Commit 349d32dc2c

Joran Dirk Greef <joran@ronomon.com>
2020-12-09 18:06:28
Add io_uring TIMEOUT and TIMEOUT_REMOVE operations:
ring.timeout() to queue a IORING_OP_TIMEOUT operation ring.timeout_remove() to queue a IORING_OP_TIMEOUT_REMOVE operation io_uring_prep_timeout() to prep a IORING_OP_TIMEOUT sqe io_uring_prep_timeout_remove() to prep a IORING_OP_TIMEOUT_REMOVE sqe
1 parent 3599fb9
Changed files (1)
lib
std
os
lib/std/os/linux/io_uring.zig
@@ -512,6 +512,52 @@ pub const IO_Uring = struct {
         return sqe;
     }
 
+    /// Queues (but does not submit) an SQE to register a timeout operation.
+    /// Returns a pointer to the SQE.
+    ///
+    /// The timeout will complete when either the timeout expires, or after the specified number of
+    /// events complete (if `count` is greater than `0`).
+    ///
+    /// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout.
+    ///
+    /// The completion event result will be `-ETIME` if the timeout completed through expiration,
+    /// `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the
+    /// timeout was removed before it expired.
+    ///
+    /// io_uring timeouts use the `CLOCK_MONOTONIC` clock source.
+    pub fn timeout(
+        self: *IO_Uring,
+        user_data: u64,
+        ts: *const os.timespec,
+        count: u32,
+        flags: u32,
+    ) !*io_uring_sqe {
+        const sqe = try self.get_sqe();
+        io_uring_prep_timeout(sqe, ts, count, flags);
+        sqe.user_data = user_data;
+        return sqe;
+    }
+
+    /// Queues (but does not submit) an SQE to remove an existing timeout operation.
+    /// Returns a pointer to the SQE.
+    ///
+    /// The timeout is identified by its `user_data`.
+    ///
+    /// The completion event result will be `0` if the timeout was found and cancelled successfully,
+    /// `-EBUSY` if the timeout was found but expiration was already in progress, or
+    /// `-ENOENT` if the timeout was not found.
+    pub fn timeout_remove(
+        self: *IO_Uring,
+        user_data: u64,
+        timeout_user_data: u64,
+        flags: u32,
+    ) !*io_uring_sqe {
+        const sqe = try self.get_sqe();
+        io_uring_prep_timeout_remove(sqe, timeout_user_data, flags);
+        sqe.user_data = user_data;
+        return sqe;
+    }
+
     /// Registers an array of file descriptors.
     /// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must
     /// retrieve a reference to the file, and once I/O has completed the file reference must be
@@ -824,6 +870,34 @@ pub fn io_uring_prep_close(sqe: *io_uring_sqe, fd: os.fd_t) void {
     };
 }
 
