Commit 6a53f4be4b

Joran Dirk Greef <joran@ronomon.com>
2020-10-04 13:15:39
Add openat(), close(), connect(), send(), recv(), as well as tests
Removes non-essential .hardlink_with_next_sqe() and .drain_previous_sqes().
1 parent 3d2de6c
Changed files (1)
lib
std
os
lib/std/os/linux/io_uring.zig
@@ -441,22 +441,75 @@ pub const IO_Uring = struct {
         sqe.user_data = user_data;
         return sqe;
     }
-    
-    /// Like `link_with_next_sqe()` but stronger.
-    /// For when you don't want the chain to fail in the event of a completion result error.
-    /// For example, you may know that some commands will fail and may want the chain to continue.
-    /// Hard links are resilient to completion results, but are not resilient to submission errors.
-    pub fn hardlink_with_next_sqe(self: *IO_Uring, sqe: *io_uring_sqe) void {
-        sqe.flags |= linux.IOSQE_IO_HARDLINK;
+
+    /// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
+    /// Returns a pointer to the SQE.
+    pub fn connect(
+        self: *IO_Uring,
+        user_data: u64,
+        fd: os.fd_t,
+        addr: *const os.sockaddr,
+        addrlen: os.socklen_t
+    ) !*io_uring_sqe {
+        const sqe = try self.get_sqe();
+        io_uring_prep_connect(sqe, fd, addr, addrlen);
+        sqe.user_data = user_data;
+        return sqe;
     }
-    
-    /// This creates a full pipeline barrier in the submission queue.
-    /// This SQE will not be started until previous SQEs complete.
-    /// Subsequent SQEs will not be started until this SQE completes.
-    /// In other words, this stalls the entire submission queue.
-    /// You should first consider using link_with_next_sqe() for more granular SQE sequence control.
-    pub fn drain_previous_sqes(self: *IO_Uring, sqe: *io_uring_sqe) void {
-        sqe.flags |= linux.IOSQE_IO_DRAIN;
+
+    /// Queues (but does not submit) an SQE to perform a `recv(2)`.
+    /// Returns a pointer to the SQE.
+    pub fn recv(
+        self: *IO_Uring,
+        user_data: u64,
+        fd: os.fd_t,
+        buffer: []u8,
+        flags: u32
+    ) !*io_uring_sqe {
+        const sqe = try self.get_sqe();
+        io_uring_prep_recv(sqe, fd, buffer, flags);
+        sqe.user_data = user_data;
+        return sqe;
+    }
+
+    /// Queues (but does not submit) an SQE to perform a `send(2)`.
+    /// Returns a pointer to the SQE.
+    pub fn send(
+        self: *IO_Uring,
+        user_data: u64,
+        fd: os.fd_t,
+        buffer: []u8,
+        flags: u32
+    ) !*io_uring_sqe {
+        const sqe = try self.get_sqe();
+        io_uring_prep_send(sqe, fd, buffer, flags);
+        sqe.user_data = user_data;
+        return sqe;
+    }
+
+    /// Queues (but does not submit) an SQE to perform an `openat(2)`.
+    /// Returns a pointer to the SQE.
+    pub fn openat(
+        self: *IO_Uring,
+        user_data: u64,
+        fd: os.fd_t,
+        path: [*:0]const u8,
+        flags: u32,
+        mode: os.mode_t
+    ) !*io_uring_sqe {
+        const sqe = try self.get_sqe();
+        io_uring_prep_openat(sqe, fd, path, flags, mode);
+        sqe.user_data = user_data;
+        return sqe;
+    }
+
+    /// Queues (but does not submit) an SQE to perform a `close(2)`.
+    /// Returns a pointer to the SQE.
+    pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*io_uring_sqe {
+        const sqe = try self.get_sqe();
+        io_uring_prep_close(sqe, fd);
+        sqe.user_data = user_data;
+        return sqe;
     }
 
     /// Registers an array of file descriptors.
@@ -1007,3 +1060,133 @@ test "write/read" {
     }, cqe_read);
     testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
 }
