master
   1const IoUring = @This();
   2const std = @import("std");
   3const builtin = @import("builtin");
   4const assert = std.debug.assert;
   5const mem = std.mem;
   6const net = std.Io.net;
   7const posix = std.posix;
   8const linux = std.os.linux;
   9const testing = std.testing;
  10const is_linux = builtin.os.tag == .linux;
  11const page_size_min = std.heap.page_size_min;
  12
  13fd: linux.fd_t = -1,
  14sq: SubmissionQueue,
  15cq: CompletionQueue,
  16flags: u32,
  17features: u32,
  18
  19/// A friendly way to setup an io_uring, with default linux.io_uring_params.
  20/// `entries` must be a power of two between 1 and 32768, although the kernel will make the final
  21/// call on how many entries the submission and completion queues will ultimately have,
  22/// see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L8027-L8050.
  23/// Matches the interface of io_uring_queue_init() in liburing.
  24pub fn init(entries: u16, flags: u32) !IoUring {
  25    var params = mem.zeroInit(linux.io_uring_params, .{
  26        .flags = flags,
  27        .sq_thread_idle = 1000,
  28    });
  29    return try IoUring.init_params(entries, &params);
  30}
  31
  32/// A powerful way to setup an io_uring, if you want to tweak linux.io_uring_params such as submission
  33/// queue thread cpu affinity or thread idle timeout (the kernel and our default is 1 second).
  34/// `params` is passed by reference because the kernel needs to modify the parameters.
  35/// Matches the interface of io_uring_queue_init_params() in liburing.
  36pub fn init_params(entries: u16, p: *linux.io_uring_params) !IoUring {
  37    if (entries == 0) return error.EntriesZero;
  38    if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
  39
  40    assert(p.sq_entries == 0);
  41    assert(p.cq_entries == 0 or p.flags & linux.IORING_SETUP_CQSIZE != 0);
  42    assert(p.features == 0);
  43    assert(p.wq_fd == 0 or p.flags & linux.IORING_SETUP_ATTACH_WQ != 0);
  44    assert(p.resv[0] == 0);
  45    assert(p.resv[1] == 0);
  46    assert(p.resv[2] == 0);
  47
  48    const res = linux.io_uring_setup(entries, p);
  49    switch (linux.errno(res)) {
  50        .SUCCESS => {},
  51        .FAULT => return error.ParamsOutsideAccessibleAddressSpace,
  52        // The resv array contains non-zero data, p.flags contains an unsupported flag,
  53        // entries out of bounds, IORING_SETUP_SQ_AFF was specified without IORING_SETUP_SQPOLL,
  54        // or IORING_SETUP_CQSIZE was specified but linux.io_uring_params.cq_entries was invalid:
  55        .INVAL => return error.ArgumentsInvalid,
  56        .MFILE => return error.ProcessFdQuotaExceeded,
  57        .NFILE => return error.SystemFdQuotaExceeded,
  58        .NOMEM => return error.SystemResources,
  59        // IORING_SETUP_SQPOLL was specified but effective user ID lacks sufficient privileges,
  60        // or a container seccomp policy prohibits io_uring syscalls:
  61        .PERM => return error.PermissionDenied,
  62        .NOSYS => return error.SystemOutdated,
  63        else => |errno| return posix.unexpectedErrno(errno),
  64    }
  65    const fd = @as(linux.fd_t, @intCast(res));
  66    assert(fd >= 0);
  67    errdefer posix.close(fd);
  68
  69    // Kernel versions 5.4 and up use only one mmap() for the submission and completion queues.
  70    // This is not an optional feature for us... if the kernel does it, we have to do it.
  71    // The thinking on this by the kernel developers was that both the submission and the
  72    // completion queue rings have sizes just over a power of two, but the submission queue ring
  73    // is significantly smaller with u32 slots. By bundling both in a single mmap, the kernel
  74    // gets the submission queue ring for free.
  75    // See https://patchwork.kernel.org/patch/11115257 for the kernel patch.
  76    // We do not support the double mmap() done before 5.4, because we want to keep the
  77    // init/deinit mmap paths simple and because io_uring has had many bug fixes even since 5.4.
  78    if ((p.features & linux.IORING_FEAT_SINGLE_MMAP) == 0) {
  79        return error.SystemOutdated;
  80    }
  81
  82    // Check that the kernel has actually set params and that "impossible is nothing".
  83    assert(p.sq_entries != 0);
  84    assert(p.cq_entries != 0);
  85    assert(p.cq_entries >= p.sq_entries);
  86
  87    // From here on, we only need to read from params, so pass `p` by value as immutable.
  88    // The completion queue shares the mmap with the submission queue, so pass `sq` there too.
  89    var sq = try SubmissionQueue.init(fd, p.*);
  90    errdefer sq.deinit();
  91    var cq = try CompletionQueue.init(fd, p.*, sq);
  92    errdefer cq.deinit();
  93
  94    // Check that our starting state is as we expect.
  95    assert(sq.head.* == 0);
  96    assert(sq.tail.* == 0);
  97    assert(sq.mask == p.sq_entries - 1);
  98    // Allow flags.* to be non-zero, since the kernel may set IORING_SQ_NEED_WAKEUP at any time.
  99    assert(sq.dropped.* == 0);
 100    assert(sq.array.len == p.sq_entries);
 101    assert(sq.sqes.len == p.sq_entries);
 102    assert(sq.sqe_head == 0);
 103    assert(sq.sqe_tail == 0);
 104
 105    assert(cq.head.* == 0);
 106    assert(cq.tail.* == 0);
 107    assert(cq.mask == p.cq_entries - 1);
 108    assert(cq.overflow.* == 0);
 109    assert(cq.cqes.len == p.cq_entries);
 110
 111    return IoUring{
 112        .fd = fd,
 113        .sq = sq,
 114        .cq = cq,
 115        .flags = p.flags,
 116        .features = p.features,
 117    };
 118}
 119
 120pub fn deinit(self: *IoUring) void {
 121    assert(self.fd >= 0);
 122    // The mmaps depend on the fd, so the order of these calls is important:
 123    self.cq.deinit();
 124    self.sq.deinit();
 125    posix.close(self.fd);
 126    self.fd = -1;
 127}
 128
 129/// Returns a pointer to a vacant SQE, or an error if the submission queue is full.
 130/// We follow the implementation (and atomics) of liburing's `io_uring_get_sqe()` exactly.
 131/// However, instead of a null we return an error to force safe handling.
 132/// Any situation where the submission queue is full tends more towards a control flow error,
 133/// and the null return in liburing is more a C idiom than anything else, for lack of a better
 134/// alternative. In Zig, we have first-class error handling... so let's use it.
 135/// Matches the implementation of io_uring_get_sqe() in liburing.
 136pub fn get_sqe(self: *IoUring) !*linux.io_uring_sqe {
 137    const head = @atomicLoad(u32, self.sq.head, .acquire);
 138    // Remember that these head and tail offsets wrap around every four billion operations.
 139    // We must therefore use wrapping addition and subtraction to avoid a runtime crash.
 140    const next = self.sq.sqe_tail +% 1;
 141    if (next -% head > self.sq.sqes.len) return error.SubmissionQueueFull;
 142    const sqe = &self.sq.sqes[self.sq.sqe_tail & self.sq.mask];
 143    self.sq.sqe_tail = next;
 144    return sqe;
 145}
 146
 147/// Submits the SQEs acquired via get_sqe() to the kernel. You can call this once after you have
 148/// called get_sqe() multiple times to setup multiple I/O requests.
 149/// Returns the number of SQEs submitted, if not used alongside IORING_SETUP_SQPOLL.
 150/// If the io_uring instance is uses IORING_SETUP_SQPOLL, the value returned on success is not
 151/// guaranteed to match the amount of actually submitted sqes during this call. A value higher
 152/// or lower, including 0, may be returned.
 153/// Matches the implementation of io_uring_submit() in liburing.
 154pub fn submit(self: *IoUring) !u32 {
 155    return self.submit_and_wait(0);
 156}
 157
 158/// Like submit(), but allows waiting for events as well.
 159/// Returns the number of SQEs submitted.
 160/// Matches the implementation of io_uring_submit_and_wait() in liburing.
 161pub fn submit_and_wait(self: *IoUring, wait_nr: u32) !u32 {
 162    const submitted = self.flush_sq();
 163    var flags: u32 = 0;
 164    if (self.sq_ring_needs_enter(&flags) or wait_nr > 0) {
 165        if (wait_nr > 0 or (self.flags & linux.IORING_SETUP_IOPOLL) != 0) {
 166            flags |= linux.IORING_ENTER_GETEVENTS;
 167        }
 168        return try self.enter(submitted, wait_nr, flags);
 169    }
 170    return submitted;
 171}
 172
 173/// Tell the kernel we have submitted SQEs and/or want to wait for CQEs.
 174/// Returns the number of SQEs submitted.
 175pub fn enter(self: *IoUring, to_submit: u32, min_complete: u32, flags: u32) !u32 {
 176    assert(self.fd >= 0);
 177    const res = linux.io_uring_enter(self.fd, to_submit, min_complete, flags, null);
 178    switch (linux.errno(res)) {
 179        .SUCCESS => {},
 180        // The kernel was unable to allocate memory or ran out of resources for the request.
 181        // The application should wait for some completions and try again:
 182        .AGAIN => return error.SystemResources,
 183        // The SQE `fd` is invalid, or IOSQE_FIXED_FILE was set but no files were registered:
 184        .BADF => return error.FileDescriptorInvalid,
 185        // The file descriptor is valid, but the ring is not in the right state.
 186        // See io_uring_register(2) for how to enable the ring.
 187        .BADFD => return error.FileDescriptorInBadState,
 188        // The application attempted to overcommit the number of requests it can have pending.
 189        // The application should wait for some completions and try again:
 190        .BUSY => return error.CompletionQueueOvercommitted,
 191        // The SQE is invalid, or valid but the ring was setup with IORING_SETUP_IOPOLL:
 192        .INVAL => return error.SubmissionQueueEntryInvalid,
 193        // The buffer is outside the process' accessible address space, or IORING_OP_READ_FIXED
 194        // or IORING_OP_WRITE_FIXED was specified but no buffers were registered, or the range
 195        // described by `addr` and `len` is not within the buffer registered at `buf_index`:
 196        .FAULT => return error.BufferInvalid,
 197        .NXIO => return error.RingShuttingDown,
 198        // The kernel believes our `self.fd` does not refer to an io_uring instance,
 199        // or the opcode is valid but not supported by this kernel (more likely):
 200        .OPNOTSUPP => return error.OpcodeNotSupported,
 201        // The operation was interrupted by a delivery of a signal before it could complete.
 202        // This can happen while waiting for events with IORING_ENTER_GETEVENTS:
 203        .INTR => return error.SignalInterrupt,
 204        else => |errno| return posix.unexpectedErrno(errno),
 205    }
 206    return @as(u32, @intCast(res));
 207}
 208
 209/// Sync internal state with kernel ring state on the SQ side.
 210/// Returns the number of all pending events in the SQ ring, for the shared ring.
 211/// This return value includes previously flushed SQEs, as per liburing.
 212/// The rationale is to suggest that an io_uring_enter() call is needed rather than not.
 213/// Matches the implementation of __io_uring_flush_sq() in liburing.
 214pub fn flush_sq(self: *IoUring) u32 {
 215    if (self.sq.sqe_head != self.sq.sqe_tail) {
 216        // Fill in SQEs that we have queued up, adding them to the kernel ring.
 217        const to_submit = self.sq.sqe_tail -% self.sq.sqe_head;
 218        var tail = self.sq.tail.*;
 219        var i: usize = 0;
 220        while (i < to_submit) : (i += 1) {
 221            self.sq.array[tail & self.sq.mask] = self.sq.sqe_head & self.sq.mask;
 222            tail +%= 1;
 223            self.sq.sqe_head +%= 1;
 224        }
 225        // Ensure that the kernel can actually see the SQE updates when it sees the tail update.
 226        @atomicStore(u32, self.sq.tail, tail, .release);
 227    }
 228    return self.sq_ready();
 229}
 230
 231/// Returns true if we are not using an SQ thread (thus nobody submits but us),
 232/// or if IORING_SQ_NEED_WAKEUP is set and the SQ thread must be explicitly awakened.
 233/// For the latter case, we set the SQ thread wakeup flag.
 234/// Matches the implementation of sq_ring_needs_enter() in liburing.
 235pub fn sq_ring_needs_enter(self: *IoUring, flags: *u32) bool {
 236    assert(flags.* == 0);
 237    if ((self.flags & linux.IORING_SETUP_SQPOLL) == 0) return true;
 238    if ((@atomicLoad(u32, self.sq.flags, .unordered) & linux.IORING_SQ_NEED_WAKEUP) != 0) {
 239        flags.* |= linux.IORING_ENTER_SQ_WAKEUP;
 240        return true;
 241    }
 242    return false;
 243}
 244
 245/// Returns the number of flushed and unflushed SQEs pending in the submission queue.
 246/// In other words, this is the number of SQEs in the submission queue, i.e. its length.
 247/// These are SQEs that the kernel is yet to consume.
 248/// Matches the implementation of io_uring_sq_ready in liburing.
 249pub fn sq_ready(self: *IoUring) u32 {
 250    // Always use the shared ring state (i.e. head and not sqe_head) to avoid going out of sync,
 251    // see https://github.com/axboe/liburing/issues/92.
 252    return self.sq.sqe_tail -% @atomicLoad(u32, self.sq.head, .acquire);
 253}
 254
 255/// Returns the number of CQEs in the completion queue, i.e. its length.
 256/// These are CQEs that the application is yet to consume.
 257/// Matches the implementation of io_uring_cq_ready in liburing.
 258pub fn cq_ready(self: *IoUring) u32 {
 259    return @atomicLoad(u32, self.cq.tail, .acquire) -% self.cq.head.*;
 260}
 261
 262/// Copies as many CQEs as are ready, and that can fit into the destination `cqes` slice.
 263/// If none are available, enters into the kernel to wait for at most `wait_nr` CQEs.
 264/// Returns the number of CQEs copied, advancing the CQ ring.
 265/// Provides all the wait/peek methods found in liburing, but with batching and a single method.
 266/// The rationale for copying CQEs rather than copying pointers is that pointers are 8 bytes
 267/// whereas CQEs are not much more at only 16 bytes, and this provides a safer faster interface.
 268/// Safer, because you no longer need to call cqe_seen(), avoiding idempotency bugs.
 269/// Faster, because we can now amortize the atomic store release to `cq.head` across the batch.
 270/// See https://github.com/axboe/liburing/issues/103#issuecomment-686665007.
 271/// Matches the implementation of io_uring_peek_batch_cqe() in liburing, but supports waiting.
 272pub fn copy_cqes(self: *IoUring, cqes: []linux.io_uring_cqe, wait_nr: u32) !u32 {
 273    const count = self.copy_cqes_ready(cqes);
 274    if (count > 0) return count;
 275    if (self.cq_ring_needs_flush() or wait_nr > 0) {
 276        _ = try self.enter(0, wait_nr, linux.IORING_ENTER_GETEVENTS);
 277        return self.copy_cqes_ready(cqes);
 278    }
 279    return 0;
 280}
 281
 282fn copy_cqes_ready(self: *IoUring, cqes: []linux.io_uring_cqe) u32 {
 283    const ready = self.cq_ready();
 284    const count = @min(cqes.len, ready);
 285    const head = self.cq.head.* & self.cq.mask;
 286
 287    // before wrapping
 288    const n = @min(self.cq.cqes.len - head, count);
 289    @memcpy(cqes[0..n], self.cq.cqes[head..][0..n]);
 290
 291    if (count > n) {
 292        // wrap self.cq.cqes
 293        const w = count - n;
 294        @memcpy(cqes[n..][0..w], self.cq.cqes[0..w]);
 295    }
 296
 297    self.cq_advance(count);
 298    return count;
 299}
 300
 301/// Returns a copy of an I/O completion, waiting for it if necessary, and advancing the CQ ring.
 302/// A convenience method for `copy_cqes()` for when you don't need to batch or peek.
 303pub fn copy_cqe(ring: *IoUring) !linux.io_uring_cqe {
 304    var cqes: [1]linux.io_uring_cqe = undefined;
 305    while (true) {
 306        const count = try ring.copy_cqes(&cqes, 1);
 307        if (count > 0) return cqes[0];
 308    }
 309}
 310
 311/// Matches the implementation of cq_ring_needs_flush() in liburing.
 312pub fn cq_ring_needs_flush(self: *IoUring) bool {
 313    return (@atomicLoad(u32, self.sq.flags, .unordered) & linux.IORING_SQ_CQ_OVERFLOW) != 0;
 314}
 315
 316/// For advanced use cases only that implement custom completion queue methods.
 317/// If you use copy_cqes() or copy_cqe() you must not call cqe_seen() or cq_advance().
 318/// Must be called exactly once after a zero-copy CQE has been processed by your application.
 319/// Not idempotent, calling more than once will result in other CQEs being lost.
 320/// Matches the implementation of cqe_seen() in liburing.
 321pub fn cqe_seen(self: *IoUring, cqe: *linux.io_uring_cqe) void {
 322    _ = cqe;
 323    self.cq_advance(1);
 324}
 325
 326/// For advanced use cases only that implement custom completion queue methods.
 327/// Matches the implementation of cq_advance() in liburing.
 328pub fn cq_advance(self: *IoUring, count: u32) void {
 329    if (count > 0) {
 330        // Ensure the kernel only sees the new head value after the CQEs have been read.
 331        @atomicStore(u32, self.cq.head, self.cq.head.* +% count, .release);
 332    }
 333}
 334
 335/// Queues (but does not submit) an SQE to perform an `fsync(2)`.
 336/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
 337/// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the SQE's `rw_flags`.
 338/// N.B. While SQEs are initiated in the order in which they appear in the submission queue,
 339/// operations execute in parallel and completions are unordered. Therefore, an application that
 340/// submits a write followed by an fsync in the submission queue cannot expect the fsync to
 341/// apply to the write, since the fsync may complete before the write is issued to the disk.
 342/// You should preferably use `link_with_next_sqe()` on a write's SQE to link it with an fsync,
 343/// or else insert a full write barrier using `drain_previous_sqes()` when queueing an fsync.
 344pub fn fsync(self: *IoUring, user_data: u64, fd: linux.fd_t, flags: u32) !*linux.io_uring_sqe {
 345    const sqe = try self.get_sqe();
 346    sqe.prep_fsync(fd, flags);
 347    sqe.user_data = user_data;
 348    return sqe;
 349}
 350
 351/// Queues (but does not submit) an SQE to perform a no-op.
 352/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
 353/// A no-op is more useful than may appear at first glance.
 354/// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to
 355/// know when the ring is idle before acting on a kill signal.
 356pub fn nop(self: *IoUring, user_data: u64) !*linux.io_uring_sqe {
 357    const sqe = try self.get_sqe();
 358    sqe.prep_nop();
 359    sqe.user_data = user_data;
 360    return sqe;
 361}
 362
 363/// Used to select how the read should be handled.
 364pub const ReadBuffer = union(enum) {
 365    /// io_uring will read directly into this buffer
 366    buffer: []u8,
 367
 368    /// io_uring will read directly into these buffers using readv.
 369    iovecs: []const posix.iovec,
 370
 371    /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
 372    /// The buffer group reference by `group_id` must contain at least one buffer for the read to work.
 373    /// `len` controls the number of bytes to read into the selected buffer.
 374    buffer_selection: struct {
 375        group_id: u16,
 376        len: usize,
 377    },
 378};
 379
 380/// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv(2)` depending on the buffer type.
 381/// * Reading into a `ReadBuffer.buffer` uses `read(2)`
 382/// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)`
 383///   If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE. See https://man7.org/linux/man-pages/man2/preadv2.2.html
 384///
 385/// Returns a pointer to the SQE.
 386pub fn read(
 387    self: *IoUring,
 388    user_data: u64,
 389    fd: linux.fd_t,
 390    buffer: ReadBuffer,
 391    offset: u64,
 392) !*linux.io_uring_sqe {
 393    const sqe = try self.get_sqe();
 394    switch (buffer) {
 395        .buffer => |slice| sqe.prep_read(fd, slice, offset),
 396        .iovecs => |vecs| sqe.prep_readv(fd, vecs, offset),
 397        .buffer_selection => |selection| {
 398            sqe.prep_rw(.READ, fd, 0, selection.len, offset);
 399            sqe.flags |= linux.IOSQE_BUFFER_SELECT;
 400            sqe.buf_index = selection.group_id;
 401        },
 402    }
 403    sqe.user_data = user_data;
 404    return sqe;
 405}
 406
 407/// Queues (but does not submit) an SQE to perform a `write(2)`.
 408/// Returns a pointer to the SQE.
 409pub fn write(
 410    self: *IoUring,
 411    user_data: u64,
 412    fd: linux.fd_t,
 413    buffer: []const u8,
 414    offset: u64,
 415) !*linux.io_uring_sqe {
 416    const sqe = try self.get_sqe();
 417    sqe.prep_write(fd, buffer, offset);
 418    sqe.user_data = user_data;
 419    return sqe;
 420}
 421
 422/// Queues (but does not submit) an SQE to perform a `splice(2)`
 423/// Either `fd_in` or `fd_out` must be a pipe.
 424/// If `fd_in` refers to a pipe, `off_in` is ignored and must be set to std.math.maxInt(u64).
 425/// If `fd_in` does not refer to a pipe and `off_in` is maxInt(u64), then `len` are read
 426/// from `fd_in` starting from the file offset, which is incremented by the number of bytes read.
 427/// If `fd_in` does not refer to a pipe and `off_in` is not maxInt(u64), then the starting offset of `fd_in` will be `off_in`.
 428/// This splice operation can be used to implement sendfile by splicing to an intermediate pipe first,
 429/// then splice to the final destination. In fact, the implementation of sendfile in kernel uses splice internally.
 430///
 431/// NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with EINVAL if one of the
 432/// fd doesn't explicitly support splice peration, e.g. reading from terminal is unsupported from kernel 5.7 to 5.11.
 433/// See https://github.com/axboe/liburing/issues/291
 434///
 435/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
 436pub fn splice(self: *IoUring, user_data: u64, fd_in: linux.fd_t, off_in: u64, fd_out: linux.fd_t, off_out: u64, len: usize) !*linux.io_uring_sqe {
 437    const sqe = try self.get_sqe();
 438    sqe.prep_splice(fd_in, off_in, fd_out, off_out, len);
 439    sqe.user_data = user_data;
 440    return sqe;
 441}
 442
 443/// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED.
 444/// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
 445/// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
 446///
 447/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
 448pub fn read_fixed(
 449    self: *IoUring,
 450    user_data: u64,
 451    fd: linux.fd_t,
 452    buffer: *posix.iovec,
 453    offset: u64,
 454    buffer_index: u16,
 455) !*linux.io_uring_sqe {
 456    const sqe = try self.get_sqe();
 457    sqe.prep_read_fixed(fd, buffer, offset, buffer_index);
 458    sqe.user_data = user_data;
 459    return sqe;
 460}
 461
 462/// Queues (but does not submit) an SQE to perform a `pwritev()`.
 463/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
 464/// For example, if you want to do a `pwritev2()` then set `rw_flags` on the returned SQE.
 465/// See https://linux.die.net/man/2/pwritev.
 466pub fn writev(
 467    self: *IoUring,
 468    user_data: u64,
 469    fd: linux.fd_t,
 470    iovecs: []const posix.iovec_const,
 471    offset: u64,
 472) !*linux.io_uring_sqe {
 473    const sqe = try self.get_sqe();
 474    sqe.prep_writev(fd, iovecs, offset);
 475    sqe.user_data = user_data;
 476    return sqe;
 477}
 478
 479/// Queues (but does not submit) an SQE to perform a IORING_OP_WRITE_FIXED.
 480/// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
 481/// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
 482///
 483/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
 484pub fn write_fixed(
 485    self: *IoUring,
 486    user_data: u64,
 487    fd: linux.fd_t,
 488    buffer: *posix.iovec,
 489    offset: u64,
 490    buffer_index: u16,
 491) !*linux.io_uring_sqe {
 492    const sqe = try self.get_sqe();
 493    sqe.prep_write_fixed(fd, buffer, offset, buffer_index);
 494    sqe.user_data = user_data;
 495    return sqe;
 496}
 497
 498/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
 499/// Returns a pointer to the SQE.
 500/// Available since 5.5
 501pub fn accept(
 502    self: *IoUring,
 503    user_data: u64,
 504    fd: linux.fd_t,
 505    addr: ?*posix.sockaddr,
 506    addrlen: ?*posix.socklen_t,
 507    flags: u32,
 508) !*linux.io_uring_sqe {
 509    const sqe = try self.get_sqe();
 510    sqe.prep_accept(fd, addr, addrlen, flags);
 511    sqe.user_data = user_data;
 512    return sqe;
 513}
 514
 515/// Queues an multishot accept on a socket.
 516///
 517/// Multishot variant allows an application to issue a single accept request,
 518/// which will repeatedly trigger a CQE when a connection request comes in.
 519/// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate
 520/// further CQEs.
 521///
 522/// Available since 5.19
 523pub fn accept_multishot(
 524    self: *IoUring,
 525    user_data: u64,
 526    fd: linux.fd_t,
 527    addr: ?*posix.sockaddr,
 528    addrlen: ?*posix.socklen_t,
 529    flags: u32,
 530) !*linux.io_uring_sqe {
 531    const sqe = try self.get_sqe();
 532    sqe.prep_multishot_accept(fd, addr, addrlen, flags);
 533    sqe.user_data = user_data;
 534    return sqe;
 535}
 536
 537/// Queues an accept using direct (registered) file descriptors.
 538///
 539/// To use an accept direct variant, the application must first have registered
 540/// a file table (with register_files). An unused table index will be
 541/// dynamically chosen and returned in the CQE res field.
 542///
 543/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
 544/// flags member, and setting the SQE fd field to the direct descriptor value
 545/// rather than the regular file descriptor.
 546///
 547/// Available since 5.19
 548pub fn accept_direct(
 549    self: *IoUring,
 550    user_data: u64,
 551    fd: linux.fd_t,
 552    addr: ?*posix.sockaddr,
 553    addrlen: ?*posix.socklen_t,
 554    flags: u32,
 555) !*linux.io_uring_sqe {
 556    const sqe = try self.get_sqe();
 557    sqe.prep_accept_direct(fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC);
 558    sqe.user_data = user_data;
 559    return sqe;
 560}
 561
 562/// Queues an multishot accept using direct (registered) file descriptors.
 563/// Available since 5.19
 564pub fn accept_multishot_direct(
 565    self: *IoUring,
 566    user_data: u64,
 567    fd: linux.fd_t,
 568    addr: ?*posix.sockaddr,
 569    addrlen: ?*posix.socklen_t,
 570    flags: u32,
 571) !*linux.io_uring_sqe {
 572    const sqe = try self.get_sqe();
 573    sqe.prep_multishot_accept_direct(fd, addr, addrlen, flags);
 574    sqe.user_data = user_data;
 575    return sqe;
 576}
 577
 578/// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
 579/// Returns a pointer to the SQE.
 580pub fn connect(
 581    self: *IoUring,
 582    user_data: u64,
 583    fd: linux.fd_t,
 584    addr: *const posix.sockaddr,
 585    addrlen: posix.socklen_t,
 586) !*linux.io_uring_sqe {
 587    const sqe = try self.get_sqe();
 588    sqe.prep_connect(fd, addr, addrlen);
 589    sqe.user_data = user_data;
 590    return sqe;
 591}
 592
 593/// Queues (but does not submit) an SQE to perform a `epoll_ctl(2)`.
 594/// Returns a pointer to the SQE.
 595pub fn epoll_ctl(
 596    self: *IoUring,
 597    user_data: u64,
 598    epfd: linux.fd_t,
 599    fd: linux.fd_t,
 600    op: u32,
 601    ev: ?*linux.epoll_event,
 602) !*linux.io_uring_sqe {
 603    const sqe = try self.get_sqe();
 604    sqe.prep_epoll_ctl(epfd, fd, op, ev);
 605    sqe.user_data = user_data;
 606    return sqe;
 607}
 608
 609/// Used to select how the recv call should be handled.
 610pub const RecvBuffer = union(enum) {
 611    /// io_uring will recv directly into this buffer
 612    buffer: []u8,
 613
 614    /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
 615    /// The buffer group referenced by `group_id` must contain at least one buffer for the recv call to work.
 616    /// `len` controls the number of bytes to read into the selected buffer.
 617    buffer_selection: struct {
 618        group_id: u16,
 619        len: usize,
 620    },
 621};
 622
 623/// Queues (but does not submit) an SQE to perform a `recv(2)`.
 624/// Returns a pointer to the SQE.
 625/// Available since 5.6
 626pub fn recv(
 627    self: *IoUring,
 628    user_data: u64,
 629    fd: linux.fd_t,
 630    buffer: RecvBuffer,
 631    flags: u32,
 632) !*linux.io_uring_sqe {
 633    const sqe = try self.get_sqe();
 634    switch (buffer) {
 635        .buffer => |slice| sqe.prep_recv(fd, slice, flags),
 636        .buffer_selection => |selection| {
 637            sqe.prep_rw(.RECV, fd, 0, selection.len, 0);
 638            sqe.rw_flags = flags;
 639            sqe.flags |= linux.IOSQE_BUFFER_SELECT;
 640            sqe.buf_index = selection.group_id;
 641        },
 642    }
 643    sqe.user_data = user_data;
 644    return sqe;
 645}
 646
 647/// Queues (but does not submit) an SQE to perform a `send(2)`.
 648/// Returns a pointer to the SQE.
 649/// Available since 5.6
 650pub fn send(
 651    self: *IoUring,
 652    user_data: u64,
 653    fd: linux.fd_t,
 654    buffer: []const u8,
 655    flags: u32,
 656) !*linux.io_uring_sqe {
 657    const sqe = try self.get_sqe();
 658    sqe.prep_send(fd, buffer, flags);
 659    sqe.user_data = user_data;
 660    return sqe;
 661}
 662
 663/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
 664///
 665/// This operation will most likely produce two CQEs. The flags field of the
 666/// first cqe may likely contain IORING_CQE_F_MORE, which means that there will
 667/// be a second cqe with the user_data field set to the same value. The user
 668/// must not modify the data buffer until the notification is posted. The first
 669/// cqe follows the usual rules and so its res field will contain the number of
 670/// bytes sent or a negative error code. The notification's res field will be
 671/// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two
 672/// step model is needed because the kernel may hold on to buffers for a long
 673/// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling
 674/// the lifetime of the buffers. Even errored requests may generate a
 675/// notification.
 676///
 677/// Available since 6.0
 678pub fn send_zc(
 679    self: *IoUring,
 680    user_data: u64,
 681    fd: linux.fd_t,
 682    buffer: []const u8,
 683    send_flags: u32,
 684    zc_flags: u16,
 685) !*linux.io_uring_sqe {
 686    const sqe = try self.get_sqe();
 687    sqe.prep_send_zc(fd, buffer, send_flags, zc_flags);
 688    sqe.user_data = user_data;
 689    return sqe;
 690}
 691
 692/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
 693/// Returns a pointer to the SQE.
 694/// Available since 6.0
 695pub fn send_zc_fixed(
 696    self: *IoUring,
 697    user_data: u64,
 698    fd: linux.fd_t,
 699    buffer: []const u8,
 700    send_flags: u32,
 701    zc_flags: u16,
 702    buf_index: u16,
 703) !*linux.io_uring_sqe {
 704    const sqe = try self.get_sqe();
 705    sqe.prep_send_zc_fixed(fd, buffer, send_flags, zc_flags, buf_index);
 706    sqe.user_data = user_data;
 707    return sqe;
 708}
 709
 710/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
 711/// Returns a pointer to the SQE.
 712/// Available since 5.3
 713pub fn recvmsg(
 714    self: *IoUring,
 715    user_data: u64,
 716    fd: linux.fd_t,
 717    msg: *linux.msghdr,
 718    flags: u32,
 719) !*linux.io_uring_sqe {
 720    const sqe = try self.get_sqe();
 721    sqe.prep_recvmsg(fd, msg, flags);
 722    sqe.user_data = user_data;
 723    return sqe;
 724}
 725
 726/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`.
 727/// Returns a pointer to the SQE.
 728/// Available since 5.3
 729pub fn sendmsg(
 730    self: *IoUring,
 731    user_data: u64,
 732    fd: linux.fd_t,
 733    msg: *const linux.msghdr_const,
 734    flags: u32,
 735) !*linux.io_uring_sqe {
 736    const sqe = try self.get_sqe();
 737    sqe.prep_sendmsg(fd, msg, flags);
 738    sqe.user_data = user_data;
 739    return sqe;
 740}
 741
 742/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
 743/// Returns a pointer to the SQE.
 744/// Available since 6.1
 745pub fn sendmsg_zc(
 746    self: *IoUring,
 747    user_data: u64,
 748    fd: linux.fd_t,
 749    msg: *const linux.msghdr_const,
 750    flags: u32,
 751) !*linux.io_uring_sqe {
 752    const sqe = try self.get_sqe();
 753    sqe.prep_sendmsg_zc(fd, msg, flags);
 754    sqe.user_data = user_data;
 755    return sqe;
 756}
 757
 758/// Queues (but does not submit) an SQE to perform an `openat(2)`.
 759/// Returns a pointer to the SQE.
 760/// Available since 5.6.
 761pub fn openat(
 762    self: *IoUring,
 763    user_data: u64,
 764    fd: linux.fd_t,
 765    path: [*:0]const u8,
 766    flags: linux.O,
 767    mode: posix.mode_t,
 768) !*linux.io_uring_sqe {
 769    const sqe = try self.get_sqe();
 770    sqe.prep_openat(fd, path, flags, mode);
 771    sqe.user_data = user_data;
 772    return sqe;
 773}
 774
 775/// Queues an openat using direct (registered) file descriptors.
 776///
 777/// To use an accept direct variant, the application must first have registered
 778/// a file table (with register_files). An unused table index will be
 779/// dynamically chosen and returned in the CQE res field.
 780///
 781/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
 782/// flags member, and setting the SQE fd field to the direct descriptor value
 783/// rather than the regular file descriptor.
 784///
 785/// Available since 5.15
 786pub fn openat_direct(
 787    self: *IoUring,
 788    user_data: u64,
 789    fd: linux.fd_t,
 790    path: [*:0]const u8,
 791    flags: linux.O,
 792    mode: posix.mode_t,
 793    file_index: u32,
 794) !*linux.io_uring_sqe {
 795    const sqe = try self.get_sqe();
 796    sqe.prep_openat_direct(fd, path, flags, mode, file_index);
 797    sqe.user_data = user_data;
 798    return sqe;
 799}
 800
 801/// Queues (but does not submit) an SQE to perform a `close(2)`.
 802/// Returns a pointer to the SQE.
 803/// Available since 5.6.
 804pub fn close(self: *IoUring, user_data: u64, fd: linux.fd_t) !*linux.io_uring_sqe {
 805    const sqe = try self.get_sqe();
 806    sqe.prep_close(fd);
 807    sqe.user_data = user_data;
 808    return sqe;
 809}
 810
 811/// Queues close of registered file descriptor.
 812/// Available since 5.15
 813pub fn close_direct(self: *IoUring, user_data: u64, file_index: u32) !*linux.io_uring_sqe {
 814    const sqe = try self.get_sqe();
 815    sqe.prep_close_direct(file_index);
 816    sqe.user_data = user_data;
 817    return sqe;
 818}
 819
 820/// Queues (but does not submit) an SQE to register a timeout operation.
 821/// Returns a pointer to the SQE.
 822///
 823/// The timeout will complete when either the timeout expires, or after the specified number of
 824/// events complete (if `count` is greater than `0`).
 825///
 826/// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout.
 827///
 828/// The completion event result will be `-ETIME` if the timeout completed through expiration,
 829/// `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the
 830/// timeout was removed before it expired.
 831///
 832/// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
 833pub fn timeout(
 834    self: *IoUring,
 835    user_data: u64,
 836    ts: *const linux.kernel_timespec,
 837    count: u32,
 838    flags: u32,
 839) !*linux.io_uring_sqe {
 840    const sqe = try self.get_sqe();
 841    sqe.prep_timeout(ts, count, flags);
 842    sqe.user_data = user_data;
 843    return sqe;
 844}
 845
 846/// Queues (but does not submit) an SQE to remove an existing timeout operation.
 847/// Returns a pointer to the SQE.
 848///
 849/// The timeout is identified by its `user_data`.
 850///
 851/// The completion event result will be `0` if the timeout was found and canceled successfully,
 852/// `-EBUSY` if the timeout was found but expiration was already in progress, or
 853/// `-ENOENT` if the timeout was not found.
 854pub fn timeout_remove(
 855    self: *IoUring,
 856    user_data: u64,
 857    timeout_user_data: u64,
 858    flags: u32,
 859) !*linux.io_uring_sqe {
 860    const sqe = try self.get_sqe();
 861    sqe.prep_timeout_remove(timeout_user_data, flags);
 862    sqe.user_data = user_data;
 863    return sqe;
 864}
 865
 866/// Queues (but does not submit) an SQE to add a link timeout operation.
 867/// Returns a pointer to the SQE.
 868///
 869/// You need to set linux.IOSQE_IO_LINK to flags of the target operation
 870/// and then call this method right after the target operation.
 871/// See https://lwn.net/Articles/803932/ for detail.
 872///
 873/// If the dependent request finishes before the linked timeout, the timeout
 874/// is canceled. If the timeout finishes before the dependent request, the
 875/// dependent request will be canceled.
 876///
 877/// The completion event result of the link_timeout will be
 878/// `-ETIME` if the timeout finishes before the dependent request
 879/// (in this case, the completion event result of the dependent request will
 880/// be `-ECANCELED`), or
 881/// `-EALREADY` if the dependent request finishes before the linked timeout.
 882pub fn link_timeout(
 883    self: *IoUring,
 884    user_data: u64,
 885    ts: *const linux.kernel_timespec,
 886    flags: u32,
 887) !*linux.io_uring_sqe {
 888    const sqe = try self.get_sqe();
 889    sqe.prep_link_timeout(ts, flags);
 890    sqe.user_data = user_data;
 891    return sqe;
 892}
 893
 894/// Queues (but does not submit) an SQE to perform a `poll(2)`.
 895/// Returns a pointer to the SQE.
 896pub fn poll_add(
 897    self: *IoUring,
 898    user_data: u64,
 899    fd: linux.fd_t,
 900    poll_mask: u32,
 901) !*linux.io_uring_sqe {
 902    const sqe = try self.get_sqe();
 903    sqe.prep_poll_add(fd, poll_mask);
 904    sqe.user_data = user_data;
 905    return sqe;
 906}
 907
 908/// Queues (but does not submit) an SQE to remove an existing poll operation.
 909/// Returns a pointer to the SQE.
 910pub fn poll_remove(
 911    self: *IoUring,
 912    user_data: u64,
 913    target_user_data: u64,
 914) !*linux.io_uring_sqe {
 915    const sqe = try self.get_sqe();
 916    sqe.prep_poll_remove(target_user_data);
 917    sqe.user_data = user_data;
 918    return sqe;
 919}
 920
 921/// Queues (but does not submit) an SQE to update the user data of an existing poll
 922/// operation. Returns a pointer to the SQE.
 923pub fn poll_update(
 924    self: *IoUring,
 925    user_data: u64,
 926    old_user_data: u64,
 927    new_user_data: u64,
 928    poll_mask: u32,
 929    flags: u32,
 930) !*linux.io_uring_sqe {
 931    const sqe = try self.get_sqe();
 932    sqe.prep_poll_update(old_user_data, new_user_data, poll_mask, flags);
 933    sqe.user_data = user_data;
 934    return sqe;
 935}
 936
 937/// Queues (but does not submit) an SQE to perform an `fallocate(2)`.
 938/// Returns a pointer to the SQE.
 939pub fn fallocate(
 940    self: *IoUring,
 941    user_data: u64,
 942    fd: linux.fd_t,
 943    mode: i32,
 944    offset: u64,
 945    len: u64,
 946) !*linux.io_uring_sqe {
 947    const sqe = try self.get_sqe();
 948    sqe.prep_fallocate(fd, mode, offset, len);
 949    sqe.user_data = user_data;
 950    return sqe;
 951}
 952
 953/// Queues (but does not submit) an SQE to perform an `statx(2)`.
 954/// Returns a pointer to the SQE.
 955pub fn statx(
 956    self: *IoUring,
 957    user_data: u64,
 958    fd: linux.fd_t,
 959    path: [:0]const u8,
 960    flags: u32,
 961    mask: u32,
 962    buf: *linux.Statx,
 963) !*linux.io_uring_sqe {
 964    const sqe = try self.get_sqe();
 965    sqe.prep_statx(fd, path, flags, mask, buf);
 966    sqe.user_data = user_data;
 967    return sqe;
 968}
 969
 970/// Queues (but does not submit) an SQE to remove an existing operation.
 971/// Returns a pointer to the SQE.
 972///
 973/// The operation is identified by its `user_data`.
 974///
 975/// The completion event result will be `0` if the operation was found and canceled successfully,
 976/// `-EALREADY` if the operation was found but was already in progress, or
 977/// `-ENOENT` if the operation was not found.
 978pub fn cancel(
 979    self: *IoUring,
 980    user_data: u64,
 981    cancel_user_data: u64,
 982    flags: u32,
 983) !*linux.io_uring_sqe {
 984    const sqe = try self.get_sqe();
 985    sqe.prep_cancel(cancel_user_data, flags);
 986    sqe.user_data = user_data;
 987    return sqe;
 988}
 989
 990/// Queues (but does not submit) an SQE to perform a `shutdown(2)`.
 991/// Returns a pointer to the SQE.
 992///
 993/// The operation is identified by its `user_data`.
 994pub fn shutdown(
 995    self: *IoUring,
 996    user_data: u64,
 997    sockfd: posix.socket_t,
 998    how: u32,
 999) !*linux.io_uring_sqe {
1000    const sqe = try self.get_sqe();
1001    sqe.prep_shutdown(sockfd, how);
1002    sqe.user_data = user_data;
1003    return sqe;
1004}
1005
1006/// Queues (but does not submit) an SQE to perform a `renameat2(2)`.
1007/// Returns a pointer to the SQE.
1008pub fn renameat(
1009    self: *IoUring,
1010    user_data: u64,
1011    old_dir_fd: linux.fd_t,
1012    old_path: [*:0]const u8,
1013    new_dir_fd: linux.fd_t,
1014    new_path: [*:0]const u8,
1015    flags: u32,
1016) !*linux.io_uring_sqe {
1017    const sqe = try self.get_sqe();
1018    sqe.prep_renameat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
1019    sqe.user_data = user_data;
1020    return sqe;
1021}
1022
1023/// Queues (but does not submit) an SQE to perform a `unlinkat(2)`.
1024/// Returns a pointer to the SQE.
1025pub fn unlinkat(
1026    self: *IoUring,
1027    user_data: u64,
1028    dir_fd: linux.fd_t,
1029    path: [*:0]const u8,
1030    flags: u32,
1031) !*linux.io_uring_sqe {
1032    const sqe = try self.get_sqe();
1033    sqe.prep_unlinkat(dir_fd, path, flags);
1034    sqe.user_data = user_data;
1035    return sqe;
1036}
1037
1038/// Queues (but does not submit) an SQE to perform a `mkdirat(2)`.
1039/// Returns a pointer to the SQE.
1040pub fn mkdirat(
1041    self: *IoUring,
1042    user_data: u64,
1043    dir_fd: linux.fd_t,
1044    path: [*:0]const u8,
1045    mode: posix.mode_t,
1046) !*linux.io_uring_sqe {
1047    const sqe = try self.get_sqe();
1048    sqe.prep_mkdirat(dir_fd, path, mode);
1049    sqe.user_data = user_data;
1050    return sqe;
1051}
1052
1053/// Queues (but does not submit) an SQE to perform a `symlinkat(2)`.
1054/// Returns a pointer to the SQE.
1055pub fn symlinkat(
1056    self: *IoUring,
1057    user_data: u64,
1058    target: [*:0]const u8,
1059    new_dir_fd: linux.fd_t,
1060    link_path: [*:0]const u8,
1061) !*linux.io_uring_sqe {
1062    const sqe = try self.get_sqe();
1063    sqe.prep_symlinkat(target, new_dir_fd, link_path);
1064    sqe.user_data = user_data;
1065    return sqe;
1066}
1067
1068/// Queues (but does not submit) an SQE to perform a `linkat(2)`.
1069/// Returns a pointer to the SQE.
1070pub fn linkat(
1071    self: *IoUring,
1072    user_data: u64,
1073    old_dir_fd: linux.fd_t,
1074    old_path: [*:0]const u8,
1075    new_dir_fd: linux.fd_t,
1076    new_path: [*:0]const u8,
1077    flags: u32,
1078) !*linux.io_uring_sqe {
1079    const sqe = try self.get_sqe();
1080    sqe.prep_linkat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
1081    sqe.user_data = user_data;
1082    return sqe;
1083}
1084
1085/// Queues (but does not submit) an SQE to provide a group of buffers used for commands that read/receive data.
1086/// Returns a pointer to the SQE.
1087///
1088/// Provided buffers can be used in `read`, `recv` or `recvmsg` commands via .buffer_selection.
1089///
1090/// The kernel expects a contiguous block of memory of size (buffers_count * buffer_size).
1091pub fn provide_buffers(
1092    self: *IoUring,
1093    user_data: u64,
1094    buffers: [*]u8,
1095    buffer_size: usize,
1096    buffers_count: usize,
1097    group_id: usize,
1098    buffer_id: usize,
1099) !*linux.io_uring_sqe {
1100    const sqe = try self.get_sqe();
1101    sqe.prep_provide_buffers(buffers, buffer_size, buffers_count, group_id, buffer_id);
1102    sqe.user_data = user_data;
1103    return sqe;
1104}
1105
1106/// Queues (but does not submit) an SQE to remove a group of provided buffers.
1107/// Returns a pointer to the SQE.
1108pub fn remove_buffers(
1109    self: *IoUring,
1110    user_data: u64,
1111    buffers_count: usize,
1112    group_id: usize,
1113) !*linux.io_uring_sqe {
1114    const sqe = try self.get_sqe();
1115    sqe.prep_remove_buffers(buffers_count, group_id);
1116    sqe.user_data = user_data;
1117    return sqe;
1118}
1119
1120/// Queues (but does not submit) an SQE to perform a `waitid(2)`.
1121/// Returns a pointer to the SQE.
1122pub fn waitid(
1123    self: *IoUring,
1124    user_data: u64,
1125    id_type: linux.P,
1126    id: i32,
1127    infop: *linux.siginfo_t,
1128    options: u32,
1129    flags: u32,
1130) !*linux.io_uring_sqe {
1131    const sqe = try self.get_sqe();
1132    sqe.prep_waitid(id_type, id, infop, options, flags);
1133    sqe.user_data = user_data;
1134    return sqe;
1135}
1136
1137/// Registers an array of file descriptors.
1138/// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must
1139/// retrieve a reference to the file, and once I/O has completed the file reference must be
1140/// dropped. The atomic nature of this file reference can be a slowdown for high IOPS workloads.
1141/// This slowdown can be avoided by pre-registering file descriptors.
1142/// To refer to a registered file descriptor, IOSQE_FIXED_FILE must be set in the SQE's flags,
1143/// and the SQE's fd must be set to the index of the file descriptor in the registered array.
1144/// Registering file descriptors will wait for the ring to idle.
1145/// Files are automatically unregistered by the kernel when the ring is torn down.
1146/// An application need unregister only if it wants to register a new array of file descriptors.
1147pub fn register_files(self: *IoUring, fds: []const linux.fd_t) !void {
1148    assert(self.fd >= 0);
1149    const res = linux.io_uring_register(
1150        self.fd,
1151        .REGISTER_FILES,
1152        @as(*const anyopaque, @ptrCast(fds.ptr)),
1153        @as(u32, @intCast(fds.len)),
1154    );
1155    try handle_registration_result(res);
1156}
1157
1158/// Updates registered file descriptors.
1159///
1160/// Updates are applied starting at the provided offset in the original file descriptors slice.
1161/// There are three kind of updates:
1162/// * turning a sparse entry (where the fd is -1) into a real one
1163/// * removing an existing entry (set the fd to -1)
1164/// * replacing an existing entry with a new fd
1165/// Adding new file descriptors must be done with `register_files`.
1166pub fn register_files_update(self: *IoUring, offset: u32, fds: []const linux.fd_t) !void {
1167    assert(self.fd >= 0);
1168
1169    const FilesUpdate = extern struct {
1170        offset: u32,
1171        resv: u32,
1172        fds: u64 align(8),
1173    };
1174    var update = FilesUpdate{
1175        .offset = offset,
1176        .resv = @as(u32, 0),
1177        .fds = @as(u64, @intFromPtr(fds.ptr)),
1178    };
1179
1180    const res = linux.io_uring_register(
1181        self.fd,
1182        .REGISTER_FILES_UPDATE,
1183        @as(*const anyopaque, @ptrCast(&update)),
1184        @as(u32, @intCast(fds.len)),
1185    );
1186    try handle_registration_result(res);
1187}
1188
1189/// Registers an empty (-1) file table of `nr_files` number of file descriptors.
1190pub fn register_files_sparse(self: *IoUring, nr_files: u32) !void {
1191    assert(self.fd >= 0);
1192
1193    const reg = &linux.io_uring_rsrc_register{
1194        .nr = nr_files,
1195        .flags = linux.IORING_RSRC_REGISTER_SPARSE,
1196        .resv2 = 0,
1197        .data = 0,
1198        .tags = 0,
1199    };
1200
1201    const res = linux.io_uring_register(
1202        self.fd,
1203        .REGISTER_FILES2,
1204        @ptrCast(reg),
1205        @as(u32, @sizeOf(linux.io_uring_rsrc_register)),
1206    );
1207
1208    return handle_registration_result(res);
1209}
1210
1211// Registers range for fixed file allocations.
1212// Available since 6.0
1213pub fn register_file_alloc_range(self: *IoUring, offset: u32, len: u32) !void {
1214    assert(self.fd >= 0);
1215
1216    const range = &linux.io_uring_file_index_range{
1217        .off = offset,
1218        .len = len,
1219        .resv = 0,
1220    };
1221
1222    const res = linux.io_uring_register(
1223        self.fd,
1224        .REGISTER_FILE_ALLOC_RANGE,
1225        @ptrCast(range),
1226        @as(u32, @sizeOf(linux.io_uring_file_index_range)),
1227    );
1228
1229    return handle_registration_result(res);
1230}
1231
1232/// Registers the file descriptor for an eventfd that will be notified of completion events on
1233///  an io_uring instance.
1234/// Only a single a eventfd can be registered at any given point in time.
1235pub fn register_eventfd(self: *IoUring, fd: linux.fd_t) !void {
1236    assert(self.fd >= 0);
1237    const res = linux.io_uring_register(
1238        self.fd,
1239        .REGISTER_EVENTFD,
1240        @as(*const anyopaque, @ptrCast(&fd)),
1241        1,
1242    );
1243    try handle_registration_result(res);
1244}
1245
1246/// Registers the file descriptor for an eventfd that will be notified of completion events on
1247/// an io_uring instance. Notifications are only posted for events that complete in an async manner.
1248/// This means that events that complete inline while being submitted do not trigger a notification event.
1249/// Only a single eventfd can be registered at any given point in time.
1250pub fn register_eventfd_async(self: *IoUring, fd: linux.fd_t) !void {
1251    assert(self.fd >= 0);
1252    const res = linux.io_uring_register(
1253        self.fd,
1254        .REGISTER_EVENTFD_ASYNC,
1255        @as(*const anyopaque, @ptrCast(&fd)),
1256        1,
1257    );
1258    try handle_registration_result(res);
1259}
1260
1261/// Unregister the registered eventfd file descriptor.
1262pub fn unregister_eventfd(self: *IoUring) !void {
1263    assert(self.fd >= 0);
1264    const res = linux.io_uring_register(
1265        self.fd,
1266        .UNREGISTER_EVENTFD,
1267        null,
1268        0,
1269    );
1270    try handle_registration_result(res);
1271}
1272
1273pub fn register_napi(self: *IoUring, napi: *linux.io_uring_napi) !void {
1274    assert(self.fd >= 0);
1275    const res = linux.io_uring_register(self.fd, .REGISTER_NAPI, napi, 1);
1276    try handle_registration_result(res);
1277}
1278
1279pub fn unregister_napi(self: *IoUring, napi: *linux.io_uring_napi) !void {
1280    assert(self.fd >= 0);
1281    const res = linux.io_uring_register(self.fd, .UNREGISTER_NAPI, napi, 1);
1282    try handle_registration_result(res);
1283}
1284
1285/// Registers an array of buffers for use with `read_fixed` and `write_fixed`.
1286pub fn register_buffers(self: *IoUring, buffers: []const posix.iovec) !void {
1287    assert(self.fd >= 0);
1288    const res = linux.io_uring_register(
1289        self.fd,
1290        .REGISTER_BUFFERS,
1291        buffers.ptr,
1292        @as(u32, @intCast(buffers.len)),
1293    );
1294    try handle_registration_result(res);
1295}
1296
1297/// Unregister the registered buffers.
1298pub fn unregister_buffers(self: *IoUring) !void {
1299    assert(self.fd >= 0);
1300    const res = linux.io_uring_register(self.fd, .UNREGISTER_BUFFERS, null, 0);
1301    switch (linux.errno(res)) {
1302        .SUCCESS => {},
1303        .NXIO => return error.BuffersNotRegistered,
1304        else => |errno| return posix.unexpectedErrno(errno),
1305    }
1306}
1307
1308/// Returns a io_uring_probe which is used to probe the capabilities of the
1309/// io_uring subsystem of the running kernel. The io_uring_probe contains the
1310/// list of supported operations.
1311pub fn get_probe(self: *IoUring) !linux.io_uring_probe {
1312    var probe = mem.zeroInit(linux.io_uring_probe, .{});
1313    const res = linux.io_uring_register(self.fd, .REGISTER_PROBE, &probe, probe.ops.len);
1314    try handle_register_buf_ring_result(res);
1315    return probe;
1316}
1317
1318fn handle_registration_result(res: usize) !void {
1319    switch (linux.errno(res)) {
1320        .SUCCESS => {},
1321        // One or more fds in the array are invalid, or the kernel does not support sparse sets:
1322        .BADF => return error.FileDescriptorInvalid,
1323        .BUSY => return error.FilesAlreadyRegistered,
1324        .INVAL => return error.FilesEmpty,
1325        // Adding `nr_args` file references would exceed the maximum allowed number of files the
1326        // user is allowed to have according to the per-user RLIMIT_NOFILE resource limit and
1327        // the CAP_SYS_RESOURCE capability is not set, or `nr_args` exceeds the maximum allowed
1328        // for a fixed file set (older kernels have a limit of 1024 files vs 64K files):
1329        .MFILE => return error.UserFdQuotaExceeded,
1330        // Insufficient kernel resources, or the caller had a non-zero RLIMIT_MEMLOCK soft
1331        // resource limit but tried to lock more memory than the limit permitted (not enforced
1332        // when the process is privileged with CAP_IPC_LOCK):
1333        .NOMEM => return error.SystemResources,
1334        // Attempt to register files on a ring already registering files or being torn down:
1335        .NXIO => return error.RingShuttingDownOrAlreadyRegisteringFiles,
1336        else => |errno| return posix.unexpectedErrno(errno),
1337    }
1338}
1339
1340/// Unregisters all registered file descriptors previously associated with the ring.
1341pub fn unregister_files(self: *IoUring) !void {
1342    assert(self.fd >= 0);
1343    const res = linux.io_uring_register(self.fd, .UNREGISTER_FILES, null, 0);
1344    switch (linux.errno(res)) {
1345        .SUCCESS => {},
1346        .NXIO => return error.FilesNotRegistered,
1347        else => |errno| return posix.unexpectedErrno(errno),
1348    }
1349}
1350
1351/// Prepares a socket creation request.
1352/// New socket fd will be returned in completion result.
1353/// Available since 5.19
1354pub fn socket(
1355    self: *IoUring,
1356    user_data: u64,
1357    domain: u32,
1358    socket_type: u32,
1359    protocol: u32,
1360    flags: u32,
1361) !*linux.io_uring_sqe {
1362    const sqe = try self.get_sqe();
1363    sqe.prep_socket(domain, socket_type, protocol, flags);
1364    sqe.user_data = user_data;
1365    return sqe;
1366}
1367
1368/// Prepares a socket creation request for registered file at index `file_index`.
1369/// Available since 5.19
1370pub fn socket_direct(
1371    self: *IoUring,
1372    user_data: u64,
1373    domain: u32,
1374    socket_type: u32,
1375    protocol: u32,
1376    flags: u32,
1377    file_index: u32,
1378) !*linux.io_uring_sqe {
1379    const sqe = try self.get_sqe();
1380    sqe.prep_socket_direct(domain, socket_type, protocol, flags, file_index);
1381    sqe.user_data = user_data;
1382    return sqe;
1383}
1384
1385/// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc).
1386/// File index will be returned in CQE res field.
1387/// Available since 5.19
1388pub fn socket_direct_alloc(
1389    self: *IoUring,
1390    user_data: u64,
1391    domain: u32,
1392    socket_type: u32,
1393    protocol: u32,
1394    flags: u32,
1395) !*linux.io_uring_sqe {
1396    const sqe = try self.get_sqe();
1397    sqe.prep_socket_direct_alloc(domain, socket_type, protocol, flags);
1398    sqe.user_data = user_data;
1399    return sqe;
1400}
1401
1402/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket.
1403/// Returns a pointer to the SQE.
1404/// Available since 6.11
1405pub fn bind(
1406    self: *IoUring,
1407    user_data: u64,
1408    fd: linux.fd_t,
1409    addr: *const posix.sockaddr,
1410    addrlen: posix.socklen_t,
1411    flags: u32,
1412) !*linux.io_uring_sqe {
1413    const sqe = try self.get_sqe();
1414    sqe.prep_bind(fd, addr, addrlen, flags);
1415    sqe.user_data = user_data;
1416    return sqe;
1417}
1418
1419/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket.
1420/// Returns a pointer to the SQE.
1421/// Available since 6.11
1422pub fn listen(
1423    self: *IoUring,
1424    user_data: u64,
1425    fd: linux.fd_t,
1426    backlog: usize,
1427    flags: u32,
1428) !*linux.io_uring_sqe {
1429    const sqe = try self.get_sqe();
1430    sqe.prep_listen(fd, backlog, flags);
1431    sqe.user_data = user_data;
1432    return sqe;
1433}
1434
1435/// Prepares an cmd request for a socket.
1436/// See: https://man7.org/linux/man-pages/man3/io_uring_prep_cmd.3.html
1437/// Available since 6.7.
1438pub fn cmd_sock(
1439    self: *IoUring,
1440    user_data: u64,
1441    cmd_op: linux.IO_URING_SOCKET_OP,
1442    fd: linux.fd_t,
1443    level: u32, // linux.SOL
1444    optname: u32, // linux.SO
1445    optval: u64, // pointer to the option value
1446    optlen: u32, // size of the option value
1447) !*linux.io_uring_sqe {
1448    const sqe = try self.get_sqe();
1449    sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen);
1450    sqe.user_data = user_data;
1451    return sqe;
1452}
1453
1454/// Prepares set socket option for the optname argument, at the protocol
1455/// level specified by the level argument.
1456/// Available since 6.7.n
1457pub fn setsockopt(
1458    self: *IoUring,
1459    user_data: u64,
1460    fd: linux.fd_t,
1461    level: u32, // linux.SOL
1462    optname: u32, // linux.SO
1463    opt: []const u8,
1464) !*linux.io_uring_sqe {
1465    return try self.cmd_sock(
1466        user_data,
1467        .SETSOCKOPT,
1468        fd,
1469        level,
1470        optname,
1471        @intFromPtr(opt.ptr),
1472        @intCast(opt.len),
1473    );
1474}
1475
1476/// Prepares get socket option to retrieve the value for the option specified by
1477/// the option_name argument for the socket specified by the fd argument.
1478/// Available since 6.7.
1479pub fn getsockopt(
1480    self: *IoUring,
1481    user_data: u64,
1482    fd: linux.fd_t,
1483    level: u32, // linux.SOL
1484    optname: u32, // linux.SO
1485    opt: []u8,
1486) !*linux.io_uring_sqe {
1487    return try self.cmd_sock(
1488        user_data,
1489        .GETSOCKOPT,
1490        fd,
1491        level,
1492        optname,
1493        @intFromPtr(opt.ptr),
1494        @intCast(opt.len),
1495    );
1496}
1497
1498pub const SubmissionQueue = struct {
1499    head: *u32,
1500    tail: *u32,
1501    mask: u32,
1502    flags: *u32,
1503    dropped: *u32,
1504    array: []u32,
1505    sqes: []linux.io_uring_sqe,
1506    mmap: []align(page_size_min) u8,
1507    mmap_sqes: []align(page_size_min) u8,
1508
1509    // We use `sqe_head` and `sqe_tail` in the same way as liburing:
1510    // We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`.
1511    // We then set `tail` to `sqe_tail` once, only when these events are actually submitted.
1512    // This allows us to amortize the cost of the @atomicStore to `tail` across multiple SQEs.
1513    sqe_head: u32 = 0,
1514    sqe_tail: u32 = 0,
1515
1516    pub fn init(fd: linux.fd_t, p: linux.io_uring_params) !SubmissionQueue {
1517        assert(fd >= 0);
1518        assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
1519        const size = @max(
1520            p.sq_off.array + p.sq_entries * @sizeOf(u32),
1521            p.cq_off.cqes + p.cq_entries * @sizeOf(linux.io_uring_cqe),
1522        );
1523        const mmap = try posix.mmap(
1524            null,
1525            size,
1526            posix.PROT.READ | posix.PROT.WRITE,
1527            .{ .TYPE = .SHARED, .POPULATE = true },
1528            fd,
1529            linux.IORING_OFF_SQ_RING,
1530        );
1531        errdefer posix.munmap(mmap);
1532        assert(mmap.len == size);
1533
1534        // The motivation for the `sqes` and `array` indirection is to make it possible for the
1535        // application to preallocate static linux.io_uring_sqe entries and then replay them when needed.
1536        const size_sqes = p.sq_entries * @sizeOf(linux.io_uring_sqe);
1537        const mmap_sqes = try posix.mmap(
1538            null,
1539            size_sqes,
1540            posix.PROT.READ | posix.PROT.WRITE,
1541            .{ .TYPE = .SHARED, .POPULATE = true },
1542            fd,
1543            linux.IORING_OFF_SQES,
1544        );
1545        errdefer posix.munmap(mmap_sqes);
1546        assert(mmap_sqes.len == size_sqes);
1547
1548        const array: [*]u32 = @ptrCast(@alignCast(&mmap[p.sq_off.array]));
1549        const sqes: [*]linux.io_uring_sqe = @ptrCast(@alignCast(&mmap_sqes[0]));
1550        // We expect the kernel copies p.sq_entries to the u32 pointed to by p.sq_off.ring_entries,
1551        // see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L7843-L7844.
1552        assert(p.sq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_entries]))).*);
1553        return SubmissionQueue{
1554            .head = @ptrCast(@alignCast(&mmap[p.sq_off.head])),
1555            .tail = @ptrCast(@alignCast(&mmap[p.sq_off.tail])),
1556            .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_mask]))).*,
1557            .flags = @ptrCast(@alignCast(&mmap[p.sq_off.flags])),
1558            .dropped = @ptrCast(@alignCast(&mmap[p.sq_off.dropped])),
1559            .array = array[0..p.sq_entries],
1560            .sqes = sqes[0..p.sq_entries],
1561            .mmap = mmap,
1562            .mmap_sqes = mmap_sqes,
1563        };
1564    }
1565
1566    pub fn deinit(self: *SubmissionQueue) void {
1567        posix.munmap(self.mmap_sqes);
1568        posix.munmap(self.mmap);
1569    }
1570};
1571
1572pub const CompletionQueue = struct {
1573    head: *u32,
1574    tail: *u32,
1575    mask: u32,
1576    overflow: *u32,
1577    cqes: []linux.io_uring_cqe,
1578
1579    pub fn init(fd: linux.fd_t, p: linux.io_uring_params, sq: SubmissionQueue) !CompletionQueue {
1580        assert(fd >= 0);
1581        assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
1582        const mmap = sq.mmap;
1583        const cqes: [*]linux.io_uring_cqe = @ptrCast(@alignCast(&mmap[p.cq_off.cqes]));
1584        assert(p.cq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_entries]))).*);
1585        return CompletionQueue{
1586            .head = @ptrCast(@alignCast(&mmap[p.cq_off.head])),
1587            .tail = @ptrCast(@alignCast(&mmap[p.cq_off.tail])),
1588            .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_mask]))).*,
1589            .overflow = @ptrCast(@alignCast(&mmap[p.cq_off.overflow])),
1590            .cqes = cqes[0..p.cq_entries],
1591        };
1592    }
1593
1594    pub fn deinit(self: *CompletionQueue) void {
1595        _ = self;
1596        // A no-op since we now share the mmap with the submission queue.
1597        // Here for symmetry with the submission queue, and for any future feature support.
1598    }
1599};
1600
1601/// Group of application provided buffers. Uses newer type, called ring mapped
1602/// buffers, supported since kernel 5.19. Buffers are identified by a buffer
1603/// group ID, and within that group, a buffer ID. IO_Uring can have multiple
1604/// buffer groups, each with unique group ID.
1605///
1606/// In `init` application provides contiguous block of memory `buffers` for
1607/// `buffers_count` buffers of size `buffers_size`. Application can then submit
1608/// `recv` operation without providing buffer upfront. Once the operation is
1609/// ready to receive data, a buffer is picked automatically and the resulting
1610/// CQE will contain the buffer ID in `cqe.buffer_id()`. Use `get` method to get
1611/// buffer for buffer ID identified by CQE. Once the application has processed
1612/// the buffer, it may hand ownership back to the kernel, by calling `put`
1613/// allowing the cycle to repeat.
1614///
1615/// Depending on the rate of arrival of data, it is possible that a given buffer
1616/// group will run out of buffers before those in CQEs can be put back to the
1617/// kernel. If this happens, a `cqe.err()` will have ENOBUFS as the error value.
1618///
1619pub const BufferGroup = struct {
1620    /// Parent ring for which this group is registered.
1621    ring: *IoUring,
1622    /// Pointer to the memory shared by the kernel.
1623    /// `buffers_count` of `io_uring_buf` structures are shared by the kernel.
1624    /// First `io_uring_buf` is overlaid by `io_uring_buf_ring` struct.
1625    br: *align(page_size_min) linux.io_uring_buf_ring,
1626    /// Contiguous block of memory of size (buffers_count * buffer_size).
1627    buffers: []u8,
1628    /// Size of each buffer in buffers.
1629    buffer_size: u32,
1630    /// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
1631    buffers_count: u16,
1632    /// Head of unconsumed part of each buffer, if incremental consumption is enabled
1633    heads: []u32,
1634    /// ID of this group, must be unique in ring.
1635    group_id: u16,
1636
1637    pub fn init(
1638        ring: *IoUring,
1639        allocator: mem.Allocator,
1640        group_id: u16,
1641        buffer_size: u32,
1642        buffers_count: u16,
1643    ) !BufferGroup {
1644        const buffers = try allocator.alloc(u8, buffer_size * buffers_count);
1645        errdefer allocator.free(buffers);
1646        const heads = try allocator.alloc(u32, buffers_count);
1647        errdefer allocator.free(heads);
1648
1649        const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .inc = true });
1650        buf_ring_init(br);
1651
1652        const mask = buf_ring_mask(buffers_count);
1653        var i: u16 = 0;
1654        while (i < buffers_count) : (i += 1) {
1655            const pos = buffer_size * i;
1656            const buf = buffers[pos .. pos + buffer_size];
1657            heads[i] = 0;
1658            buf_ring_add(br, buf, i, mask, i);
1659        }
1660        buf_ring_advance(br, buffers_count);
1661
1662        return BufferGroup{
1663            .ring = ring,
1664            .group_id = group_id,
1665            .br = br,
1666            .buffers = buffers,
1667            .heads = heads,
1668            .buffer_size = buffer_size,
1669            .buffers_count = buffers_count,
1670        };
1671    }
1672
1673    pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void {
1674        free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
1675        allocator.free(self.buffers);
1676        allocator.free(self.heads);
1677    }
1678
1679    // Prepare recv operation which will select buffer from this group.
1680    pub fn recv(self: *BufferGroup, user_data: u64, fd: linux.fd_t, flags: u32) !*linux.io_uring_sqe {
1681        var sqe = try self.ring.get_sqe();
1682        sqe.prep_rw(.RECV, fd, 0, 0, 0);
1683        sqe.rw_flags = flags;
1684        sqe.flags |= linux.IOSQE_BUFFER_SELECT;
1685        sqe.buf_index = self.group_id;
1686        sqe.user_data = user_data;
1687        return sqe;
1688    }
1689
1690    // Prepare multishot recv operation which will select buffer from this group.
1691    pub fn recv_multishot(self: *BufferGroup, user_data: u64, fd: linux.fd_t, flags: u32) !*linux.io_uring_sqe {
1692        var sqe = try self.recv(user_data, fd, flags);
1693        sqe.ioprio |= linux.IORING_RECV_MULTISHOT;
1694        return sqe;
1695    }
1696
1697    // Get buffer by id.
1698    fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 {
1699        const pos = self.buffer_size * buffer_id;
1700        return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..];
1701    }
1702
1703    // Get buffer by CQE.
1704    pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
1705        const buffer_id = try cqe.buffer_id();
1706        const used_len = @as(usize, @intCast(cqe.res));
1707        return self.get_by_id(buffer_id)[0..used_len];
1708    }
1709
1710    // Release buffer from CQE to the kernel.
1711    pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
1712        const buffer_id = try cqe.buffer_id();
1713        if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) {
1714            // Incremental consumption active, kernel will write to the this buffer again
1715            const used_len = @as(u32, @intCast(cqe.res));
1716            // Track what part of the buffer is used
1717            self.heads[buffer_id] += used_len;
1718            return;
1719        }
1720        self.heads[buffer_id] = 0;
1721
1722        // Release buffer to the kernel.    const mask = buf_ring_mask(self.buffers_count);
1723        const mask = buf_ring_mask(self.buffers_count);
1724        buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0);
1725        buf_ring_advance(self.br, 1);
1726    }
1727};
1728
1729/// Registers a shared buffer ring to be used with provided buffers.
1730/// `entries` number of `io_uring_buf` structures is mem mapped and shared by kernel.
1731/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered.
1732/// `entries` is the number of entries requested in the buffer ring, must be power of 2.
1733/// `group_id` is the chosen buffer group ID, unique in IO_Uring.
1734pub fn setup_buf_ring(
1735    fd: linux.fd_t,
1736    entries: u16,
1737    group_id: u16,
1738    flags: linux.io_uring_buf_reg.Flags,
1739) !*align(page_size_min) linux.io_uring_buf_ring {
1740    if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange;
1741    if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
1742
1743    const mmap_size = @as(usize, entries) * @sizeOf(linux.io_uring_buf);
1744    const mmap = try posix.mmap(
1745        null,
1746        mmap_size,
1747        posix.PROT.READ | posix.PROT.WRITE,
1748        .{ .TYPE = .PRIVATE, .ANONYMOUS = true },
1749        -1,
1750        0,
1751    );
1752    errdefer posix.munmap(mmap);
1753    assert(mmap.len == mmap_size);
1754
1755    const br: *align(page_size_min) linux.io_uring_buf_ring = @ptrCast(mmap.ptr);
1756    try register_buf_ring(fd, @intFromPtr(br), entries, group_id, flags);
1757    return br;
1758}
1759
1760fn register_buf_ring(
1761    fd: linux.fd_t,
1762    addr: u64,
1763    entries: u32,
1764    group_id: u16,
1765    flags: linux.io_uring_buf_reg.Flags,
1766) !void {
1767    var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
1768        .ring_addr = addr,
1769        .ring_entries = entries,
1770        .bgid = group_id,
1771        .flags = flags,
1772    });
1773    var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(&reg)), 1);
1774    if (linux.errno(res) == .INVAL and reg.flags.inc) {
1775        // Retry without incremental buffer consumption.
1776        // It is available since kernel 6.12. returns INVAL on older.
1777        reg.flags.inc = false;
1778        res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(&reg)), 1);
1779    }
1780    try handle_register_buf_ring_result(res);
1781}
1782
1783fn unregister_buf_ring(fd: linux.fd_t, group_id: u16) !void {
1784    var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
1785        .bgid = group_id,
1786    });
1787    const res = linux.io_uring_register(
1788        fd,
1789        .UNREGISTER_PBUF_RING,
1790        @as(*const anyopaque, @ptrCast(&reg)),
1791        1,
1792    );
1793    try handle_register_buf_ring_result(res);
1794}
1795
1796fn handle_register_buf_ring_result(res: usize) !void {
1797    switch (linux.errno(res)) {
1798        .SUCCESS => {},
1799        .INVAL => return error.ArgumentsInvalid,
1800        else => |errno| return posix.unexpectedErrno(errno),
1801    }
1802}
1803
1804// Unregisters a previously registered shared buffer ring, returned from io_uring_setup_buf_ring.
1805pub fn free_buf_ring(fd: linux.fd_t, br: *align(page_size_min) linux.io_uring_buf_ring, entries: u32, group_id: u16) void {
1806    unregister_buf_ring(fd, group_id) catch {};
1807    var mmap: []align(page_size_min) u8 = undefined;
1808    mmap.ptr = @ptrCast(br);
1809    mmap.len = entries * @sizeOf(linux.io_uring_buf);
1810    posix.munmap(mmap);
1811}
1812
1813/// Initialises `br` so that it is ready to be used.
1814pub fn buf_ring_init(br: *linux.io_uring_buf_ring) void {
1815    br.tail = 0;
1816}
1817
1818/// Calculates the appropriate size mask for a buffer ring.
1819/// `entries` is the ring entries as specified in io_uring_register_buf_ring.
1820pub fn buf_ring_mask(entries: u16) u16 {
1821    return entries - 1;
1822}
1823
1824/// Assigns `buffer` with the `br` buffer ring.
1825/// `buffer_id` is identifier which will be returned in the CQE.
1826/// `buffer_offset` is the offset to insert at from the current tail.
1827/// If just one buffer is provided before the ring tail is committed with advance then offset should be 0.
1828/// If buffers are provided in a loop before being committed, the offset must be incremented by one for each buffer added.
1829pub fn buf_ring_add(
1830    br: *linux.io_uring_buf_ring,
1831    buffer: []u8,
1832    buffer_id: u16,
1833    mask: u16,
1834    buffer_offset: u16,
1835) void {
1836    const bufs: [*]linux.io_uring_buf = @ptrCast(br);
1837    const buf: *linux.io_uring_buf = &bufs[(br.tail +% buffer_offset) & mask];
1838
1839    buf.addr = @intFromPtr(buffer.ptr);
1840    buf.len = @intCast(buffer.len);
1841    buf.bid = buffer_id;
1842}
1843
1844/// Make `count` new buffers visible to the kernel. Called after
1845/// `io_uring_buf_ring_add` has been called `count` times to fill in new buffers.
1846pub fn buf_ring_advance(br: *linux.io_uring_buf_ring, count: u16) void {
1847    const tail: u16 = br.tail +% count;
1848    @atomicStore(u16, &br.tail, tail, .release);
1849}
1850
1851test "structs/offsets/entries" {
1852    if (!is_linux) return error.SkipZigTest;
1853
1854    try testing.expectEqual(@as(usize, 120), @sizeOf(linux.io_uring_params));
1855    try testing.expectEqual(@as(usize, 64), @sizeOf(linux.io_uring_sqe));
1856    try testing.expectEqual(@as(usize, 16), @sizeOf(linux.io_uring_cqe));
1857
1858    try testing.expectEqual(0, linux.IORING_OFF_SQ_RING);
1859    try testing.expectEqual(0x8000000, linux.IORING_OFF_CQ_RING);
1860    try testing.expectEqual(0x10000000, linux.IORING_OFF_SQES);
1861
1862    try testing.expectError(error.EntriesZero, IoUring.init(0, 0));
1863    try testing.expectError(error.EntriesNotPowerOfTwo, IoUring.init(3, 0));
1864}
1865
1866test "nop" {
1867    if (!is_linux) return error.SkipZigTest;
1868
1869    var ring = IoUring.init(1, 0) catch |err| switch (err) {
1870        error.SystemOutdated => return error.SkipZigTest,
1871        error.PermissionDenied => return error.SkipZigTest,
1872        else => return err,
1873    };
1874    defer {
1875        ring.deinit();
1876        testing.expectEqual(@as(linux.fd_t, -1), ring.fd) catch @panic("test failed");
1877    }
1878
1879    const sqe = try ring.nop(0xaaaaaaaa);
1880    try testing.expectEqual(linux.io_uring_sqe{
1881        .opcode = .NOP,
1882        .flags = 0,
1883        .ioprio = 0,
1884        .fd = 0,
1885        .off = 0,
1886        .addr = 0,
1887        .len = 0,
1888        .rw_flags = 0,
1889        .user_data = 0xaaaaaaaa,
1890        .buf_index = 0,
1891        .personality = 0,
1892        .splice_fd_in = 0,
1893        .addr3 = 0,
1894        .resv = 0,
1895    }, sqe.*);
1896
1897    try testing.expectEqual(@as(u32, 0), ring.sq.sqe_head);
1898    try testing.expectEqual(@as(u32, 1), ring.sq.sqe_tail);
1899    try testing.expectEqual(@as(u32, 0), ring.sq.tail.*);
1900    try testing.expectEqual(@as(u32, 0), ring.cq.head.*);
1901    try testing.expectEqual(@as(u32, 1), ring.sq_ready());
1902    try testing.expectEqual(@as(u32, 0), ring.cq_ready());
1903
1904    try testing.expectEqual(@as(u32, 1), try ring.submit());
1905    try testing.expectEqual(@as(u32, 1), ring.sq.sqe_head);
1906    try testing.expectEqual(@as(u32, 1), ring.sq.sqe_tail);
1907    try testing.expectEqual(@as(u32, 1), ring.sq.tail.*);
1908    try testing.expectEqual(@as(u32, 0), ring.cq.head.*);
1909    try testing.expectEqual(@as(u32, 0), ring.sq_ready());
1910
1911    try testing.expectEqual(linux.io_uring_cqe{
1912        .user_data = 0xaaaaaaaa,
1913        .res = 0,
1914        .flags = 0,
1915    }, try ring.copy_cqe());
1916    try testing.expectEqual(@as(u32, 1), ring.cq.head.*);
1917    try testing.expectEqual(@as(u32, 0), ring.cq_ready());
1918
1919    const sqe_barrier = try ring.nop(0xbbbbbbbb);
1920    sqe_barrier.flags |= linux.IOSQE_IO_DRAIN;
1921    try testing.expectEqual(@as(u32, 1), try ring.submit());
1922    try testing.expectEqual(linux.io_uring_cqe{
1923        .user_data = 0xbbbbbbbb,
1924        .res = 0,
1925        .flags = 0,
1926    }, try ring.copy_cqe());
1927    try testing.expectEqual(@as(u32, 2), ring.sq.sqe_head);
1928    try testing.expectEqual(@as(u32, 2), ring.sq.sqe_tail);
1929    try testing.expectEqual(@as(u32, 2), ring.sq.tail.*);
1930    try testing.expectEqual(@as(u32, 2), ring.cq.head.*);
1931}
1932
1933test "readv" {
1934    if (!is_linux) return error.SkipZigTest;
1935
1936    var ring = IoUring.init(1, 0) catch |err| switch (err) {
1937        error.SystemOutdated => return error.SkipZigTest,
1938        error.PermissionDenied => return error.SkipZigTest,
1939        else => return err,
1940    };
1941    defer ring.deinit();
1942
1943    const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
1944    defer posix.close(fd);
1945
1946    // Linux Kernel 5.4 supports IORING_REGISTER_FILES but not sparse fd sets (i.e. an fd of -1).
1947    // Linux Kernel 5.5 adds support for sparse fd sets.
1948    // Compare:
1949    // https://github.com/torvalds/linux/blob/v5.4/fs/io_uring.c#L3119-L3124 vs
1950    // https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L6687-L6691
1951    // We therefore avoid stressing sparse fd sets here:
1952    var registered_fds = [_]linux.fd_t{0} ** 1;
1953    const fd_index = 0;
1954    registered_fds[fd_index] = fd;
1955    try ring.register_files(registered_fds[0..]);
1956
1957    var buffer = [_]u8{42} ** 128;
1958    var iovecs = [_]posix.iovec{posix.iovec{ .base = &buffer, .len = buffer.len }};
1959    const sqe = try ring.read(0xcccccccc, fd_index, .{ .iovecs = iovecs[0..] }, 0);
1960    try testing.expectEqual(linux.IORING_OP.READV, sqe.opcode);
1961    sqe.flags |= linux.IOSQE_FIXED_FILE;
1962
1963    try testing.expectError(error.SubmissionQueueFull, ring.nop(0));
1964    try testing.expectEqual(@as(u32, 1), try ring.submit());
1965    try testing.expectEqual(linux.io_uring_cqe{
1966        .user_data = 0xcccccccc,
1967        .res = buffer.len,
1968        .flags = 0,
1969    }, try ring.copy_cqe());
1970    try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer.len), buffer[0..]);
1971
1972    try ring.unregister_files();
1973}
1974
1975test "writev/fsync/readv" {
1976    if (!is_linux) return error.SkipZigTest;
1977
1978    var ring = IoUring.init(4, 0) catch |err| switch (err) {
1979        error.SystemOutdated => return error.SkipZigTest,
1980        error.PermissionDenied => return error.SkipZigTest,
1981        else => return err,
1982    };
1983    defer ring.deinit();
1984
1985    var tmp = std.testing.tmpDir(.{});
1986    defer tmp.cleanup();
1987
1988    const path = "test_io_uring_writev_fsync_readv";
1989    const file = try tmp.dir.createFile(path, .{ .read = true, .truncate = true });
1990    defer file.close();
1991    const fd = file.handle;
1992
1993    const buffer_write = [_]u8{42} ** 128;
1994    const iovecs_write = [_]posix.iovec_const{
1995        posix.iovec_const{ .base = &buffer_write, .len = buffer_write.len },
1996    };
1997    var buffer_read = [_]u8{0} ** 128;
1998    var iovecs_read = [_]posix.iovec{
1999        posix.iovec{ .base = &buffer_read, .len = buffer_read.len },
2000    };
2001
2002    const sqe_writev = try ring.writev(0xdddddddd, fd, iovecs_write[0..], 17);
2003    try testing.expectEqual(linux.IORING_OP.WRITEV, sqe_writev.opcode);
2004    try testing.expectEqual(@as(u64, 17), sqe_writev.off);
2005    sqe_writev.flags |= linux.IOSQE_IO_LINK;
2006
2007    const sqe_fsync = try ring.fsync(0xeeeeeeee, fd, 0);
2008    try testing.expectEqual(linux.IORING_OP.FSYNC, sqe_fsync.opcode);
2009    try testing.expectEqual(fd, sqe_fsync.fd);
2010    sqe_fsync.flags |= linux.IOSQE_IO_LINK;
2011
2012    const sqe_readv = try ring.read(0xffffffff, fd, .{ .iovecs = iovecs_read[0..] }, 17);
2013    try testing.expectEqual(linux.IORING_OP.READV, sqe_readv.opcode);
2014    try testing.expectEqual(@as(u64, 17), sqe_readv.off);
2015
2016    try testing.expectEqual(@as(u32, 3), ring.sq_ready());
2017    try testing.expectEqual(@as(u32, 3), try ring.submit_and_wait(3));
2018    try testing.expectEqual(@as(u32, 0), ring.sq_ready());
2019    try testing.expectEqual(@as(u32, 3), ring.cq_ready());
2020
2021    try testing.expectEqual(linux.io_uring_cqe{
2022        .user_data = 0xdddddddd,
2023        .res = buffer_write.len,
2024        .flags = 0,
2025    }, try ring.copy_cqe());
2026    try testing.expectEqual(@as(u32, 2), ring.cq_ready());
2027
2028    try testing.expectEqual(linux.io_uring_cqe{
2029        .user_data = 0xeeeeeeee,
2030        .res = 0,
2031        .flags = 0,
2032    }, try ring.copy_cqe());
2033    try testing.expectEqual(@as(u32, 1), ring.cq_ready());
2034
2035    try testing.expectEqual(linux.io_uring_cqe{
2036        .user_data = 0xffffffff,
2037        .res = buffer_read.len,
2038        .flags = 0,
2039    }, try ring.copy_cqe());
2040    try testing.expectEqual(@as(u32, 0), ring.cq_ready());
2041
2042    try testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
2043}
2044
2045test "write/read" {
2046    if (!is_linux) return error.SkipZigTest;
2047
2048    var ring = IoUring.init(2, 0) catch |err| switch (err) {
2049        error.SystemOutdated => return error.SkipZigTest,
2050        error.PermissionDenied => return error.SkipZigTest,
2051        else => return err,
2052    };
2053    defer ring.deinit();
2054
2055    var tmp = std.testing.tmpDir(.{});
2056    defer tmp.cleanup();
2057    const path = "test_io_uring_write_read";
2058    const file = try tmp.dir.createFile(path, .{ .read = true, .truncate = true });
2059    defer file.close();
2060    const fd = file.handle;
2061
2062    const buffer_write = [_]u8{97} ** 20;
2063    var buffer_read = [_]u8{98} ** 20;
2064    const sqe_write = try ring.write(0x11111111, fd, buffer_write[0..], 10);
2065    try testing.expectEqual(linux.IORING_OP.WRITE, sqe_write.opcode);
2066    try testing.expectEqual(@as(u64, 10), sqe_write.off);
2067    sqe_write.flags |= linux.IOSQE_IO_LINK;
2068    const sqe_read = try ring.read(0x22222222, fd, .{ .buffer = buffer_read[0..] }, 10);
2069    try testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode);
2070    try testing.expectEqual(@as(u64, 10), sqe_read.off);
2071    try testing.expectEqual(@as(u32, 2), try ring.submit());
2072
2073    const cqe_write = try ring.copy_cqe();
2074    const cqe_read = try ring.copy_cqe();
2075    // Prior to Linux Kernel 5.6 this is the only way to test for read/write support:
2076    // https://lwn.net/Articles/809820/
2077    if (cqe_write.err() == .INVAL) return error.SkipZigTest;
2078    if (cqe_read.err() == .INVAL) return error.SkipZigTest;
2079    try testing.expectEqual(linux.io_uring_cqe{
2080        .user_data = 0x11111111,
2081        .res = buffer_write.len,
2082        .flags = 0,
2083    }, cqe_write);
2084    try testing.expectEqual(linux.io_uring_cqe{
2085        .user_data = 0x22222222,
2086        .res = buffer_read.len,
2087        .flags = 0,
2088    }, cqe_read);
2089    try testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
2090}
2091
2092test "splice/read" {
2093    if (!is_linux) return error.SkipZigTest;
2094
2095    var ring = IoUring.init(4, 0) catch |err| switch (err) {
2096        error.SystemOutdated => return error.SkipZigTest,
2097        error.PermissionDenied => return error.SkipZigTest,
2098        else => return err,
2099    };
2100    defer ring.deinit();
2101
2102    var tmp = std.testing.tmpDir(.{});
2103    const path_src = "test_io_uring_splice_src";
2104    const file_src = try tmp.dir.createFile(path_src, .{ .read = true, .truncate = true });
2105    defer file_src.close();
2106    const fd_src = file_src.handle;
2107
2108    const path_dst = "test_io_uring_splice_dst";
2109    const file_dst = try tmp.dir.createFile(path_dst, .{ .read = true, .truncate = true });
2110    defer file_dst.close();
2111    const fd_dst = file_dst.handle;
2112
2113    const buffer_write = [_]u8{97} ** 20;
2114    var buffer_read = [_]u8{98} ** 20;
2115    _ = try file_src.write(&buffer_write);
2116
2117    const fds = try posix.pipe();
2118    const pipe_offset: u64 = std.math.maxInt(u64);
2119
2120    const sqe_splice_to_pipe = try ring.splice(0x11111111, fd_src, 0, fds[1], pipe_offset, buffer_write.len);
2121    try testing.expectEqual(linux.IORING_OP.SPLICE, sqe_splice_to_pipe.opcode);
2122    try testing.expectEqual(@as(u64, 0), sqe_splice_to_pipe.addr);
2123    try testing.expectEqual(pipe_offset, sqe_splice_to_pipe.off);
2124    sqe_splice_to_pipe.flags |= linux.IOSQE_IO_LINK;
2125
2126    const sqe_splice_from_pipe = try ring.splice(0x22222222, fds[0], pipe_offset, fd_dst, 10, buffer_write.len);
2127    try testing.expectEqual(linux.IORING_OP.SPLICE, sqe_splice_from_pipe.opcode);
2128    try testing.expectEqual(pipe_offset, sqe_splice_from_pipe.addr);
2129    try testing.expectEqual(@as(u64, 10), sqe_splice_from_pipe.off);
2130    sqe_splice_from_pipe.flags |= linux.IOSQE_IO_LINK;
2131
2132    const sqe_read = try ring.read(0x33333333, fd_dst, .{ .buffer = buffer_read[0..] }, 10);
2133    try testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode);
2134    try testing.expectEqual(@as(u64, 10), sqe_read.off);
2135    try testing.expectEqual(@as(u32, 3), try ring.submit());
2136
2137    const cqe_splice_to_pipe = try ring.copy_cqe();
2138    const cqe_splice_from_pipe = try ring.copy_cqe();
2139    const cqe_read = try ring.copy_cqe();
2140    // Prior to Linux Kernel 5.6 this is the only way to test for splice/read support:
2141    // https://lwn.net/Articles/809820/
2142    if (cqe_splice_to_pipe.err() == .INVAL) return error.SkipZigTest;
2143    if (cqe_splice_from_pipe.err() == .INVAL) return error.SkipZigTest;
2144    if (cqe_read.err() == .INVAL) return error.SkipZigTest;
2145    try testing.expectEqual(linux.io_uring_cqe{
2146        .user_data = 0x11111111,
2147        .res = buffer_write.len,
2148        .flags = 0,
2149    }, cqe_splice_to_pipe);
2150    try testing.expectEqual(linux.io_uring_cqe{
2151        .user_data = 0x22222222,
2152        .res = buffer_write.len,
2153        .flags = 0,
2154    }, cqe_splice_from_pipe);
2155    try testing.expectEqual(linux.io_uring_cqe{
2156        .user_data = 0x33333333,
2157        .res = buffer_read.len,
2158        .flags = 0,
2159    }, cqe_read);
2160    try testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
2161}
2162
2163test "write_fixed/read_fixed" {
2164    if (!is_linux) return error.SkipZigTest;
2165
2166    var ring = IoUring.init(2, 0) catch |err| switch (err) {
2167        error.SystemOutdated => return error.SkipZigTest,
2168        error.PermissionDenied => return error.SkipZigTest,
2169        else => return err,
2170    };
2171    defer ring.deinit();
2172
2173    var tmp = std.testing.tmpDir(.{});
2174    defer tmp.cleanup();
2175
2176    const path = "test_io_uring_write_read_fixed";
2177    const file = try tmp.dir.createFile(path, .{ .read = true, .truncate = true });
2178    defer file.close();
2179    const fd = file.handle;
2180
2181    var raw_buffers: [2][11]u8 = undefined;
2182    // First buffer will be written to the file.
2183    @memset(&raw_buffers[0], 'z');
2184    raw_buffers[0][0.."foobar".len].* = "foobar".*;
2185
2186    var buffers = [2]posix.iovec{
2187        .{ .base = &raw_buffers[0], .len = raw_buffers[0].len },
2188        .{ .base = &raw_buffers[1], .len = raw_buffers[1].len },
2189    };
2190    ring.register_buffers(&buffers) catch |err| switch (err) {
2191        error.SystemResources => {
2192            // See https://github.com/ziglang/zig/issues/15362
2193            return error.SkipZigTest;
2194        },
2195        else => |e| return e,
2196    };
2197
2198    const sqe_write = try ring.write_fixed(0x45454545, fd, &buffers[0], 3, 0);
2199    try testing.expectEqual(linux.IORING_OP.WRITE_FIXED, sqe_write.opcode);
2200    try testing.expectEqual(@as(u64, 3), sqe_write.off);
2201    sqe_write.flags |= linux.IOSQE_IO_LINK;
2202
2203    const sqe_read = try ring.read_fixed(0x12121212, fd, &buffers[1], 0, 1);
2204    try testing.expectEqual(linux.IORING_OP.READ_FIXED, sqe_read.opcode);
2205    try testing.expectEqual(@as(u64, 0), sqe_read.off);
2206
2207    try testing.expectEqual(@as(u32, 2), try ring.submit());
2208
2209    const cqe_write = try ring.copy_cqe();
2210    const cqe_read = try ring.copy_cqe();
2211
2212    try testing.expectEqual(linux.io_uring_cqe{
2213        .user_data = 0x45454545,
2214        .res = @as(i32, @intCast(buffers[0].len)),
2215        .flags = 0,
2216    }, cqe_write);
2217    try testing.expectEqual(linux.io_uring_cqe{
2218        .user_data = 0x12121212,
2219        .res = @as(i32, @intCast(buffers[1].len)),
2220        .flags = 0,
2221    }, cqe_read);
2222
2223    try testing.expectEqualSlices(u8, "\x00\x00\x00", buffers[1].base[0..3]);
2224    try testing.expectEqualSlices(u8, "foobar", buffers[1].base[3..9]);
2225    try testing.expectEqualSlices(u8, "zz", buffers[1].base[9..11]);
2226}
2227
2228test "openat" {
2229    if (!is_linux) return error.SkipZigTest;
2230
2231    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2232        error.SystemOutdated => return error.SkipZigTest,
2233        error.PermissionDenied => return error.SkipZigTest,
2234        else => return err,
2235    };
2236    defer ring.deinit();
2237
2238    var tmp = std.testing.tmpDir(.{});
2239    defer tmp.cleanup();
2240
2241    const path = "test_io_uring_openat";
2242
2243    // Workaround for LLVM bug: https://github.com/ziglang/zig/issues/12014
2244    const path_addr = if (builtin.zig_backend == .stage2_llvm) p: {
2245        var workaround = path;
2246        _ = &workaround;
2247        break :p @intFromPtr(workaround);
2248    } else @intFromPtr(path);
2249
2250    const flags: linux.O = .{ .CLOEXEC = true, .ACCMODE = .RDWR, .CREAT = true };
2251    const mode: posix.mode_t = 0o666;
2252    const sqe_openat = try ring.openat(0x33333333, tmp.dir.fd, path, flags, mode);
2253    try testing.expectEqual(linux.io_uring_sqe{
2254        .opcode = .OPENAT,
2255        .flags = 0,
2256        .ioprio = 0,
2257        .fd = tmp.dir.fd,
2258        .off = 0,
2259        .addr = path_addr,
2260        .len = mode,
2261        .rw_flags = @bitCast(flags),
2262        .user_data = 0x33333333,
2263        .buf_index = 0,
2264        .personality = 0,
2265        .splice_fd_in = 0,
2266        .addr3 = 0,
2267        .resv = 0,
2268    }, sqe_openat.*);
2269    try testing.expectEqual(@as(u32, 1), try ring.submit());
2270
2271    const cqe_openat = try ring.copy_cqe();
2272    try testing.expectEqual(@as(u64, 0x33333333), cqe_openat.user_data);
2273    if (cqe_openat.err() == .INVAL) return error.SkipZigTest;
2274    if (cqe_openat.err() == .BADF) return error.SkipZigTest;
2275    if (cqe_openat.res <= 0) std.debug.print("\ncqe_openat.res={}\n", .{cqe_openat.res});
2276    try testing.expect(cqe_openat.res > 0);
2277    try testing.expectEqual(@as(u32, 0), cqe_openat.flags);
2278
2279    posix.close(cqe_openat.res);
2280}
2281
2282test "close" {
2283    if (!is_linux) return error.SkipZigTest;
2284
2285    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2286        error.SystemOutdated => return error.SkipZigTest,
2287        error.PermissionDenied => return error.SkipZigTest,
2288        else => return err,
2289    };
2290    defer ring.deinit();
2291
2292    var tmp = std.testing.tmpDir(.{});
2293    defer tmp.cleanup();
2294
2295    const path = "test_io_uring_close";
2296    const file = try tmp.dir.createFile(path, .{});
2297    errdefer file.close();
2298
2299    const sqe_close = try ring.close(0x44444444, file.handle);
2300    try testing.expectEqual(linux.IORING_OP.CLOSE, sqe_close.opcode);
2301    try testing.expectEqual(file.handle, sqe_close.fd);
2302    try testing.expectEqual(@as(u32, 1), try ring.submit());
2303
2304    const cqe_close = try ring.copy_cqe();
2305    if (cqe_close.err() == .INVAL) return error.SkipZigTest;
2306    try testing.expectEqual(linux.io_uring_cqe{
2307        .user_data = 0x44444444,
2308        .res = 0,
2309        .flags = 0,
2310    }, cqe_close);
2311}
2312
2313test "accept/connect/send/recv" {
2314    if (!is_linux) return error.SkipZigTest;
2315
2316    var ring = IoUring.init(16, 0) catch |err| switch (err) {
2317        error.SystemOutdated => return error.SkipZigTest,
2318        error.PermissionDenied => return error.SkipZigTest,
2319        else => return err,
2320    };
2321    defer ring.deinit();
2322
2323    const socket_test_harness = try createSocketTestHarness(&ring);
2324    defer socket_test_harness.close();
2325
2326    const buffer_send = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 };
2327    var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 };
2328
2329    const sqe_send = try ring.send(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0);
2330    sqe_send.flags |= linux.IOSQE_IO_LINK;
2331    _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
2332    try testing.expectEqual(@as(u32, 2), try ring.submit());
2333
2334    const cqe_send = try ring.copy_cqe();
2335    if (cqe_send.err() == .INVAL) return error.SkipZigTest;
2336    try testing.expectEqual(linux.io_uring_cqe{
2337        .user_data = 0xeeeeeeee,
2338        .res = buffer_send.len,
2339        .flags = 0,
2340    }, cqe_send);
2341
2342    const cqe_recv = try ring.copy_cqe();
2343    if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
2344    try testing.expectEqual(linux.io_uring_cqe{
2345        .user_data = 0xffffffff,
2346        .res = buffer_recv.len,
2347        // ignore IORING_CQE_F_SOCK_NONEMPTY since it is only set on some systems
2348        .flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
2349    }, cqe_recv);
2350
2351    try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
2352}
2353
2354test "sendmsg/recvmsg" {
2355    if (!is_linux) return error.SkipZigTest;
2356
2357    var ring = IoUring.init(2, 0) catch |err| switch (err) {
2358        error.SystemOutdated => return error.SkipZigTest,
2359        error.PermissionDenied => return error.SkipZigTest,
2360        else => return err,
2361    };
2362    defer ring.deinit();
2363
2364    var address_server: linux.sockaddr.in = .{
2365        .port = 0,
2366        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
2367    };
2368
2369    const server = try posix.socket(address_server.family, posix.SOCK.DGRAM, 0);
2370    defer posix.close(server);
2371    try posix.setsockopt(server, posix.SOL.SOCKET, posix.SO.REUSEPORT, &mem.toBytes(@as(c_int, 1)));
2372    try posix.setsockopt(server, posix.SOL.SOCKET, posix.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1)));
2373    try posix.bind(server, addrAny(&address_server), @sizeOf(linux.sockaddr.in));
2374
2375    // set address_server to the OS-chosen IP/port.
2376    var slen: posix.socklen_t = @sizeOf(linux.sockaddr.in);
2377    try posix.getsockname(server, addrAny(&address_server), &slen);
2378
2379    const client = try posix.socket(address_server.family, posix.SOCK.DGRAM, 0);
2380    defer posix.close(client);
2381
2382    const buffer_send = [_]u8{42} ** 128;
2383    const iovecs_send = [_]posix.iovec_const{
2384        posix.iovec_const{ .base = &buffer_send, .len = buffer_send.len },
2385    };
2386    const msg_send: linux.msghdr_const = .{
2387        .name = addrAny(&address_server),
2388        .namelen = @sizeOf(linux.sockaddr.in),
2389        .iov = &iovecs_send,
2390        .iovlen = 1,
2391        .control = null,
2392        .controllen = 0,
2393        .flags = 0,
2394    };
2395    const sqe_sendmsg = try ring.sendmsg(0x11111111, client, &msg_send, 0);
2396    sqe_sendmsg.flags |= linux.IOSQE_IO_LINK;
2397    try testing.expectEqual(linux.IORING_OP.SENDMSG, sqe_sendmsg.opcode);
2398    try testing.expectEqual(client, sqe_sendmsg.fd);
2399
2400    var buffer_recv = [_]u8{0} ** 128;
2401    var iovecs_recv = [_]posix.iovec{
2402        posix.iovec{ .base = &buffer_recv, .len = buffer_recv.len },
2403    };
2404    var address_recv: linux.sockaddr.in = .{
2405        .port = 0,
2406        .addr = 0,
2407    };
2408    var msg_recv: linux.msghdr = .{
2409        .name = addrAny(&address_recv),
2410        .namelen = @sizeOf(linux.sockaddr.in),
2411        .iov = &iovecs_recv,
2412        .iovlen = 1,
2413        .control = null,
2414        .controllen = 0,
2415        .flags = 0,
2416    };
2417    const sqe_recvmsg = try ring.recvmsg(0x22222222, server, &msg_recv, 0);
2418    try testing.expectEqual(linux.IORING_OP.RECVMSG, sqe_recvmsg.opcode);
2419    try testing.expectEqual(server, sqe_recvmsg.fd);
2420
2421    try testing.expectEqual(@as(u32, 2), ring.sq_ready());
2422    try testing.expectEqual(@as(u32, 2), try ring.submit_and_wait(2));
2423    try testing.expectEqual(@as(u32, 0), ring.sq_ready());
2424    try testing.expectEqual(@as(u32, 2), ring.cq_ready());
2425
2426    const cqe_sendmsg = try ring.copy_cqe();
2427    if (cqe_sendmsg.res == -@as(i32, @intFromEnum(linux.E.INVAL))) return error.SkipZigTest;
2428    try testing.expectEqual(linux.io_uring_cqe{
2429        .user_data = 0x11111111,
2430        .res = buffer_send.len,
2431        .flags = 0,
2432    }, cqe_sendmsg);
2433
2434    const cqe_recvmsg = try ring.copy_cqe();
2435    if (cqe_recvmsg.res == -@as(i32, @intFromEnum(linux.E.INVAL))) return error.SkipZigTest;
2436    try testing.expectEqual(linux.io_uring_cqe{
2437        .user_data = 0x22222222,
2438        .res = buffer_recv.len,
2439        // ignore IORING_CQE_F_SOCK_NONEMPTY since it is set non-deterministically
2440        .flags = cqe_recvmsg.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
2441    }, cqe_recvmsg);
2442
2443    try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
2444}
2445
2446test "timeout (after a relative time)" {
2447    if (!is_linux) return error.SkipZigTest;
2448
2449    const io = testing.io;
2450
2451    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2452        error.SystemOutdated => return error.SkipZigTest,
2453        error.PermissionDenied => return error.SkipZigTest,
2454        else => return err,
2455    };
2456    defer ring.deinit();
2457
2458    const ms = 10;
2459    const margin = 5;
2460    const ts: linux.kernel_timespec = .{ .sec = 0, .nsec = ms * 1000000 };
2461
2462    const started = try std.Io.Clock.awake.now(io);
2463    const sqe = try ring.timeout(0x55555555, &ts, 0, 0);
2464    try testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe.opcode);
2465    try testing.expectEqual(@as(u32, 1), try ring.submit());
2466    const cqe = try ring.copy_cqe();
2467    const stopped = try std.Io.Clock.awake.now(io);
2468
2469    try testing.expectEqual(linux.io_uring_cqe{
2470        .user_data = 0x55555555,
2471        .res = -@as(i32, @intFromEnum(linux.E.TIME)),
2472        .flags = 0,
2473    }, cqe);
2474
2475    // Tests should not depend on timings: skip test if outside margin.
2476    const ms_elapsed = started.durationTo(stopped).toMilliseconds();
2477    if (ms_elapsed > margin) return error.SkipZigTest;
2478}
2479
2480test "timeout (after a number of completions)" {
2481    if (!is_linux) return error.SkipZigTest;
2482
2483    var ring = IoUring.init(2, 0) catch |err| switch (err) {
2484        error.SystemOutdated => return error.SkipZigTest,
2485        error.PermissionDenied => return error.SkipZigTest,
2486        else => return err,
2487    };
2488    defer ring.deinit();
2489
2490    const ts: linux.kernel_timespec = .{ .sec = 3, .nsec = 0 };
2491    const count_completions: u64 = 1;
2492    const sqe_timeout = try ring.timeout(0x66666666, &ts, count_completions, 0);
2493    try testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe_timeout.opcode);
2494    try testing.expectEqual(count_completions, sqe_timeout.off);
2495    _ = try ring.nop(0x77777777);
2496    try testing.expectEqual(@as(u32, 2), try ring.submit());
2497
2498    const cqe_nop = try ring.copy_cqe();
2499    try testing.expectEqual(linux.io_uring_cqe{
2500        .user_data = 0x77777777,
2501        .res = 0,
2502        .flags = 0,
2503    }, cqe_nop);
2504
2505    const cqe_timeout = try ring.copy_cqe();
2506    try testing.expectEqual(linux.io_uring_cqe{
2507        .user_data = 0x66666666,
2508        .res = 0,
2509        .flags = 0,
2510    }, cqe_timeout);
2511}
2512
2513test "timeout_remove" {
2514    if (!is_linux) return error.SkipZigTest;
2515
2516    var ring = IoUring.init(2, 0) catch |err| switch (err) {
2517        error.SystemOutdated => return error.SkipZigTest,
2518        error.PermissionDenied => return error.SkipZigTest,
2519        else => return err,
2520    };
2521    defer ring.deinit();
2522
2523    const ts: linux.kernel_timespec = .{ .sec = 3, .nsec = 0 };
2524    const sqe_timeout = try ring.timeout(0x88888888, &ts, 0, 0);
2525    try testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe_timeout.opcode);
2526    try testing.expectEqual(@as(u64, 0x88888888), sqe_timeout.user_data);
2527
2528    const sqe_timeout_remove = try ring.timeout_remove(0x99999999, 0x88888888, 0);
2529    try testing.expectEqual(linux.IORING_OP.TIMEOUT_REMOVE, sqe_timeout_remove.opcode);
2530    try testing.expectEqual(@as(u64, 0x88888888), sqe_timeout_remove.addr);
2531    try testing.expectEqual(@as(u64, 0x99999999), sqe_timeout_remove.user_data);
2532
2533    try testing.expectEqual(@as(u32, 2), try ring.submit());
2534
2535    // The order in which the CQE arrive is not clearly documented and it changed with kernel 5.18:
2536    // * kernel 5.10 gives user data 0x88888888 first, 0x99999999 second
2537    // * kernel 5.18 gives user data 0x99999999 first, 0x88888888 second
2538
2539    var cqes: [2]linux.io_uring_cqe = undefined;
2540    cqes[0] = try ring.copy_cqe();
2541    cqes[1] = try ring.copy_cqe();
2542
2543    for (cqes) |cqe| {
2544        // IORING_OP_TIMEOUT_REMOVE is not supported by this kernel version:
2545        // Timeout remove operations set the fd to -1, which results in EBADF before EINVAL.
2546        // We use IORING_FEAT_RW_CUR_POS as a safety check here to make sure we are at least pre-5.6.
2547        // We don't want to skip this test for newer kernels.
2548        if (cqe.user_data == 0x99999999 and
2549            cqe.err() == .BADF and
2550            (ring.features & linux.IORING_FEAT_RW_CUR_POS) == 0)
2551        {
2552            return error.SkipZigTest;
2553        }
2554
2555        try testing.expect(cqe.user_data == 0x88888888 or cqe.user_data == 0x99999999);
2556
2557        if (cqe.user_data == 0x88888888) {
2558            try testing.expectEqual(linux.io_uring_cqe{
2559                .user_data = 0x88888888,
2560                .res = -@as(i32, @intFromEnum(linux.E.CANCELED)),
2561                .flags = 0,
2562            }, cqe);
2563        } else if (cqe.user_data == 0x99999999) {
2564            try testing.expectEqual(linux.io_uring_cqe{
2565                .user_data = 0x99999999,
2566                .res = 0,
2567                .flags = 0,
2568            }, cqe);
2569        }
2570    }
2571}
2572
2573test "accept/connect/recv/link_timeout" {
2574    if (!is_linux) return error.SkipZigTest;
2575
2576    var ring = IoUring.init(16, 0) catch |err| switch (err) {
2577        error.SystemOutdated => return error.SkipZigTest,
2578        error.PermissionDenied => return error.SkipZigTest,
2579        else => return err,
2580    };
2581    defer ring.deinit();
2582
2583    const socket_test_harness = try createSocketTestHarness(&ring);
2584    defer socket_test_harness.close();
2585
2586    var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 };
2587
2588    const sqe_recv = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
2589    sqe_recv.flags |= linux.IOSQE_IO_LINK;
2590
2591    const ts = linux.kernel_timespec{ .sec = 0, .nsec = 1000000 };
2592    _ = try ring.link_timeout(0x22222222, &ts, 0);
2593
2594    const nr_wait = try ring.submit();
2595    try testing.expectEqual(@as(u32, 2), nr_wait);
2596
2597    var i: usize = 0;
2598    while (i < nr_wait) : (i += 1) {
2599        const cqe = try ring.copy_cqe();
2600        switch (cqe.user_data) {
2601            0xffffffff => {
2602                if (cqe.res != -@as(i32, @intFromEnum(linux.E.INTR)) and
2603                    cqe.res != -@as(i32, @intFromEnum(linux.E.CANCELED)))
2604                {
2605                    std.debug.print("Req 0x{x} got {d}\n", .{ cqe.user_data, cqe.res });
2606                    try testing.expect(false);
2607                }
2608            },
2609            0x22222222 => {
2610                if (cqe.res != -@as(i32, @intFromEnum(linux.E.ALREADY)) and
2611                    cqe.res != -@as(i32, @intFromEnum(linux.E.TIME)))
2612                {
2613                    std.debug.print("Req 0x{x} got {d}\n", .{ cqe.user_data, cqe.res });
2614                    try testing.expect(false);
2615                }
2616            },
2617            else => @panic("should not happen"),
2618        }
2619    }
2620}
2621
2622test "fallocate" {
2623    if (!is_linux) return error.SkipZigTest;
2624
2625    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2626        error.SystemOutdated => return error.SkipZigTest,
2627        error.PermissionDenied => return error.SkipZigTest,
2628        else => return err,
2629    };
2630    defer ring.deinit();
2631
2632    var tmp = std.testing.tmpDir(.{});
2633    defer tmp.cleanup();
2634
2635    const path = "test_io_uring_fallocate";
2636    const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
2637    defer file.close();
2638
2639    try testing.expectEqual(@as(u64, 0), (try file.stat()).size);
2640
2641    const len: u64 = 65536;
2642    const sqe = try ring.fallocate(0xaaaaaaaa, file.handle, 0, 0, len);
2643    try testing.expectEqual(linux.IORING_OP.FALLOCATE, sqe.opcode);
2644    try testing.expectEqual(file.handle, sqe.fd);
2645    try testing.expectEqual(@as(u32, 1), try ring.submit());
2646
2647    const cqe = try ring.copy_cqe();
2648    switch (cqe.err()) {
2649        .SUCCESS => {},
2650        // This kernel's io_uring does not yet implement fallocate():
2651        .INVAL => return error.SkipZigTest,
2652        // This kernel does not implement fallocate():
2653        .NOSYS => return error.SkipZigTest,
2654        // The filesystem containing the file referred to by fd does not support this operation;
2655        // or the mode is not supported by the filesystem containing the file referred to by fd:
2656        .OPNOTSUPP => return error.SkipZigTest,
2657        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2658    }
2659    try testing.expectEqual(linux.io_uring_cqe{
2660        .user_data = 0xaaaaaaaa,
2661        .res = 0,
2662        .flags = 0,
2663    }, cqe);
2664
2665    try testing.expectEqual(len, (try file.stat()).size);
2666}
2667
2668test "statx" {
2669    if (!is_linux) return error.SkipZigTest;
2670
2671    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2672        error.SystemOutdated => return error.SkipZigTest,
2673        error.PermissionDenied => return error.SkipZigTest,
2674        else => return err,
2675    };
2676    defer ring.deinit();
2677
2678    var tmp = std.testing.tmpDir(.{});
2679    defer tmp.cleanup();
2680    const path = "test_io_uring_statx";
2681    const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
2682    defer file.close();
2683
2684    try testing.expectEqual(@as(u64, 0), (try file.stat()).size);
2685
2686    try file.writeAll("foobar");
2687
2688    var buf: linux.Statx = undefined;
2689    const sqe = try ring.statx(
2690        0xaaaaaaaa,
2691        tmp.dir.fd,
2692        path,
2693        0,
2694        linux.STATX_SIZE,
2695        &buf,
2696    );
2697    try testing.expectEqual(linux.IORING_OP.STATX, sqe.opcode);
2698    try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
2699    try testing.expectEqual(@as(u32, 1), try ring.submit());
2700
2701    const cqe = try ring.copy_cqe();
2702    switch (cqe.err()) {
2703        .SUCCESS => {},
2704        // This kernel's io_uring does not yet implement statx():
2705        .INVAL => return error.SkipZigTest,
2706        // This kernel does not implement statx():
2707        .NOSYS => return error.SkipZigTest,
2708        // The filesystem containing the file referred to by fd does not support this operation;
2709        // or the mode is not supported by the filesystem containing the file referred to by fd:
2710        .OPNOTSUPP => return error.SkipZigTest,
2711        // not supported on older kernels (5.4)
2712        .BADF => return error.SkipZigTest,
2713        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2714    }
2715    try testing.expectEqual(linux.io_uring_cqe{
2716        .user_data = 0xaaaaaaaa,
2717        .res = 0,
2718        .flags = 0,
2719    }, cqe);
2720
2721    try testing.expect(buf.mask & linux.STATX_SIZE == linux.STATX_SIZE);
2722    try testing.expectEqual(@as(u64, 6), buf.size);
2723}
2724
2725test "accept/connect/recv/cancel" {
2726    if (!is_linux) return error.SkipZigTest;
2727
2728    var ring = IoUring.init(16, 0) catch |err| switch (err) {
2729        error.SystemOutdated => return error.SkipZigTest,
2730        error.PermissionDenied => return error.SkipZigTest,
2731        else => return err,
2732    };
2733    defer ring.deinit();
2734
2735    const socket_test_harness = try createSocketTestHarness(&ring);
2736    defer socket_test_harness.close();
2737
2738    var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 };
2739
2740    _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
2741    try testing.expectEqual(@as(u32, 1), try ring.submit());
2742
2743    const sqe_cancel = try ring.cancel(0x99999999, 0xffffffff, 0);
2744    try testing.expectEqual(linux.IORING_OP.ASYNC_CANCEL, sqe_cancel.opcode);
2745    try testing.expectEqual(@as(u64, 0xffffffff), sqe_cancel.addr);
2746    try testing.expectEqual(@as(u64, 0x99999999), sqe_cancel.user_data);
2747    try testing.expectEqual(@as(u32, 1), try ring.submit());
2748
2749    var cqe_recv = try ring.copy_cqe();
2750    if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
2751    var cqe_cancel = try ring.copy_cqe();
2752    if (cqe_cancel.err() == .INVAL) return error.SkipZigTest;
2753
2754    // The recv/cancel CQEs may arrive in any order, the recv CQE will sometimes come first:
2755    if (cqe_recv.user_data == 0x99999999 and cqe_cancel.user_data == 0xffffffff) {
2756        const a = cqe_recv;
2757        const b = cqe_cancel;
2758        cqe_recv = b;
2759        cqe_cancel = a;
2760    }
2761
2762    try testing.expectEqual(linux.io_uring_cqe{
2763        .user_data = 0xffffffff,
2764        .res = -@as(i32, @intFromEnum(linux.E.CANCELED)),
2765        .flags = 0,
2766    }, cqe_recv);
2767
2768    try testing.expectEqual(linux.io_uring_cqe{
2769        .user_data = 0x99999999,
2770        .res = 0,
2771        .flags = 0,
2772    }, cqe_cancel);
2773}
2774
2775test "register_files_update" {
2776    if (!is_linux) return error.SkipZigTest;
2777
2778    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2779        error.SystemOutdated => return error.SkipZigTest,
2780        error.PermissionDenied => return error.SkipZigTest,
2781        else => return err,
2782    };
2783    defer ring.deinit();
2784
2785    const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
2786    defer posix.close(fd);
2787
2788    var registered_fds = [_]linux.fd_t{0} ** 2;
2789    const fd_index = 0;
2790    const fd_index2 = 1;
2791    registered_fds[fd_index] = fd;
2792    registered_fds[fd_index2] = -1;
2793
2794    ring.register_files(registered_fds[0..]) catch |err| switch (err) {
2795        // Happens when the kernel doesn't support sparse entry (-1) in the file descriptors array.
2796        error.FileDescriptorInvalid => return error.SkipZigTest,
2797        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2798    };
2799
2800    // Test IORING_REGISTER_FILES_UPDATE
2801    // Only available since Linux 5.5
2802
2803    const fd2 = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
2804    defer posix.close(fd2);
2805
2806    registered_fds[fd_index] = fd2;
2807    registered_fds[fd_index2] = -1;
2808    try ring.register_files_update(0, registered_fds[0..]);
2809
2810    var buffer = [_]u8{42} ** 128;
2811    {
2812        const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0);
2813        try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
2814        sqe.flags |= linux.IOSQE_FIXED_FILE;
2815
2816        try testing.expectEqual(@as(u32, 1), try ring.submit());
2817        try testing.expectEqual(linux.io_uring_cqe{
2818            .user_data = 0xcccccccc,
2819            .res = buffer.len,
2820            .flags = 0,
2821        }, try ring.copy_cqe());
2822        try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer.len), buffer[0..]);
2823    }
2824
2825    // Test with a non-zero offset
2826
2827    registered_fds[fd_index] = -1;
2828    registered_fds[fd_index2] = -1;
2829    try ring.register_files_update(1, registered_fds[1..]);
2830
2831    {
2832        // Next read should still work since fd_index in the registered file descriptors hasn't been updated yet.
2833        const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0);
2834        try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
2835        sqe.flags |= linux.IOSQE_FIXED_FILE;
2836
2837        try testing.expectEqual(@as(u32, 1), try ring.submit());
2838        try testing.expectEqual(linux.io_uring_cqe{
2839            .user_data = 0xcccccccc,
2840            .res = buffer.len,
2841            .flags = 0,
2842        }, try ring.copy_cqe());
2843        try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer.len), buffer[0..]);
2844    }
2845
2846    try ring.register_files_update(0, registered_fds[0..]);
2847
2848    {
2849        // Now this should fail since both fds are sparse (-1)
2850        const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0);
2851        try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
2852        sqe.flags |= linux.IOSQE_FIXED_FILE;
2853
2854        try testing.expectEqual(@as(u32, 1), try ring.submit());
2855        const cqe = try ring.copy_cqe();
2856        try testing.expectEqual(linux.E.BADF, cqe.err());
2857    }
2858
2859    try ring.unregister_files();
2860}
2861
2862test "shutdown" {
2863    if (!is_linux) return error.SkipZigTest;
2864
2865    var ring = IoUring.init(16, 0) catch |err| switch (err) {
2866        error.SystemOutdated => return error.SkipZigTest,
2867        error.PermissionDenied => return error.SkipZigTest,
2868        else => return err,
2869    };
2870    defer ring.deinit();
2871
2872    var address: linux.sockaddr.in = .{
2873        .port = 0,
2874        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
2875    };
2876
2877    // Socket bound, expect shutdown to work
2878    {
2879        const server = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
2880        defer posix.close(server);
2881        try posix.setsockopt(server, posix.SOL.SOCKET, posix.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1)));
2882        try posix.bind(server, addrAny(&address), @sizeOf(linux.sockaddr.in));
2883        try posix.listen(server, 1);
2884
2885        // set address to the OS-chosen IP/port.
2886        var slen: posix.socklen_t = @sizeOf(linux.sockaddr.in);
2887        try posix.getsockname(server, addrAny(&address), &slen);
2888
2889        const shutdown_sqe = try ring.shutdown(0x445445445, server, linux.SHUT.RD);
2890        try testing.expectEqual(linux.IORING_OP.SHUTDOWN, shutdown_sqe.opcode);
2891        try testing.expectEqual(@as(i32, server), shutdown_sqe.fd);
2892
2893        try testing.expectEqual(@as(u32, 1), try ring.submit());
2894
2895        const cqe = try ring.copy_cqe();
2896        switch (cqe.err()) {
2897            .SUCCESS => {},
2898            // This kernel's io_uring does not yet implement shutdown (kernel version < 5.11)
2899            .INVAL => return error.SkipZigTest,
2900            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2901        }
2902
2903        try testing.expectEqual(linux.io_uring_cqe{
2904            .user_data = 0x445445445,
2905            .res = 0,
2906            .flags = 0,
2907        }, cqe);
2908    }
2909
2910    // Socket not bound, expect to fail with ENOTCONN
2911    {
2912        const server = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
2913        defer posix.close(server);
2914
2915        const shutdown_sqe = ring.shutdown(0x445445445, server, linux.SHUT.RD) catch |err| switch (err) {
2916            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2917        };
2918        try testing.expectEqual(linux.IORING_OP.SHUTDOWN, shutdown_sqe.opcode);
2919        try testing.expectEqual(@as(i32, server), shutdown_sqe.fd);
2920
2921        try testing.expectEqual(@as(u32, 1), try ring.submit());
2922
2923        const cqe = try ring.copy_cqe();
2924        try testing.expectEqual(@as(u64, 0x445445445), cqe.user_data);
2925        try testing.expectEqual(linux.E.NOTCONN, cqe.err());
2926    }
2927}
2928
2929test "renameat" {
2930    if (!is_linux) return error.SkipZigTest;
2931
2932    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2933        error.SystemOutdated => return error.SkipZigTest,
2934        error.PermissionDenied => return error.SkipZigTest,
2935        else => return err,
2936    };
2937    defer ring.deinit();
2938
2939    const old_path = "test_io_uring_renameat_old";
2940    const new_path = "test_io_uring_renameat_new";
2941
2942    var tmp = std.testing.tmpDir(.{});
2943    defer tmp.cleanup();
2944
2945    // Write old file with data
2946
2947    const old_file = try tmp.dir.createFile(old_path, .{ .truncate = true, .mode = 0o666 });
2948    defer old_file.close();
2949    try old_file.writeAll("hello");
2950
2951    // Submit renameat
2952
2953    const sqe = try ring.renameat(
2954        0x12121212,
2955        tmp.dir.fd,
2956        old_path,
2957        tmp.dir.fd,
2958        new_path,
2959        0,
2960    );
2961    try testing.expectEqual(linux.IORING_OP.RENAMEAT, sqe.opcode);
2962    try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
2963    try testing.expectEqual(@as(i32, tmp.dir.fd), @as(i32, @bitCast(sqe.len)));
2964    try testing.expectEqual(@as(u32, 1), try ring.submit());
2965
2966    const cqe = try ring.copy_cqe();
2967    switch (cqe.err()) {
2968        .SUCCESS => {},
2969        // This kernel's io_uring does not yet implement renameat (kernel version < 5.11)
2970        .BADF, .INVAL => return error.SkipZigTest,
2971        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2972    }
2973    try testing.expectEqual(linux.io_uring_cqe{
2974        .user_data = 0x12121212,
2975        .res = 0,
2976        .flags = 0,
2977    }, cqe);
2978
2979    // Validate that the old file doesn't exist anymore
2980    try testing.expectError(error.FileNotFound, tmp.dir.openFile(old_path, .{}));
2981
2982    // Validate that the new file exists with the proper content
2983    var new_file_data: [16]u8 = undefined;
2984    try testing.expectEqualStrings("hello", try tmp.dir.readFile(new_path, &new_file_data));
2985}
2986
2987test "unlinkat" {
2988    if (!is_linux) return error.SkipZigTest;
2989
2990    var ring = IoUring.init(1, 0) catch |err| switch (err) {
2991        error.SystemOutdated => return error.SkipZigTest,
2992        error.PermissionDenied => return error.SkipZigTest,
2993        else => return err,
2994    };
2995    defer ring.deinit();
2996
2997    const path = "test_io_uring_unlinkat";
2998
2999    var tmp = std.testing.tmpDir(.{});
3000    defer tmp.cleanup();
3001
3002    // Write old file with data
3003
3004    const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
3005    defer file.close();
3006
3007    // Submit unlinkat
3008
3009    const sqe = try ring.unlinkat(
3010        0x12121212,
3011        tmp.dir.fd,
3012        path,
3013        0,
3014    );
3015    try testing.expectEqual(linux.IORING_OP.UNLINKAT, sqe.opcode);
3016    try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3017    try testing.expectEqual(@as(u32, 1), try ring.submit());
3018
3019    const cqe = try ring.copy_cqe();
3020    switch (cqe.err()) {
3021        .SUCCESS => {},
3022        // This kernel's io_uring does not yet implement unlinkat (kernel version < 5.11)
3023        .BADF, .INVAL => return error.SkipZigTest,
3024        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3025    }
3026    try testing.expectEqual(linux.io_uring_cqe{
3027        .user_data = 0x12121212,
3028        .res = 0,
3029        .flags = 0,
3030    }, cqe);
3031
3032    // Validate that the file doesn't exist anymore
3033    _ = tmp.dir.openFile(path, .{}) catch |err| switch (err) {
3034        error.FileNotFound => {},
3035        else => std.debug.panic("unexpected error: {}", .{err}),
3036    };
3037}
3038
3039test "mkdirat" {
3040    if (!is_linux) return error.SkipZigTest;
3041
3042    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3043        error.SystemOutdated => return error.SkipZigTest,
3044        error.PermissionDenied => return error.SkipZigTest,
3045        else => return err,
3046    };
3047    defer ring.deinit();
3048
3049    var tmp = std.testing.tmpDir(.{});
3050    defer tmp.cleanup();
3051
3052    const path = "test_io_uring_mkdirat";
3053
3054    // Submit mkdirat
3055
3056    const sqe = try ring.mkdirat(
3057        0x12121212,
3058        tmp.dir.fd,
3059        path,
3060        0o0755,
3061    );
3062    try testing.expectEqual(linux.IORING_OP.MKDIRAT, sqe.opcode);
3063    try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3064    try testing.expectEqual(@as(u32, 1), try ring.submit());
3065
3066    const cqe = try ring.copy_cqe();
3067    switch (cqe.err()) {
3068        .SUCCESS => {},
3069        // This kernel's io_uring does not yet implement mkdirat (kernel version < 5.15)
3070        .BADF, .INVAL => return error.SkipZigTest,
3071        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3072    }
3073    try testing.expectEqual(linux.io_uring_cqe{
3074        .user_data = 0x12121212,
3075        .res = 0,
3076        .flags = 0,
3077    }, cqe);
3078
3079    // Validate that the directory exist
3080    _ = try tmp.dir.openDir(path, .{});
3081}
3082
3083test "symlinkat" {
3084    if (!is_linux) return error.SkipZigTest;
3085
3086    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3087        error.SystemOutdated => return error.SkipZigTest,
3088        error.PermissionDenied => return error.SkipZigTest,
3089        else => return err,
3090    };
3091    defer ring.deinit();
3092
3093    var tmp = std.testing.tmpDir(.{});
3094    defer tmp.cleanup();
3095
3096    const path = "test_io_uring_symlinkat";
3097    const link_path = "test_io_uring_symlinkat_link";
3098
3099    const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
3100    defer file.close();
3101
3102    // Submit symlinkat
3103
3104    const sqe = try ring.symlinkat(
3105        0x12121212,
3106        path,
3107        tmp.dir.fd,
3108        link_path,
3109    );
3110    try testing.expectEqual(linux.IORING_OP.SYMLINKAT, sqe.opcode);
3111    try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3112    try testing.expectEqual(@as(u32, 1), try ring.submit());
3113
3114    const cqe = try ring.copy_cqe();
3115    switch (cqe.err()) {
3116        .SUCCESS => {},
3117        // This kernel's io_uring does not yet implement symlinkat (kernel version < 5.15)
3118        .BADF, .INVAL => return error.SkipZigTest,
3119        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3120    }
3121    try testing.expectEqual(linux.io_uring_cqe{
3122        .user_data = 0x12121212,
3123        .res = 0,
3124        .flags = 0,
3125    }, cqe);
3126
3127    // Validate that the symlink exist
3128    _ = try tmp.dir.openFile(link_path, .{});
3129}
3130
3131test "linkat" {
3132    if (!is_linux) return error.SkipZigTest;
3133
3134    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3135        error.SystemOutdated => return error.SkipZigTest,
3136        error.PermissionDenied => return error.SkipZigTest,
3137        else => return err,
3138    };
3139    defer ring.deinit();
3140
3141    var tmp = std.testing.tmpDir(.{});
3142    defer tmp.cleanup();
3143
3144    const first_path = "test_io_uring_linkat_first";
3145    const second_path = "test_io_uring_linkat_second";
3146
3147    // Write file with data
3148
3149    const first_file = try tmp.dir.createFile(first_path, .{ .truncate = true, .mode = 0o666 });
3150    defer first_file.close();
3151    try first_file.writeAll("hello");
3152
3153    // Submit linkat
3154
3155    const sqe = try ring.linkat(
3156        0x12121212,
3157        tmp.dir.fd,
3158        first_path,
3159        tmp.dir.fd,
3160        second_path,
3161        0,
3162    );
3163    try testing.expectEqual(linux.IORING_OP.LINKAT, sqe.opcode);
3164    try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3165    try testing.expectEqual(@as(i32, tmp.dir.fd), @as(i32, @bitCast(sqe.len)));
3166    try testing.expectEqual(@as(u32, 1), try ring.submit());
3167
3168    const cqe = try ring.copy_cqe();
3169    switch (cqe.err()) {
3170        .SUCCESS => {},
3171        // This kernel's io_uring does not yet implement linkat (kernel version < 5.15)
3172        .BADF, .INVAL => return error.SkipZigTest,
3173        else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3174    }
3175    try testing.expectEqual(linux.io_uring_cqe{
3176        .user_data = 0x12121212,
3177        .res = 0,
3178        .flags = 0,
3179    }, cqe);
3180
3181    // Validate the second file
3182    var second_file_data: [16]u8 = undefined;
3183    try testing.expectEqualStrings("hello", try tmp.dir.readFile(second_path, &second_file_data));
3184}
3185
3186test "provide_buffers: read" {
3187    if (!is_linux) return error.SkipZigTest;
3188
3189    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3190        error.SystemOutdated => return error.SkipZigTest,
3191        error.PermissionDenied => return error.SkipZigTest,
3192        else => return err,
3193    };
3194    defer ring.deinit();
3195
3196    const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
3197    defer posix.close(fd);
3198
3199    const group_id = 1337;
3200    const buffer_id = 0;
3201
3202    const buffer_len = 128;
3203
3204    var buffers: [4][buffer_len]u8 = undefined;
3205
3206    // Provide 4 buffers
3207
3208    {
3209        const sqe = try ring.provide_buffers(0xcccccccc, @as([*]u8, @ptrCast(&buffers)), buffer_len, buffers.len, group_id, buffer_id);
3210        try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
3211        try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
3212        try testing.expectEqual(@as(u32, buffers[0].len), sqe.len);
3213        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3214        try testing.expectEqual(@as(u32, 1), try ring.submit());
3215
3216        const cqe = try ring.copy_cqe();
3217        switch (cqe.err()) {
3218            // Happens when the kernel is < 5.7
3219            .INVAL, .BADF => return error.SkipZigTest,
3220            .SUCCESS => {},
3221            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3222        }
3223        try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
3224    }
3225
3226    // Do 4 reads which should consume all buffers
3227
3228    var i: usize = 0;
3229    while (i < buffers.len) : (i += 1) {
3230        const sqe = try ring.read(0xdededede, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3231        try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
3232        try testing.expectEqual(@as(i32, fd), sqe.fd);
3233        try testing.expectEqual(@as(u64, 0), sqe.addr);
3234        try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3235        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3236        try testing.expectEqual(@as(u32, 1), try ring.submit());
3237
3238        const cqe = try ring.copy_cqe();
3239        switch (cqe.err()) {
3240            .SUCCESS => {},
3241            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3242        }
3243
3244        try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3245        const used_buffer_id = cqe.flags >> 16;
3246        try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
3247        try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3248
3249        try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
3250        try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))]);
3251    }
3252
3253    // This read should fail
3254
3255    {
3256        const sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3257        try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
3258        try testing.expectEqual(@as(i32, fd), sqe.fd);
3259        try testing.expectEqual(@as(u64, 0), sqe.addr);
3260        try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3261        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3262        try testing.expectEqual(@as(u32, 1), try ring.submit());
3263
3264        const cqe = try ring.copy_cqe();
3265        switch (cqe.err()) {
3266            // Expected
3267            .NOBUFS => {},
3268            .SUCCESS => std.debug.panic("unexpected success", .{}),
3269            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3270        }
3271        try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3272    }
3273
3274    // Provide 1 buffer again
3275
3276    // Deliberately put something we don't expect in the buffers
3277    @memset(mem.sliceAsBytes(&buffers), 42);
3278
3279    const reprovided_buffer_id = 2;
3280
3281    {
3282        _ = try ring.provide_buffers(0xabababab, @as([*]u8, @ptrCast(&buffers[reprovided_buffer_id])), buffer_len, 1, group_id, reprovided_buffer_id);
3283        try testing.expectEqual(@as(u32, 1), try ring.submit());
3284
3285        const cqe = try ring.copy_cqe();
3286        switch (cqe.err()) {
3287            .SUCCESS => {},
3288            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3289        }
3290    }
3291
3292    // Final read which should work
3293
3294    {
3295        const sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3296        try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
3297        try testing.expectEqual(@as(i32, fd), sqe.fd);
3298        try testing.expectEqual(@as(u64, 0), sqe.addr);
3299        try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3300        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3301        try testing.expectEqual(@as(u32, 1), try ring.submit());
3302
3303        const cqe = try ring.copy_cqe();
3304        switch (cqe.err()) {
3305            .SUCCESS => {},
3306            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3307        }
3308
3309        try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3310        const used_buffer_id = cqe.flags >> 16;
3311        try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
3312        try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3313        try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3314        try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))]);
3315    }
3316}
3317
3318test "remove_buffers" {
3319    if (!is_linux) return error.SkipZigTest;
3320
3321    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3322        error.SystemOutdated => return error.SkipZigTest,
3323        error.PermissionDenied => return error.SkipZigTest,
3324        else => return err,
3325    };
3326    defer ring.deinit();
3327
3328    const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
3329    defer posix.close(fd);
3330
3331    const group_id = 1337;
3332    const buffer_id = 0;
3333
3334    const buffer_len = 128;
3335
3336    var buffers: [4][buffer_len]u8 = undefined;
3337
3338    // Provide 4 buffers
3339
3340    {
3341        _ = try ring.provide_buffers(0xcccccccc, @as([*]u8, @ptrCast(&buffers)), buffer_len, buffers.len, group_id, buffer_id);
3342        try testing.expectEqual(@as(u32, 1), try ring.submit());
3343
3344        const cqe = try ring.copy_cqe();
3345        switch (cqe.err()) {
3346            .INVAL, .BADF => return error.SkipZigTest,
3347            .SUCCESS => {},
3348            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3349        }
3350        try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
3351    }
3352
3353    // Remove 3 buffers
3354
3355    {
3356        const sqe = try ring.remove_buffers(0xbababababa, 3, group_id);
3357        try testing.expectEqual(linux.IORING_OP.REMOVE_BUFFERS, sqe.opcode);
3358        try testing.expectEqual(@as(i32, 3), sqe.fd);
3359        try testing.expectEqual(@as(u64, 0), sqe.addr);
3360        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3361        try testing.expectEqual(@as(u32, 1), try ring.submit());
3362
3363        const cqe = try ring.copy_cqe();
3364        switch (cqe.err()) {
3365            .SUCCESS => {},
3366            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3367        }
3368        try testing.expectEqual(@as(u64, 0xbababababa), cqe.user_data);
3369    }
3370
3371    // This read should work
3372
3373    {
3374        _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3375        try testing.expectEqual(@as(u32, 1), try ring.submit());
3376
3377        const cqe = try ring.copy_cqe();
3378        switch (cqe.err()) {
3379            .SUCCESS => {},
3380            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3381        }
3382
3383        try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3384        const used_buffer_id = cqe.flags >> 16;
3385        try testing.expect(used_buffer_id >= 0 and used_buffer_id < 4);
3386        try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3387        try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3388        try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))]);
3389    }
3390
3391    // Final read should _not_ work
3392
3393    {
3394        _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3395        try testing.expectEqual(@as(u32, 1), try ring.submit());
3396
3397        const cqe = try ring.copy_cqe();
3398        switch (cqe.err()) {
3399            // Expected
3400            .NOBUFS => {},
3401            .SUCCESS => std.debug.panic("unexpected success", .{}),
3402            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3403        }
3404    }
3405}
3406
3407test "provide_buffers: accept/connect/send/recv" {
3408    if (!is_linux) return error.SkipZigTest;
3409
3410    var ring = IoUring.init(16, 0) catch |err| switch (err) {
3411        error.SystemOutdated => return error.SkipZigTest,
3412        error.PermissionDenied => return error.SkipZigTest,
3413        else => return err,
3414    };
3415    defer ring.deinit();
3416
3417    const group_id = 1337;
3418    const buffer_id = 0;
3419
3420    const buffer_len = 128;
3421    var buffers: [4][buffer_len]u8 = undefined;
3422
3423    // Provide 4 buffers
3424
3425    {
3426        const sqe = try ring.provide_buffers(0xcccccccc, @as([*]u8, @ptrCast(&buffers)), buffer_len, buffers.len, group_id, buffer_id);
3427        try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
3428        try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
3429        try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3430        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3431        try testing.expectEqual(@as(u32, 1), try ring.submit());
3432
3433        const cqe = try ring.copy_cqe();
3434        switch (cqe.err()) {
3435            // Happens when the kernel is < 5.7
3436            .INVAL => return error.SkipZigTest,
3437            // Happens on the kernel 5.4
3438            .BADF => return error.SkipZigTest,
3439            .SUCCESS => {},
3440            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3441        }
3442        try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
3443    }
3444
3445    const socket_test_harness = try createSocketTestHarness(&ring);
3446    defer socket_test_harness.close();
3447
3448    // Do 4 send on the socket
3449
3450    {
3451        var i: usize = 0;
3452        while (i < buffers.len) : (i += 1) {
3453            _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'z'} ** buffer_len), 0);
3454            try testing.expectEqual(@as(u32, 1), try ring.submit());
3455        }
3456
3457        var cqes: [4]linux.io_uring_cqe = undefined;
3458        try testing.expectEqual(@as(u32, 4), try ring.copy_cqes(&cqes, 4));
3459    }
3460
3461    // Do 4 recv which should consume all buffers
3462
3463    // Deliberately put something we don't expect in the buffers
3464    @memset(mem.sliceAsBytes(&buffers), 1);
3465
3466    var i: usize = 0;
3467    while (i < buffers.len) : (i += 1) {
3468        const sqe = try ring.recv(0xdededede, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3469        try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
3470        try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
3471        try testing.expectEqual(@as(u64, 0), sqe.addr);
3472        try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3473        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3474        try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
3475        try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
3476        try testing.expectEqual(@as(u32, 1), try ring.submit());
3477
3478        const cqe = try ring.copy_cqe();
3479        switch (cqe.err()) {
3480            .SUCCESS => {},
3481            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3482        }
3483
3484        try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3485        const used_buffer_id = cqe.flags >> 16;
3486        try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
3487        try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3488
3489        try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
3490        const buffer = buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))];
3491        try testing.expectEqualSlices(u8, &([_]u8{'z'} ** buffer_len), buffer);
3492    }
3493
3494    // This recv should fail
3495
3496    {
3497        const sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3498        try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
3499        try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
3500        try testing.expectEqual(@as(u64, 0), sqe.addr);
3501        try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3502        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3503        try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
3504        try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
3505        try testing.expectEqual(@as(u32, 1), try ring.submit());
3506
3507        const cqe = try ring.copy_cqe();
3508        switch (cqe.err()) {
3509            // Expected
3510            .NOBUFS => {},
3511            .SUCCESS => std.debug.panic("unexpected success", .{}),
3512            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3513        }
3514        try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3515    }
3516
3517    // Provide 1 buffer again
3518
3519    const reprovided_buffer_id = 2;
3520
3521    {
3522        _ = try ring.provide_buffers(0xabababab, @as([*]u8, @ptrCast(&buffers[reprovided_buffer_id])), buffer_len, 1, group_id, reprovided_buffer_id);
3523        try testing.expectEqual(@as(u32, 1), try ring.submit());
3524
3525        const cqe = try ring.copy_cqe();
3526        switch (cqe.err()) {
3527            .SUCCESS => {},
3528            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3529        }
3530    }
3531
3532    // Redo 1 send on the server socket
3533
3534    {
3535        _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'w'} ** buffer_len), 0);
3536        try testing.expectEqual(@as(u32, 1), try ring.submit());
3537
3538        _ = try ring.copy_cqe();
3539    }
3540
3541    // Final recv which should work
3542
3543    // Deliberately put something we don't expect in the buffers
3544    @memset(mem.sliceAsBytes(&buffers), 1);
3545
3546    {
3547        const sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3548        try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
3549        try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
3550        try testing.expectEqual(@as(u64, 0), sqe.addr);
3551        try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3552        try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3553        try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
3554        try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
3555        try testing.expectEqual(@as(u32, 1), try ring.submit());
3556
3557        const cqe = try ring.copy_cqe();
3558        switch (cqe.err()) {
3559            .SUCCESS => {},
3560            else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3561        }
3562
3563        try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3564        const used_buffer_id = cqe.flags >> 16;
3565        try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
3566        try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3567        try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3568        const buffer = buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))];
3569        try testing.expectEqualSlices(u8, &([_]u8{'w'} ** buffer_len), buffer);
3570    }
3571}
3572
3573/// Used for testing server/client interactions.
3574const SocketTestHarness = struct {
3575    listener: posix.socket_t,
3576    server: posix.socket_t,
3577    client: posix.socket_t,
3578
3579    fn close(self: SocketTestHarness) void {
3580        posix.close(self.client);
3581        posix.close(self.listener);
3582    }
3583};
3584
3585fn createSocketTestHarness(ring: *IoUring) !SocketTestHarness {
3586    // Create a TCP server socket
3587    var address: linux.sockaddr.in = .{
3588        .port = 0,
3589        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3590    };
3591    const listener_socket = try createListenerSocket(&address);
3592    errdefer posix.close(listener_socket);
3593
3594    // Submit 1 accept
3595    var accept_addr: posix.sockaddr = undefined;
3596    var accept_addr_len: posix.socklen_t = @sizeOf(@TypeOf(accept_addr));
3597    _ = try ring.accept(0xaaaaaaaa, listener_socket, &accept_addr, &accept_addr_len, 0);
3598
3599    // Create a TCP client socket
3600    const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3601    errdefer posix.close(client);
3602    _ = try ring.connect(0xcccccccc, client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3603
3604    try testing.expectEqual(@as(u32, 2), try ring.submit());
3605
3606    var cqe_accept = try ring.copy_cqe();
3607    if (cqe_accept.err() == .INVAL) return error.SkipZigTest;
3608    var cqe_connect = try ring.copy_cqe();
3609    if (cqe_connect.err() == .INVAL) return error.SkipZigTest;
3610
3611    // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first:
3612    if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) {
3613        const a = cqe_accept;
3614        const b = cqe_connect;
3615        cqe_accept = b;
3616        cqe_connect = a;
3617    }
3618
3619    try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data);
3620    if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res});
3621    try testing.expect(cqe_accept.res > 0);
3622    try testing.expectEqual(@as(u32, 0), cqe_accept.flags);
3623    try testing.expectEqual(linux.io_uring_cqe{
3624        .user_data = 0xcccccccc,
3625        .res = 0,
3626        .flags = 0,
3627    }, cqe_connect);
3628
3629    // All good
3630
3631    return SocketTestHarness{
3632        .listener = listener_socket,
3633        .server = cqe_accept.res,
3634        .client = client,
3635    };
3636}
3637
3638fn createListenerSocket(address: *linux.sockaddr.in) !posix.socket_t {
3639    const kernel_backlog = 1;
3640    const listener_socket = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3641    errdefer posix.close(listener_socket);
3642
3643    try posix.setsockopt(listener_socket, posix.SOL.SOCKET, posix.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1)));
3644    try posix.bind(listener_socket, addrAny(address), @sizeOf(linux.sockaddr.in));
3645    try posix.listen(listener_socket, kernel_backlog);
3646
3647    // set address to the OS-chosen IP/port.
3648    var slen: posix.socklen_t = @sizeOf(linux.sockaddr.in);
3649    try posix.getsockname(listener_socket, addrAny(address), &slen);
3650
3651    return listener_socket;
3652}
3653
3654test "accept multishot" {
3655    if (!is_linux) return error.SkipZigTest;
3656
3657    var ring = IoUring.init(16, 0) catch |err| switch (err) {
3658        error.SystemOutdated => return error.SkipZigTest,
3659        error.PermissionDenied => return error.SkipZigTest,
3660        else => return err,
3661    };
3662    defer ring.deinit();
3663
3664    var address: linux.sockaddr.in = .{
3665        .port = 0,
3666        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3667    };
3668    const listener_socket = try createListenerSocket(&address);
3669    defer posix.close(listener_socket);
3670
3671    // submit multishot accept operation
3672    var addr: posix.sockaddr = undefined;
3673    var addr_len: posix.socklen_t = @sizeOf(@TypeOf(addr));
3674    const userdata: u64 = 0xaaaaaaaa;
3675    _ = try ring.accept_multishot(userdata, listener_socket, &addr, &addr_len, 0);
3676    try testing.expectEqual(@as(u32, 1), try ring.submit());
3677
3678    var nr: usize = 4; // number of clients to connect
3679    while (nr > 0) : (nr -= 1) {
3680        // connect client
3681        const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3682        errdefer posix.close(client);
3683        try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3684
3685        // test accept completion
3686        var cqe = try ring.copy_cqe();
3687        if (cqe.err() == .INVAL) return error.SkipZigTest;
3688        try testing.expect(cqe.res > 0);
3689        try testing.expect(cqe.user_data == userdata);
3690        try testing.expect(cqe.flags & linux.IORING_CQE_F_MORE > 0); // more flag is set
3691
3692        posix.close(client);
3693    }
3694}
3695
3696test "accept/connect/send_zc/recv" {
3697    try skipKernelLessThan(.{ .major = 6, .minor = 0, .patch = 0 });
3698
3699    var ring = IoUring.init(16, 0) catch |err| switch (err) {
3700        error.SystemOutdated => return error.SkipZigTest,
3701        error.PermissionDenied => return error.SkipZigTest,
3702        else => return err,
3703    };
3704    defer ring.deinit();
3705
3706    const socket_test_harness = try createSocketTestHarness(&ring);
3707    defer socket_test_harness.close();
3708
3709    const buffer_send = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
3710    var buffer_recv = [_]u8{0} ** 10;
3711
3712    // zero-copy send
3713    const sqe_send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0);
3714    sqe_send.flags |= linux.IOSQE_IO_LINK;
3715    _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
3716    try testing.expectEqual(@as(u32, 2), try ring.submit());
3717
3718    var cqe_send = try ring.copy_cqe();
3719    // First completion of zero-copy send.
3720    // IORING_CQE_F_MORE, means that there
3721    // will be a second completion event / notification for the
3722    // request, with the user_data field set to the same value.
3723    // buffer_send must be keep alive until second cqe.
3724    try testing.expectEqual(linux.io_uring_cqe{
3725        .user_data = 0xeeeeeeee,
3726        .res = buffer_send.len,
3727        .flags = linux.IORING_CQE_F_MORE,
3728    }, cqe_send);
3729
3730    cqe_send, const cqe_recv = brk: {
3731        const cqe1 = try ring.copy_cqe();
3732        const cqe2 = try ring.copy_cqe();
3733        break :brk if (cqe1.user_data == 0xeeeeeeee) .{ cqe1, cqe2 } else .{ cqe2, cqe1 };
3734    };
3735
3736    try testing.expectEqual(linux.io_uring_cqe{
3737        .user_data = 0xffffffff,
3738        .res = buffer_recv.len,
3739        .flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
3740    }, cqe_recv);
3741    try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
3742
3743    // Second completion of zero-copy send.
3744    // IORING_CQE_F_NOTIF in flags signals that kernel is done with send_buffer
3745    try testing.expectEqual(linux.io_uring_cqe{
3746        .user_data = 0xeeeeeeee,
3747        .res = 0,
3748        .flags = linux.IORING_CQE_F_NOTIF,
3749    }, cqe_send);
3750}
3751
3752test "accept_direct" {
3753    try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3754
3755    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3756        error.SystemOutdated => return error.SkipZigTest,
3757        error.PermissionDenied => return error.SkipZigTest,
3758        else => return err,
3759    };
3760    defer ring.deinit();
3761    var address: linux.sockaddr.in = .{
3762        .port = 0,
3763        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3764    };
3765
3766    // register direct file descriptors
3767    var registered_fds = [_]linux.fd_t{-1} ** 2;
3768    try ring.register_files(registered_fds[0..]);
3769
3770    const listener_socket = try createListenerSocket(&address);
3771    defer posix.close(listener_socket);
3772
3773    const accept_userdata: u64 = 0xaaaaaaaa;
3774    const read_userdata: u64 = 0xbbbbbbbb;
3775    const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
3776
3777    for (0..2) |_| {
3778        for (registered_fds, 0..) |_, i| {
3779            var buffer_recv = [_]u8{0} ** 16;
3780            const buffer_send: []const u8 = data[0 .. data.len - i]; // make it different at each loop
3781
3782            // submit accept, will chose registered fd and return index in cqe
3783            _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0);
3784            try testing.expectEqual(@as(u32, 1), try ring.submit());
3785
3786            // connect
3787            const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3788            try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3789            defer posix.close(client);
3790
3791            // accept completion
3792            const cqe_accept = try ring.copy_cqe();
3793            try testing.expectEqual(posix.E.SUCCESS, cqe_accept.err());
3794            const fd_index = cqe_accept.res;
3795            try testing.expect(fd_index < registered_fds.len);
3796            try testing.expect(cqe_accept.user_data == accept_userdata);
3797
3798            // send data
3799            _ = try posix.send(client, buffer_send, 0);
3800
3801            // Example of how to use registered fd:
3802            // Submit receive to fixed file returned by accept (fd_index).
3803            // Fd field is set to registered file index, returned by accept.
3804            // Flag linux.IOSQE_FIXED_FILE must be set.
3805            const recv_sqe = try ring.recv(read_userdata, fd_index, .{ .buffer = &buffer_recv }, 0);
3806            recv_sqe.flags |= linux.IOSQE_FIXED_FILE;
3807            try testing.expectEqual(@as(u32, 1), try ring.submit());
3808
3809            // accept receive
3810            const recv_cqe = try ring.copy_cqe();
3811            try testing.expect(recv_cqe.user_data == read_userdata);
3812            try testing.expect(recv_cqe.res == buffer_send.len);
3813            try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]);
3814        }
3815        // no more available fds, accept will get NFILE error
3816        {
3817            // submit accept
3818            _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0);
3819            try testing.expectEqual(@as(u32, 1), try ring.submit());
3820            // connect
3821            const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3822            try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3823            defer posix.close(client);
3824            // completion with error
3825            const cqe_accept = try ring.copy_cqe();
3826            try testing.expect(cqe_accept.user_data == accept_userdata);
3827            try testing.expectEqual(posix.E.NFILE, cqe_accept.err());
3828        }
3829        // return file descriptors to kernel
3830        try ring.register_files_update(0, registered_fds[0..]);
3831    }
3832    try ring.unregister_files();
3833}
3834
3835test "accept_multishot_direct" {
3836    try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3837
3838    if (builtin.cpu.arch == .riscv64) {
3839        // https://github.com/ziglang/zig/issues/25734
3840        return error.SkipZigTest;
3841    }
3842
3843    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3844        error.SystemOutdated => return error.SkipZigTest,
3845        error.PermissionDenied => return error.SkipZigTest,
3846        else => return err,
3847    };
3848    defer ring.deinit();
3849
3850    var address: linux.sockaddr.in = .{
3851        .port = 0,
3852        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3853    };
3854
3855    var registered_fds = [_]linux.fd_t{-1} ** 2;
3856    try ring.register_files(registered_fds[0..]);
3857
3858    const listener_socket = try createListenerSocket(&address);
3859    defer posix.close(listener_socket);
3860
3861    const accept_userdata: u64 = 0xaaaaaaaa;
3862
3863    for (0..2) |_| {
3864        // submit multishot accept
3865        // Will chose registered fd and return index of the selected registered file in cqe.
3866        _ = try ring.accept_multishot_direct(accept_userdata, listener_socket, null, null, 0);
3867        try testing.expectEqual(@as(u32, 1), try ring.submit());
3868
3869        for (registered_fds) |_| {
3870            // connect
3871            const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3872            try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3873            defer posix.close(client);
3874
3875            // accept completion
3876            const cqe_accept = try ring.copy_cqe();
3877            const fd_index = cqe_accept.res;
3878            try testing.expect(fd_index < registered_fds.len);
3879            try testing.expect(cqe_accept.user_data == accept_userdata);
3880            try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE > 0); // has more is set
3881        }
3882        // No more available fds, accept will get NFILE error.
3883        // Multishot is terminated (more flag is not set).
3884        {
3885            // connect
3886            const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3887            try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3888            defer posix.close(client);
3889            // completion with error
3890            const cqe_accept = try ring.copy_cqe();
3891            try testing.expect(cqe_accept.user_data == accept_userdata);
3892            try testing.expectEqual(posix.E.NFILE, cqe_accept.err());
3893            try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE == 0); // has more is not set
3894        }
3895        // return file descriptors to kernel
3896        try ring.register_files_update(0, registered_fds[0..]);
3897    }
3898    try ring.unregister_files();
3899}
3900
3901test "socket" {
3902    try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3903
3904    var ring = IoUring.init(1, 0) catch |err| switch (err) {
3905        error.SystemOutdated => return error.SkipZigTest,
3906        error.PermissionDenied => return error.SkipZigTest,
3907        else => return err,
3908    };
3909    defer ring.deinit();
3910
3911    // prepare, submit socket operation
3912    _ = try ring.socket(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0);
3913    try testing.expectEqual(@as(u32, 1), try ring.submit());
3914
3915    // test completion
3916    var cqe = try ring.copy_cqe();
3917    try testing.expectEqual(posix.E.SUCCESS, cqe.err());
3918    const fd: linux.fd_t = @intCast(cqe.res);
3919    try testing.expect(fd > 2);
3920
3921    posix.close(fd);
3922}
3923
3924test "socket_direct/socket_direct_alloc/close_direct" {
3925    try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3926
3927    var ring = IoUring.init(2, 0) catch |err| switch (err) {
3928        error.SystemOutdated => return error.SkipZigTest,
3929        error.PermissionDenied => return error.SkipZigTest,
3930        else => return err,
3931    };
3932    defer ring.deinit();
3933
3934    var registered_fds = [_]linux.fd_t{-1} ** 3;
3935    try ring.register_files(registered_fds[0..]);
3936
3937    // create socket in registered file descriptor at index 0 (last param)
3938    _ = try ring.socket_direct(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0, 0);
3939    try testing.expectEqual(@as(u32, 1), try ring.submit());
3940    var cqe_socket = try ring.copy_cqe();
3941    try testing.expectEqual(posix.E.SUCCESS, cqe_socket.err());
3942    try testing.expect(cqe_socket.res == 0);
3943
3944    // create socket in registered file descriptor at index 1 (last param)
3945    _ = try ring.socket_direct(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0, 1);
3946    try testing.expectEqual(@as(u32, 1), try ring.submit());
3947    cqe_socket = try ring.copy_cqe();
3948    try testing.expectEqual(posix.E.SUCCESS, cqe_socket.err());
3949    try testing.expect(cqe_socket.res == 0); // res is 0 when index is specified
3950
3951    // create socket in kernel chosen file descriptor index (_alloc version)
3952    // completion res has index from registered files
3953    _ = try ring.socket_direct_alloc(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0);
3954    try testing.expectEqual(@as(u32, 1), try ring.submit());
3955    cqe_socket = try ring.copy_cqe();
3956    try testing.expectEqual(posix.E.SUCCESS, cqe_socket.err());
3957    try testing.expect(cqe_socket.res == 2); // returns registered file index
3958
3959    // use sockets from registered_fds in connect operation
3960    var address: linux.sockaddr.in = .{
3961        .port = 0,
3962        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3963    };
3964    const listener_socket = try createListenerSocket(&address);
3965    defer posix.close(listener_socket);
3966    const accept_userdata: u64 = 0xaaaaaaaa;
3967    const connect_userdata: u64 = 0xbbbbbbbb;
3968    const close_userdata: u64 = 0xcccccccc;
3969    for (registered_fds, 0..) |_, fd_index| {
3970        // prepare accept
3971        _ = try ring.accept(accept_userdata, listener_socket, null, null, 0);
3972        // prepare connect with fixed socket
3973        const connect_sqe = try ring.connect(connect_userdata, @intCast(fd_index), addrAny(&address), @sizeOf(linux.sockaddr.in));
3974        connect_sqe.flags |= linux.IOSQE_FIXED_FILE; // fd is fixed file index
3975        // submit both
3976        try testing.expectEqual(@as(u32, 2), try ring.submit());
3977        // get completions
3978        var cqe_connect = try ring.copy_cqe();
3979        var cqe_accept = try ring.copy_cqe();
3980        // ignore order
3981        if (cqe_connect.user_data == accept_userdata and cqe_accept.user_data == connect_userdata) {
3982            const a = cqe_accept;
3983            const b = cqe_connect;
3984            cqe_accept = b;
3985            cqe_connect = a;
3986        }
3987        // test connect completion
3988        try testing.expect(cqe_connect.user_data == connect_userdata);
3989        try testing.expectEqual(posix.E.SUCCESS, cqe_connect.err());
3990        // test accept completion
3991        try testing.expect(cqe_accept.user_data == accept_userdata);
3992        try testing.expectEqual(posix.E.SUCCESS, cqe_accept.err());
3993
3994        //  submit and test close_direct
3995        _ = try ring.close_direct(close_userdata, @intCast(fd_index));
3996        try testing.expectEqual(@as(u32, 1), try ring.submit());
3997        var cqe_close = try ring.copy_cqe();
3998        try testing.expect(cqe_close.user_data == close_userdata);
3999        try testing.expectEqual(posix.E.SUCCESS, cqe_close.err());
4000    }
4001
4002    try ring.unregister_files();
4003}
4004
4005test "openat_direct/close_direct" {
4006    try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
4007
4008    var ring = IoUring.init(2, 0) catch |err| switch (err) {
4009        error.SystemOutdated => return error.SkipZigTest,
4010        error.PermissionDenied => return error.SkipZigTest,
4011        else => return err,
4012    };
4013    defer ring.deinit();
4014
4015    var registered_fds = [_]linux.fd_t{-1} ** 3;
4016    try ring.register_files(registered_fds[0..]);
4017
4018    var tmp = std.testing.tmpDir(.{});
4019    defer tmp.cleanup();
4020    const path = "test_io_uring_close_direct";
4021    const flags: linux.O = .{ .ACCMODE = .RDWR, .CREAT = true };
4022    const mode: posix.mode_t = 0o666;
4023    const user_data: u64 = 0;
4024
4025    // use registered file at index 0 (last param)
4026    _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 0);
4027    try testing.expectEqual(@as(u32, 1), try ring.submit());
4028    var cqe = try ring.copy_cqe();
4029    try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4030    try testing.expect(cqe.res == 0);
4031
4032    // use registered file at index 1
4033    _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 1);
4034    try testing.expectEqual(@as(u32, 1), try ring.submit());
4035    cqe = try ring.copy_cqe();
4036    try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4037    try testing.expect(cqe.res == 0); // res is 0 when we specify index
4038
4039    // let kernel choose registered file index
4040    _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, linux.IORING_FILE_INDEX_ALLOC);
4041    try testing.expectEqual(@as(u32, 1), try ring.submit());
4042    cqe = try ring.copy_cqe();
4043    try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4044    try testing.expect(cqe.res == 2); // chosen index is in res
4045
4046    // close all open file descriptors
4047    for (registered_fds, 0..) |_, fd_index| {
4048        _ = try ring.close_direct(user_data, @intCast(fd_index));
4049        try testing.expectEqual(@as(u32, 1), try ring.submit());
4050        var cqe_close = try ring.copy_cqe();
4051        try testing.expectEqual(posix.E.SUCCESS, cqe_close.err());
4052    }
4053    try ring.unregister_files();
4054}
4055
4056test "waitid" {
4057    try skipKernelLessThan(.{ .major = 6, .minor = 7, .patch = 0 });
4058
4059    var ring = IoUring.init(16, 0) catch |err| switch (err) {
4060        error.SystemOutdated => return error.SkipZigTest,
4061        error.PermissionDenied => return error.SkipZigTest,
4062        else => return err,
4063    };
4064    defer ring.deinit();
4065
4066    const pid = try posix.fork();
4067    if (pid == 0) {
4068        posix.exit(7);
4069    }
4070
4071    var siginfo: posix.siginfo_t = undefined;
4072    _ = try ring.waitid(0, .PID, pid, &siginfo, posix.W.EXITED, 0);
4073
4074    try testing.expectEqual(1, try ring.submit());
4075
4076    const cqe_waitid = try ring.copy_cqe();
4077    try testing.expectEqual(0, cqe_waitid.res);
4078    try testing.expectEqual(pid, siginfo.fields.common.first.piduid.pid);
4079    try testing.expectEqual(7, siginfo.fields.common.second.sigchld.status);
4080}
4081
4082/// For use in tests. Returns SkipZigTest if kernel version is less than required.
4083inline fn skipKernelLessThan(required: std.SemanticVersion) !void {
4084    if (!is_linux) return error.SkipZigTest;
4085
4086    var uts: linux.utsname = undefined;
4087    const res = linux.uname(&uts);
4088    switch (linux.errno(res)) {
4089        .SUCCESS => {},
4090        else => |errno| return posix.unexpectedErrno(errno),
4091    }
4092
4093    const release = mem.sliceTo(&uts.release, 0);
4094    // Strips potential extra, as kernel version might not be semver compliant, example "6.8.9-300.fc40.x86_64"
4095    const extra_index = std.mem.indexOfAny(u8, release, "-+");
4096    const stripped = release[0..(extra_index orelse release.len)];
4097    // Make sure the input don't rely on the extra we just stripped
4098    try testing.expect(required.pre == null and required.build == null);
4099
4100    var current = try std.SemanticVersion.parse(stripped);
4101    current.pre = null; // don't check pre field
4102    if (required.order(current) == .gt) return error.SkipZigTest;
4103}
4104
4105test BufferGroup {
4106    if (!is_linux) return error.SkipZigTest;
4107
4108    // Init IoUring
4109    var ring = IoUring.init(16, 0) catch |err| switch (err) {
4110        error.SystemOutdated => return error.SkipZigTest,
4111        error.PermissionDenied => return error.SkipZigTest,
4112        else => return err,
4113    };
4114    defer ring.deinit();
4115
4116    // Init buffer group for ring
4117    const group_id: u16 = 1; // buffers group id
4118    const buffers_count: u16 = 1; // number of buffers in buffer group
4119    const buffer_size: usize = 128; // size of each buffer in group
4120    var buf_grp = BufferGroup.init(
4121        &ring,
4122        testing.allocator,
4123        group_id,
4124        buffer_size,
4125        buffers_count,
4126    ) catch |err| switch (err) {
4127        // kernel older than 5.19
4128        error.ArgumentsInvalid => return error.SkipZigTest,
4129        else => return err,
4130    };
4131    defer buf_grp.deinit(testing.allocator);
4132
4133    // Create client/server fds
4134    const fds = try createSocketTestHarness(&ring);
4135    defer fds.close();
4136    const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
4137
4138    // Client sends data
4139    {
4140        _ = try ring.send(1, fds.client, data[0..], 0);
4141        const submitted = try ring.submit();
4142        try testing.expectEqual(1, submitted);
4143        const cqe_send = try ring.copy_cqe();
4144        if (cqe_send.err() == .INVAL) return error.SkipZigTest;
4145        try testing.expectEqual(linux.io_uring_cqe{ .user_data = 1, .res = data.len, .flags = 0 }, cqe_send);
4146    }
4147
4148    // Server uses buffer group receive
4149    {
4150        // Submit recv operation, buffer will be chosen from buffer group
4151        _ = try buf_grp.recv(2, fds.server, 0);
4152        const submitted = try ring.submit();
4153        try testing.expectEqual(1, submitted);
4154
4155        // ... when we have completion for recv operation
4156        const cqe = try ring.copy_cqe();
4157        try testing.expectEqual(2, cqe.user_data); // matches submitted user_data
4158        try testing.expect(cqe.res >= 0); // success
4159        try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4160        try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len
4161
4162        // Get buffer from pool
4163        const buf = try buf_grp.get(cqe);
4164        try testing.expectEqualSlices(u8, &data, buf);
4165        // Release buffer to the kernel when application is done with it
4166        try buf_grp.put(cqe);
4167    }
4168}
4169
4170test "ring mapped buffers recv" {
4171    if (!is_linux) return error.SkipZigTest;
4172
4173    var ring = IoUring.init(16, 0) catch |err| switch (err) {
4174        error.SystemOutdated => return error.SkipZigTest,
4175        error.PermissionDenied => return error.SkipZigTest,
4176        else => return err,
4177    };
4178    defer ring.deinit();
4179
4180    // init buffer group
4181    const group_id: u16 = 1; // buffers group id
4182    const buffers_count: u16 = 2; // number of buffers in buffer group
4183    const buffer_size: usize = 4; // size of each buffer in group
4184    var buf_grp = BufferGroup.init(
4185        &ring,
4186        testing.allocator,
4187        group_id,
4188        buffer_size,
4189        buffers_count,
4190    ) catch |err| switch (err) {
4191        // kernel older than 5.19
4192        error.ArgumentsInvalid => return error.SkipZigTest,
4193        else => return err,
4194    };
4195    defer buf_grp.deinit(testing.allocator);
4196
4197    // create client/server fds
4198    const fds = try createSocketTestHarness(&ring);
4199    defer fds.close();
4200
4201    // for random user_data in sqe/cqe
4202    var Rnd = std.Random.DefaultPrng.init(std.testing.random_seed);
4203    var rnd = Rnd.random();
4204
4205    var round: usize = 4; // repeat send/recv cycle round times
4206    while (round > 0) : (round -= 1) {
4207        // client sends data
4208        const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
4209        {
4210            const user_data = rnd.int(u64);
4211            _ = try ring.send(user_data, fds.client, data[0..], 0);
4212            try testing.expectEqual(@as(u32, 1), try ring.submit());
4213            const cqe_send = try ring.copy_cqe();
4214            if (cqe_send.err() == .INVAL) return error.SkipZigTest;
4215            try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
4216        }
4217        var pos: usize = 0;
4218
4219        // read first chunk
4220        const cqe1 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
4221        var buf = try buf_grp.get(cqe1);
4222        try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
4223        pos += buf.len;
4224        // second chunk
4225        const cqe2 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
4226        buf = try buf_grp.get(cqe2);
4227        try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
4228        pos += buf.len;
4229
4230        // both buffers provided to the kernel are used so we get error
4231        // 'no more buffers', until we put buffers to the kernel
4232        {
4233            const user_data = rnd.int(u64);
4234            _ = try buf_grp.recv(user_data, fds.server, 0);
4235            try testing.expectEqual(@as(u32, 1), try ring.submit());
4236            const cqe = try ring.copy_cqe();
4237            try testing.expectEqual(user_data, cqe.user_data);
4238            try testing.expect(cqe.res < 0); // fail
4239            try testing.expectEqual(posix.E.NOBUFS, cqe.err());
4240            try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
4241            try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
4242        }
4243
4244        // put buffers back to the kernel
4245        try buf_grp.put(cqe1);
4246        try buf_grp.put(cqe2);
4247
4248        // read remaining data
4249        while (pos < data.len) {
4250            const cqe = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
4251            buf = try buf_grp.get(cqe);
4252            try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
4253            pos += buf.len;
4254            try buf_grp.put(cqe);
4255        }
4256    }
4257}
4258
4259test "ring mapped buffers multishot recv" {
4260    if (!is_linux) return error.SkipZigTest;
4261
4262    var ring = IoUring.init(16, 0) catch |err| switch (err) {
4263        error.SystemOutdated => return error.SkipZigTest,
4264        error.PermissionDenied => return error.SkipZigTest,
4265        else => return err,
4266    };
4267    defer ring.deinit();
4268
4269    // init buffer group
4270    const group_id: u16 = 1; // buffers group id
4271    const buffers_count: u16 = 2; // number of buffers in buffer group
4272    const buffer_size: usize = 4; // size of each buffer in group
4273    var buf_grp = BufferGroup.init(
4274        &ring,
4275        testing.allocator,
4276        group_id,
4277        buffer_size,
4278        buffers_count,
4279    ) catch |err| switch (err) {
4280        // kernel older than 5.19
4281        error.ArgumentsInvalid => return error.SkipZigTest,
4282        else => return err,
4283    };
4284    defer buf_grp.deinit(testing.allocator);
4285
4286    // create client/server fds
4287    const fds = try createSocketTestHarness(&ring);
4288    defer fds.close();
4289
4290    // for random user_data in sqe/cqe
4291    var Rnd = std.Random.DefaultPrng.init(std.testing.random_seed);
4292    var rnd = Rnd.random();
4293
4294    var round: usize = 4; // repeat send/recv cycle round times
4295    while (round > 0) : (round -= 1) {
4296        // client sends data
4297        const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf };
4298        {
4299            const user_data = rnd.int(u64);
4300            _ = try ring.send(user_data, fds.client, data[0..], 0);
4301            try testing.expectEqual(@as(u32, 1), try ring.submit());
4302            const cqe_send = try ring.copy_cqe();
4303            if (cqe_send.err() == .INVAL) return error.SkipZigTest;
4304            try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
4305        }
4306
4307        // start multishot recv
4308        var recv_user_data = rnd.int(u64);
4309        _ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
4310        try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
4311
4312        // server reads data into provided buffers
4313        // there are 2 buffers of size 4, so each read gets only chunk of data
4314        // we read four chunks of 4, 4, 4, 4 bytes each
4315        var chunk: []const u8 = data[0..buffer_size]; // first chunk
4316        const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4317        try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0);
4318
4319        chunk = data[buffer_size .. buffer_size * 2]; // second chunk
4320        const cqe2 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4321        try testing.expect(cqe2.flags & linux.IORING_CQE_F_MORE > 0);
4322
4323        // both buffers provided to the kernel are used so we get error
4324        // 'no more buffers', until we put buffers to the kernel
4325        {
4326            const cqe = try ring.copy_cqe();
4327            try testing.expectEqual(recv_user_data, cqe.user_data);
4328            try testing.expect(cqe.res < 0); // fail
4329            try testing.expectEqual(posix.E.NOBUFS, cqe.err());
4330            try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
4331            // has more is not set
4332            // indicates that multishot is finished
4333            try testing.expect(cqe.flags & linux.IORING_CQE_F_MORE == 0);
4334            try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
4335        }
4336
4337        // put buffers back to the kernel
4338        try buf_grp.put(cqe1);
4339        try buf_grp.put(cqe2);
4340
4341        // restart multishot
4342        recv_user_data = rnd.int(u64);
4343        _ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
4344        try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
4345
4346        chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
4347        const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4348        try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0);
4349        try buf_grp.put(cqe3);
4350
4351        chunk = data[buffer_size * 3 ..]; // last chunk
4352        const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4353        try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0);
4354        try buf_grp.put(cqe4);
4355
4356        // cancel pending multishot recv operation
4357        {
4358            const cancel_user_data = rnd.int(u64);
4359            _ = try ring.cancel(cancel_user_data, recv_user_data, 0);
4360            try testing.expectEqual(@as(u32, 1), try ring.submit());
4361
4362            // expect completion of cancel operation and completion of recv operation
4363            var cqe_cancel = try ring.copy_cqe();
4364            if (cqe_cancel.err() == .INVAL) return error.SkipZigTest;
4365            var cqe_recv = try ring.copy_cqe();
4366            if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
4367
4368            // don't depend on order of completions
4369            if (cqe_cancel.user_data == recv_user_data and cqe_recv.user_data == cancel_user_data) {
4370                const a = cqe_cancel;
4371                const b = cqe_recv;
4372                cqe_cancel = b;
4373                cqe_recv = a;
4374            }
4375
4376            // Note on different kernel results:
4377            // on older kernel (tested with v6.0.16, v6.1.57, v6.2.12, v6.4.16)
4378            //   cqe_cancel.err() == .NOENT
4379            //   cqe_recv.err() == .NOBUFS
4380            // on kernel (tested with v6.5.0, v6.5.7)
4381            //   cqe_cancel.err() == .SUCCESS
4382            //   cqe_recv.err() == .CANCELED
4383            // Upstream reference: https://github.com/axboe/liburing/issues/984
4384
4385            // cancel operation is success (or NOENT on older kernels)
4386            try testing.expectEqual(cancel_user_data, cqe_cancel.user_data);
4387            try testing.expect(cqe_cancel.err() == .NOENT or cqe_cancel.err() == .SUCCESS);
4388
4389            // recv operation is failed with err CANCELED (or NOBUFS on older kernels)
4390            try testing.expectEqual(recv_user_data, cqe_recv.user_data);
4391            try testing.expect(cqe_recv.res < 0);
4392            try testing.expect(cqe_recv.err() == .NOBUFS or cqe_recv.err() == .CANCELED);
4393            try testing.expect(cqe_recv.flags & linux.IORING_CQE_F_MORE == 0);
4394        }
4395    }
4396}
4397
4398// Prepare, submit recv and get cqe using buffer group.
4399fn buf_grp_recv_submit_get_cqe(
4400    ring: *IoUring,
4401    buf_grp: *BufferGroup,
4402    fd: linux.fd_t,
4403    user_data: u64,
4404) !linux.io_uring_cqe {
4405    // prepare and submit recv
4406    const sqe = try buf_grp.recv(user_data, fd, 0);
4407    try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT);
4408    try testing.expect(sqe.buf_index == buf_grp.group_id);
4409    try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
4410    // get cqe, expect success
4411    const cqe = try ring.copy_cqe();
4412    try testing.expectEqual(user_data, cqe.user_data);
4413    try testing.expect(cqe.res >= 0); // success
4414    try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4415    try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
4416
4417    return cqe;
4418}
4419
4420fn expect_buf_grp_cqe(
4421    ring: *IoUring,
4422    buf_grp: *BufferGroup,
4423    user_data: u64,
4424    expected: []const u8,
4425) !linux.io_uring_cqe {
4426    // get cqe
4427    const cqe = try ring.copy_cqe();
4428    try testing.expectEqual(user_data, cqe.user_data);
4429    try testing.expect(cqe.res >= 0); // success
4430    try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
4431    try testing.expectEqual(expected.len, @as(usize, @intCast(cqe.res)));
4432    try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4433
4434    // get buffer from pool
4435    const buffer_id = try cqe.buffer_id();
4436    const len = @as(usize, @intCast(cqe.res));
4437    const buf = buf_grp.get_by_id(buffer_id)[0..len];
4438    try testing.expectEqualSlices(u8, expected, buf);
4439
4440    return cqe;
4441}
4442
4443test "copy_cqes with wrapping sq.cqes buffer" {
4444    if (!is_linux) return error.SkipZigTest;
4445
4446    var ring = IoUring.init(2, 0) catch |err| switch (err) {
4447        error.SystemOutdated => return error.SkipZigTest,
4448        error.PermissionDenied => return error.SkipZigTest,
4449        else => return err,
4450    };
4451    defer ring.deinit();
4452
4453    try testing.expectEqual(2, ring.sq.sqes.len);
4454    try testing.expectEqual(4, ring.cq.cqes.len);
4455
4456    // submit 2 entries, receive 2 completions
4457    var cqes: [8]linux.io_uring_cqe = undefined;
4458    {
4459        for (0..2) |_| {
4460            const sqe = try ring.get_sqe();
4461            sqe.prep_timeout(&.{ .sec = 0, .nsec = 10000 }, 0, 0);
4462            try testing.expect(try ring.submit() == 1);
4463        }
4464        var cqe_count: u32 = 0;
4465        while (cqe_count < 2) {
4466            cqe_count += try ring.copy_cqes(&cqes, 2 - cqe_count);
4467        }
4468    }
4469
4470    try testing.expectEqual(2, ring.cq.head.*);
4471
4472    // sq.sqes len is 4, starting at position 2
4473    // every 4 entries submit wraps completion buffer
4474    // we are reading ring.cq.cqes at indexes 2,3,0,1
4475    for (1..1024) |i| {
4476        for (0..4) |_| {
4477            const sqe = try ring.get_sqe();
4478            sqe.prep_timeout(&.{ .sec = 0, .nsec = 10000 }, 0, 0);
4479            try testing.expect(try ring.submit() == 1);
4480        }
4481        var cqe_count: u32 = 0;
4482        while (cqe_count < 4) {
4483            cqe_count += try ring.copy_cqes(&cqes, 4 - cqe_count);
4484        }
4485        try testing.expectEqual(4, cqe_count);
4486        try testing.expectEqual(2 + 4 * i, ring.cq.head.*);
4487    }
4488}
4489
4490test "bind/listen/connect" {
4491    if (builtin.cpu.arch == .s390x) return error.SkipZigTest; // https://github.com/ziglang/zig/issues/25956
4492
4493    var ring = IoUring.init(4, 0) catch |err| switch (err) {
4494        error.SystemOutdated => return error.SkipZigTest,
4495        error.PermissionDenied => return error.SkipZigTest,
4496        else => return err,
4497    };
4498    defer ring.deinit();
4499
4500    const probe = ring.get_probe() catch return error.SkipZigTest;
4501    // LISTEN is higher required operation
4502    if (!probe.is_supported(.LISTEN)) return error.SkipZigTest;
4503
4504    var addr: linux.sockaddr.in = .{
4505        .port = 0,
4506        .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
4507    };
4508    const proto: u32 = if (addr.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP;
4509
4510    const listen_fd = brk: {
4511        // Create socket
4512        _ = try ring.socket(1, addr.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
4513        try testing.expectEqual(1, try ring.submit());
4514        var cqe = try ring.copy_cqe();
4515        try testing.expectEqual(1, cqe.user_data);
4516        try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4517        const listen_fd: linux.fd_t = @intCast(cqe.res);
4518        try testing.expect(listen_fd > 2);
4519
4520        // Prepare: set socket option * 2, bind, listen
4521        var optval: u32 = 1;
4522        (try ring.setsockopt(2, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval))).link_next();
4523        (try ring.setsockopt(3, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, mem.asBytes(&optval))).link_next();
4524        (try ring.bind(4, listen_fd, addrAny(&addr), @sizeOf(linux.sockaddr.in), 0)).link_next();
4525        _ = try ring.listen(5, listen_fd, 1, 0);
4526        // Submit 4 operations
4527        try testing.expectEqual(4, try ring.submit());
4528        // Expect all to succeed
4529        for (2..6) |user_data| {
4530            cqe = try ring.copy_cqe();
4531            try testing.expectEqual(user_data, cqe.user_data);
4532            try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4533        }
4534
4535        // Check that socket option is set
4536        optval = 0;
4537        _ = try ring.getsockopt(5, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval));
4538        try testing.expectEqual(1, try ring.submit());
4539        cqe = try ring.copy_cqe();
4540        try testing.expectEqual(5, cqe.user_data);
4541        try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4542        try testing.expectEqual(1, optval);
4543
4544        // Read system assigned port into addr
4545        var addr_len: posix.socklen_t = @sizeOf(linux.sockaddr.in);
4546        try posix.getsockname(listen_fd, addrAny(&addr), &addr_len);
4547
4548        break :brk listen_fd;
4549    };
4550
4551    const connect_fd = brk: {
4552        // Create connect socket
4553        _ = try ring.socket(6, addr.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
4554        try testing.expectEqual(1, try ring.submit());
4555        const cqe = try ring.copy_cqe();
4556        try testing.expectEqual(6, cqe.user_data);
4557        try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4558        // Get connect socket fd
4559        const connect_fd: linux.fd_t = @intCast(cqe.res);
4560        try testing.expect(connect_fd > 2 and connect_fd != listen_fd);
4561        break :brk connect_fd;
4562    };
4563
4564    // Prepare accept/connect operations
4565    _ = try ring.accept(7, listen_fd, null, null, 0);
4566    _ = try ring.connect(8, connect_fd, addrAny(&addr), @sizeOf(linux.sockaddr.in));
4567    try testing.expectEqual(2, try ring.submit());
4568    // Get listener accepted socket
4569    var accept_fd: posix.socket_t = 0;
4570    for (0..2) |_| {
4571        const cqe = try ring.copy_cqe();
4572        try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4573        if (cqe.user_data == 7) {
4574            accept_fd = @intCast(cqe.res);
4575        } else {
4576            try testing.expectEqual(8, cqe.user_data);
4577        }
4578    }
4579    try testing.expect(accept_fd > 2 and accept_fd != listen_fd and accept_fd != connect_fd);
4580
4581    // Communicate
4582    try testSendRecv(&ring, connect_fd, accept_fd);
4583    try testSendRecv(&ring, accept_fd, connect_fd);
4584
4585    // Shutdown and close all sockets
4586    for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| {
4587        (try ring.shutdown(9, fd, posix.SHUT.RDWR)).link_next();
4588        _ = try ring.close(10, fd);
4589        try testing.expectEqual(2, try ring.submit());
4590        for (0..2) |i| {
4591            const cqe = try ring.copy_cqe();
4592            try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4593            try testing.expectEqual(9 + i, cqe.user_data);
4594        }
4595    }
4596}
4597
4598fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t) !void {
4599    const buffer_send = "0123456789abcdf" ** 10;
4600    var buffer_recv: [buffer_send.len * 2]u8 = undefined;
4601
4602    // 2 sends
4603    _ = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL);
4604    _ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL);
4605    try testing.expectEqual(2, try ring.submit());
4606    for (0..2) |i| {
4607        const cqe = try ring.copy_cqe();
4608        try testing.expectEqual(1 + i, cqe.user_data);
4609        try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4610        try testing.expectEqual(buffer_send.len, @as(usize, @intCast(cqe.res)));
4611    }
4612
4613    // receive
4614    var recv_len: usize = 0;
4615    while (recv_len < buffer_send.len * 2) {
4616        _ = try ring.recv(3, recv_fd, .{ .buffer = buffer_recv[recv_len..] }, 0);
4617        try testing.expectEqual(1, try ring.submit());
4618        const cqe = try ring.copy_cqe();
4619        try testing.expectEqual(3, cqe.user_data);
4620        try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4621        recv_len += @intCast(cqe.res);
4622    }
4623
4624    // inspect recv buffer
4625    try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]);
4626    try testing.expectEqualSlices(u8, buffer_send, buffer_recv[buffer_send.len..]);
4627}
4628
4629fn addrAny(addr: *linux.sockaddr.in) *linux.sockaddr {
4630    return @ptrCast(addr);
4631}