+pub fn io_uring_prep_timeout(
+    sqe: *io_uring_sqe,
+    ts: *const os.timespec,
+    count: u32,
+    flags: u32,
+) void {
+    io_uring_prep_rw(.TIMEOUT, sqe, -1, ts, 1, count);
+    sqe.rw_flags = flags;
+}
+
+pub fn io_uring_prep_timeout_remove(sqe: *io_uring_sqe, timeout_user_data: u64, flags: u32) void {
+    sqe.* = .{
+        .opcode = .TIMEOUT_REMOVE,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = -1,
+        .off = 0,
+        .addr = timeout_user_data,
+        .len = 0,
+        .rw_flags = flags,
+        .user_data = 0,
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .__pad2 = [2]u64{ 0, 0 },
+    };
+}
+
 test "structs/offsets/entries" {
     if (builtin.os.tag != .linux) return error.SkipZigTest;
 
@@ -1216,3 +1290,105 @@ test "accept/connect/send/recv" {
 
     testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
 }
+
+test "timeout (after a relative time)" {
+    if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+    var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+        error.SystemOutdated => return error.SkipZigTest,
+        error.PermissionDenied => return error.SkipZigTest,
+        else => return err,
+    };
+    defer ring.deinit();
+
+    const ms = 10;
+    const margin = 5;
+    const ts = os.timespec{ .tv_sec = 0, .tv_nsec = ms * 1000000 };
+
+    const started = std.time.milliTimestamp();
+    const sqe = try ring.timeout(0x55555555, &ts, 0, 0);
+    testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe.opcode);
+    testing.expectEqual(@as(u32, 1), try ring.submit());
+    const cqe = try ring.copy_cqe();
+    const stopped = std.time.milliTimestamp();
+
+    testing.expectEqual(linux.io_uring_cqe{
+        .user_data = 0x55555555,
+        .res = -linux.ETIME,
+        .flags = 0,
+    }, cqe);
+    testing.expectWithinMargin(@intToFloat(f64, ms), @intToFloat(f64, stopped - started), margin);
+}
+
+test "timeout (after a number of completions)" {
+    if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+    var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+        error.SystemOutdated => return error.SkipZigTest,
+        error.PermissionDenied => return error.SkipZigTest,
+        else => return err,
+    };
+    defer ring.deinit();
+
+    const ts = os.timespec{ .tv_sec = 3, .tv_nsec = 0 };
+    const count_completions: u64 = 1;
+    const sqe_timeout = try ring.timeout(0x66666666, &ts, count_completions, 0);
+    testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe_timeout.opcode);
+    testing.expectEqual(count_completions, sqe_timeout.off);
+    _ = try ring.nop(0x77777777);
+    testing.expectEqual(@as(u32, 2), try ring.submit());
+
+    const cqe_nop = try ring.copy_cqe();
+    testing.expectEqual(linux.io_uring_cqe{
+        .user_data = 0x77777777,
+        .res = 0,
+        .flags = 0,
+    }, cqe_nop);
+
+    const cqe_timeout = try ring.copy_cqe();
+    testing.expectEqual(linux.io_uring_cqe{
+        .user_data = 0x66666666,
+        .res = 0,
+        .flags = 0,
+    }, cqe_timeout);
+}
+
+test "timeout_remove" {
+    if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+    var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+        error.SystemOutdated => return error.SkipZigTest,
+        error.PermissionDenied => return error.SkipZigTest,
+        else => return err,
+    };
+    defer ring.deinit();
+
+    const ts = os.timespec{ .tv_sec = 3, .tv_nsec = 0 };
+    const sqe_timeout = try ring.timeout(0x88888888, &ts, 0, 0);
+    testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe_timeout.opcode);
+    testing.expectEqual(@as(u64, 0x88888888), sqe_timeout.user_data);
+
+    const sqe_timeout_remove = try ring.timeout_remove(0x99999999, 0x88888888, 0);
+    testing.expectEqual(linux.IORING_OP.TIMEOUT_REMOVE, sqe_timeout_remove.opcode);
+    testing.expectEqual(@as(u64, 0x88888888), sqe_timeout_remove.addr);
+    testing.expectEqual(@as(u64, 0x99999999), sqe_timeout_remove.user_data);
+
+    testing.expectEqual(@as(u32, 2), try ring.submit());
+
+    const cqe_timeout = try ring.copy_cqe();
+    if (cqe_timeout.user_data == 0x99999999 and cqe_timeout.res == -linux.EINVAL) {
+        return error.SkipZigTest;
+    }
+    testing.expectEqual(linux.io_uring_cqe{
+        .user_data = 0x88888888,
+        .res = -linux.ECANCELED,
+        .flags = 0,
+    }, cqe_timeout);
+
+    const cqe_timeout_remove = try ring.copy_cqe();
+    testing.expectEqual(linux.io_uring_cqe{
+        .user_data = 0x99999999,
+        .res = 0,
+        .flags = 0,
+    }, cqe_timeout_remove);
+}