+
+test "openat/close" {
+    if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+    var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+        error.SystemOutdated => return error.SkipZigTest,
+        error.PermissionDenied => return error.SkipZigTest,
+        else => return err
+    };
+    defer ring.deinit();
+
+    const path = "test_io_uring_openat_close";
+    defer std.fs.cwd().deleteFile(path) catch {};
+
+    const flags: u32 = os.O_CLOEXEC | os.O_RDWR | os.O_CREAT;
+    const mode: os.mode_t = 0o666;
+    var sqe_openat = try ring.openat(789, linux.AT_FDCWD, path, flags, mode);
+    testing.expectEqual(io_uring_sqe {
+        .opcode = .OPENAT,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = linux.AT_FDCWD,
+        .off = 0,
+        .addr = @ptrToInt(path),
+        .len = mode,
+        .rw_flags = flags,
+        .user_data = 789,
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .__pad2 = [2]u64{ 0, 0 }
+    }, sqe_openat.*);
+    testing.expectEqual(@as(u32, 1), try ring.submit());
+
+    var cqe_openat = try ring.copy_cqe();
+    if (cqe_openat.res == -linux.EINVAL) return error.SkipZigTest;
+    testing.expectEqual(@as(u64, 789), cqe_openat.user_data);
+    testing.expect(cqe_openat.res > 0);
+    testing.expectEqual(@as(u32, 0), cqe_openat.flags);
+
+    var sqe_close = try ring.close(1011, cqe_openat.res);
+    testing.expectEqual(linux.IORING_OP.CLOSE, sqe_close.opcode);
+    testing.expectEqual(cqe_openat.res, sqe_close.fd);
+    testing.expectEqual(@as(u32, 1), try ring.submit());
+
+    var cqe_close = try ring.copy_cqe();
+    if (cqe_close.res == -linux.EINVAL) return error.SkipZigTest;
+    testing.expectEqual(linux.io_uring_cqe {
+        .user_data = 1011,
+        .res = 0,
+        .flags = 0,
+    }, cqe_close);
+}
+
+test "accept/connect/send/recv" {
+    if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+    var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+        error.SystemOutdated => return error.SkipZigTest,
+        error.PermissionDenied => return error.SkipZigTest,
+        else => return err
+    };
+    defer ring.deinit();
+
+    var address = try net.Address.parseIp4("127.0.0.1", 3131);
+    const kernel_backlog = 1;
+    const server = try os.socket(address.any.family, os.SOCK_STREAM | os.SOCK_CLOEXEC, 0);
+    defer os.close(server);
+    try os.setsockopt(server, os.SOL_SOCKET, os.SO_REUSEADDR, &mem.toBytes(@as(c_int, 1)));
+    try os.bind(server, &address.any, address.getOsSockLen());
+    try os.listen(server, kernel_backlog);
+
+    var buffer_send = [_]u8{1,0,1,0,1,0,1,0,1,0};
+    var buffer_recv = [_]u8{0,1,0,1,0};
+
+    var accept_addr: os.sockaddr = undefined;
+    var accept_addr_len: os.socklen_t = @sizeOf(@TypeOf(accept_addr));
+    var accept = try ring.accept(0xaaaaaaaa, server, &accept_addr, &accept_addr_len, 0);
+    testing.expectEqual(@as(u32, 1), try ring.submit());
+
+    const client = try os.socket(address.any.family, os.SOCK_STREAM | os.SOCK_CLOEXEC, 0);
+    defer os.close(client);
+    var connect = try ring.connect(0xcccccccc, client, &address.any, address.getOsSockLen());
+    testing.expectEqual(@as(u32, 1), try ring.submit());
+
+    var cqe_accept = try ring.copy_cqe();
+    if (cqe_accept.res == -linux.EINVAL) return error.SkipZigTest;
+    var cqe_connect = try ring.copy_cqe();
+    if (cqe_connect.res == -linux.EINVAL) return error.SkipZigTest;
+
+    // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first:
+    if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) {
+        var a = cqe_accept;
+        var b = cqe_connect;
+        cqe_accept = b;
+        cqe_connect = a;
+    }
+
+    testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data);
+    testing.expect(cqe_accept.res > 0);
+    testing.expectEqual(@as(u32, 0), cqe_accept.flags);
+    testing.expectEqual(linux.io_uring_cqe {
+        .user_data = 0xcccccccc,
+        .res = 0,
+        .flags = 0,
+    }, cqe_connect);
+
+    var send = try ring.send(0xeeeeeeee, client, buffer_send[0..], 0);
+    send.flags |= linux.IOSQE_IO_LINK;
+    var recv = try ring.recv(0xffffffff, cqe_accept.res, buffer_recv[0..], 0);
+    testing.expectEqual(@as(u32, 2), try ring.submit());
+
+    var cqe_send = try ring.copy_cqe();
+    if (cqe_send.res == -linux.EINVAL) return error.SkipZigTest;
+    testing.expectEqual(linux.io_uring_cqe {
+        .user_data = 0xeeeeeeee,
+        .res = buffer_send.len,
+        .flags = 0,
+    }, cqe_send);
+
+    var cqe_recv = try ring.copy_cqe();
+    if (cqe_recv.res == -linux.EINVAL) return error.SkipZigTest;
+    testing.expectEqual(linux.io_uring_cqe {
+        .user_data = 0xffffffff,
+        .res = buffer_recv.len,
+        .flags = 0,
+    }, cqe_recv);
+
+    testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
+}