Commit 265f42d472
Changed files (5)
lib
std
lib/std/os/linux/io_uring_sqe.zig
@@ -0,0 +1,579 @@
+//! Contains only the definition of `io_uring_sqe`.
+//! Split into its own file to compartmentalize the initialization methods.
+
+const std = @import("../../std.zig");
+const os = std.os;
+const linux = os.linux;
+
+pub const io_uring_sqe = extern struct {
+ opcode: linux.IORING_OP,
+ flags: u8,
+ ioprio: u16,
+ fd: i32,
+ off: u64,
+ addr: u64,
+ len: u32,
+ rw_flags: u32,
+ user_data: u64,
+ buf_index: u16,
+ personality: u16,
+ splice_fd_in: i32,
+ addr3: u64,
+ resv: u64,
+
+ pub fn prep_nop(sqe: *linux.io_uring_sqe) void {
+ sqe.* = .{
+ .opcode = .NOP,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = 0,
+ .off = 0,
+ .addr = 0,
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = 0,
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ }
+
+ pub fn prep_fsync(sqe: *linux.io_uring_sqe, fd: os.fd_t, flags: u32) void {
+ sqe.* = .{
+ .opcode = .FSYNC,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = fd,
+ .off = 0,
+ .addr = 0,
+ .len = 0,
+ .rw_flags = flags,
+ .user_data = 0,
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ }
+
+ pub fn prep_rw(
+ sqe: *linux.io_uring_sqe,
+ op: linux.IORING_OP,
+ fd: os.fd_t,
+ addr: u64,
+ len: usize,
+ offset: u64,
+ ) void {
+ sqe.* = .{
+ .opcode = op,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = fd,
+ .off = offset,
+ .addr = addr,
+ .len = @intCast(len),
+ .rw_flags = 0,
+ .user_data = 0,
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ }
+
+ pub fn prep_read(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []u8, offset: u64) void {
+ sqe.prep_rw(.READ, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
+ }
+
+ pub fn prep_write(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, offset: u64) void {
+ sqe.prep_rw(.WRITE, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
+ }
+
+ pub fn prep_splice(sqe: *linux.io_uring_sqe, fd_in: os.fd_t, off_in: u64, fd_out: os.fd_t, off_out: u64, len: usize) void {
+ sqe.prep_rw(.SPLICE, fd_out, undefined, len, off_out);
+ sqe.addr = off_in;
+ sqe.splice_fd_in = fd_in;
+ }
+
+ pub fn prep_readv(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ iovecs: []const os.iovec,
+ offset: u64,
+ ) void {
+ sqe.prep_rw(.READV, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
+ }
+
+ pub fn prep_writev(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ iovecs: []const os.iovec_const,
+ offset: u64,
+ ) void {
+ sqe.prep_rw(.WRITEV, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
+ }
+
+ pub fn prep_read_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: *os.iovec, offset: u64, buffer_index: u16) void {
+ sqe.prep_rw(.READ_FIXED, fd, @intFromPtr(buffer.iov_base), buffer.iov_len, offset);
+ sqe.buf_index = buffer_index;
+ }
+
+ pub fn prep_write_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: *os.iovec, offset: u64, buffer_index: u16) void {
+ sqe.prep_rw(.WRITE_FIXED, fd, @intFromPtr(buffer.iov_base), buffer.iov_len, offset);
+ sqe.buf_index = buffer_index;
+ }
+
+ pub fn prep_accept(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ addr: ?*os.sockaddr,
+ addrlen: ?*os.socklen_t,
+ flags: u32,
+ ) void {
+ // `addr` holds a pointer to `sockaddr`, and `addr2` holds a pointer to socklen_t`.
+ // `addr2` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32).
+ sqe.prep_rw(.ACCEPT, fd, @intFromPtr(addr), 0, @intFromPtr(addrlen));
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_accept_direct(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ addr: ?*os.sockaddr,
+ addrlen: ?*os.socklen_t,
+ flags: u32,
+ file_index: u32,
+ ) void {
+ prep_accept(sqe, fd, addr, addrlen, flags);
+ __io_uring_set_target_fixed_file(sqe, file_index);
+ }
+
+ pub fn prep_multishot_accept_direct(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ addr: ?*os.sockaddr,
+ addrlen: ?*os.socklen_t,
+ flags: u32,
+ ) void {
+ prep_multishot_accept(sqe, fd, addr, addrlen, flags);
+ __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC);
+ }
+
+ fn __io_uring_set_target_fixed_file(sqe: *linux.io_uring_sqe, file_index: u32) void {
+ const sqe_file_index: u32 = if (file_index == linux.IORING_FILE_INDEX_ALLOC)
+ linux.IORING_FILE_INDEX_ALLOC
+ else
+ // 0 means no fixed files, indexes should be encoded as "index + 1"
+ file_index + 1;
+ // This filed is overloaded in liburing:
+ // splice_fd_in: i32
+ // sqe_file_index: u32
+ sqe.splice_fd_in = @bitCast(sqe_file_index);
+ }
+
+ pub fn prep_connect(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ addr: *const os.sockaddr,
+ addrlen: os.socklen_t,
+ ) void {
+ // `addrlen` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32).
+ sqe.prep_rw(.CONNECT, fd, @intFromPtr(addr), 0, addrlen);
+ }
+
+ pub fn prep_epoll_ctl(
+ sqe: *linux.io_uring_sqe,
+ epfd: os.fd_t,
+ fd: os.fd_t,
+ op: u32,
+ ev: ?*linux.epoll_event,
+ ) void {
+ sqe.prep_rw(.EPOLL_CTL, epfd, @intFromPtr(ev), op, @intCast(fd));
+ }
+
+ pub fn prep_recv(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []u8, flags: u32) void {
+ sqe.prep_rw(.RECV, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32) void {
+ sqe.prep_rw(.SEND, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_send_zc(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16) void {
+ sqe.prep_rw(.SEND_ZC, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
+ sqe.rw_flags = flags;
+ sqe.ioprio = zc_flags;
+ }
+
+ pub fn prep_send_zc_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16, buf_index: u16) void {
+ prep_send_zc(sqe, fd, buffer, flags, zc_flags);
+ sqe.ioprio |= linux.IORING_RECVSEND_FIXED_BUF;
+ sqe.buf_index = buf_index;
+ }
+
+ pub fn prep_sendmsg_zc(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ msg: *const os.msghdr_const,
+ flags: u32,
+ ) void {
+ prep_sendmsg(sqe, fd, msg, flags);
+ sqe.opcode = .SENDMSG_ZC;
+ }
+
+ pub fn prep_recvmsg(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ msg: *os.msghdr,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.RECVMSG, fd, @intFromPtr(msg), 1, 0);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_sendmsg(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ msg: *const os.msghdr_const,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.SENDMSG, fd, @intFromPtr(msg), 1, 0);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_openat(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ path: [*:0]const u8,
+ flags: linux.O,
+ mode: os.mode_t,
+ ) void {
+ sqe.prep_rw(.OPENAT, fd, @intFromPtr(path), mode, 0);
+ sqe.rw_flags = @bitCast(flags);
+ }
+
+ pub fn prep_openat_direct(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ path: [*:0]const u8,
+ flags: linux.O,
+ mode: os.mode_t,
+ file_index: u32,
+ ) void {
+ prep_openat(sqe, fd, path, flags, mode);
+ __io_uring_set_target_fixed_file(sqe, file_index);
+ }
+
+ pub fn prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void {
+ sqe.* = .{
+ .opcode = .CLOSE,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = fd,
+ .off = 0,
+ .addr = 0,
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = 0,
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ }
+
+ pub fn prep_close_direct(sqe: *linux.io_uring_sqe, file_index: u32) void {
+ prep_close(sqe, 0);
+ __io_uring_set_target_fixed_file(sqe, file_index);
+ }
+
+ pub fn prep_timeout(
+ sqe: *linux.io_uring_sqe,
+ ts: *const os.linux.kernel_timespec,
+ count: u32,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.TIMEOUT, -1, @intFromPtr(ts), 1, count);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_timeout_remove(sqe: *linux.io_uring_sqe, timeout_user_data: u64, flags: u32) void {
+ sqe.* = .{
+ .opcode = .TIMEOUT_REMOVE,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = -1,
+ .off = 0,
+ .addr = timeout_user_data,
+ .len = 0,
+ .rw_flags = flags,
+ .user_data = 0,
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ }
+
+ pub fn prep_link_timeout(
+ sqe: *linux.io_uring_sqe,
+ ts: *const os.linux.kernel_timespec,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.LINK_TIMEOUT, -1, @intFromPtr(ts), 1, 0);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_poll_add(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ poll_mask: u32,
+ ) void {
+ sqe.prep_rw(.POLL_ADD, fd, @intFromPtr(@as(?*anyopaque, null)), 0, 0);
+ // Poll masks previously used to comprise of 16 bits in the flags union of
+ // a SQE, but were then extended to comprise of 32 bits in order to make
+ // room for additional option flags. To ensure that the correct bits of
+ // poll masks are consistently and properly read across multiple kernel
+ // versions, poll masks are enforced to be little-endian.
+ // https://www.spinics.net/lists/io-uring/msg02848.html
+ sqe.rw_flags = std.mem.nativeToLittle(u32, poll_mask);
+ }
+
+ pub fn prep_poll_remove(
+ sqe: *linux.io_uring_sqe,
+ target_user_data: u64,
+ ) void {
+ sqe.prep_rw(.POLL_REMOVE, -1, target_user_data, 0, 0);
+ }
+
+ pub fn prep_poll_update(
+ sqe: *linux.io_uring_sqe,
+ old_user_data: u64,
+ new_user_data: u64,
+ poll_mask: u32,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.POLL_REMOVE, -1, old_user_data, flags, new_user_data);
+ // Poll masks previously used to comprise of 16 bits in the flags union of
+ // a SQE, but were then extended to comprise of 32 bits in order to make
+ // room for additional option flags. To ensure that the correct bits of
+ // poll masks are consistently and properly read across multiple kernel
+ // versions, poll masks are enforced to be little-endian.
+ // https://www.spinics.net/lists/io-uring/msg02848.html
+ sqe.rw_flags = std.mem.nativeToLittle(u32, poll_mask);
+ }
+
+ pub fn prep_fallocate(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ mode: i32,
+ offset: u64,
+ len: u64,
+ ) void {
+ sqe.* = .{
+ .opcode = .FALLOCATE,
+ .flags = 0,
+ .ioprio = 0,
+ .fd = fd,
+ .off = offset,
+ .addr = len,
+ .len = @intCast(mode),
+ .rw_flags = 0,
+ .user_data = 0,
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ }
+
+ pub fn prep_statx(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ path: [*:0]const u8,
+ flags: u32,
+ mask: u32,
+ buf: *linux.Statx,
+ ) void {
+ sqe.prep_rw(.STATX, fd, @intFromPtr(path), mask, @intFromPtr(buf));
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_cancel(
+ sqe: *linux.io_uring_sqe,
+ cancel_user_data: u64,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.ASYNC_CANCEL, -1, cancel_user_data, 0, 0);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_shutdown(
+ sqe: *linux.io_uring_sqe,
+ sockfd: os.socket_t,
+ how: u32,
+ ) void {
+ sqe.prep_rw(.SHUTDOWN, sockfd, 0, how, 0);
+ }
+
+ pub fn prep_renameat(
+ sqe: *linux.io_uring_sqe,
+ old_dir_fd: os.fd_t,
+ old_path: [*:0]const u8,
+ new_dir_fd: os.fd_t,
+ new_path: [*:0]const u8,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(
+ .RENAMEAT,
+ old_dir_fd,
+ @intFromPtr(old_path),
+ 0,
+ @intFromPtr(new_path),
+ );
+ sqe.len = @bitCast(new_dir_fd);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_unlinkat(
+ sqe: *linux.io_uring_sqe,
+ dir_fd: os.fd_t,
+ path: [*:0]const u8,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.UNLINKAT, dir_fd, @intFromPtr(path), 0, 0);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_mkdirat(
+ sqe: *linux.io_uring_sqe,
+ dir_fd: os.fd_t,
+ path: [*:0]const u8,
+ mode: os.mode_t,
+ ) void {
+ sqe.prep_rw(.MKDIRAT, dir_fd, @intFromPtr(path), mode, 0);
+ }
+
+ pub fn prep_symlinkat(
+ sqe: *linux.io_uring_sqe,
+ target: [*:0]const u8,
+ new_dir_fd: os.fd_t,
+ link_path: [*:0]const u8,
+ ) void {
+ sqe.prep_rw(
+ .SYMLINKAT,
+ new_dir_fd,
+ @intFromPtr(target),
+ 0,
+ @intFromPtr(link_path),
+ );
+ }
+
+ pub fn prep_linkat(
+ sqe: *linux.io_uring_sqe,
+ old_dir_fd: os.fd_t,
+ old_path: [*:0]const u8,
+ new_dir_fd: os.fd_t,
+ new_path: [*:0]const u8,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(
+ .LINKAT,
+ old_dir_fd,
+ @intFromPtr(old_path),
+ 0,
+ @intFromPtr(new_path),
+ );
+ sqe.len = @bitCast(new_dir_fd);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_provide_buffers(
+ sqe: *linux.io_uring_sqe,
+ buffers: [*]u8,
+ buffer_len: usize,
+ num: usize,
+ group_id: usize,
+ buffer_id: usize,
+ ) void {
+ const ptr = @intFromPtr(buffers);
+ sqe.prep_rw(.PROVIDE_BUFFERS, @intCast(num), ptr, buffer_len, buffer_id);
+ sqe.buf_index = @intCast(group_id);
+ }
+
+ pub fn prep_remove_buffers(
+ sqe: *linux.io_uring_sqe,
+ num: usize,
+ group_id: usize,
+ ) void {
+ sqe.prep_rw(.REMOVE_BUFFERS, @intCast(num), 0, 0, 0);
+ sqe.buf_index = @intCast(group_id);
+ }
+
+ pub fn prep_multishot_accept(
+ sqe: *linux.io_uring_sqe,
+ fd: os.fd_t,
+ addr: ?*os.sockaddr,
+ addrlen: ?*os.socklen_t,
+ flags: u32,
+ ) void {
+ prep_accept(sqe, fd, addr, addrlen, flags);
+ sqe.ioprio |= linux.IORING_ACCEPT_MULTISHOT;
+ }
+
+ pub fn prep_socket(
+ sqe: *linux.io_uring_sqe,
+ domain: u32,
+ socket_type: u32,
+ protocol: u32,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.SOCKET, @intCast(domain), 0, protocol, socket_type);
+ sqe.rw_flags = flags;
+ }
+
+ pub fn prep_socket_direct(
+ sqe: *linux.io_uring_sqe,
+ domain: u32,
+ socket_type: u32,
+ protocol: u32,
+ flags: u32,
+ file_index: u32,
+ ) void {
+ prep_socket(sqe, domain, socket_type, protocol, flags);
+ __io_uring_set_target_fixed_file(sqe, file_index);
+ }
+
+ pub fn prep_socket_direct_alloc(
+ sqe: *linux.io_uring_sqe,
+ domain: u32,
+ socket_type: u32,
+ protocol: u32,
+ flags: u32,
+ ) void {
+ prep_socket(sqe, domain, socket_type, protocol, flags);
+ __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC);
+ }
+
+ pub fn prep_waitid(
+ sqe: *linux.io_uring_sqe,
+ id_type: linux.P,
+ id: i32,
+ infop: *linux.siginfo_t,
+ options: u32,
+ flags: u32,
+ ) void {
+ sqe.prep_rw(.WAITID, id, 0, @intFromEnum(id_type), @intFromPtr(infop));
+ sqe.rw_flags = flags;
+ sqe.splice_fd_in = @bitCast(options);
+ }
+};
lib/std/os/linux/io_uring.zig โ lib/std/os/linux/IoUring.zig
@@ -1,3 +1,4 @@
+const IoUring = @This();
const std = @import("../../std.zig");
const builtin = @import("builtin");
const assert = std.debug.assert;
@@ -8,1995 +9,1436 @@ const posix = std.posix;
const linux = os.linux;
const testing = std.testing;
-pub const IO_Uring = struct {
- fd: os.fd_t = -1,
- sq: SubmissionQueue,
- cq: CompletionQueue,
- flags: u32,
- features: u32,
-
- /// A friendly way to setup an io_uring, with default linux.io_uring_params.
- /// `entries` must be a power of two between 1 and 32768, although the kernel will make the final
- /// call on how many entries the submission and completion queues will ultimately have,
- /// see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L8027-L8050.
- /// Matches the interface of io_uring_queue_init() in liburing.
- pub fn init(entries: u16, flags: u32) !IO_Uring {
- var params = mem.zeroInit(linux.io_uring_params, .{
- .flags = flags,
- .sq_thread_idle = 1000,
- });
- return try IO_Uring.init_params(entries, ¶ms);
- }
-
- /// A powerful way to setup an io_uring, if you want to tweak linux.io_uring_params such as submission
- /// queue thread cpu affinity or thread idle timeout (the kernel and our default is 1 second).
- /// `params` is passed by reference because the kernel needs to modify the parameters.
- /// Matches the interface of io_uring_queue_init_params() in liburing.
- pub fn init_params(entries: u16, p: *linux.io_uring_params) !IO_Uring {
- if (entries == 0) return error.EntriesZero;
- if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
-
- assert(p.sq_entries == 0);
- assert(p.cq_entries == 0 or p.flags & linux.IORING_SETUP_CQSIZE != 0);
- assert(p.features == 0);
- assert(p.wq_fd == 0 or p.flags & linux.IORING_SETUP_ATTACH_WQ != 0);
- assert(p.resv[0] == 0);
- assert(p.resv[1] == 0);
- assert(p.resv[2] == 0);
-
- const res = linux.io_uring_setup(entries, p);
- switch (linux.getErrno(res)) {
- .SUCCESS => {},
- .FAULT => return error.ParamsOutsideAccessibleAddressSpace,
- // The resv array contains non-zero data, p.flags contains an unsupported flag,
- // entries out of bounds, IORING_SETUP_SQ_AFF was specified without IORING_SETUP_SQPOLL,
- // or IORING_SETUP_CQSIZE was specified but linux.io_uring_params.cq_entries was invalid:
- .INVAL => return error.ArgumentsInvalid,
- .MFILE => return error.ProcessFdQuotaExceeded,
- .NFILE => return error.SystemFdQuotaExceeded,
- .NOMEM => return error.SystemResources,
- // IORING_SETUP_SQPOLL was specified but effective user ID lacks sufficient privileges,
- // or a container seccomp policy prohibits io_uring syscalls:
- .PERM => return error.PermissionDenied,
- .NOSYS => return error.SystemOutdated,
- else => |errno| return os.unexpectedErrno(errno),
- }
- const fd = @as(os.fd_t, @intCast(res));
- assert(fd >= 0);
- errdefer os.close(fd);
-
- // Kernel versions 5.4 and up use only one mmap() for the submission and completion queues.
- // This is not an optional feature for us... if the kernel does it, we have to do it.
- // The thinking on this by the kernel developers was that both the submission and the
- // completion queue rings have sizes just over a power of two, but the submission queue ring
- // is significantly smaller with u32 slots. By bundling both in a single mmap, the kernel
- // gets the submission queue ring for free.
- // See https://patchwork.kernel.org/patch/11115257 for the kernel patch.
- // We do not support the double mmap() done before 5.4, because we want to keep the
- // init/deinit mmap paths simple and because io_uring has had many bug fixes even since 5.4.
- if ((p.features & linux.IORING_FEAT_SINGLE_MMAP) == 0) {
- return error.SystemOutdated;
- }
-
- // Check that the kernel has actually set params and that "impossible is nothing".
- assert(p.sq_entries != 0);
- assert(p.cq_entries != 0);
- assert(p.cq_entries >= p.sq_entries);
-
- // From here on, we only need to read from params, so pass `p` by value as immutable.
- // The completion queue shares the mmap with the submission queue, so pass `sq` there too.
- var sq = try SubmissionQueue.init(fd, p.*);
- errdefer sq.deinit();
- var cq = try CompletionQueue.init(fd, p.*, sq);
- errdefer cq.deinit();
-
- // Check that our starting state is as we expect.
- assert(sq.head.* == 0);
- assert(sq.tail.* == 0);
- assert(sq.mask == p.sq_entries - 1);
- // Allow flags.* to be non-zero, since the kernel may set IORING_SQ_NEED_WAKEUP at any time.
- assert(sq.dropped.* == 0);
- assert(sq.array.len == p.sq_entries);
- assert(sq.sqes.len == p.sq_entries);
- assert(sq.sqe_head == 0);
- assert(sq.sqe_tail == 0);
-
- assert(cq.head.* == 0);
- assert(cq.tail.* == 0);
- assert(cq.mask == p.cq_entries - 1);
- assert(cq.overflow.* == 0);
- assert(cq.cqes.len == p.cq_entries);
-
- return IO_Uring{
- .fd = fd,
- .sq = sq,
- .cq = cq,
- .flags = p.flags,
- .features = p.features,
- };
- }
-
- pub fn deinit(self: *IO_Uring) void {
- assert(self.fd >= 0);
- // The mmaps depend on the fd, so the order of these calls is important:
- self.cq.deinit();
- self.sq.deinit();
- os.close(self.fd);
- self.fd = -1;
- }
-
- /// Returns a pointer to a vacant SQE, or an error if the submission queue is full.
- /// We follow the implementation (and atomics) of liburing's `io_uring_get_sqe()` exactly.
- /// However, instead of a null we return an error to force safe handling.
- /// Any situation where the submission queue is full tends more towards a control flow error,
- /// and the null return in liburing is more a C idiom than anything else, for lack of a better
- /// alternative. In Zig, we have first-class error handling... so let's use it.
- /// Matches the implementation of io_uring_get_sqe() in liburing.
- pub fn get_sqe(self: *IO_Uring) !*linux.io_uring_sqe {
- const head = @atomicLoad(u32, self.sq.head, .Acquire);
- // Remember that these head and tail offsets wrap around every four billion operations.
- // We must therefore use wrapping addition and subtraction to avoid a runtime crash.
- const next = self.sq.sqe_tail +% 1;
- if (next -% head > self.sq.sqes.len) return error.SubmissionQueueFull;
- const sqe = &self.sq.sqes[self.sq.sqe_tail & self.sq.mask];
- self.sq.sqe_tail = next;
- return sqe;
- }
-
- /// Submits the SQEs acquired via get_sqe() to the kernel. You can call this once after you have
- /// called get_sqe() multiple times to setup multiple I/O requests.
- /// Returns the number of SQEs submitted, if not used alongside IORING_SETUP_SQPOLL.
- /// If the io_uring instance is uses IORING_SETUP_SQPOLL, the value returned on success is not
- /// guaranteed to match the amount of actually submitted sqes during this call. A value higher
- /// or lower, including 0, may be returned.
- /// Matches the implementation of io_uring_submit() in liburing.
- pub fn submit(self: *IO_Uring) !u32 {
- return self.submit_and_wait(0);
- }
-
- /// Like submit(), but allows waiting for events as well.
- /// Returns the number of SQEs submitted.
- /// Matches the implementation of io_uring_submit_and_wait() in liburing.
- pub fn submit_and_wait(self: *IO_Uring, wait_nr: u32) !u32 {
- const submitted = self.flush_sq();
- var flags: u32 = 0;
- if (self.sq_ring_needs_enter(&flags) or wait_nr > 0) {
- if (wait_nr > 0 or (self.flags & linux.IORING_SETUP_IOPOLL) != 0) {
- flags |= linux.IORING_ENTER_GETEVENTS;
- }
- return try self.enter(submitted, wait_nr, flags);
- }
- return submitted;
- }
-
- /// Tell the kernel we have submitted SQEs and/or want to wait for CQEs.
- /// Returns the number of SQEs submitted.
- pub fn enter(self: *IO_Uring, to_submit: u32, min_complete: u32, flags: u32) !u32 {
- assert(self.fd >= 0);
- const res = linux.io_uring_enter(self.fd, to_submit, min_complete, flags, null);
- switch (linux.getErrno(res)) {
- .SUCCESS => {},
- // The kernel was unable to allocate memory or ran out of resources for the request.
- // The application should wait for some completions and try again:
- .AGAIN => return error.SystemResources,
- // The SQE `fd` is invalid, or IOSQE_FIXED_FILE was set but no files were registered:
- .BADF => return error.FileDescriptorInvalid,
- // The file descriptor is valid, but the ring is not in the right state.
- // See io_uring_register(2) for how to enable the ring.
- .BADFD => return error.FileDescriptorInBadState,
- // The application attempted to overcommit the number of requests it can have pending.
- // The application should wait for some completions and try again:
- .BUSY => return error.CompletionQueueOvercommitted,
- // The SQE is invalid, or valid but the ring was setup with IORING_SETUP_IOPOLL:
- .INVAL => return error.SubmissionQueueEntryInvalid,
- // The buffer is outside the process' accessible address space, or IORING_OP_READ_FIXED
- // or IORING_OP_WRITE_FIXED was specified but no buffers were registered, or the range
- // described by `addr` and `len` is not within the buffer registered at `buf_index`:
- .FAULT => return error.BufferInvalid,
- .NXIO => return error.RingShuttingDown,
- // The kernel believes our `self.fd` does not refer to an io_uring instance,
- // or the opcode is valid but not supported by this kernel (more likely):
- .OPNOTSUPP => return error.OpcodeNotSupported,
- // The operation was interrupted by a delivery of a signal before it could complete.
- // This can happen while waiting for events with IORING_ENTER_GETEVENTS:
- .INTR => return error.SignalInterrupt,
- else => |errno| return os.unexpectedErrno(errno),
- }
- return @as(u32, @intCast(res));
- }
-
- /// Sync internal state with kernel ring state on the SQ side.
- /// Returns the number of all pending events in the SQ ring, for the shared ring.
- /// This return value includes previously flushed SQEs, as per liburing.
- /// The rationale is to suggest that an io_uring_enter() call is needed rather than not.
- /// Matches the implementation of __io_uring_flush_sq() in liburing.
- pub fn flush_sq(self: *IO_Uring) u32 {
- if (self.sq.sqe_head != self.sq.sqe_tail) {
- // Fill in SQEs that we have queued up, adding them to the kernel ring.
- const to_submit = self.sq.sqe_tail -% self.sq.sqe_head;
- var tail = self.sq.tail.*;
- var i: usize = 0;
- while (i < to_submit) : (i += 1) {
- self.sq.array[tail & self.sq.mask] = self.sq.sqe_head & self.sq.mask;
- tail +%= 1;
- self.sq.sqe_head +%= 1;
- }
- // Ensure that the kernel can actually see the SQE updates when it sees the tail update.
- @atomicStore(u32, self.sq.tail, tail, .Release);
- }
- return self.sq_ready();
- }
-
- /// Returns true if we are not using an SQ thread (thus nobody submits but us),
- /// or if IORING_SQ_NEED_WAKEUP is set and the SQ thread must be explicitly awakened.
- /// For the latter case, we set the SQ thread wakeup flag.
- /// Matches the implementation of sq_ring_needs_enter() in liburing.
- pub fn sq_ring_needs_enter(self: *IO_Uring, flags: *u32) bool {
- assert(flags.* == 0);
- if ((self.flags & linux.IORING_SETUP_SQPOLL) == 0) return true;
- if ((@atomicLoad(u32, self.sq.flags, .Unordered) & linux.IORING_SQ_NEED_WAKEUP) != 0) {
- flags.* |= linux.IORING_ENTER_SQ_WAKEUP;
- return true;
- }
- return false;
- }
-
- /// Returns the number of flushed and unflushed SQEs pending in the submission queue.
- /// In other words, this is the number of SQEs in the submission queue, i.e. its length.
- /// These are SQEs that the kernel is yet to consume.
- /// Matches the implementation of io_uring_sq_ready in liburing.
- pub fn sq_ready(self: *IO_Uring) u32 {
- // Always use the shared ring state (i.e. head and not sqe_head) to avoid going out of sync,
- // see https://github.com/axboe/liburing/issues/92.
- return self.sq.sqe_tail -% @atomicLoad(u32, self.sq.head, .Acquire);
- }
-
- /// Returns the number of CQEs in the completion queue, i.e. its length.
- /// These are CQEs that the application is yet to consume.
- /// Matches the implementation of io_uring_cq_ready in liburing.
- pub fn cq_ready(self: *IO_Uring) u32 {
- return @atomicLoad(u32, self.cq.tail, .Acquire) -% self.cq.head.*;
- }
-
- /// Copies as many CQEs as are ready, and that can fit into the destination `cqes` slice.
- /// If none are available, enters into the kernel to wait for at most `wait_nr` CQEs.
- /// Returns the number of CQEs copied, advancing the CQ ring.
- /// Provides all the wait/peek methods found in liburing, but with batching and a single method.
- /// The rationale for copying CQEs rather than copying pointers is that pointers are 8 bytes
- /// whereas CQEs are not much more at only 16 bytes, and this provides a safer faster interface.
- /// Safer, because you no longer need to call cqe_seen(), avoiding idempotency bugs.
- /// Faster, because we can now amortize the atomic store release to `cq.head` across the batch.
- /// See https://github.com/axboe/liburing/issues/103#issuecomment-686665007.
- /// Matches the implementation of io_uring_peek_batch_cqe() in liburing, but supports waiting.
- pub fn copy_cqes(self: *IO_Uring, cqes: []linux.io_uring_cqe, wait_nr: u32) !u32 {
- const count = self.copy_cqes_ready(cqes);
- if (count > 0) return count;
- if (self.cq_ring_needs_flush() or wait_nr > 0) {
- _ = try self.enter(0, wait_nr, linux.IORING_ENTER_GETEVENTS);
- return self.copy_cqes_ready(cqes);
- }
- return 0;
- }
-
- fn copy_cqes_ready(self: *IO_Uring, cqes: []linux.io_uring_cqe) u32 {
- const ready = self.cq_ready();
- const count = @min(cqes.len, ready);
- const head = self.cq.head.* & self.cq.mask;
- const tail = (self.cq.head.* +% count) & self.cq.mask;
-
- if (head <= tail) {
- // head behind tail -> no wrapping
- @memcpy(cqes[0..count], self.cq.cqes[head..tail]);
- } else {
- // head in front of tail -> buffer wraps
- const two_copies_required: bool = self.cq.cqes.len - head < count;
- const amount_to_copy_in_first = if (two_copies_required) self.cq.cqes.len - head else count;
- @memcpy(cqes[0..amount_to_copy_in_first], self.cq.cqes[head .. head + amount_to_copy_in_first]);
- if (two_copies_required) {
- @memcpy(cqes[amount_to_copy_in_first..count], self.cq.cqes[0..tail]);
- }
- }
-
- self.cq_advance(count);
- return count;
- }
-
- /// Returns a copy of an I/O completion, waiting for it if necessary, and advancing the CQ ring.
- /// A convenience method for `copy_cqes()` for when you don't need to batch or peek.
- pub fn copy_cqe(ring: *IO_Uring) !linux.io_uring_cqe {
- var cqes: [1]linux.io_uring_cqe = undefined;
- while (true) {
- const count = try ring.copy_cqes(&cqes, 1);
- if (count > 0) return cqes[0];
- }
- }
-
- /// Matches the implementation of cq_ring_needs_flush() in liburing.
- pub fn cq_ring_needs_flush(self: *IO_Uring) bool {
- return (@atomicLoad(u32, self.sq.flags, .Unordered) & linux.IORING_SQ_CQ_OVERFLOW) != 0;
- }
-
- /// For advanced use cases only that implement custom completion queue methods.
- /// If you use copy_cqes() or copy_cqe() you must not call cqe_seen() or cq_advance().
- /// Must be called exactly once after a zero-copy CQE has been processed by your application.
- /// Not idempotent, calling more than once will result in other CQEs being lost.
- /// Matches the implementation of cqe_seen() in liburing.
- pub fn cqe_seen(self: *IO_Uring, cqe: *linux.io_uring_cqe) void {
- _ = cqe;
- self.cq_advance(1);
- }
-
- /// For advanced use cases only that implement custom completion queue methods.
- /// Matches the implementation of cq_advance() in liburing.
- pub fn cq_advance(self: *IO_Uring, count: u32) void {
- if (count > 0) {
- // Ensure the kernel only sees the new head value after the CQEs have been read.
- @atomicStore(u32, self.cq.head, self.cq.head.* +% count, .Release);
- }
- }
-
- /// Queues (but does not submit) an SQE to perform an `fsync(2)`.
- /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
- /// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the SQE's `rw_flags`.
- /// N.B. While SQEs are initiated in the order in which they appear in the submission queue,
- /// operations execute in parallel and completions are unordered. Therefore, an application that
- /// submits a write followed by an fsync in the submission queue cannot expect the fsync to
- /// apply to the write, since the fsync may complete before the write is issued to the disk.
- /// You should preferably use `link_with_next_sqe()` on a write's SQE to link it with an fsync,
- /// or else insert a full write barrier using `drain_previous_sqes()` when queueing an fsync.
- pub fn fsync(self: *IO_Uring, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_fsync(sqe, fd, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a no-op.
- /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
- /// A no-op is more useful than may appear at first glance.
- /// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to
- /// know when the ring is idle before acting on a kill signal.
- pub fn nop(self: *IO_Uring, user_data: u64) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_nop(sqe);
- sqe.user_data = user_data;
- return sqe;
+fd: os.fd_t = -1,
+sq: SubmissionQueue,
+cq: CompletionQueue,
+flags: u32,
+features: u32,
+
+/// A friendly way to setup an io_uring, with default linux.io_uring_params.
+/// `entries` must be a power of two between 1 and 32768, although the kernel will make the final
+/// call on how many entries the submission and completion queues will ultimately have,
+/// see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L8027-L8050.
+/// Matches the interface of io_uring_queue_init() in liburing.
+pub fn init(entries: u16, flags: u32) !IoUring {
+ var params = mem.zeroInit(linux.io_uring_params, .{
+ .flags = flags,
+ .sq_thread_idle = 1000,
+ });
+ return try IoUring.init_params(entries, ¶ms);
+}
+
+/// A powerful way to setup an io_uring, if you want to tweak linux.io_uring_params such as submission
+/// queue thread cpu affinity or thread idle timeout (the kernel and our default is 1 second).
+/// `params` is passed by reference because the kernel needs to modify the parameters.
+/// Matches the interface of io_uring_queue_init_params() in liburing.
+pub fn init_params(entries: u16, p: *linux.io_uring_params) !IoUring {
+ if (entries == 0) return error.EntriesZero;
+ if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
+
+ assert(p.sq_entries == 0);
+ assert(p.cq_entries == 0 or p.flags & linux.IORING_SETUP_CQSIZE != 0);
+ assert(p.features == 0);
+ assert(p.wq_fd == 0 or p.flags & linux.IORING_SETUP_ATTACH_WQ != 0);
+ assert(p.resv[0] == 0);
+ assert(p.resv[1] == 0);
+ assert(p.resv[2] == 0);
+
+ const res = linux.io_uring_setup(entries, p);
+ switch (linux.getErrno(res)) {
+ .SUCCESS => {},
+ .FAULT => return error.ParamsOutsideAccessibleAddressSpace,
+ // The resv array contains non-zero data, p.flags contains an unsupported flag,
+ // entries out of bounds, IORING_SETUP_SQ_AFF was specified without IORING_SETUP_SQPOLL,
+ // or IORING_SETUP_CQSIZE was specified but linux.io_uring_params.cq_entries was invalid:
+ .INVAL => return error.ArgumentsInvalid,
+ .MFILE => return error.ProcessFdQuotaExceeded,
+ .NFILE => return error.SystemFdQuotaExceeded,
+ .NOMEM => return error.SystemResources,
+ // IORING_SETUP_SQPOLL was specified but effective user ID lacks sufficient privileges,
+ // or a container seccomp policy prohibits io_uring syscalls:
+ .PERM => return error.PermissionDenied,
+ .NOSYS => return error.SystemOutdated,
+ else => |errno| return os.unexpectedErrno(errno),
}
-
- /// Used to select how the read should be handled.
- pub const ReadBuffer = union(enum) {
- /// io_uring will read directly into this buffer
- buffer: []u8,
-
- /// io_uring will read directly into these buffers using readv.
- iovecs: []const os.iovec,
-
- /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
- /// The buffer group reference by `group_id` must contain at least one buffer for the read to work.
- /// `len` controls the number of bytes to read into the selected buffer.
- buffer_selection: struct {
- group_id: u16,
- len: usize,
- },
+ const fd = @as(os.fd_t, @intCast(res));
+ assert(fd >= 0);
+ errdefer os.close(fd);
+
+ // Kernel versions 5.4 and up use only one mmap() for the submission and completion queues.
+ // This is not an optional feature for us... if the kernel does it, we have to do it.
+ // The thinking on this by the kernel developers was that both the submission and the
+ // completion queue rings have sizes just over a power of two, but the submission queue ring
+ // is significantly smaller with u32 slots. By bundling both in a single mmap, the kernel
+ // gets the submission queue ring for free.
+ // See https://patchwork.kernel.org/patch/11115257 for the kernel patch.
+ // We do not support the double mmap() done before 5.4, because we want to keep the
+ // init/deinit mmap paths simple and because io_uring has had many bug fixes even since 5.4.
+ if ((p.features & linux.IORING_FEAT_SINGLE_MMAP) == 0) {
+ return error.SystemOutdated;
+ }
+
+ // Check that the kernel has actually set params and that "impossible is nothing".
+ assert(p.sq_entries != 0);
+ assert(p.cq_entries != 0);
+ assert(p.cq_entries >= p.sq_entries);
+
+ // From here on, we only need to read from params, so pass `p` by value as immutable.
+ // The completion queue shares the mmap with the submission queue, so pass `sq` there too.
+ var sq = try SubmissionQueue.init(fd, p.*);
+ errdefer sq.deinit();
+ var cq = try CompletionQueue.init(fd, p.*, sq);
+ errdefer cq.deinit();
+
+ // Check that our starting state is as we expect.
+ assert(sq.head.* == 0);
+ assert(sq.tail.* == 0);
+ assert(sq.mask == p.sq_entries - 1);
+ // Allow flags.* to be non-zero, since the kernel may set IORING_SQ_NEED_WAKEUP at any time.
+ assert(sq.dropped.* == 0);
+ assert(sq.array.len == p.sq_entries);
+ assert(sq.sqes.len == p.sq_entries);
+ assert(sq.sqe_head == 0);
+ assert(sq.sqe_tail == 0);
+
+ assert(cq.head.* == 0);
+ assert(cq.tail.* == 0);
+ assert(cq.mask == p.cq_entries - 1);
+ assert(cq.overflow.* == 0);
+ assert(cq.cqes.len == p.cq_entries);
+
+ return IoUring{
+ .fd = fd,
+ .sq = sq,
+ .cq = cq,
+ .flags = p.flags,
+ .features = p.features,
};
+}
- /// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv(2)` depending on the buffer type.
- /// * Reading into a `ReadBuffer.buffer` uses `read(2)`
- /// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)`
- /// If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE. See https://man7.org/linux/man-pages/man2/preadv2.2.html
- ///
- /// Returns a pointer to the SQE.
- pub fn read(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: ReadBuffer,
- offset: u64,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- switch (buffer) {
- .buffer => |slice| io_uring_prep_read(sqe, fd, slice, offset),
- .iovecs => |vecs| io_uring_prep_readv(sqe, fd, vecs, offset),
- .buffer_selection => |selection| {
- io_uring_prep_rw(.READ, sqe, fd, 0, selection.len, offset);
- sqe.flags |= linux.IOSQE_BUFFER_SELECT;
- sqe.buf_index = selection.group_id;
- },
+pub fn deinit(self: *IoUring) void {
+ assert(self.fd >= 0);
+ // The mmaps depend on the fd, so the order of these calls is important:
+ self.cq.deinit();
+ self.sq.deinit();
+ os.close(self.fd);
+ self.fd = -1;
+}
+
+/// Returns a pointer to a vacant SQE, or an error if the submission queue is full.
+/// We follow the implementation (and atomics) of liburing's `io_uring_get_sqe()` exactly.
+/// However, instead of a null we return an error to force safe handling.
+/// Any situation where the submission queue is full tends more towards a control flow error,
+/// and the null return in liburing is more a C idiom than anything else, for lack of a better
+/// alternative. In Zig, we have first-class error handling... so let's use it.
+/// Matches the implementation of io_uring_get_sqe() in liburing.
+pub fn get_sqe(self: *IoUring) !*linux.io_uring_sqe {
+ const head = @atomicLoad(u32, self.sq.head, .Acquire);
+ // Remember that these head and tail offsets wrap around every four billion operations.
+ // We must therefore use wrapping addition and subtraction to avoid a runtime crash.
+ const next = self.sq.sqe_tail +% 1;
+ if (next -% head > self.sq.sqes.len) return error.SubmissionQueueFull;
+ const sqe = &self.sq.sqes[self.sq.sqe_tail & self.sq.mask];
+ self.sq.sqe_tail = next;
+ return sqe;
+}
+
+/// Submits the SQEs acquired via get_sqe() to the kernel. You can call this once after you have
+/// called get_sqe() multiple times to setup multiple I/O requests.
+/// Returns the number of SQEs submitted, if not used alongside IORING_SETUP_SQPOLL.
+/// If the io_uring instance is uses IORING_SETUP_SQPOLL, the value returned on success is not
+/// guaranteed to match the amount of actually submitted sqes during this call. A value higher
+/// or lower, including 0, may be returned.
+/// Matches the implementation of io_uring_submit() in liburing.
+pub fn submit(self: *IoUring) !u32 {
+ return self.submit_and_wait(0);
+}
+
+/// Like submit(), but allows waiting for events as well.
+/// Returns the number of SQEs submitted.
+/// Matches the implementation of io_uring_submit_and_wait() in liburing.
+pub fn submit_and_wait(self: *IoUring, wait_nr: u32) !u32 {
+ const submitted = self.flush_sq();
+ var flags: u32 = 0;
+ if (self.sq_ring_needs_enter(&flags) or wait_nr > 0) {
+ if (wait_nr > 0 or (self.flags & linux.IORING_SETUP_IOPOLL) != 0) {
+ flags |= linux.IORING_ENTER_GETEVENTS;
}
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `write(2)`.
- /// Returns a pointer to the SQE.
- pub fn write(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: []const u8,
- offset: u64,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_write(sqe, fd, buffer, offset);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `splice(2)`
- /// Either `fd_in` or `fd_out` must be a pipe.
- /// If `fd_in` refers to a pipe, `off_in` is ignored and must be set to std.math.maxInt(u64).
- /// If `fd_in` does not refer to a pipe and `off_in` is maxInt(u64), then `len` are read
- /// from `fd_in` starting from the file offset, which is incremented by the number of bytes read.
- /// If `fd_in` does not refer to a pipe and `off_in` is not maxInt(u64), then the starting offset of `fd_in` will be `off_in`.
- /// This splice operation can be used to implement sendfile by splicing to an intermediate pipe first,
- /// then splice to the final destination. In fact, the implementation of sendfile in kernel uses splice internally.
- ///
- /// NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with EINVAL if one of the
- /// fd doesn't explicitly support splice peration, e.g. reading from terminal is unsupported from kernel 5.7 to 5.11.
- /// See https://github.com/axboe/liburing/issues/291
- ///
- /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
- pub fn splice(self: *IO_Uring, user_data: u64, fd_in: os.fd_t, off_in: u64, fd_out: os.fd_t, off_out: u64, len: usize) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_splice(sqe, fd_in, off_in, fd_out, off_out, len);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED.
- /// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
- /// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
- ///
- /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
- pub fn read_fixed(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: *os.iovec,
- offset: u64,
- buffer_index: u16,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_read_fixed(sqe, fd, buffer, offset, buffer_index);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `pwritev()`.
- /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
- /// For example, if you want to do a `pwritev2()` then set `rw_flags` on the returned SQE.
- /// See https://linux.die.net/man/2/pwritev.
- pub fn writev(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- iovecs: []const os.iovec_const,
- offset: u64,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_writev(sqe, fd, iovecs, offset);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a IORING_OP_WRITE_FIXED.
- /// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
- /// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
- ///
- /// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
- pub fn write_fixed(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: *os.iovec,
- offset: u64,
- buffer_index: u16,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_write_fixed(sqe, fd, buffer, offset, buffer_index);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
- /// Returns a pointer to the SQE.
- /// Available since 5.5
- pub fn accept(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- addr: ?*os.sockaddr,
- addrlen: ?*os.socklen_t,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues an multishot accept on a socket.
- ///
- /// Multishot variant allows an application to issue a single accept request,
- /// which will repeatedly trigger a CQE when a connection request comes in.
- /// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate
- /// further CQEs.
- ///
- /// Available since 5.19
- pub fn accept_multishot(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- addr: ?*os.sockaddr,
- addrlen: ?*os.socklen_t,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues an accept using direct (registered) file descriptors.
- ///
- /// To use an accept direct variant, the application must first have registered
- /// a file table (with register_files). An unused table index will be
- /// dynamically chosen and returned in the CQE res field.
- ///
- /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
- /// flags member, and setting the SQE fd field to the direct descriptor value
- /// rather than the regular file descriptor.
- ///
- /// Available since 5.19
- pub fn accept_direct(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- addr: ?*os.sockaddr,
- addrlen: ?*os.socklen_t,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_accept_direct(sqe, fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues an multishot accept using direct (registered) file descriptors.
- /// Available since 5.19
- pub fn accept_multishot_direct(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- addr: ?*os.sockaddr,
- addrlen: ?*os.socklen_t,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_multishot_accept_direct(sqe, fd, addr, addrlen, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
- /// Returns a pointer to the SQE.
- pub fn connect(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- addr: *const os.sockaddr,
- addrlen: os.socklen_t,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_connect(sqe, fd, addr, addrlen);
- sqe.user_data = user_data;
- return sqe;
+ return try self.enter(submitted, wait_nr, flags);
}
+ return submitted;
+}
- /// Queues (but does not submit) an SQE to perform a `epoll_ctl(2)`.
- /// Returns a pointer to the SQE.
- pub fn epoll_ctl(
- self: *IO_Uring,
- user_data: u64,
- epfd: os.fd_t,
- fd: os.fd_t,
- op: u32,
- ev: ?*linux.epoll_event,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_epoll_ctl(sqe, epfd, fd, op, ev);
- sqe.user_data = user_data;
- return sqe;
+/// Tell the kernel we have submitted SQEs and/or want to wait for CQEs.
+/// Returns the number of SQEs submitted.
+pub fn enter(self: *IoUring, to_submit: u32, min_complete: u32, flags: u32) !u32 {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_enter(self.fd, to_submit, min_complete, flags, null);
+ switch (linux.getErrno(res)) {
+ .SUCCESS => {},
+ // The kernel was unable to allocate memory or ran out of resources for the request.
+ // The application should wait for some completions and try again:
+ .AGAIN => return error.SystemResources,
+ // The SQE `fd` is invalid, or IOSQE_FIXED_FILE was set but no files were registered:
+ .BADF => return error.FileDescriptorInvalid,
+ // The file descriptor is valid, but the ring is not in the right state.
+ // See io_uring_register(2) for how to enable the ring.
+ .BADFD => return error.FileDescriptorInBadState,
+ // The application attempted to overcommit the number of requests it can have pending.
+ // The application should wait for some completions and try again:
+ .BUSY => return error.CompletionQueueOvercommitted,
+ // The SQE is invalid, or valid but the ring was setup with IORING_SETUP_IOPOLL:
+ .INVAL => return error.SubmissionQueueEntryInvalid,
+ // The buffer is outside the process' accessible address space, or IORING_OP_READ_FIXED
+ // or IORING_OP_WRITE_FIXED was specified but no buffers were registered, or the range
+ // described by `addr` and `len` is not within the buffer registered at `buf_index`:
+ .FAULT => return error.BufferInvalid,
+ .NXIO => return error.RingShuttingDown,
+ // The kernel believes our `self.fd` does not refer to an io_uring instance,
+ // or the opcode is valid but not supported by this kernel (more likely):
+ .OPNOTSUPP => return error.OpcodeNotSupported,
+ // The operation was interrupted by a delivery of a signal before it could complete.
+ // This can happen while waiting for events with IORING_ENTER_GETEVENTS:
+ .INTR => return error.SignalInterrupt,
+ else => |errno| return os.unexpectedErrno(errno),
}
+ return @as(u32, @intCast(res));
+}
- /// Used to select how the recv call should be handled.
- pub const RecvBuffer = union(enum) {
- /// io_uring will recv directly into this buffer
- buffer: []u8,
-
- /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
- /// The buffer group referenced by `group_id` must contain at least one buffer for the recv call to work.
- /// `len` controls the number of bytes to read into the selected buffer.
- buffer_selection: struct {
- group_id: u16,
- len: usize,
- },
- };
-
- /// Queues (but does not submit) an SQE to perform a `recv(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 5.6
- pub fn recv(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: RecvBuffer,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- switch (buffer) {
- .buffer => |slice| io_uring_prep_recv(sqe, fd, slice, flags),
- .buffer_selection => |selection| {
- io_uring_prep_rw(.RECV, sqe, fd, 0, selection.len, 0);
- sqe.rw_flags = flags;
- sqe.flags |= linux.IOSQE_BUFFER_SELECT;
- sqe.buf_index = selection.group_id;
- },
+/// Sync internal state with kernel ring state on the SQ side.
+/// Returns the number of all pending events in the SQ ring, for the shared ring.
+/// This return value includes previously flushed SQEs, as per liburing.
+/// The rationale is to suggest that an io_uring_enter() call is needed rather than not.
+/// Matches the implementation of __io_uring_flush_sq() in liburing.
+pub fn flush_sq(self: *IoUring) u32 {
+ if (self.sq.sqe_head != self.sq.sqe_tail) {
+ // Fill in SQEs that we have queued up, adding them to the kernel ring.
+ const to_submit = self.sq.sqe_tail -% self.sq.sqe_head;
+ var tail = self.sq.tail.*;
+ var i: usize = 0;
+ while (i < to_submit) : (i += 1) {
+ self.sq.array[tail & self.sq.mask] = self.sq.sqe_head & self.sq.mask;
+ tail +%= 1;
+ self.sq.sqe_head +%= 1;
}
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `send(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 5.6
- pub fn send(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: []const u8,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_send(sqe, fd, buffer, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
- ///
- /// This operation will most likely produce two CQEs. The flags field of the
- /// first cqe may likely contain IORING_CQE_F_MORE, which means that there will
- /// be a second cqe with the user_data field set to the same value. The user
- /// must not modify the data buffer until the notification is posted. The first
- /// cqe follows the usual rules and so its res field will contain the number of
- /// bytes sent or a negative error code. The notification's res field will be
- /// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two
- /// step model is needed because the kernel may hold on to buffers for a long
- /// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling
- /// the lifetime of the buffers. Even errored requests may generate a
- /// notification.
- ///
- /// Available since 6.0
- pub fn send_zc(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: []const u8,
- send_flags: u32,
- zc_flags: u16,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_send_zc(sqe, fd, buffer, send_flags, zc_flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 6.0
- pub fn send_zc_fixed(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- buffer: []const u8,
- send_flags: u32,
- zc_flags: u16,
- buf_index: u16,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_send_zc_fixed(sqe, fd, buffer, send_flags, zc_flags, buf_index);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 5.3
- pub fn recvmsg(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- msg: *os.msghdr,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_recvmsg(sqe, fd, msg, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `sendmsg(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 5.3
- pub fn sendmsg(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- msg: *const os.msghdr_const,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_sendmsg(sqe, fd, msg, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 6.1
- pub fn sendmsg_zc(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- msg: *const os.msghdr_const,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_sendmsg_zc(sqe, fd, msg, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform an `openat(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 5.6.
- pub fn openat(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- path: [*:0]const u8,
- flags: linux.O,
- mode: os.mode_t,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_openat(sqe, fd, path, flags, mode);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues an openat using direct (registered) file descriptors.
- ///
- /// To use an accept direct variant, the application must first have registered
- /// a file table (with register_files). An unused table index will be
- /// dynamically chosen and returned in the CQE res field.
- ///
- /// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
- /// flags member, and setting the SQE fd field to the direct descriptor value
- /// rather than the regular file descriptor.
- ///
- /// Available since 5.15
- pub fn openat_direct(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- path: [*:0]const u8,
- flags: linux.O,
- mode: os.mode_t,
- file_index: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_openat_direct(sqe, fd, path, flags, mode, file_index);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `close(2)`.
- /// Returns a pointer to the SQE.
- /// Available since 5.6.
- pub fn close(self: *IO_Uring, user_data: u64, fd: os.fd_t) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_close(sqe, fd);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues close of registered file descriptor.
- /// Available since 5.15
- pub fn close_direct(self: *IO_Uring, user_data: u64, file_index: u32) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_close_direct(sqe, file_index);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to register a timeout operation.
- /// Returns a pointer to the SQE.
- ///
- /// The timeout will complete when either the timeout expires, or after the specified number of
- /// events complete (if `count` is greater than `0`).
- ///
- /// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout.
- ///
- /// The completion event result will be `-ETIME` if the timeout completed through expiration,
- /// `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the
- /// timeout was removed before it expired.
- ///
- /// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
- pub fn timeout(
- self: *IO_Uring,
- user_data: u64,
- ts: *const os.linux.kernel_timespec,
- count: u32,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_timeout(sqe, ts, count, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to remove an existing timeout operation.
- /// Returns a pointer to the SQE.
- ///
- /// The timeout is identified by its `user_data`.
- ///
- /// The completion event result will be `0` if the timeout was found and cancelled successfully,
- /// `-EBUSY` if the timeout was found but expiration was already in progress, or
- /// `-ENOENT` if the timeout was not found.
- pub fn timeout_remove(
- self: *IO_Uring,
- user_data: u64,
- timeout_user_data: u64,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_timeout_remove(sqe, timeout_user_data, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to add a link timeout operation.
- /// Returns a pointer to the SQE.
- ///
- /// You need to set linux.IOSQE_IO_LINK to flags of the target operation
- /// and then call this method right after the target operation.
- /// See https://lwn.net/Articles/803932/ for detail.
- ///
- /// If the dependent request finishes before the linked timeout, the timeout
- /// is canceled. If the timeout finishes before the dependent request, the
- /// dependent request will be canceled.
- ///
- /// The completion event result of the link_timeout will be
- /// `-ETIME` if the timeout finishes before the dependent request
- /// (in this case, the completion event result of the dependent request will
- /// be `-ECANCELED`), or
- /// `-EALREADY` if the dependent request finishes before the linked timeout.
- pub fn link_timeout(
- self: *IO_Uring,
- user_data: u64,
- ts: *const os.linux.kernel_timespec,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_link_timeout(sqe, ts, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `poll(2)`.
- /// Returns a pointer to the SQE.
- pub fn poll_add(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- poll_mask: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_poll_add(sqe, fd, poll_mask);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to remove an existing poll operation.
- /// Returns a pointer to the SQE.
- pub fn poll_remove(
- self: *IO_Uring,
- user_data: u64,
- target_user_data: u64,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_poll_remove(sqe, target_user_data);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to update the user data of an existing poll
- /// operation. Returns a pointer to the SQE.
- pub fn poll_update(
- self: *IO_Uring,
- user_data: u64,
- old_user_data: u64,
- new_user_data: u64,
- poll_mask: u32,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_poll_update(sqe, old_user_data, new_user_data, poll_mask, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform an `fallocate(2)`.
- /// Returns a pointer to the SQE.
- pub fn fallocate(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- mode: i32,
- offset: u64,
- len: u64,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_fallocate(sqe, fd, mode, offset, len);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform an `statx(2)`.
- /// Returns a pointer to the SQE.
- pub fn statx(
- self: *IO_Uring,
- user_data: u64,
- fd: os.fd_t,
- path: [:0]const u8,
- flags: u32,
- mask: u32,
- buf: *linux.Statx,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_statx(sqe, fd, path, flags, mask, buf);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to remove an existing operation.
- /// Returns a pointer to the SQE.
- ///
- /// The operation is identified by its `user_data`.
- ///
- /// The completion event result will be `0` if the operation was found and cancelled successfully,
- /// `-EALREADY` if the operation was found but was already in progress, or
- /// `-ENOENT` if the operation was not found.
- pub fn cancel(
- self: *IO_Uring,
- user_data: u64,
- cancel_user_data: u64,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_cancel(sqe, cancel_user_data, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `shutdown(2)`.
- /// Returns a pointer to the SQE.
- ///
- /// The operation is identified by its `user_data`.
- pub fn shutdown(
- self: *IO_Uring,
- user_data: u64,
- sockfd: os.socket_t,
- how: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_shutdown(sqe, sockfd, how);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `renameat2(2)`.
- /// Returns a pointer to the SQE.
- pub fn renameat(
- self: *IO_Uring,
- user_data: u64,
- old_dir_fd: os.fd_t,
- old_path: [*:0]const u8,
- new_dir_fd: os.fd_t,
- new_path: [*:0]const u8,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_renameat(sqe, old_dir_fd, old_path, new_dir_fd, new_path, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `unlinkat(2)`.
- /// Returns a pointer to the SQE.
- pub fn unlinkat(
- self: *IO_Uring,
- user_data: u64,
- dir_fd: os.fd_t,
- path: [*:0]const u8,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_unlinkat(sqe, dir_fd, path, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `mkdirat(2)`.
- /// Returns a pointer to the SQE.
- pub fn mkdirat(
- self: *IO_Uring,
- user_data: u64,
- dir_fd: os.fd_t,
- path: [*:0]const u8,
- mode: os.mode_t,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_mkdirat(sqe, dir_fd, path, mode);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `symlinkat(2)`.
- /// Returns a pointer to the SQE.
- pub fn symlinkat(
- self: *IO_Uring,
- user_data: u64,
- target: [*:0]const u8,
- new_dir_fd: os.fd_t,
- link_path: [*:0]const u8,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_symlinkat(sqe, target, new_dir_fd, link_path);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `linkat(2)`.
- /// Returns a pointer to the SQE.
- pub fn linkat(
- self: *IO_Uring,
- user_data: u64,
- old_dir_fd: os.fd_t,
- old_path: [*:0]const u8,
- new_dir_fd: os.fd_t,
- new_path: [*:0]const u8,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_linkat(sqe, old_dir_fd, old_path, new_dir_fd, new_path, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to provide a group of buffers used for commands that read/receive data.
- /// Returns a pointer to the SQE.
- ///
- /// Provided buffers can be used in `read`, `recv` or `recvmsg` commands via .buffer_selection.
- ///
- /// The kernel expects a contiguous block of memory of size (buffers_count * buffer_size).
- pub fn provide_buffers(
- self: *IO_Uring,
- user_data: u64,
- buffers: [*]u8,
- buffer_size: usize,
- buffers_count: usize,
- group_id: usize,
- buffer_id: usize,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_provide_buffers(sqe, buffers, buffer_size, buffers_count, group_id, buffer_id);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to remove a group of provided buffers.
- /// Returns a pointer to the SQE.
- pub fn remove_buffers(
- self: *IO_Uring,
- user_data: u64,
- buffers_count: usize,
- group_id: usize,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_remove_buffers(sqe, buffers_count, group_id);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Queues (but does not submit) an SQE to perform a `waitid(2)`.
- /// Returns a pointer to the SQE.
- pub fn waitid(
- self: *IO_Uring,
- user_data: u64,
- id_type: linux.P,
- id: i32,
- infop: *linux.siginfo_t,
- options: u32,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_waitid(sqe, id_type, id, infop, options, flags);
- sqe.user_data = user_data;
- return sqe;
- }
-
- /// Registers an array of file descriptors.
- /// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must
- /// retrieve a reference to the file, and once I/O has completed the file reference must be
- /// dropped. The atomic nature of this file reference can be a slowdown for high IOPS workloads.
- /// This slowdown can be avoided by pre-registering file descriptors.
- /// To refer to a registered file descriptor, IOSQE_FIXED_FILE must be set in the SQE's flags,
- /// and the SQE's fd must be set to the index of the file descriptor in the registered array.
- /// Registering file descriptors will wait for the ring to idle.
- /// Files are automatically unregistered by the kernel when the ring is torn down.
- /// An application need unregister only if it wants to register a new array of file descriptors.
- pub fn register_files(self: *IO_Uring, fds: []const os.fd_t) !void {
- assert(self.fd >= 0);
- const res = linux.io_uring_register(
- self.fd,
- .REGISTER_FILES,
- @as(*const anyopaque, @ptrCast(fds.ptr)),
- @as(u32, @intCast(fds.len)),
- );
- try handle_registration_result(res);
- }
-
- /// Updates registered file descriptors.
- ///
- /// Updates are applied starting at the provided offset in the original file descriptors slice.
- /// There are three kind of updates:
- /// * turning a sparse entry (where the fd is -1) into a real one
- /// * removing an existing entry (set the fd to -1)
- /// * replacing an existing entry with a new fd
- /// Adding new file descriptors must be done with `register_files`.
- pub fn register_files_update(self: *IO_Uring, offset: u32, fds: []const os.fd_t) !void {
- assert(self.fd >= 0);
-
- const FilesUpdate = extern struct {
- offset: u32,
- resv: u32,
- fds: u64 align(8),
- };
- var update = FilesUpdate{
- .offset = offset,
- .resv = @as(u32, 0),
- .fds = @as(u64, @intFromPtr(fds.ptr)),
- };
-
- const res = linux.io_uring_register(
- self.fd,
- .REGISTER_FILES_UPDATE,
- @as(*const anyopaque, @ptrCast(&update)),
- @as(u32, @intCast(fds.len)),
- );
- try handle_registration_result(res);
- }
-
- /// Registers the file descriptor for an eventfd that will be notified of completion events on
- /// an io_uring instance.
- /// Only a single a eventfd can be registered at any given point in time.
- pub fn register_eventfd(self: *IO_Uring, fd: os.fd_t) !void {
- assert(self.fd >= 0);
- const res = linux.io_uring_register(
- self.fd,
- .REGISTER_EVENTFD,
- @as(*const anyopaque, @ptrCast(&fd)),
- 1,
- );
- try handle_registration_result(res);
- }
-
- /// Registers the file descriptor for an eventfd that will be notified of completion events on
- /// an io_uring instance. Notifications are only posted for events that complete in an async manner.
- /// This means that events that complete inline while being submitted do not trigger a notification event.
- /// Only a single eventfd can be registered at any given point in time.
- pub fn register_eventfd_async(self: *IO_Uring, fd: os.fd_t) !void {
- assert(self.fd >= 0);
- const res = linux.io_uring_register(
- self.fd,
- .REGISTER_EVENTFD_ASYNC,
- @as(*const anyopaque, @ptrCast(&fd)),
- 1,
- );
- try handle_registration_result(res);
- }
-
- /// Unregister the registered eventfd file descriptor.
- pub fn unregister_eventfd(self: *IO_Uring) !void {
- assert(self.fd >= 0);
- const res = linux.io_uring_register(
- self.fd,
- .UNREGISTER_EVENTFD,
- null,
- 0,
- );
- try handle_registration_result(res);
- }
-
- /// Registers an array of buffers for use with `read_fixed` and `write_fixed`.
- pub fn register_buffers(self: *IO_Uring, buffers: []const os.iovec) !void {
- assert(self.fd >= 0);
- const res = linux.io_uring_register(
- self.fd,
- .REGISTER_BUFFERS,
- buffers.ptr,
- @as(u32, @intCast(buffers.len)),
- );
- try handle_registration_result(res);
- }
-
- /// Unregister the registered buffers.
- pub fn unregister_buffers(self: *IO_Uring) !void {
- assert(self.fd >= 0);
- const res = linux.io_uring_register(self.fd, .UNREGISTER_BUFFERS, null, 0);
- switch (linux.getErrno(res)) {
- .SUCCESS => {},
- .NXIO => return error.BuffersNotRegistered,
- else => |errno| return os.unexpectedErrno(errno),
+ // Ensure that the kernel can actually see the SQE updates when it sees the tail update.
+ @atomicStore(u32, self.sq.tail, tail, .Release);
+ }
+ return self.sq_ready();
+}
+
+/// Returns true if we are not using an SQ thread (thus nobody submits but us),
+/// or if IORING_SQ_NEED_WAKEUP is set and the SQ thread must be explicitly awakened.
+/// For the latter case, we set the SQ thread wakeup flag.
+/// Matches the implementation of sq_ring_needs_enter() in liburing.
+pub fn sq_ring_needs_enter(self: *IoUring, flags: *u32) bool {
+ assert(flags.* == 0);
+ if ((self.flags & linux.IORING_SETUP_SQPOLL) == 0) return true;
+ if ((@atomicLoad(u32, self.sq.flags, .Unordered) & linux.IORING_SQ_NEED_WAKEUP) != 0) {
+ flags.* |= linux.IORING_ENTER_SQ_WAKEUP;
+ return true;
+ }
+ return false;
+}
+
+/// Returns the number of flushed and unflushed SQEs pending in the submission queue.
+/// In other words, this is the number of SQEs in the submission queue, i.e. its length.
+/// These are SQEs that the kernel is yet to consume.
+/// Matches the implementation of io_uring_sq_ready in liburing.
+pub fn sq_ready(self: *IoUring) u32 {
+ // Always use the shared ring state (i.e. head and not sqe_head) to avoid going out of sync,
+ // see https://github.com/axboe/liburing/issues/92.
+ return self.sq.sqe_tail -% @atomicLoad(u32, self.sq.head, .Acquire);
+}
+
+/// Returns the number of CQEs in the completion queue, i.e. its length.
+/// These are CQEs that the application is yet to consume.
+/// Matches the implementation of io_uring_cq_ready in liburing.
+pub fn cq_ready(self: *IoUring) u32 {
+ return @atomicLoad(u32, self.cq.tail, .Acquire) -% self.cq.head.*;
+}
+
+/// Copies as many CQEs as are ready, and that can fit into the destination `cqes` slice.
+/// If none are available, enters into the kernel to wait for at most `wait_nr` CQEs.
+/// Returns the number of CQEs copied, advancing the CQ ring.
+/// Provides all the wait/peek methods found in liburing, but with batching and a single method.
+/// The rationale for copying CQEs rather than copying pointers is that pointers are 8 bytes
+/// whereas CQEs are not much more at only 16 bytes, and this provides a safer faster interface.
+/// Safer, because you no longer need to call cqe_seen(), avoiding idempotency bugs.
+/// Faster, because we can now amortize the atomic store release to `cq.head` across the batch.
+/// See https://github.com/axboe/liburing/issues/103#issuecomment-686665007.
+/// Matches the implementation of io_uring_peek_batch_cqe() in liburing, but supports waiting.
+pub fn copy_cqes(self: *IoUring, cqes: []linux.io_uring_cqe, wait_nr: u32) !u32 {
+ const count = self.copy_cqes_ready(cqes);
+ if (count > 0) return count;
+ if (self.cq_ring_needs_flush() or wait_nr > 0) {
+ _ = try self.enter(0, wait_nr, linux.IORING_ENTER_GETEVENTS);
+ return self.copy_cqes_ready(cqes);
+ }
+ return 0;
+}
+
+fn copy_cqes_ready(self: *IoUring, cqes: []linux.io_uring_cqe) u32 {
+ const ready = self.cq_ready();
+ const count = @min(cqes.len, ready);
+ const head = self.cq.head.* & self.cq.mask;
+ const tail = (self.cq.head.* +% count) & self.cq.mask;
+
+ if (head <= tail) {
+ // head behind tail -> no wrapping
+ @memcpy(cqes[0..count], self.cq.cqes[head..tail]);
+ } else {
+ // head in front of tail -> buffer wraps
+ const two_copies_required: bool = self.cq.cqes.len - head < count;
+ const amount_to_copy_in_first = if (two_copies_required) self.cq.cqes.len - head else count;
+ @memcpy(cqes[0..amount_to_copy_in_first], self.cq.cqes[head .. head + amount_to_copy_in_first]);
+ if (two_copies_required) {
+ @memcpy(cqes[amount_to_copy_in_first..count], self.cq.cqes[0..tail]);
}
}
- fn handle_registration_result(res: usize) !void {
- switch (linux.getErrno(res)) {
- .SUCCESS => {},
- // One or more fds in the array are invalid, or the kernel does not support sparse sets:
- .BADF => return error.FileDescriptorInvalid,
- .BUSY => return error.FilesAlreadyRegistered,
- .INVAL => return error.FilesEmpty,
- // Adding `nr_args` file references would exceed the maximum allowed number of files the
- // user is allowed to have according to the per-user RLIMIT_NOFILE resource limit and
- // the CAP_SYS_RESOURCE capability is not set, or `nr_args` exceeds the maximum allowed
- // for a fixed file set (older kernels have a limit of 1024 files vs 64K files):
- .MFILE => return error.UserFdQuotaExceeded,
- // Insufficient kernel resources, or the caller had a non-zero RLIMIT_MEMLOCK soft
- // resource limit but tried to lock more memory than the limit permitted (not enforced
- // when the process is privileged with CAP_IPC_LOCK):
- .NOMEM => return error.SystemResources,
- // Attempt to register files on a ring already registering files or being torn down:
- .NXIO => return error.RingShuttingDownOrAlreadyRegisteringFiles,
- else => |errno| return os.unexpectedErrno(errno),
- }
- }
+ self.cq_advance(count);
+ return count;
+}
- /// Unregisters all registered file descriptors previously associated with the ring.
- pub fn unregister_files(self: *IO_Uring) !void {
- assert(self.fd >= 0);
- const res = linux.io_uring_register(self.fd, .UNREGISTER_FILES, null, 0);
- switch (linux.getErrno(res)) {
- .SUCCESS => {},
- .NXIO => return error.FilesNotRegistered,
- else => |errno| return os.unexpectedErrno(errno),
- }
+/// Returns a copy of an I/O completion, waiting for it if necessary, and advancing the CQ ring.
+/// A convenience method for `copy_cqes()` for when you don't need to batch or peek.
+pub fn copy_cqe(ring: *IoUring) !linux.io_uring_cqe {
+ var cqes: [1]linux.io_uring_cqe = undefined;
+ while (true) {
+ const count = try ring.copy_cqes(&cqes, 1);
+ if (count > 0) return cqes[0];
}
+}
- /// Prepares a socket creation request.
- /// New socket fd will be returned in completion result.
- /// Available since 5.19
- pub fn socket(
- self: *IO_Uring,
- user_data: u64,
- domain: u32,
- socket_type: u32,
- protocol: u32,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_socket(sqe, domain, socket_type, protocol, flags);
- sqe.user_data = user_data;
- return sqe;
- }
+/// Matches the implementation of cq_ring_needs_flush() in liburing.
+pub fn cq_ring_needs_flush(self: *IoUring) bool {
+ return (@atomicLoad(u32, self.sq.flags, .Unordered) & linux.IORING_SQ_CQ_OVERFLOW) != 0;
+}
- /// Prepares a socket creation request for registered file at index `file_index`.
- /// Available since 5.19
- pub fn socket_direct(
- self: *IO_Uring,
- user_data: u64,
- domain: u32,
- socket_type: u32,
- protocol: u32,
- flags: u32,
- file_index: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_socket_direct(sqe, domain, socket_type, protocol, flags, file_index);
- sqe.user_data = user_data;
- return sqe;
- }
+/// For advanced use cases only that implement custom completion queue methods.
+/// If you use copy_cqes() or copy_cqe() you must not call cqe_seen() or cq_advance().
+/// Must be called exactly once after a zero-copy CQE has been processed by your application.
+/// Not idempotent, calling more than once will result in other CQEs being lost.
+/// Matches the implementation of cqe_seen() in liburing.
+pub fn cqe_seen(self: *IoUring, cqe: *linux.io_uring_cqe) void {
+ _ = cqe;
+ self.cq_advance(1);
+}
- /// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc).
- /// File index will be returned in CQE res field.
- /// Available since 5.19
- pub fn socket_direct_alloc(
- self: *IO_Uring,
- user_data: u64,
- domain: u32,
- socket_type: u32,
- protocol: u32,
- flags: u32,
- ) !*linux.io_uring_sqe {
- const sqe = try self.get_sqe();
- io_uring_prep_socket_direct_alloc(sqe, domain, socket_type, protocol, flags);
- sqe.user_data = user_data;
- return sqe;
+/// For advanced use cases only that implement custom completion queue methods.
+/// Matches the implementation of cq_advance() in liburing.
+pub fn cq_advance(self: *IoUring, count: u32) void {
+ if (count > 0) {
+ // Ensure the kernel only sees the new head value after the CQEs have been read.
+ @atomicStore(u32, self.cq.head, self.cq.head.* +% count, .Release);
}
-};
-
-pub const SubmissionQueue = struct {
- head: *u32,
- tail: *u32,
- mask: u32,
- flags: *u32,
- dropped: *u32,
- array: []u32,
- sqes: []linux.io_uring_sqe,
- mmap: []align(mem.page_size) u8,
- mmap_sqes: []align(mem.page_size) u8,
+}
- // We use `sqe_head` and `sqe_tail` in the same way as liburing:
- // We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`.
- // We then set `tail` to `sqe_tail` once, only when these events are actually submitted.
- // This allows us to amortize the cost of the @atomicStore to `tail` across multiple SQEs.
- sqe_head: u32 = 0,
- sqe_tail: u32 = 0,
+/// Queues (but does not submit) an SQE to perform an `fsync(2)`.
+/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
+/// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the SQE's `rw_flags`.
+/// N.B. While SQEs are initiated in the order in which they appear in the submission queue,
+/// operations execute in parallel and completions are unordered. Therefore, an application that
+/// submits a write followed by an fsync in the submission queue cannot expect the fsync to
+/// apply to the write, since the fsync may complete before the write is issued to the disk.
+/// You should preferably use `link_with_next_sqe()` on a write's SQE to link it with an fsync,
+/// or else insert a full write barrier using `drain_previous_sqes()` when queueing an fsync.
+pub fn fsync(self: *IoUring, user_data: u64, fd: os.fd_t, flags: u32) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_fsync(fd, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
- pub fn init(fd: os.fd_t, p: linux.io_uring_params) !SubmissionQueue {
- assert(fd >= 0);
- assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
- const size = @max(
- p.sq_off.array + p.sq_entries * @sizeOf(u32),
- p.cq_off.cqes + p.cq_entries * @sizeOf(linux.io_uring_cqe),
- );
- const mmap = try os.mmap(
- null,
- size,
- os.PROT.READ | os.PROT.WRITE,
- .{ .TYPE = .SHARED, .POPULATE = true },
- fd,
- linux.IORING_OFF_SQ_RING,
- );
- errdefer os.munmap(mmap);
- assert(mmap.len == size);
+/// Queues (but does not submit) an SQE to perform a no-op.
+/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
+/// A no-op is more useful than may appear at first glance.
+/// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to
+/// know when the ring is idle before acting on a kill signal.
+pub fn nop(self: *IoUring, user_data: u64) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_nop();
+ sqe.user_data = user_data;
+ return sqe;
+}
- // The motivation for the `sqes` and `array` indirection is to make it possible for the
- // application to preallocate static linux.io_uring_sqe entries and then replay them when needed.
- const size_sqes = p.sq_entries * @sizeOf(linux.io_uring_sqe);
- const mmap_sqes = try os.mmap(
- null,
- size_sqes,
- os.PROT.READ | os.PROT.WRITE,
- .{ .TYPE = .SHARED, .POPULATE = true },
- fd,
- linux.IORING_OFF_SQES,
- );
- errdefer os.munmap(mmap_sqes);
- assert(mmap_sqes.len == size_sqes);
+/// Used to select how the read should be handled.
+pub const ReadBuffer = union(enum) {
+ /// io_uring will read directly into this buffer
+ buffer: []u8,
- const array: [*]u32 = @ptrCast(@alignCast(&mmap[p.sq_off.array]));
- const sqes: [*]linux.io_uring_sqe = @ptrCast(@alignCast(&mmap_sqes[0]));
- // We expect the kernel copies p.sq_entries to the u32 pointed to by p.sq_off.ring_entries,
- // see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L7843-L7844.
- assert(p.sq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_entries]))).*);
- return SubmissionQueue{
- .head = @ptrCast(@alignCast(&mmap[p.sq_off.head])),
- .tail = @ptrCast(@alignCast(&mmap[p.sq_off.tail])),
- .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_mask]))).*,
- .flags = @ptrCast(@alignCast(&mmap[p.sq_off.flags])),
- .dropped = @ptrCast(@alignCast(&mmap[p.sq_off.dropped])),
- .array = array[0..p.sq_entries],
- .sqes = sqes[0..p.sq_entries],
- .mmap = mmap,
- .mmap_sqes = mmap_sqes,
- };
- }
+ /// io_uring will read directly into these buffers using readv.
+ iovecs: []const os.iovec,
- pub fn deinit(self: *SubmissionQueue) void {
- os.munmap(self.mmap_sqes);
- os.munmap(self.mmap);
- }
+ /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
+ /// The buffer group reference by `group_id` must contain at least one buffer for the read to work.
+ /// `len` controls the number of bytes to read into the selected buffer.
+ buffer_selection: struct {
+ group_id: u16,
+ len: usize,
+ },
};
-pub const CompletionQueue = struct {
- head: *u32,
- tail: *u32,
- mask: u32,
- overflow: *u32,
- cqes: []linux.io_uring_cqe,
-
- pub fn init(fd: os.fd_t, p: linux.io_uring_params, sq: SubmissionQueue) !CompletionQueue {
- assert(fd >= 0);
- assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
- const mmap = sq.mmap;
- const cqes: [*]linux.io_uring_cqe = @ptrCast(@alignCast(&mmap[p.cq_off.cqes]));
- assert(p.cq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_entries]))).*);
- return CompletionQueue{
- .head = @ptrCast(@alignCast(&mmap[p.cq_off.head])),
- .tail = @ptrCast(@alignCast(&mmap[p.cq_off.tail])),
- .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_mask]))).*,
- .overflow = @ptrCast(@alignCast(&mmap[p.cq_off.overflow])),
- .cqes = cqes[0..p.cq_entries],
- };
- }
-
- pub fn deinit(self: *CompletionQueue) void {
- _ = self;
- // A no-op since we now share the mmap with the submission queue.
- // Here for symmetry with the submission queue, and for any future feature support.
+/// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv(2)` depending on the buffer type.
+/// * Reading into a `ReadBuffer.buffer` uses `read(2)`
+/// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)`
+/// If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE. See https://man7.org/linux/man-pages/man2/preadv2.2.html
+///
+/// Returns a pointer to the SQE.
+pub fn read(
+ self: *IoUring,
+ user_data: u64,
+ fd: os.fd_t,
+ buffer: ReadBuffer,
+ offset: u64,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ switch (buffer) {
+ .buffer => |slice| sqe.prep_read(fd, slice, offset),
+ .iovecs => |vecs| sqe.prep_readv(fd, vecs, offset),
+ .buffer_selection => |selection| {
+ sqe.prep_rw(.READ, fd, 0, selection.len, offset);
+ sqe.flags |= linux.IOSQE_BUFFER_SELECT;
+ sqe.buf_index = selection.group_id;
+ },
}
-};
-
-pub fn io_uring_prep_nop(sqe: *linux.io_uring_sqe) void {
- sqe.* = .{
- .opcode = .NOP,
- .flags = 0,
- .ioprio = 0,
- .fd = 0,
- .off = 0,
- .addr = 0,
- .len = 0,
- .rw_flags = 0,
- .user_data = 0,
- .buf_index = 0,
- .personality = 0,
- .splice_fd_in = 0,
- .addr3 = 0,
- .resv = 0,
- };
+ sqe.user_data = user_data;
+ return sqe;
}
-pub fn io_uring_prep_fsync(sqe: *linux.io_uring_sqe, fd: os.fd_t, flags: u32) void {
- sqe.* = .{
- .opcode = .FSYNC,
- .flags = 0,
- .ioprio = 0,
- .fd = fd,
- .off = 0,
- .addr = 0,
- .len = 0,
- .rw_flags = flags,
- .user_data = 0,
- .buf_index = 0,
- .personality = 0,
- .splice_fd_in = 0,
- .addr3 = 0,
- .resv = 0,
- };
-}
-
-pub fn io_uring_prep_rw(
- op: linux.IORING_OP,
- sqe: *linux.io_uring_sqe,
+/// Queues (but does not submit) an SQE to perform a `write(2)`.
+/// Returns a pointer to the SQE.
+pub fn write(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
- addr: u64,
- len: usize,
+ buffer: []const u8,
offset: u64,
-) void {
- sqe.* = .{
- .opcode = op,
- .flags = 0,
- .ioprio = 0,
- .fd = fd,
- .off = offset,
- .addr = addr,
- .len = @as(u32, @intCast(len)),
- .rw_flags = 0,
- .user_data = 0,
- .buf_index = 0,
- .personality = 0,
- .splice_fd_in = 0,
- .addr3 = 0,
- .resv = 0,
- };
-}
-
-pub fn io_uring_prep_read(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []u8, offset: u64) void {
- io_uring_prep_rw(.READ, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
-}
-
-pub fn io_uring_prep_write(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, offset: u64) void {
- io_uring_prep_rw(.WRITE, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, offset);
-}
-
-pub fn io_uring_prep_splice(sqe: *linux.io_uring_sqe, fd_in: os.fd_t, off_in: u64, fd_out: os.fd_t, off_out: u64, len: usize) void {
- io_uring_prep_rw(.SPLICE, sqe, fd_out, undefined, len, off_out);
- sqe.addr = off_in;
- sqe.splice_fd_in = fd_in;
-}
-
-pub fn io_uring_prep_readv(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_write(fd, buffer, offset);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `splice(2)`
+/// Either `fd_in` or `fd_out` must be a pipe.
+/// If `fd_in` refers to a pipe, `off_in` is ignored and must be set to std.math.maxInt(u64).
+/// If `fd_in` does not refer to a pipe and `off_in` is maxInt(u64), then `len` are read
+/// from `fd_in` starting from the file offset, which is incremented by the number of bytes read.
+/// If `fd_in` does not refer to a pipe and `off_in` is not maxInt(u64), then the starting offset of `fd_in` will be `off_in`.
+/// This splice operation can be used to implement sendfile by splicing to an intermediate pipe first,
+/// then splice to the final destination. In fact, the implementation of sendfile in kernel uses splice internally.
+///
+/// NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with EINVAL if one of the
+/// fd doesn't explicitly support splice peration, e.g. reading from terminal is unsupported from kernel 5.7 to 5.11.
+/// See https://github.com/axboe/liburing/issues/291
+///
+/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
+pub fn splice(self: *IoUring, user_data: u64, fd_in: os.fd_t, off_in: u64, fd_out: os.fd_t, off_out: u64, len: usize) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_splice(fd_in, off_in, fd_out, off_out, len);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED.
+/// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
+/// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
+///
+/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
+pub fn read_fixed(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
- iovecs: []const os.iovec,
+ buffer: *os.iovec,
offset: u64,
-) void {
- io_uring_prep_rw(.READV, sqe, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
-}
-
-pub fn io_uring_prep_writev(
- sqe: *linux.io_uring_sqe,
+ buffer_index: u16,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_read_fixed(fd, buffer, offset, buffer_index);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `pwritev()`.
+/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
+/// For example, if you want to do a `pwritev2()` then set `rw_flags` on the returned SQE.
+/// See https://linux.die.net/man/2/pwritev.
+pub fn writev(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
iovecs: []const os.iovec_const,
offset: u64,
-) void {
- io_uring_prep_rw(.WRITEV, sqe, fd, @intFromPtr(iovecs.ptr), iovecs.len, offset);
-}
-
-pub fn io_uring_prep_read_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: *os.iovec, offset: u64, buffer_index: u16) void {
- io_uring_prep_rw(.READ_FIXED, sqe, fd, @intFromPtr(buffer.iov_base), buffer.iov_len, offset);
- sqe.buf_index = buffer_index;
-}
-
-pub fn io_uring_prep_write_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: *os.iovec, offset: u64, buffer_index: u16) void {
- io_uring_prep_rw(.WRITE_FIXED, sqe, fd, @intFromPtr(buffer.iov_base), buffer.iov_len, offset);
- sqe.buf_index = buffer_index;
-}
-
-/// Poll masks previously used to comprise of 16 bits in the flags union of
-/// a SQE, but were then extended to comprise of 32 bits in order to make
-/// room for additional option flags. To ensure that the correct bits of
-/// poll masks are consistently and properly read across multiple kernel
-/// versions, poll masks are enforced to be little-endian.
-/// https://www.spinics.net/lists/io-uring/msg02848.html
-pub inline fn __io_uring_prep_poll_mask(poll_mask: u32) u32 {
- return std.mem.nativeToLittle(u32, poll_mask);
-}
-
-pub fn io_uring_prep_accept(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_writev(fd, iovecs, offset);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a IORING_OP_WRITE_FIXED.
+/// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
+/// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
+///
+/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
+pub fn write_fixed(
+ self: *IoUring,
+ user_data: u64,
+ fd: os.fd_t,
+ buffer: *os.iovec,
+ offset: u64,
+ buffer_index: u16,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_write_fixed(fd, buffer, offset, buffer_index);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
+/// Returns a pointer to the SQE.
+/// Available since 5.5
+pub fn accept(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
addr: ?*os.sockaddr,
addrlen: ?*os.socklen_t,
flags: u32,
-) void {
- // `addr` holds a pointer to `sockaddr`, and `addr2` holds a pointer to socklen_t`.
- // `addr2` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32).
- io_uring_prep_rw(.ACCEPT, sqe, fd, @intFromPtr(addr), 0, @intFromPtr(addrlen));
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_accept_direct(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_accept(fd, addr, addrlen, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues an multishot accept on a socket.
+///
+/// Multishot variant allows an application to issue a single accept request,
+/// which will repeatedly trigger a CQE when a connection request comes in.
+/// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate
+/// further CQEs.
+///
+/// Available since 5.19
+pub fn accept_multishot(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
addr: ?*os.sockaddr,
addrlen: ?*os.socklen_t,
flags: u32,
- file_index: u32,
-) void {
- io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
- __io_uring_set_target_fixed_file(sqe, file_index);
-}
-
-pub fn io_uring_prep_multishot_accept_direct(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_multishot_accept(fd, addr, addrlen, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues an accept using direct (registered) file descriptors.
+///
+/// To use an accept direct variant, the application must first have registered
+/// a file table (with register_files). An unused table index will be
+/// dynamically chosen and returned in the CQE res field.
+///
+/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
+/// flags member, and setting the SQE fd field to the direct descriptor value
+/// rather than the regular file descriptor.
+///
+/// Available since 5.19
+pub fn accept_direct(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
addr: ?*os.sockaddr,
addrlen: ?*os.socklen_t,
flags: u32,
-) void {
- io_uring_prep_multishot_accept(sqe, fd, addr, addrlen, flags);
- __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC);
-}
-
-fn __io_uring_set_target_fixed_file(sqe: *linux.io_uring_sqe, file_index: u32) void {
- const sqe_file_index: u32 = if (file_index == linux.IORING_FILE_INDEX_ALLOC)
- linux.IORING_FILE_INDEX_ALLOC
- else
- // 0 means no fixed files, indexes should be encoded as "index + 1"
- file_index + 1;
- // This filed is overloaded in liburing:
- // splice_fd_in: i32
- // sqe_file_index: u32
- sqe.splice_fd_in = @bitCast(sqe_file_index);
-}
-
-pub fn io_uring_prep_connect(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_accept_direct(fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues an multishot accept using direct (registered) file descriptors.
+/// Available since 5.19
+pub fn accept_multishot_direct(
+ self: *IoUring,
+ user_data: u64,
+ fd: os.fd_t,
+ addr: ?*os.sockaddr,
+ addrlen: ?*os.socklen_t,
+ flags: u32,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_multishot_accept_direct(fd, addr, addrlen, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
+/// Returns a pointer to the SQE.
+pub fn connect(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
addr: *const os.sockaddr,
addrlen: os.socklen_t,
-) void {
- // `addrlen` maps to `sqe.off` (u64) instead of `sqe.len` (which is only a u32).
- io_uring_prep_rw(.CONNECT, sqe, fd, @intFromPtr(addr), 0, addrlen);
-}
-
-pub fn io_uring_prep_epoll_ctl(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_connect(fd, addr, addrlen);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `epoll_ctl(2)`.
+/// Returns a pointer to the SQE.
+pub fn epoll_ctl(
+ self: *IoUring,
+ user_data: u64,
epfd: os.fd_t,
fd: os.fd_t,
op: u32,
ev: ?*linux.epoll_event,
-) void {
- io_uring_prep_rw(.EPOLL_CTL, sqe, epfd, @intFromPtr(ev), op, @as(u64, @intCast(fd)));
-}
-
-pub fn io_uring_prep_recv(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []u8, flags: u32) void {
- io_uring_prep_rw(.RECV, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_send(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32) void {
- io_uring_prep_rw(.SEND, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_send_zc(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16) void {
- io_uring_prep_rw(.SEND_ZC, sqe, fd, @intFromPtr(buffer.ptr), buffer.len, 0);
- sqe.rw_flags = flags;
- sqe.ioprio = zc_flags;
-}
-
-pub fn io_uring_prep_send_zc_fixed(sqe: *linux.io_uring_sqe, fd: os.fd_t, buffer: []const u8, flags: u32, zc_flags: u16, buf_index: u16) void {
- io_uring_prep_send_zc(sqe, fd, buffer, flags, zc_flags);
- sqe.ioprio |= linux.IORING_RECVSEND_FIXED_BUF;
- sqe.buf_index = buf_index;
-}
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_epoll_ctl(epfd, fd, op, ev);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Used to select how the recv call should be handled.
+pub const RecvBuffer = union(enum) {
+ /// io_uring will recv directly into this buffer
+ buffer: []u8,
+
+ /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
+ /// The buffer group referenced by `group_id` must contain at least one buffer for the recv call to work.
+ /// `len` controls the number of bytes to read into the selected buffer.
+ buffer_selection: struct {
+ group_id: u16,
+ len: usize,
+ },
+};
-pub fn io_uring_prep_sendmsg_zc(
- sqe: *linux.io_uring_sqe,
+/// Queues (but does not submit) an SQE to perform a `recv(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 5.6
+pub fn recv(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
- msg: *const os.msghdr_const,
+ buffer: RecvBuffer,
flags: u32,
-) void {
- io_uring_prep_sendmsg(sqe, fd, msg, flags);
- sqe.opcode = .SENDMSG_ZC;
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ switch (buffer) {
+ .buffer => |slice| sqe.prep_recv(fd, slice, flags),
+ .buffer_selection => |selection| {
+ sqe.prep_rw(.RECV, fd, 0, selection.len, 0);
+ sqe.rw_flags = flags;
+ sqe.flags |= linux.IOSQE_BUFFER_SELECT;
+ sqe.buf_index = selection.group_id;
+ },
+ }
+ sqe.user_data = user_data;
+ return sqe;
}
-pub fn io_uring_prep_recvmsg(
- sqe: *linux.io_uring_sqe,
+/// Queues (but does not submit) an SQE to perform a `send(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 5.6
+pub fn send(
+ self: *IoUring,
+ user_data: u64,
+ fd: os.fd_t,
+ buffer: []const u8,
+ flags: u32,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_send(fd, buffer, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
+///
+/// This operation will most likely produce two CQEs. The flags field of the
+/// first cqe may likely contain IORING_CQE_F_MORE, which means that there will
+/// be a second cqe with the user_data field set to the same value. The user
+/// must not modify the data buffer until the notification is posted. The first
+/// cqe follows the usual rules and so its res field will contain the number of
+/// bytes sent or a negative error code. The notification's res field will be
+/// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two
+/// step model is needed because the kernel may hold on to buffers for a long
+/// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling
+/// the lifetime of the buffers. Even errored requests may generate a
+/// notification.
+///
+/// Available since 6.0
+pub fn send_zc(
+ self: *IoUring,
+ user_data: u64,
+ fd: os.fd_t,
+ buffer: []const u8,
+ send_flags: u32,
+ zc_flags: u16,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_send_zc(fd, buffer, send_flags, zc_flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 6.0
+pub fn send_zc_fixed(
+ self: *IoUring,
+ user_data: u64,
+ fd: os.fd_t,
+ buffer: []const u8,
+ send_flags: u32,
+ zc_flags: u16,
+ buf_index: u16,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_send_zc_fixed(fd, buffer, send_flags, zc_flags, buf_index);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 5.3
+pub fn recvmsg(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
msg: *os.msghdr,
flags: u32,
-) void {
- linux.io_uring_prep_rw(.RECVMSG, sqe, fd, @intFromPtr(msg), 1, 0);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_sendmsg(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_recvmsg(fd, msg, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 5.3
+pub fn sendmsg(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
msg: *const os.msghdr_const,
flags: u32,
-) void {
- linux.io_uring_prep_rw(.SENDMSG, sqe, fd, @intFromPtr(msg), 1, 0);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_openat(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_sendmsg(fd, msg, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 6.1
+pub fn sendmsg_zc(
+ self: *IoUring,
+ user_data: u64,
+ fd: os.fd_t,
+ msg: *const os.msghdr_const,
+ flags: u32,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_sendmsg_zc(fd, msg, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform an `openat(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 5.6.
+pub fn openat(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
path: [*:0]const u8,
flags: linux.O,
mode: os.mode_t,
-) void {
- io_uring_prep_rw(.OPENAT, sqe, fd, @intFromPtr(path), mode, 0);
- sqe.rw_flags = @bitCast(flags);
-}
-
-pub fn io_uring_prep_openat_direct(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_openat(fd, path, flags, mode);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues an openat using direct (registered) file descriptors.
+///
+/// To use an accept direct variant, the application must first have registered
+/// a file table (with register_files). An unused table index will be
+/// dynamically chosen and returned in the CQE res field.
+///
+/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
+/// flags member, and setting the SQE fd field to the direct descriptor value
+/// rather than the regular file descriptor.
+///
+/// Available since 5.15
+pub fn openat_direct(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
path: [*:0]const u8,
flags: linux.O,
mode: os.mode_t,
file_index: u32,
-) void {
- io_uring_prep_openat(sqe, fd, path, flags, mode);
- __io_uring_set_target_fixed_file(sqe, file_index);
-}
-
-pub fn io_uring_prep_close(sqe: *linux.io_uring_sqe, fd: os.fd_t) void {
- sqe.* = .{
- .opcode = .CLOSE,
- .flags = 0,
- .ioprio = 0,
- .fd = fd,
- .off = 0,
- .addr = 0,
- .len = 0,
- .rw_flags = 0,
- .user_data = 0,
- .buf_index = 0,
- .personality = 0,
- .splice_fd_in = 0,
- .addr3 = 0,
- .resv = 0,
- };
-}
-
-pub fn io_uring_prep_close_direct(sqe: *linux.io_uring_sqe, file_index: u32) void {
- io_uring_prep_close(sqe, 0);
- __io_uring_set_target_fixed_file(sqe, file_index);
-}
-
-pub fn io_uring_prep_timeout(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_openat_direct(fd, path, flags, mode, file_index);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `close(2)`.
+/// Returns a pointer to the SQE.
+/// Available since 5.6.
+pub fn close(self: *IoUring, user_data: u64, fd: os.fd_t) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_close(fd);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues close of registered file descriptor.
+/// Available since 5.15
+pub fn close_direct(self: *IoUring, user_data: u64, file_index: u32) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_close_direct(file_index);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to register a timeout operation.
+/// Returns a pointer to the SQE.
+///
+/// The timeout will complete when either the timeout expires, or after the specified number of
+/// events complete (if `count` is greater than `0`).
+///
+/// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout.
+///
+/// The completion event result will be `-ETIME` if the timeout completed through expiration,
+/// `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the
+/// timeout was removed before it expired.
+///
+/// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
+pub fn timeout(
+ self: *IoUring,
+ user_data: u64,
ts: *const os.linux.kernel_timespec,
count: u32,
flags: u32,
-) void {
- io_uring_prep_rw(.TIMEOUT, sqe, -1, @intFromPtr(ts), 1, count);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_timeout_remove(sqe: *linux.io_uring_sqe, timeout_user_data: u64, flags: u32) void {
- sqe.* = .{
- .opcode = .TIMEOUT_REMOVE,
- .flags = 0,
- .ioprio = 0,
- .fd = -1,
- .off = 0,
- .addr = timeout_user_data,
- .len = 0,
- .rw_flags = flags,
- .user_data = 0,
- .buf_index = 0,
- .personality = 0,
- .splice_fd_in = 0,
- .addr3 = 0,
- .resv = 0,
- };
-}
-
-pub fn io_uring_prep_link_timeout(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_timeout(ts, count, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to remove an existing timeout operation.
+/// Returns a pointer to the SQE.
+///
+/// The timeout is identified by its `user_data`.
+///
+/// The completion event result will be `0` if the timeout was found and cancelled successfully,
+/// `-EBUSY` if the timeout was found but expiration was already in progress, or
+/// `-ENOENT` if the timeout was not found.
+pub fn timeout_remove(
+ self: *IoUring,
+ user_data: u64,
+ timeout_user_data: u64,
+ flags: u32,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_timeout_remove(timeout_user_data, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to add a link timeout operation.
+/// Returns a pointer to the SQE.
+///
+/// You need to set linux.IOSQE_IO_LINK to flags of the target operation
+/// and then call this method right after the target operation.
+/// See https://lwn.net/Articles/803932/ for detail.
+///
+/// If the dependent request finishes before the linked timeout, the timeout
+/// is canceled. If the timeout finishes before the dependent request, the
+/// dependent request will be canceled.
+///
+/// The completion event result of the link_timeout will be
+/// `-ETIME` if the timeout finishes before the dependent request
+/// (in this case, the completion event result of the dependent request will
+/// be `-ECANCELED`), or
+/// `-EALREADY` if the dependent request finishes before the linked timeout.
+pub fn link_timeout(
+ self: *IoUring,
+ user_data: u64,
ts: *const os.linux.kernel_timespec,
flags: u32,
-) void {
- linux.io_uring_prep_rw(.LINK_TIMEOUT, sqe, -1, @intFromPtr(ts), 1, 0);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_poll_add(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_link_timeout(ts, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `poll(2)`.
+/// Returns a pointer to the SQE.
+pub fn poll_add(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
poll_mask: u32,
-) void {
- io_uring_prep_rw(.POLL_ADD, sqe, fd, @intFromPtr(@as(?*anyopaque, null)), 0, 0);
- sqe.rw_flags = __io_uring_prep_poll_mask(poll_mask);
-}
-
-pub fn io_uring_prep_poll_remove(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_poll_add(fd, poll_mask);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to remove an existing poll operation.
+/// Returns a pointer to the SQE.
+pub fn poll_remove(
+ self: *IoUring,
+ user_data: u64,
target_user_data: u64,
-) void {
- io_uring_prep_rw(.POLL_REMOVE, sqe, -1, target_user_data, 0, 0);
-}
-
-pub fn io_uring_prep_poll_update(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_poll_remove(target_user_data);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to update the user data of an existing poll
+/// operation. Returns a pointer to the SQE.
+pub fn poll_update(
+ self: *IoUring,
+ user_data: u64,
old_user_data: u64,
new_user_data: u64,
poll_mask: u32,
flags: u32,
-) void {
- io_uring_prep_rw(.POLL_REMOVE, sqe, -1, old_user_data, flags, new_user_data);
- sqe.rw_flags = __io_uring_prep_poll_mask(poll_mask);
-}
-
-pub fn io_uring_prep_fallocate(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_poll_update(old_user_data, new_user_data, poll_mask, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform an `fallocate(2)`.
+/// Returns a pointer to the SQE.
+pub fn fallocate(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
mode: i32,
offset: u64,
len: u64,
-) void {
- sqe.* = .{
- .opcode = .FALLOCATE,
- .flags = 0,
- .ioprio = 0,
- .fd = fd,
- .off = offset,
- .addr = len,
- .len = @as(u32, @intCast(mode)),
- .rw_flags = 0,
- .user_data = 0,
- .buf_index = 0,
- .personality = 0,
- .splice_fd_in = 0,
- .addr3 = 0,
- .resv = 0,
- };
-}
-
-pub fn io_uring_prep_statx(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_fallocate(fd, mode, offset, len);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform an `statx(2)`.
+/// Returns a pointer to the SQE.
+pub fn statx(
+ self: *IoUring,
+ user_data: u64,
fd: os.fd_t,
- path: [*:0]const u8,
+ path: [:0]const u8,
flags: u32,
mask: u32,
buf: *linux.Statx,
-) void {
- io_uring_prep_rw(.STATX, sqe, fd, @intFromPtr(path), mask, @intFromPtr(buf));
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_cancel(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_statx(fd, path, flags, mask, buf);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to remove an existing operation.
+/// Returns a pointer to the SQE.
+///
+/// The operation is identified by its `user_data`.
+///
+/// The completion event result will be `0` if the operation was found and cancelled successfully,
+/// `-EALREADY` if the operation was found but was already in progress, or
+/// `-ENOENT` if the operation was not found.
+pub fn cancel(
+ self: *IoUring,
+ user_data: u64,
cancel_user_data: u64,
flags: u32,
-) void {
- io_uring_prep_rw(.ASYNC_CANCEL, sqe, -1, cancel_user_data, 0, 0);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_shutdown(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_cancel(cancel_user_data, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `shutdown(2)`.
+/// Returns a pointer to the SQE.
+///
+/// The operation is identified by its `user_data`.
+pub fn shutdown(
+ self: *IoUring,
+ user_data: u64,
sockfd: os.socket_t,
how: u32,
-) void {
- io_uring_prep_rw(.SHUTDOWN, sqe, sockfd, 0, how, 0);
-}
-
-pub fn io_uring_prep_renameat(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_shutdown(sockfd, how);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `renameat2(2)`.
+/// Returns a pointer to the SQE.
+pub fn renameat(
+ self: *IoUring,
+ user_data: u64,
old_dir_fd: os.fd_t,
old_path: [*:0]const u8,
new_dir_fd: os.fd_t,
new_path: [*:0]const u8,
flags: u32,
-) void {
- io_uring_prep_rw(
- .RENAMEAT,
- sqe,
- old_dir_fd,
- @intFromPtr(old_path),
- 0,
- @intFromPtr(new_path),
- );
- sqe.len = @bitCast(new_dir_fd);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_unlinkat(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_renameat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `unlinkat(2)`.
+/// Returns a pointer to the SQE.
+pub fn unlinkat(
+ self: *IoUring,
+ user_data: u64,
dir_fd: os.fd_t,
path: [*:0]const u8,
flags: u32,
-) void {
- io_uring_prep_rw(.UNLINKAT, sqe, dir_fd, @intFromPtr(path), 0, 0);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_mkdirat(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_unlinkat(dir_fd, path, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `mkdirat(2)`.
+/// Returns a pointer to the SQE.
+pub fn mkdirat(
+ self: *IoUring,
+ user_data: u64,
dir_fd: os.fd_t,
path: [*:0]const u8,
mode: os.mode_t,
-) void {
- io_uring_prep_rw(.MKDIRAT, sqe, dir_fd, @intFromPtr(path), mode, 0);
-}
-
-pub fn io_uring_prep_symlinkat(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_mkdirat(dir_fd, path, mode);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `symlinkat(2)`.
+/// Returns a pointer to the SQE.
+pub fn symlinkat(
+ self: *IoUring,
+ user_data: u64,
target: [*:0]const u8,
new_dir_fd: os.fd_t,
link_path: [*:0]const u8,
-) void {
- io_uring_prep_rw(
- .SYMLINKAT,
- sqe,
- new_dir_fd,
- @intFromPtr(target),
- 0,
- @intFromPtr(link_path),
- );
-}
-
-pub fn io_uring_prep_linkat(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_symlinkat(target, new_dir_fd, link_path);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `linkat(2)`.
+/// Returns a pointer to the SQE.
+pub fn linkat(
+ self: *IoUring,
+ user_data: u64,
old_dir_fd: os.fd_t,
old_path: [*:0]const u8,
new_dir_fd: os.fd_t,
new_path: [*:0]const u8,
flags: u32,
-) void {
- io_uring_prep_rw(
- .LINKAT,
- sqe,
- old_dir_fd,
- @intFromPtr(old_path),
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_linkat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to provide a group of buffers used for commands that read/receive data.
+/// Returns a pointer to the SQE.
+///
+/// Provided buffers can be used in `read`, `recv` or `recvmsg` commands via .buffer_selection.
+///
+/// The kernel expects a contiguous block of memory of size (buffers_count * buffer_size).
+pub fn provide_buffers(
+ self: *IoUring,
+ user_data: u64,
+ buffers: [*]u8,
+ buffer_size: usize,
+ buffers_count: usize,
+ group_id: usize,
+ buffer_id: usize,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_provide_buffers(buffers, buffer_size, buffers_count, group_id, buffer_id);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to remove a group of provided buffers.
+/// Returns a pointer to the SQE.
+pub fn remove_buffers(
+ self: *IoUring,
+ user_data: u64,
+ buffers_count: usize,
+ group_id: usize,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_remove_buffers(buffers_count, group_id);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Queues (but does not submit) an SQE to perform a `waitid(2)`.
+/// Returns a pointer to the SQE.
+pub fn waitid(
+ self: *IoUring,
+ user_data: u64,
+ id_type: linux.P,
+ id: i32,
+ infop: *linux.siginfo_t,
+ options: u32,
+ flags: u32,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_waitid(id_type, id, infop, options, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Registers an array of file descriptors.
+/// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must
+/// retrieve a reference to the file, and once I/O has completed the file reference must be
+/// dropped. The atomic nature of this file reference can be a slowdown for high IOPS workloads.
+/// This slowdown can be avoided by pre-registering file descriptors.
+/// To refer to a registered file descriptor, IOSQE_FIXED_FILE must be set in the SQE's flags,
+/// and the SQE's fd must be set to the index of the file descriptor in the registered array.
+/// Registering file descriptors will wait for the ring to idle.
+/// Files are automatically unregistered by the kernel when the ring is torn down.
+/// An application need unregister only if it wants to register a new array of file descriptors.
+pub fn register_files(self: *IoUring, fds: []const os.fd_t) !void {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_register(
+ self.fd,
+ .REGISTER_FILES,
+ @as(*const anyopaque, @ptrCast(fds.ptr)),
+ @as(u32, @intCast(fds.len)),
+ );
+ try handle_registration_result(res);
+}
+
+/// Updates registered file descriptors.
+///
+/// Updates are applied starting at the provided offset in the original file descriptors slice.
+/// There are three kind of updates:
+/// * turning a sparse entry (where the fd is -1) into a real one
+/// * removing an existing entry (set the fd to -1)
+/// * replacing an existing entry with a new fd
+/// Adding new file descriptors must be done with `register_files`.
+pub fn register_files_update(self: *IoUring, offset: u32, fds: []const os.fd_t) !void {
+ assert(self.fd >= 0);
+
+ const FilesUpdate = extern struct {
+ offset: u32,
+ resv: u32,
+ fds: u64 align(8),
+ };
+ var update = FilesUpdate{
+ .offset = offset,
+ .resv = @as(u32, 0),
+ .fds = @as(u64, @intFromPtr(fds.ptr)),
+ };
+
+ const res = linux.io_uring_register(
+ self.fd,
+ .REGISTER_FILES_UPDATE,
+ @as(*const anyopaque, @ptrCast(&update)),
+ @as(u32, @intCast(fds.len)),
+ );
+ try handle_registration_result(res);
+}
+
+/// Registers the file descriptor for an eventfd that will be notified of completion events on
+/// an io_uring instance.
+/// Only a single a eventfd can be registered at any given point in time.
+pub fn register_eventfd(self: *IoUring, fd: os.fd_t) !void {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_register(
+ self.fd,
+ .REGISTER_EVENTFD,
+ @as(*const anyopaque, @ptrCast(&fd)),
+ 1,
+ );
+ try handle_registration_result(res);
+}
+
+/// Registers the file descriptor for an eventfd that will be notified of completion events on
+/// an io_uring instance. Notifications are only posted for events that complete in an async manner.
+/// This means that events that complete inline while being submitted do not trigger a notification event.
+/// Only a single eventfd can be registered at any given point in time.
+pub fn register_eventfd_async(self: *IoUring, fd: os.fd_t) !void {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_register(
+ self.fd,
+ .REGISTER_EVENTFD_ASYNC,
+ @as(*const anyopaque, @ptrCast(&fd)),
+ 1,
+ );
+ try handle_registration_result(res);
+}
+
+/// Unregister the registered eventfd file descriptor.
+pub fn unregister_eventfd(self: *IoUring) !void {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_register(
+ self.fd,
+ .UNREGISTER_EVENTFD,
+ null,
0,
- @intFromPtr(new_path),
);
- sqe.len = @bitCast(new_dir_fd);
- sqe.rw_flags = flags;
+ try handle_registration_result(res);
}
-pub fn io_uring_prep_provide_buffers(
- sqe: *linux.io_uring_sqe,
- buffers: [*]u8,
- buffer_len: usize,
- num: usize,
- group_id: usize,
- buffer_id: usize,
-) void {
- const ptr = @intFromPtr(buffers);
- io_uring_prep_rw(.PROVIDE_BUFFERS, sqe, @as(i32, @intCast(num)), ptr, buffer_len, buffer_id);
- sqe.buf_index = @intCast(group_id);
+/// Registers an array of buffers for use with `read_fixed` and `write_fixed`.
+pub fn register_buffers(self: *IoUring, buffers: []const os.iovec) !void {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_register(
+ self.fd,
+ .REGISTER_BUFFERS,
+ buffers.ptr,
+ @as(u32, @intCast(buffers.len)),
+ );
+ try handle_registration_result(res);
}
-pub fn io_uring_prep_remove_buffers(
- sqe: *linux.io_uring_sqe,
- num: usize,
- group_id: usize,
-) void {
- io_uring_prep_rw(.REMOVE_BUFFERS, sqe, @as(i32, @intCast(num)), 0, 0, 0);
- sqe.buf_index = @intCast(group_id);
+/// Unregister the registered buffers.
+pub fn unregister_buffers(self: *IoUring) !void {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_register(self.fd, .UNREGISTER_BUFFERS, null, 0);
+ switch (linux.getErrno(res)) {
+ .SUCCESS => {},
+ .NXIO => return error.BuffersNotRegistered,
+ else => |errno| return os.unexpectedErrno(errno),
+ }
}
-pub fn io_uring_prep_multishot_accept(
- sqe: *linux.io_uring_sqe,
- fd: os.fd_t,
- addr: ?*os.sockaddr,
- addrlen: ?*os.socklen_t,
- flags: u32,
-) void {
- io_uring_prep_accept(sqe, fd, addr, addrlen, flags);
- sqe.ioprio |= linux.IORING_ACCEPT_MULTISHOT;
+fn handle_registration_result(res: usize) !void {
+ switch (linux.getErrno(res)) {
+ .SUCCESS => {},
+ // One or more fds in the array are invalid, or the kernel does not support sparse sets:
+ .BADF => return error.FileDescriptorInvalid,
+ .BUSY => return error.FilesAlreadyRegistered,
+ .INVAL => return error.FilesEmpty,
+ // Adding `nr_args` file references would exceed the maximum allowed number of files the
+ // user is allowed to have according to the per-user RLIMIT_NOFILE resource limit and
+ // the CAP_SYS_RESOURCE capability is not set, or `nr_args` exceeds the maximum allowed
+ // for a fixed file set (older kernels have a limit of 1024 files vs 64K files):
+ .MFILE => return error.UserFdQuotaExceeded,
+ // Insufficient kernel resources, or the caller had a non-zero RLIMIT_MEMLOCK soft
+ // resource limit but tried to lock more memory than the limit permitted (not enforced
+ // when the process is privileged with CAP_IPC_LOCK):
+ .NOMEM => return error.SystemResources,
+ // Attempt to register files on a ring already registering files or being torn down:
+ .NXIO => return error.RingShuttingDownOrAlreadyRegisteringFiles,
+ else => |errno| return os.unexpectedErrno(errno),
+ }
+}
+
+/// Unregisters all registered file descriptors previously associated with the ring.
+pub fn unregister_files(self: *IoUring) !void {
+ assert(self.fd >= 0);
+ const res = linux.io_uring_register(self.fd, .UNREGISTER_FILES, null, 0);
+ switch (linux.getErrno(res)) {
+ .SUCCESS => {},
+ .NXIO => return error.FilesNotRegistered,
+ else => |errno| return os.unexpectedErrno(errno),
+ }
}
-pub fn io_uring_prep_socket(
- sqe: *linux.io_uring_sqe,
+/// Prepares a socket creation request.
+/// New socket fd will be returned in completion result.
+/// Available since 5.19
+pub fn socket(
+ self: *IoUring,
+ user_data: u64,
domain: u32,
socket_type: u32,
protocol: u32,
flags: u32,
-) void {
- io_uring_prep_rw(.SOCKET, sqe, @intCast(domain), 0, protocol, socket_type);
- sqe.rw_flags = flags;
-}
-
-pub fn io_uring_prep_socket_direct(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_socket(domain, socket_type, protocol, flags);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Prepares a socket creation request for registered file at index `file_index`.
+/// Available since 5.19
+pub fn socket_direct(
+ self: *IoUring,
+ user_data: u64,
domain: u32,
socket_type: u32,
protocol: u32,
flags: u32,
file_index: u32,
-) void {
- io_uring_prep_socket(sqe, domain, socket_type, protocol, flags);
- __io_uring_set_target_fixed_file(sqe, file_index);
-}
-
-pub fn io_uring_prep_socket_direct_alloc(
- sqe: *linux.io_uring_sqe,
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_socket_direct(domain, socket_type, protocol, flags, file_index);
+ sqe.user_data = user_data;
+ return sqe;
+}
+
+/// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc).
+/// File index will be returned in CQE res field.
+/// Available since 5.19
+pub fn socket_direct_alloc(
+ self: *IoUring,
+ user_data: u64,
domain: u32,
socket_type: u32,
protocol: u32,
flags: u32,
-) void {
- io_uring_prep_socket(sqe, domain, socket_type, protocol, flags);
- __io_uring_set_target_fixed_file(sqe, linux.IORING_FILE_INDEX_ALLOC);
+) !*linux.io_uring_sqe {
+ const sqe = try self.get_sqe();
+ sqe.prep_socket_direct_alloc(domain, socket_type, protocol, flags);
+ sqe.user_data = user_data;
+ return sqe;
}
-pub fn io_uring_prep_waitid(
- sqe: *linux.io_uring_sqe,
- id_type: linux.P,
- id: i32,
- infop: *linux.siginfo_t,
- options: u32,
- flags: u32,
-) void {
- io_uring_prep_rw(.WAITID, sqe, id, 0, @intFromEnum(id_type), @intFromPtr(infop));
- sqe.rw_flags = flags;
- sqe.splice_fd_in = @bitCast(options);
-}
+pub const SubmissionQueue = struct {
+ head: *u32,
+ tail: *u32,
+ mask: u32,
+ flags: *u32,
+ dropped: *u32,
+ array: []u32,
+ sqes: []linux.io_uring_sqe,
+ mmap: []align(mem.page_size) u8,
+ mmap_sqes: []align(mem.page_size) u8,
+
+ // We use `sqe_head` and `sqe_tail` in the same way as liburing:
+ // We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`.
+ // We then set `tail` to `sqe_tail` once, only when these events are actually submitted.
+ // This allows us to amortize the cost of the @atomicStore to `tail` across multiple SQEs.
+ sqe_head: u32 = 0,
+ sqe_tail: u32 = 0,
+
+ pub fn init(fd: os.fd_t, p: linux.io_uring_params) !SubmissionQueue {
+ assert(fd >= 0);
+ assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
+ const size = @max(
+ p.sq_off.array + p.sq_entries * @sizeOf(u32),
+ p.cq_off.cqes + p.cq_entries * @sizeOf(linux.io_uring_cqe),
+ );
+ const mmap = try os.mmap(
+ null,
+ size,
+ os.PROT.READ | os.PROT.WRITE,
+ .{ .TYPE = .SHARED, .POPULATE = true },
+ fd,
+ linux.IORING_OFF_SQ_RING,
+ );
+ errdefer os.munmap(mmap);
+ assert(mmap.len == size);
+
+ // The motivation for the `sqes` and `array` indirection is to make it possible for the
+ // application to preallocate static linux.io_uring_sqe entries and then replay them when needed.
+ const size_sqes = p.sq_entries * @sizeOf(linux.io_uring_sqe);
+ const mmap_sqes = try os.mmap(
+ null,
+ size_sqes,
+ os.PROT.READ | os.PROT.WRITE,
+ .{ .TYPE = .SHARED, .POPULATE = true },
+ fd,
+ linux.IORING_OFF_SQES,
+ );
+ errdefer os.munmap(mmap_sqes);
+ assert(mmap_sqes.len == size_sqes);
+
+ const array: [*]u32 = @ptrCast(@alignCast(&mmap[p.sq_off.array]));
+ const sqes: [*]linux.io_uring_sqe = @ptrCast(@alignCast(&mmap_sqes[0]));
+ // We expect the kernel copies p.sq_entries to the u32 pointed to by p.sq_off.ring_entries,
+ // see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L7843-L7844.
+ assert(p.sq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_entries]))).*);
+ return SubmissionQueue{
+ .head = @ptrCast(@alignCast(&mmap[p.sq_off.head])),
+ .tail = @ptrCast(@alignCast(&mmap[p.sq_off.tail])),
+ .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_mask]))).*,
+ .flags = @ptrCast(@alignCast(&mmap[p.sq_off.flags])),
+ .dropped = @ptrCast(@alignCast(&mmap[p.sq_off.dropped])),
+ .array = array[0..p.sq_entries],
+ .sqes = sqes[0..p.sq_entries],
+ .mmap = mmap,
+ .mmap_sqes = mmap_sqes,
+ };
+ }
+
+ pub fn deinit(self: *SubmissionQueue) void {
+ os.munmap(self.mmap_sqes);
+ os.munmap(self.mmap);
+ }
+};
+
+pub const CompletionQueue = struct {
+ head: *u32,
+ tail: *u32,
+ mask: u32,
+ overflow: *u32,
+ cqes: []linux.io_uring_cqe,
+
+ pub fn init(fd: os.fd_t, p: linux.io_uring_params, sq: SubmissionQueue) !CompletionQueue {
+ assert(fd >= 0);
+ assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
+ const mmap = sq.mmap;
+ const cqes: [*]linux.io_uring_cqe = @ptrCast(@alignCast(&mmap[p.cq_off.cqes]));
+ assert(p.cq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_entries]))).*);
+ return CompletionQueue{
+ .head = @ptrCast(@alignCast(&mmap[p.cq_off.head])),
+ .tail = @ptrCast(@alignCast(&mmap[p.cq_off.tail])),
+ .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_mask]))).*,
+ .overflow = @ptrCast(@alignCast(&mmap[p.cq_off.overflow])),
+ .cqes = cqes[0..p.cq_entries],
+ };
+ }
+
+ pub fn deinit(self: *CompletionQueue) void {
+ _ = self;
+ // A no-op since we now share the mmap with the submission queue.
+ // Here for symmetry with the submission queue, and for any future feature support.
+ }
+};
test "structs/offsets/entries" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
@@ -2009,14 +1451,14 @@ test "structs/offsets/entries" {
try testing.expectEqual(0x8000000, linux.IORING_OFF_CQ_RING);
try testing.expectEqual(0x10000000, linux.IORING_OFF_SQES);
- try testing.expectError(error.EntriesZero, IO_Uring.init(0, 0));
- try testing.expectError(error.EntriesNotPowerOfTwo, IO_Uring.init(3, 0));
+ try testing.expectError(error.EntriesZero, IoUring.init(0, 0));
+ try testing.expectError(error.EntriesNotPowerOfTwo, IoUring.init(3, 0));
}
test "nop" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2083,7 +1525,7 @@ test "nop" {
test "readv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2125,7 +1567,7 @@ test "readv" {
test "writev/fsync/readv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(4, 0) catch |err| switch (err) {
+ var ring = IoUring.init(4, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2195,7 +1637,7 @@ test "writev/fsync/readv" {
test "write/read" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+ var ring = IoUring.init(2, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2242,7 +1684,7 @@ test "write/read" {
test "splice/read" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(4, 0) catch |err| switch (err) {
+ var ring = IoUring.init(4, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2313,7 +1755,7 @@ test "splice/read" {
test "write_fixed/read_fixed" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+ var ring = IoUring.init(2, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2378,7 +1820,7 @@ test "write_fixed/read_fixed" {
test "openat" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2432,7 +1874,7 @@ test "openat" {
test "close" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2463,7 +1905,7 @@ test "close" {
test "accept/connect/send/recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2476,8 +1918,8 @@ test "accept/connect/send/recv" {
const buffer_send = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 };
var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 };
- const send = try ring.send(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0);
- send.flags |= linux.IOSQE_IO_LINK;
+ const sqe_send = try ring.send(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0);
+ sqe_send.flags |= linux.IOSQE_IO_LINK;
_ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
try testing.expectEqual(@as(u32, 2), try ring.submit());
@@ -2504,7 +1946,7 @@ test "accept/connect/send/recv" {
test "sendmsg/recvmsg" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+ var ring = IoUring.init(2, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2591,7 +2033,7 @@ test "sendmsg/recvmsg" {
test "timeout (after a relative time)" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2622,7 +2064,7 @@ test "timeout (after a relative time)" {
test "timeout (after a number of completions)" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+ var ring = IoUring.init(2, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2655,7 +2097,7 @@ test "timeout (after a number of completions)" {
test "timeout_remove" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+ var ring = IoUring.init(2, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2715,7 +2157,7 @@ test "timeout_remove" {
test "accept/connect/recv/link_timeout" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2764,7 +2206,7 @@ test "accept/connect/recv/link_timeout" {
test "fallocate" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2810,7 +2252,7 @@ test "fallocate" {
test "statx" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2867,7 +2309,7 @@ test "statx" {
test "accept/connect/recv/cancel" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -2917,7 +2359,7 @@ test "accept/connect/recv/cancel" {
test "register_files_update" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3004,7 +2446,7 @@ test "register_files_update" {
test "shutdown" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3068,7 +2510,7 @@ test "shutdown" {
test "renameat" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3129,15 +2571,15 @@ test "renameat" {
defer new_file.close();
var new_file_data: [16]u8 = undefined;
- const read = try new_file.readAll(&new_file_data);
- try testing.expectEqualStrings("hello", new_file_data[0..read]);
+ const bytes_read = try new_file.readAll(&new_file_data);
+ try testing.expectEqualStrings("hello", new_file_data[0..bytes_read]);
}
}
test "unlinkat" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3189,7 +2631,7 @@ test "unlinkat" {
test "mkdirat" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3233,7 +2675,7 @@ test "mkdirat" {
test "symlinkat" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3281,7 +2723,7 @@ test "symlinkat" {
test "linkat" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3333,14 +2775,14 @@ test "linkat" {
defer second_file.close();
var second_file_data: [16]u8 = undefined;
- const read = try second_file.readAll(&second_file_data);
- try testing.expectEqualStrings("hello", second_file_data[0..read]);
+ const bytes_read = try second_file.readAll(&second_file_data);
+ try testing.expectEqualStrings("hello", second_file_data[0..bytes_read]);
}
test "provide_buffers: read" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3472,7 +2914,7 @@ test "provide_buffers: read" {
test "remove_buffers" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3561,7 +3003,7 @@ test "remove_buffers" {
test "provide_buffers: accept/connect/send/recv" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3736,7 +3178,7 @@ const SocketTestHarness = struct {
}
};
-fn createSocketTestHarness(ring: *IO_Uring) !SocketTestHarness {
+fn createSocketTestHarness(ring: *IoUring) !SocketTestHarness {
// Create a TCP server socket
var address = try net.Address.parseIp4("127.0.0.1", 0);
const listener_socket = try createListenerSocket(&address);
@@ -3805,7 +3247,7 @@ fn createListenerSocket(address: *net.Address) !os.socket_t {
test "accept multishot" {
if (builtin.os.tag != .linux) return error.SkipZigTest;
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3844,7 +3286,7 @@ test "accept multishot" {
test "accept/connect/send_zc/recv" {
try skipKernelLessThan(.{ .major = 6, .minor = 0, .patch = 0 });
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3858,8 +3300,8 @@ test "accept/connect/send_zc/recv" {
var buffer_recv = [_]u8{0} ** 10;
// zero-copy send
- const send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0);
- send.flags |= linux.IOSQE_IO_LINK;
+ const sqe_send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0);
+ sqe_send.flags |= linux.IOSQE_IO_LINK;
_ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
try testing.expectEqual(@as(u32, 2), try ring.submit());
@@ -3897,7 +3339,7 @@ test "accept/connect/send_zc/recv" {
test "accept_direct" {
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -3977,7 +3419,7 @@ test "accept_direct" {
test "accept_multishot_direct" {
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -4035,7 +3477,7 @@ test "accept_multishot_direct" {
test "socket" {
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
- var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ var ring = IoUring.init(1, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -4058,7 +3500,7 @@ test "socket" {
test "socket_direct/socket_direct_alloc/close_direct" {
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
- var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+ var ring = IoUring.init(2, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -4136,7 +3578,7 @@ test "socket_direct/socket_direct_alloc/close_direct" {
test "openat_direct/close_direct" {
try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
- var ring = IO_Uring.init(2, 0) catch |err| switch (err) {
+ var ring = IoUring.init(2, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
@@ -4187,7 +3629,7 @@ test "openat_direct/close_direct" {
test "waitid" {
try skipKernelLessThan(.{ .major = 6, .minor = 7, .patch = 0 });
- var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ var ring = IoUring.init(16, 0) catch |err| switch (err) {
error.SystemOutdated => return error.SkipZigTest,
error.PermissionDenied => return error.SkipZigTest,
else => return err,
lib/std/os/linux/test.zig
@@ -120,3 +120,7 @@ test "fadvise" {
const ret = linux.fadvise(file.handle, 0, 0, linux.POSIX_FADV.SEQUENTIAL);
try expectEqual(@as(usize, 0), ret);
}
+
+test {
+ _ = linux.IoUring;
+}
lib/std/os/linux.zig
@@ -383,8 +383,6 @@ pub const O = switch (native_arch) {
else => @compileError("missing std.os.linux.O constants for this architecture"),
};
-pub usingnamespace @import("linux/io_uring.zig");
-
/// Set by startup code, used by `getauxval`.
pub var elf_aux_maybe: ?[*]std.elf.Auxv = null;
@@ -4188,22 +4186,9 @@ pub const IORING_SETUP_SINGLE_ISSUER = 1 << 12;
pub const IORING_SETUP_DEFER_TASKRUN = 1 << 13;
/// IO submission data structure (Submission Queue Entry)
-pub const io_uring_sqe = extern struct {
- opcode: IORING_OP,
- flags: u8,
- ioprio: u16,
- fd: i32,
- off: u64,
- addr: u64,
- len: u32,
- rw_flags: u32,
- user_data: u64,
- buf_index: u16,
- personality: u16,
- splice_fd_in: i32,
- addr3: u64,
- resv: u64,
-};
+pub const io_uring_sqe = @import("linux/io_uring_sqe.zig").io_uring_sqe;
+
+pub const IoUring = @import("linux/IoUring.zig");
/// If sqe->file_index is set to this for opcodes that instantiate a new
/// direct descriptor (like openat/openat2/accept), then io_uring will allocate
CMakeLists.txt
@@ -291,7 +291,8 @@ set(ZIG_STAGE2_SOURCES
"${CMAKE_SOURCE_DIR}/lib/std/os/linux/errno/generic.zig"
"${CMAKE_SOURCE_DIR}/lib/std/os/linux/x86_64.zig"
"${CMAKE_SOURCE_DIR}/lib/std/os/linux.zig"
- "${CMAKE_SOURCE_DIR}/lib/std/os/linux/io_uring.zig"
+ "${CMAKE_SOURCE_DIR}/lib/std/os/linux/IoUring.zig"
+ "${CMAKE_SOURCE_DIR}/lib/std/os/linux/io_uring_sqe.zig"
"${CMAKE_SOURCE_DIR}/lib/std/os/linux/x86_64.zig"
"${CMAKE_SOURCE_DIR}/lib/std/os/windows.zig"
"${CMAKE_SOURCE_DIR}/lib/std/os/windows/ntstatus.zig"