master
1const IoUring = @This();
2const std = @import("std");
3const builtin = @import("builtin");
4const assert = std.debug.assert;
5const mem = std.mem;
6const net = std.Io.net;
7const posix = std.posix;
8const linux = std.os.linux;
9const testing = std.testing;
10const is_linux = builtin.os.tag == .linux;
11const page_size_min = std.heap.page_size_min;
12
13fd: linux.fd_t = -1,
14sq: SubmissionQueue,
15cq: CompletionQueue,
16flags: u32,
17features: u32,
18
19/// A friendly way to setup an io_uring, with default linux.io_uring_params.
20/// `entries` must be a power of two between 1 and 32768, although the kernel will make the final
21/// call on how many entries the submission and completion queues will ultimately have,
22/// see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L8027-L8050.
23/// Matches the interface of io_uring_queue_init() in liburing.
24pub fn init(entries: u16, flags: u32) !IoUring {
25 var params = mem.zeroInit(linux.io_uring_params, .{
26 .flags = flags,
27 .sq_thread_idle = 1000,
28 });
29 return try IoUring.init_params(entries, ¶ms);
30}
31
32/// A powerful way to setup an io_uring, if you want to tweak linux.io_uring_params such as submission
33/// queue thread cpu affinity or thread idle timeout (the kernel and our default is 1 second).
34/// `params` is passed by reference because the kernel needs to modify the parameters.
35/// Matches the interface of io_uring_queue_init_params() in liburing.
36pub fn init_params(entries: u16, p: *linux.io_uring_params) !IoUring {
37 if (entries == 0) return error.EntriesZero;
38 if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
39
40 assert(p.sq_entries == 0);
41 assert(p.cq_entries == 0 or p.flags & linux.IORING_SETUP_CQSIZE != 0);
42 assert(p.features == 0);
43 assert(p.wq_fd == 0 or p.flags & linux.IORING_SETUP_ATTACH_WQ != 0);
44 assert(p.resv[0] == 0);
45 assert(p.resv[1] == 0);
46 assert(p.resv[2] == 0);
47
48 const res = linux.io_uring_setup(entries, p);
49 switch (linux.errno(res)) {
50 .SUCCESS => {},
51 .FAULT => return error.ParamsOutsideAccessibleAddressSpace,
52 // The resv array contains non-zero data, p.flags contains an unsupported flag,
53 // entries out of bounds, IORING_SETUP_SQ_AFF was specified without IORING_SETUP_SQPOLL,
54 // or IORING_SETUP_CQSIZE was specified but linux.io_uring_params.cq_entries was invalid:
55 .INVAL => return error.ArgumentsInvalid,
56 .MFILE => return error.ProcessFdQuotaExceeded,
57 .NFILE => return error.SystemFdQuotaExceeded,
58 .NOMEM => return error.SystemResources,
59 // IORING_SETUP_SQPOLL was specified but effective user ID lacks sufficient privileges,
60 // or a container seccomp policy prohibits io_uring syscalls:
61 .PERM => return error.PermissionDenied,
62 .NOSYS => return error.SystemOutdated,
63 else => |errno| return posix.unexpectedErrno(errno),
64 }
65 const fd = @as(linux.fd_t, @intCast(res));
66 assert(fd >= 0);
67 errdefer posix.close(fd);
68
69 // Kernel versions 5.4 and up use only one mmap() for the submission and completion queues.
70 // This is not an optional feature for us... if the kernel does it, we have to do it.
71 // The thinking on this by the kernel developers was that both the submission and the
72 // completion queue rings have sizes just over a power of two, but the submission queue ring
73 // is significantly smaller with u32 slots. By bundling both in a single mmap, the kernel
74 // gets the submission queue ring for free.
75 // See https://patchwork.kernel.org/patch/11115257 for the kernel patch.
76 // We do not support the double mmap() done before 5.4, because we want to keep the
77 // init/deinit mmap paths simple and because io_uring has had many bug fixes even since 5.4.
78 if ((p.features & linux.IORING_FEAT_SINGLE_MMAP) == 0) {
79 return error.SystemOutdated;
80 }
81
82 // Check that the kernel has actually set params and that "impossible is nothing".
83 assert(p.sq_entries != 0);
84 assert(p.cq_entries != 0);
85 assert(p.cq_entries >= p.sq_entries);
86
87 // From here on, we only need to read from params, so pass `p` by value as immutable.
88 // The completion queue shares the mmap with the submission queue, so pass `sq` there too.
89 var sq = try SubmissionQueue.init(fd, p.*);
90 errdefer sq.deinit();
91 var cq = try CompletionQueue.init(fd, p.*, sq);
92 errdefer cq.deinit();
93
94 // Check that our starting state is as we expect.
95 assert(sq.head.* == 0);
96 assert(sq.tail.* == 0);
97 assert(sq.mask == p.sq_entries - 1);
98 // Allow flags.* to be non-zero, since the kernel may set IORING_SQ_NEED_WAKEUP at any time.
99 assert(sq.dropped.* == 0);
100 assert(sq.array.len == p.sq_entries);
101 assert(sq.sqes.len == p.sq_entries);
102 assert(sq.sqe_head == 0);
103 assert(sq.sqe_tail == 0);
104
105 assert(cq.head.* == 0);
106 assert(cq.tail.* == 0);
107 assert(cq.mask == p.cq_entries - 1);
108 assert(cq.overflow.* == 0);
109 assert(cq.cqes.len == p.cq_entries);
110
111 return IoUring{
112 .fd = fd,
113 .sq = sq,
114 .cq = cq,
115 .flags = p.flags,
116 .features = p.features,
117 };
118}
119
120pub fn deinit(self: *IoUring) void {
121 assert(self.fd >= 0);
122 // The mmaps depend on the fd, so the order of these calls is important:
123 self.cq.deinit();
124 self.sq.deinit();
125 posix.close(self.fd);
126 self.fd = -1;
127}
128
129/// Returns a pointer to a vacant SQE, or an error if the submission queue is full.
130/// We follow the implementation (and atomics) of liburing's `io_uring_get_sqe()` exactly.
131/// However, instead of a null we return an error to force safe handling.
132/// Any situation where the submission queue is full tends more towards a control flow error,
133/// and the null return in liburing is more a C idiom than anything else, for lack of a better
134/// alternative. In Zig, we have first-class error handling... so let's use it.
135/// Matches the implementation of io_uring_get_sqe() in liburing.
136pub fn get_sqe(self: *IoUring) !*linux.io_uring_sqe {
137 const head = @atomicLoad(u32, self.sq.head, .acquire);
138 // Remember that these head and tail offsets wrap around every four billion operations.
139 // We must therefore use wrapping addition and subtraction to avoid a runtime crash.
140 const next = self.sq.sqe_tail +% 1;
141 if (next -% head > self.sq.sqes.len) return error.SubmissionQueueFull;
142 const sqe = &self.sq.sqes[self.sq.sqe_tail & self.sq.mask];
143 self.sq.sqe_tail = next;
144 return sqe;
145}
146
147/// Submits the SQEs acquired via get_sqe() to the kernel. You can call this once after you have
148/// called get_sqe() multiple times to setup multiple I/O requests.
149/// Returns the number of SQEs submitted, if not used alongside IORING_SETUP_SQPOLL.
150/// If the io_uring instance is uses IORING_SETUP_SQPOLL, the value returned on success is not
151/// guaranteed to match the amount of actually submitted sqes during this call. A value higher
152/// or lower, including 0, may be returned.
153/// Matches the implementation of io_uring_submit() in liburing.
154pub fn submit(self: *IoUring) !u32 {
155 return self.submit_and_wait(0);
156}
157
158/// Like submit(), but allows waiting for events as well.
159/// Returns the number of SQEs submitted.
160/// Matches the implementation of io_uring_submit_and_wait() in liburing.
161pub fn submit_and_wait(self: *IoUring, wait_nr: u32) !u32 {
162 const submitted = self.flush_sq();
163 var flags: u32 = 0;
164 if (self.sq_ring_needs_enter(&flags) or wait_nr > 0) {
165 if (wait_nr > 0 or (self.flags & linux.IORING_SETUP_IOPOLL) != 0) {
166 flags |= linux.IORING_ENTER_GETEVENTS;
167 }
168 return try self.enter(submitted, wait_nr, flags);
169 }
170 return submitted;
171}
172
173/// Tell the kernel we have submitted SQEs and/or want to wait for CQEs.
174/// Returns the number of SQEs submitted.
175pub fn enter(self: *IoUring, to_submit: u32, min_complete: u32, flags: u32) !u32 {
176 assert(self.fd >= 0);
177 const res = linux.io_uring_enter(self.fd, to_submit, min_complete, flags, null);
178 switch (linux.errno(res)) {
179 .SUCCESS => {},
180 // The kernel was unable to allocate memory or ran out of resources for the request.
181 // The application should wait for some completions and try again:
182 .AGAIN => return error.SystemResources,
183 // The SQE `fd` is invalid, or IOSQE_FIXED_FILE was set but no files were registered:
184 .BADF => return error.FileDescriptorInvalid,
185 // The file descriptor is valid, but the ring is not in the right state.
186 // See io_uring_register(2) for how to enable the ring.
187 .BADFD => return error.FileDescriptorInBadState,
188 // The application attempted to overcommit the number of requests it can have pending.
189 // The application should wait for some completions and try again:
190 .BUSY => return error.CompletionQueueOvercommitted,
191 // The SQE is invalid, or valid but the ring was setup with IORING_SETUP_IOPOLL:
192 .INVAL => return error.SubmissionQueueEntryInvalid,
193 // The buffer is outside the process' accessible address space, or IORING_OP_READ_FIXED
194 // or IORING_OP_WRITE_FIXED was specified but no buffers were registered, or the range
195 // described by `addr` and `len` is not within the buffer registered at `buf_index`:
196 .FAULT => return error.BufferInvalid,
197 .NXIO => return error.RingShuttingDown,
198 // The kernel believes our `self.fd` does not refer to an io_uring instance,
199 // or the opcode is valid but not supported by this kernel (more likely):
200 .OPNOTSUPP => return error.OpcodeNotSupported,
201 // The operation was interrupted by a delivery of a signal before it could complete.
202 // This can happen while waiting for events with IORING_ENTER_GETEVENTS:
203 .INTR => return error.SignalInterrupt,
204 else => |errno| return posix.unexpectedErrno(errno),
205 }
206 return @as(u32, @intCast(res));
207}
208
209/// Sync internal state with kernel ring state on the SQ side.
210/// Returns the number of all pending events in the SQ ring, for the shared ring.
211/// This return value includes previously flushed SQEs, as per liburing.
212/// The rationale is to suggest that an io_uring_enter() call is needed rather than not.
213/// Matches the implementation of __io_uring_flush_sq() in liburing.
214pub fn flush_sq(self: *IoUring) u32 {
215 if (self.sq.sqe_head != self.sq.sqe_tail) {
216 // Fill in SQEs that we have queued up, adding them to the kernel ring.
217 const to_submit = self.sq.sqe_tail -% self.sq.sqe_head;
218 var tail = self.sq.tail.*;
219 var i: usize = 0;
220 while (i < to_submit) : (i += 1) {
221 self.sq.array[tail & self.sq.mask] = self.sq.sqe_head & self.sq.mask;
222 tail +%= 1;
223 self.sq.sqe_head +%= 1;
224 }
225 // Ensure that the kernel can actually see the SQE updates when it sees the tail update.
226 @atomicStore(u32, self.sq.tail, tail, .release);
227 }
228 return self.sq_ready();
229}
230
231/// Returns true if we are not using an SQ thread (thus nobody submits but us),
232/// or if IORING_SQ_NEED_WAKEUP is set and the SQ thread must be explicitly awakened.
233/// For the latter case, we set the SQ thread wakeup flag.
234/// Matches the implementation of sq_ring_needs_enter() in liburing.
235pub fn sq_ring_needs_enter(self: *IoUring, flags: *u32) bool {
236 assert(flags.* == 0);
237 if ((self.flags & linux.IORING_SETUP_SQPOLL) == 0) return true;
238 if ((@atomicLoad(u32, self.sq.flags, .unordered) & linux.IORING_SQ_NEED_WAKEUP) != 0) {
239 flags.* |= linux.IORING_ENTER_SQ_WAKEUP;
240 return true;
241 }
242 return false;
243}
244
245/// Returns the number of flushed and unflushed SQEs pending in the submission queue.
246/// In other words, this is the number of SQEs in the submission queue, i.e. its length.
247/// These are SQEs that the kernel is yet to consume.
248/// Matches the implementation of io_uring_sq_ready in liburing.
249pub fn sq_ready(self: *IoUring) u32 {
250 // Always use the shared ring state (i.e. head and not sqe_head) to avoid going out of sync,
251 // see https://github.com/axboe/liburing/issues/92.
252 return self.sq.sqe_tail -% @atomicLoad(u32, self.sq.head, .acquire);
253}
254
255/// Returns the number of CQEs in the completion queue, i.e. its length.
256/// These are CQEs that the application is yet to consume.
257/// Matches the implementation of io_uring_cq_ready in liburing.
258pub fn cq_ready(self: *IoUring) u32 {
259 return @atomicLoad(u32, self.cq.tail, .acquire) -% self.cq.head.*;
260}
261
262/// Copies as many CQEs as are ready, and that can fit into the destination `cqes` slice.
263/// If none are available, enters into the kernel to wait for at most `wait_nr` CQEs.
264/// Returns the number of CQEs copied, advancing the CQ ring.
265/// Provides all the wait/peek methods found in liburing, but with batching and a single method.
266/// The rationale for copying CQEs rather than copying pointers is that pointers are 8 bytes
267/// whereas CQEs are not much more at only 16 bytes, and this provides a safer faster interface.
268/// Safer, because you no longer need to call cqe_seen(), avoiding idempotency bugs.
269/// Faster, because we can now amortize the atomic store release to `cq.head` across the batch.
270/// See https://github.com/axboe/liburing/issues/103#issuecomment-686665007.
271/// Matches the implementation of io_uring_peek_batch_cqe() in liburing, but supports waiting.
272pub fn copy_cqes(self: *IoUring, cqes: []linux.io_uring_cqe, wait_nr: u32) !u32 {
273 const count = self.copy_cqes_ready(cqes);
274 if (count > 0) return count;
275 if (self.cq_ring_needs_flush() or wait_nr > 0) {
276 _ = try self.enter(0, wait_nr, linux.IORING_ENTER_GETEVENTS);
277 return self.copy_cqes_ready(cqes);
278 }
279 return 0;
280}
281
282fn copy_cqes_ready(self: *IoUring, cqes: []linux.io_uring_cqe) u32 {
283 const ready = self.cq_ready();
284 const count = @min(cqes.len, ready);
285 const head = self.cq.head.* & self.cq.mask;
286
287 // before wrapping
288 const n = @min(self.cq.cqes.len - head, count);
289 @memcpy(cqes[0..n], self.cq.cqes[head..][0..n]);
290
291 if (count > n) {
292 // wrap self.cq.cqes
293 const w = count - n;
294 @memcpy(cqes[n..][0..w], self.cq.cqes[0..w]);
295 }
296
297 self.cq_advance(count);
298 return count;
299}
300
301/// Returns a copy of an I/O completion, waiting for it if necessary, and advancing the CQ ring.
302/// A convenience method for `copy_cqes()` for when you don't need to batch or peek.
303pub fn copy_cqe(ring: *IoUring) !linux.io_uring_cqe {
304 var cqes: [1]linux.io_uring_cqe = undefined;
305 while (true) {
306 const count = try ring.copy_cqes(&cqes, 1);
307 if (count > 0) return cqes[0];
308 }
309}
310
311/// Matches the implementation of cq_ring_needs_flush() in liburing.
312pub fn cq_ring_needs_flush(self: *IoUring) bool {
313 return (@atomicLoad(u32, self.sq.flags, .unordered) & linux.IORING_SQ_CQ_OVERFLOW) != 0;
314}
315
316/// For advanced use cases only that implement custom completion queue methods.
317/// If you use copy_cqes() or copy_cqe() you must not call cqe_seen() or cq_advance().
318/// Must be called exactly once after a zero-copy CQE has been processed by your application.
319/// Not idempotent, calling more than once will result in other CQEs being lost.
320/// Matches the implementation of cqe_seen() in liburing.
321pub fn cqe_seen(self: *IoUring, cqe: *linux.io_uring_cqe) void {
322 _ = cqe;
323 self.cq_advance(1);
324}
325
326/// For advanced use cases only that implement custom completion queue methods.
327/// Matches the implementation of cq_advance() in liburing.
328pub fn cq_advance(self: *IoUring, count: u32) void {
329 if (count > 0) {
330 // Ensure the kernel only sees the new head value after the CQEs have been read.
331 @atomicStore(u32, self.cq.head, self.cq.head.* +% count, .release);
332 }
333}
334
335/// Queues (but does not submit) an SQE to perform an `fsync(2)`.
336/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
337/// For example, for `fdatasync()` you can set `IORING_FSYNC_DATASYNC` in the SQE's `rw_flags`.
338/// N.B. While SQEs are initiated in the order in which they appear in the submission queue,
339/// operations execute in parallel and completions are unordered. Therefore, an application that
340/// submits a write followed by an fsync in the submission queue cannot expect the fsync to
341/// apply to the write, since the fsync may complete before the write is issued to the disk.
342/// You should preferably use `link_with_next_sqe()` on a write's SQE to link it with an fsync,
343/// or else insert a full write barrier using `drain_previous_sqes()` when queueing an fsync.
344pub fn fsync(self: *IoUring, user_data: u64, fd: linux.fd_t, flags: u32) !*linux.io_uring_sqe {
345 const sqe = try self.get_sqe();
346 sqe.prep_fsync(fd, flags);
347 sqe.user_data = user_data;
348 return sqe;
349}
350
351/// Queues (but does not submit) an SQE to perform a no-op.
352/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
353/// A no-op is more useful than may appear at first glance.
354/// For example, you could call `drain_previous_sqes()` on the returned SQE, to use the no-op to
355/// know when the ring is idle before acting on a kill signal.
356pub fn nop(self: *IoUring, user_data: u64) !*linux.io_uring_sqe {
357 const sqe = try self.get_sqe();
358 sqe.prep_nop();
359 sqe.user_data = user_data;
360 return sqe;
361}
362
363/// Used to select how the read should be handled.
364pub const ReadBuffer = union(enum) {
365 /// io_uring will read directly into this buffer
366 buffer: []u8,
367
368 /// io_uring will read directly into these buffers using readv.
369 iovecs: []const posix.iovec,
370
371 /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
372 /// The buffer group reference by `group_id` must contain at least one buffer for the read to work.
373 /// `len` controls the number of bytes to read into the selected buffer.
374 buffer_selection: struct {
375 group_id: u16,
376 len: usize,
377 },
378};
379
380/// Queues (but does not submit) an SQE to perform a `read(2)` or `preadv(2)` depending on the buffer type.
381/// * Reading into a `ReadBuffer.buffer` uses `read(2)`
382/// * Reading into a `ReadBuffer.iovecs` uses `preadv(2)`
383/// If you want to do a `preadv2(2)` then set `rw_flags` on the returned SQE. See https://man7.org/linux/man-pages/man2/preadv2.2.html
384///
385/// Returns a pointer to the SQE.
386pub fn read(
387 self: *IoUring,
388 user_data: u64,
389 fd: linux.fd_t,
390 buffer: ReadBuffer,
391 offset: u64,
392) !*linux.io_uring_sqe {
393 const sqe = try self.get_sqe();
394 switch (buffer) {
395 .buffer => |slice| sqe.prep_read(fd, slice, offset),
396 .iovecs => |vecs| sqe.prep_readv(fd, vecs, offset),
397 .buffer_selection => |selection| {
398 sqe.prep_rw(.READ, fd, 0, selection.len, offset);
399 sqe.flags |= linux.IOSQE_BUFFER_SELECT;
400 sqe.buf_index = selection.group_id;
401 },
402 }
403 sqe.user_data = user_data;
404 return sqe;
405}
406
407/// Queues (but does not submit) an SQE to perform a `write(2)`.
408/// Returns a pointer to the SQE.
409pub fn write(
410 self: *IoUring,
411 user_data: u64,
412 fd: linux.fd_t,
413 buffer: []const u8,
414 offset: u64,
415) !*linux.io_uring_sqe {
416 const sqe = try self.get_sqe();
417 sqe.prep_write(fd, buffer, offset);
418 sqe.user_data = user_data;
419 return sqe;
420}
421
422/// Queues (but does not submit) an SQE to perform a `splice(2)`
423/// Either `fd_in` or `fd_out` must be a pipe.
424/// If `fd_in` refers to a pipe, `off_in` is ignored and must be set to std.math.maxInt(u64).
425/// If `fd_in` does not refer to a pipe and `off_in` is maxInt(u64), then `len` are read
426/// from `fd_in` starting from the file offset, which is incremented by the number of bytes read.
427/// If `fd_in` does not refer to a pipe and `off_in` is not maxInt(u64), then the starting offset of `fd_in` will be `off_in`.
428/// This splice operation can be used to implement sendfile by splicing to an intermediate pipe first,
429/// then splice to the final destination. In fact, the implementation of sendfile in kernel uses splice internally.
430///
431/// NOTE that even if fd_in or fd_out refers to a pipe, the splice operation can still fail with EINVAL if one of the
432/// fd doesn't explicitly support splice peration, e.g. reading from terminal is unsupported from kernel 5.7 to 5.11.
433/// See https://github.com/axboe/liburing/issues/291
434///
435/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
436pub fn splice(self: *IoUring, user_data: u64, fd_in: linux.fd_t, off_in: u64, fd_out: linux.fd_t, off_out: u64, len: usize) !*linux.io_uring_sqe {
437 const sqe = try self.get_sqe();
438 sqe.prep_splice(fd_in, off_in, fd_out, off_out, len);
439 sqe.user_data = user_data;
440 return sqe;
441}
442
443/// Queues (but does not submit) an SQE to perform a IORING_OP_READ_FIXED.
444/// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
445/// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
446///
447/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
448pub fn read_fixed(
449 self: *IoUring,
450 user_data: u64,
451 fd: linux.fd_t,
452 buffer: *posix.iovec,
453 offset: u64,
454 buffer_index: u16,
455) !*linux.io_uring_sqe {
456 const sqe = try self.get_sqe();
457 sqe.prep_read_fixed(fd, buffer, offset, buffer_index);
458 sqe.user_data = user_data;
459 return sqe;
460}
461
462/// Queues (but does not submit) an SQE to perform a `pwritev()`.
463/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
464/// For example, if you want to do a `pwritev2()` then set `rw_flags` on the returned SQE.
465/// See https://linux.die.net/man/2/pwritev.
466pub fn writev(
467 self: *IoUring,
468 user_data: u64,
469 fd: linux.fd_t,
470 iovecs: []const posix.iovec_const,
471 offset: u64,
472) !*linux.io_uring_sqe {
473 const sqe = try self.get_sqe();
474 sqe.prep_writev(fd, iovecs, offset);
475 sqe.user_data = user_data;
476 return sqe;
477}
478
479/// Queues (but does not submit) an SQE to perform a IORING_OP_WRITE_FIXED.
480/// The `buffer` provided must be registered with the kernel by calling `register_buffers` first.
481/// The `buffer_index` must be the same as its index in the array provided to `register_buffers`.
482///
483/// Returns a pointer to the SQE so that you can further modify the SQE for advanced use cases.
484pub fn write_fixed(
485 self: *IoUring,
486 user_data: u64,
487 fd: linux.fd_t,
488 buffer: *posix.iovec,
489 offset: u64,
490 buffer_index: u16,
491) !*linux.io_uring_sqe {
492 const sqe = try self.get_sqe();
493 sqe.prep_write_fixed(fd, buffer, offset, buffer_index);
494 sqe.user_data = user_data;
495 return sqe;
496}
497
498/// Queues (but does not submit) an SQE to perform an `accept4(2)` on a socket.
499/// Returns a pointer to the SQE.
500/// Available since 5.5
501pub fn accept(
502 self: *IoUring,
503 user_data: u64,
504 fd: linux.fd_t,
505 addr: ?*posix.sockaddr,
506 addrlen: ?*posix.socklen_t,
507 flags: u32,
508) !*linux.io_uring_sqe {
509 const sqe = try self.get_sqe();
510 sqe.prep_accept(fd, addr, addrlen, flags);
511 sqe.user_data = user_data;
512 return sqe;
513}
514
515/// Queues an multishot accept on a socket.
516///
517/// Multishot variant allows an application to issue a single accept request,
518/// which will repeatedly trigger a CQE when a connection request comes in.
519/// While IORING_CQE_F_MORE flag is set in CQE flags accept will generate
520/// further CQEs.
521///
522/// Available since 5.19
523pub fn accept_multishot(
524 self: *IoUring,
525 user_data: u64,
526 fd: linux.fd_t,
527 addr: ?*posix.sockaddr,
528 addrlen: ?*posix.socklen_t,
529 flags: u32,
530) !*linux.io_uring_sqe {
531 const sqe = try self.get_sqe();
532 sqe.prep_multishot_accept(fd, addr, addrlen, flags);
533 sqe.user_data = user_data;
534 return sqe;
535}
536
537/// Queues an accept using direct (registered) file descriptors.
538///
539/// To use an accept direct variant, the application must first have registered
540/// a file table (with register_files). An unused table index will be
541/// dynamically chosen and returned in the CQE res field.
542///
543/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
544/// flags member, and setting the SQE fd field to the direct descriptor value
545/// rather than the regular file descriptor.
546///
547/// Available since 5.19
548pub fn accept_direct(
549 self: *IoUring,
550 user_data: u64,
551 fd: linux.fd_t,
552 addr: ?*posix.sockaddr,
553 addrlen: ?*posix.socklen_t,
554 flags: u32,
555) !*linux.io_uring_sqe {
556 const sqe = try self.get_sqe();
557 sqe.prep_accept_direct(fd, addr, addrlen, flags, linux.IORING_FILE_INDEX_ALLOC);
558 sqe.user_data = user_data;
559 return sqe;
560}
561
562/// Queues an multishot accept using direct (registered) file descriptors.
563/// Available since 5.19
564pub fn accept_multishot_direct(
565 self: *IoUring,
566 user_data: u64,
567 fd: linux.fd_t,
568 addr: ?*posix.sockaddr,
569 addrlen: ?*posix.socklen_t,
570 flags: u32,
571) !*linux.io_uring_sqe {
572 const sqe = try self.get_sqe();
573 sqe.prep_multishot_accept_direct(fd, addr, addrlen, flags);
574 sqe.user_data = user_data;
575 return sqe;
576}
577
578/// Queue (but does not submit) an SQE to perform a `connect(2)` on a socket.
579/// Returns a pointer to the SQE.
580pub fn connect(
581 self: *IoUring,
582 user_data: u64,
583 fd: linux.fd_t,
584 addr: *const posix.sockaddr,
585 addrlen: posix.socklen_t,
586) !*linux.io_uring_sqe {
587 const sqe = try self.get_sqe();
588 sqe.prep_connect(fd, addr, addrlen);
589 sqe.user_data = user_data;
590 return sqe;
591}
592
593/// Queues (but does not submit) an SQE to perform a `epoll_ctl(2)`.
594/// Returns a pointer to the SQE.
595pub fn epoll_ctl(
596 self: *IoUring,
597 user_data: u64,
598 epfd: linux.fd_t,
599 fd: linux.fd_t,
600 op: u32,
601 ev: ?*linux.epoll_event,
602) !*linux.io_uring_sqe {
603 const sqe = try self.get_sqe();
604 sqe.prep_epoll_ctl(epfd, fd, op, ev);
605 sqe.user_data = user_data;
606 return sqe;
607}
608
609/// Used to select how the recv call should be handled.
610pub const RecvBuffer = union(enum) {
611 /// io_uring will recv directly into this buffer
612 buffer: []u8,
613
614 /// io_uring will select a buffer that has previously been provided with `provide_buffers`.
615 /// The buffer group referenced by `group_id` must contain at least one buffer for the recv call to work.
616 /// `len` controls the number of bytes to read into the selected buffer.
617 buffer_selection: struct {
618 group_id: u16,
619 len: usize,
620 },
621};
622
623/// Queues (but does not submit) an SQE to perform a `recv(2)`.
624/// Returns a pointer to the SQE.
625/// Available since 5.6
626pub fn recv(
627 self: *IoUring,
628 user_data: u64,
629 fd: linux.fd_t,
630 buffer: RecvBuffer,
631 flags: u32,
632) !*linux.io_uring_sqe {
633 const sqe = try self.get_sqe();
634 switch (buffer) {
635 .buffer => |slice| sqe.prep_recv(fd, slice, flags),
636 .buffer_selection => |selection| {
637 sqe.prep_rw(.RECV, fd, 0, selection.len, 0);
638 sqe.rw_flags = flags;
639 sqe.flags |= linux.IOSQE_BUFFER_SELECT;
640 sqe.buf_index = selection.group_id;
641 },
642 }
643 sqe.user_data = user_data;
644 return sqe;
645}
646
647/// Queues (but does not submit) an SQE to perform a `send(2)`.
648/// Returns a pointer to the SQE.
649/// Available since 5.6
650pub fn send(
651 self: *IoUring,
652 user_data: u64,
653 fd: linux.fd_t,
654 buffer: []const u8,
655 flags: u32,
656) !*linux.io_uring_sqe {
657 const sqe = try self.get_sqe();
658 sqe.prep_send(fd, buffer, flags);
659 sqe.user_data = user_data;
660 return sqe;
661}
662
663/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
664///
665/// This operation will most likely produce two CQEs. The flags field of the
666/// first cqe may likely contain IORING_CQE_F_MORE, which means that there will
667/// be a second cqe with the user_data field set to the same value. The user
668/// must not modify the data buffer until the notification is posted. The first
669/// cqe follows the usual rules and so its res field will contain the number of
670/// bytes sent or a negative error code. The notification's res field will be
671/// set to zero and the flags field will contain IORING_CQE_F_NOTIF. The two
672/// step model is needed because the kernel may hold on to buffers for a long
673/// time, e.g. waiting for a TCP ACK. Notifications responsible for controlling
674/// the lifetime of the buffers. Even errored requests may generate a
675/// notification.
676///
677/// Available since 6.0
678pub fn send_zc(
679 self: *IoUring,
680 user_data: u64,
681 fd: linux.fd_t,
682 buffer: []const u8,
683 send_flags: u32,
684 zc_flags: u16,
685) !*linux.io_uring_sqe {
686 const sqe = try self.get_sqe();
687 sqe.prep_send_zc(fd, buffer, send_flags, zc_flags);
688 sqe.user_data = user_data;
689 return sqe;
690}
691
692/// Queues (but does not submit) an SQE to perform an async zerocopy `send(2)`.
693/// Returns a pointer to the SQE.
694/// Available since 6.0
695pub fn send_zc_fixed(
696 self: *IoUring,
697 user_data: u64,
698 fd: linux.fd_t,
699 buffer: []const u8,
700 send_flags: u32,
701 zc_flags: u16,
702 buf_index: u16,
703) !*linux.io_uring_sqe {
704 const sqe = try self.get_sqe();
705 sqe.prep_send_zc_fixed(fd, buffer, send_flags, zc_flags, buf_index);
706 sqe.user_data = user_data;
707 return sqe;
708}
709
710/// Queues (but does not submit) an SQE to perform a `recvmsg(2)`.
711/// Returns a pointer to the SQE.
712/// Available since 5.3
713pub fn recvmsg(
714 self: *IoUring,
715 user_data: u64,
716 fd: linux.fd_t,
717 msg: *linux.msghdr,
718 flags: u32,
719) !*linux.io_uring_sqe {
720 const sqe = try self.get_sqe();
721 sqe.prep_recvmsg(fd, msg, flags);
722 sqe.user_data = user_data;
723 return sqe;
724}
725
726/// Queues (but does not submit) an SQE to perform a `sendmsg(2)`.
727/// Returns a pointer to the SQE.
728/// Available since 5.3
729pub fn sendmsg(
730 self: *IoUring,
731 user_data: u64,
732 fd: linux.fd_t,
733 msg: *const linux.msghdr_const,
734 flags: u32,
735) !*linux.io_uring_sqe {
736 const sqe = try self.get_sqe();
737 sqe.prep_sendmsg(fd, msg, flags);
738 sqe.user_data = user_data;
739 return sqe;
740}
741
742/// Queues (but does not submit) an SQE to perform an async zerocopy `sendmsg(2)`.
743/// Returns a pointer to the SQE.
744/// Available since 6.1
745pub fn sendmsg_zc(
746 self: *IoUring,
747 user_data: u64,
748 fd: linux.fd_t,
749 msg: *const linux.msghdr_const,
750 flags: u32,
751) !*linux.io_uring_sqe {
752 const sqe = try self.get_sqe();
753 sqe.prep_sendmsg_zc(fd, msg, flags);
754 sqe.user_data = user_data;
755 return sqe;
756}
757
758/// Queues (but does not submit) an SQE to perform an `openat(2)`.
759/// Returns a pointer to the SQE.
760/// Available since 5.6.
761pub fn openat(
762 self: *IoUring,
763 user_data: u64,
764 fd: linux.fd_t,
765 path: [*:0]const u8,
766 flags: linux.O,
767 mode: posix.mode_t,
768) !*linux.io_uring_sqe {
769 const sqe = try self.get_sqe();
770 sqe.prep_openat(fd, path, flags, mode);
771 sqe.user_data = user_data;
772 return sqe;
773}
774
775/// Queues an openat using direct (registered) file descriptors.
776///
777/// To use an accept direct variant, the application must first have registered
778/// a file table (with register_files). An unused table index will be
779/// dynamically chosen and returned in the CQE res field.
780///
781/// After creation, they can be used by setting IOSQE_FIXED_FILE in the SQE
782/// flags member, and setting the SQE fd field to the direct descriptor value
783/// rather than the regular file descriptor.
784///
785/// Available since 5.15
786pub fn openat_direct(
787 self: *IoUring,
788 user_data: u64,
789 fd: linux.fd_t,
790 path: [*:0]const u8,
791 flags: linux.O,
792 mode: posix.mode_t,
793 file_index: u32,
794) !*linux.io_uring_sqe {
795 const sqe = try self.get_sqe();
796 sqe.prep_openat_direct(fd, path, flags, mode, file_index);
797 sqe.user_data = user_data;
798 return sqe;
799}
800
801/// Queues (but does not submit) an SQE to perform a `close(2)`.
802/// Returns a pointer to the SQE.
803/// Available since 5.6.
804pub fn close(self: *IoUring, user_data: u64, fd: linux.fd_t) !*linux.io_uring_sqe {
805 const sqe = try self.get_sqe();
806 sqe.prep_close(fd);
807 sqe.user_data = user_data;
808 return sqe;
809}
810
811/// Queues close of registered file descriptor.
812/// Available since 5.15
813pub fn close_direct(self: *IoUring, user_data: u64, file_index: u32) !*linux.io_uring_sqe {
814 const sqe = try self.get_sqe();
815 sqe.prep_close_direct(file_index);
816 sqe.user_data = user_data;
817 return sqe;
818}
819
820/// Queues (but does not submit) an SQE to register a timeout operation.
821/// Returns a pointer to the SQE.
822///
823/// The timeout will complete when either the timeout expires, or after the specified number of
824/// events complete (if `count` is greater than `0`).
825///
826/// `flags` may be `0` for a relative timeout, or `IORING_TIMEOUT_ABS` for an absolute timeout.
827///
828/// The completion event result will be `-ETIME` if the timeout completed through expiration,
829/// `0` if the timeout completed after the specified number of events, or `-ECANCELED` if the
830/// timeout was removed before it expired.
831///
832/// io_uring timeouts use the `CLOCK.MONOTONIC` clock source.
833pub fn timeout(
834 self: *IoUring,
835 user_data: u64,
836 ts: *const linux.kernel_timespec,
837 count: u32,
838 flags: u32,
839) !*linux.io_uring_sqe {
840 const sqe = try self.get_sqe();
841 sqe.prep_timeout(ts, count, flags);
842 sqe.user_data = user_data;
843 return sqe;
844}
845
846/// Queues (but does not submit) an SQE to remove an existing timeout operation.
847/// Returns a pointer to the SQE.
848///
849/// The timeout is identified by its `user_data`.
850///
851/// The completion event result will be `0` if the timeout was found and canceled successfully,
852/// `-EBUSY` if the timeout was found but expiration was already in progress, or
853/// `-ENOENT` if the timeout was not found.
854pub fn timeout_remove(
855 self: *IoUring,
856 user_data: u64,
857 timeout_user_data: u64,
858 flags: u32,
859) !*linux.io_uring_sqe {
860 const sqe = try self.get_sqe();
861 sqe.prep_timeout_remove(timeout_user_data, flags);
862 sqe.user_data = user_data;
863 return sqe;
864}
865
866/// Queues (but does not submit) an SQE to add a link timeout operation.
867/// Returns a pointer to the SQE.
868///
869/// You need to set linux.IOSQE_IO_LINK to flags of the target operation
870/// and then call this method right after the target operation.
871/// See https://lwn.net/Articles/803932/ for detail.
872///
873/// If the dependent request finishes before the linked timeout, the timeout
874/// is canceled. If the timeout finishes before the dependent request, the
875/// dependent request will be canceled.
876///
877/// The completion event result of the link_timeout will be
878/// `-ETIME` if the timeout finishes before the dependent request
879/// (in this case, the completion event result of the dependent request will
880/// be `-ECANCELED`), or
881/// `-EALREADY` if the dependent request finishes before the linked timeout.
882pub fn link_timeout(
883 self: *IoUring,
884 user_data: u64,
885 ts: *const linux.kernel_timespec,
886 flags: u32,
887) !*linux.io_uring_sqe {
888 const sqe = try self.get_sqe();
889 sqe.prep_link_timeout(ts, flags);
890 sqe.user_data = user_data;
891 return sqe;
892}
893
894/// Queues (but does not submit) an SQE to perform a `poll(2)`.
895/// Returns a pointer to the SQE.
896pub fn poll_add(
897 self: *IoUring,
898 user_data: u64,
899 fd: linux.fd_t,
900 poll_mask: u32,
901) !*linux.io_uring_sqe {
902 const sqe = try self.get_sqe();
903 sqe.prep_poll_add(fd, poll_mask);
904 sqe.user_data = user_data;
905 return sqe;
906}
907
908/// Queues (but does not submit) an SQE to remove an existing poll operation.
909/// Returns a pointer to the SQE.
910pub fn poll_remove(
911 self: *IoUring,
912 user_data: u64,
913 target_user_data: u64,
914) !*linux.io_uring_sqe {
915 const sqe = try self.get_sqe();
916 sqe.prep_poll_remove(target_user_data);
917 sqe.user_data = user_data;
918 return sqe;
919}
920
921/// Queues (but does not submit) an SQE to update the user data of an existing poll
922/// operation. Returns a pointer to the SQE.
923pub fn poll_update(
924 self: *IoUring,
925 user_data: u64,
926 old_user_data: u64,
927 new_user_data: u64,
928 poll_mask: u32,
929 flags: u32,
930) !*linux.io_uring_sqe {
931 const sqe = try self.get_sqe();
932 sqe.prep_poll_update(old_user_data, new_user_data, poll_mask, flags);
933 sqe.user_data = user_data;
934 return sqe;
935}
936
937/// Queues (but does not submit) an SQE to perform an `fallocate(2)`.
938/// Returns a pointer to the SQE.
939pub fn fallocate(
940 self: *IoUring,
941 user_data: u64,
942 fd: linux.fd_t,
943 mode: i32,
944 offset: u64,
945 len: u64,
946) !*linux.io_uring_sqe {
947 const sqe = try self.get_sqe();
948 sqe.prep_fallocate(fd, mode, offset, len);
949 sqe.user_data = user_data;
950 return sqe;
951}
952
953/// Queues (but does not submit) an SQE to perform an `statx(2)`.
954/// Returns a pointer to the SQE.
955pub fn statx(
956 self: *IoUring,
957 user_data: u64,
958 fd: linux.fd_t,
959 path: [:0]const u8,
960 flags: u32,
961 mask: u32,
962 buf: *linux.Statx,
963) !*linux.io_uring_sqe {
964 const sqe = try self.get_sqe();
965 sqe.prep_statx(fd, path, flags, mask, buf);
966 sqe.user_data = user_data;
967 return sqe;
968}
969
970/// Queues (but does not submit) an SQE to remove an existing operation.
971/// Returns a pointer to the SQE.
972///
973/// The operation is identified by its `user_data`.
974///
975/// The completion event result will be `0` if the operation was found and canceled successfully,
976/// `-EALREADY` if the operation was found but was already in progress, or
977/// `-ENOENT` if the operation was not found.
978pub fn cancel(
979 self: *IoUring,
980 user_data: u64,
981 cancel_user_data: u64,
982 flags: u32,
983) !*linux.io_uring_sqe {
984 const sqe = try self.get_sqe();
985 sqe.prep_cancel(cancel_user_data, flags);
986 sqe.user_data = user_data;
987 return sqe;
988}
989
990/// Queues (but does not submit) an SQE to perform a `shutdown(2)`.
991/// Returns a pointer to the SQE.
992///
993/// The operation is identified by its `user_data`.
994pub fn shutdown(
995 self: *IoUring,
996 user_data: u64,
997 sockfd: posix.socket_t,
998 how: u32,
999) !*linux.io_uring_sqe {
1000 const sqe = try self.get_sqe();
1001 sqe.prep_shutdown(sockfd, how);
1002 sqe.user_data = user_data;
1003 return sqe;
1004}
1005
1006/// Queues (but does not submit) an SQE to perform a `renameat2(2)`.
1007/// Returns a pointer to the SQE.
1008pub fn renameat(
1009 self: *IoUring,
1010 user_data: u64,
1011 old_dir_fd: linux.fd_t,
1012 old_path: [*:0]const u8,
1013 new_dir_fd: linux.fd_t,
1014 new_path: [*:0]const u8,
1015 flags: u32,
1016) !*linux.io_uring_sqe {
1017 const sqe = try self.get_sqe();
1018 sqe.prep_renameat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
1019 sqe.user_data = user_data;
1020 return sqe;
1021}
1022
1023/// Queues (but does not submit) an SQE to perform a `unlinkat(2)`.
1024/// Returns a pointer to the SQE.
1025pub fn unlinkat(
1026 self: *IoUring,
1027 user_data: u64,
1028 dir_fd: linux.fd_t,
1029 path: [*:0]const u8,
1030 flags: u32,
1031) !*linux.io_uring_sqe {
1032 const sqe = try self.get_sqe();
1033 sqe.prep_unlinkat(dir_fd, path, flags);
1034 sqe.user_data = user_data;
1035 return sqe;
1036}
1037
1038/// Queues (but does not submit) an SQE to perform a `mkdirat(2)`.
1039/// Returns a pointer to the SQE.
1040pub fn mkdirat(
1041 self: *IoUring,
1042 user_data: u64,
1043 dir_fd: linux.fd_t,
1044 path: [*:0]const u8,
1045 mode: posix.mode_t,
1046) !*linux.io_uring_sqe {
1047 const sqe = try self.get_sqe();
1048 sqe.prep_mkdirat(dir_fd, path, mode);
1049 sqe.user_data = user_data;
1050 return sqe;
1051}
1052
1053/// Queues (but does not submit) an SQE to perform a `symlinkat(2)`.
1054/// Returns a pointer to the SQE.
1055pub fn symlinkat(
1056 self: *IoUring,
1057 user_data: u64,
1058 target: [*:0]const u8,
1059 new_dir_fd: linux.fd_t,
1060 link_path: [*:0]const u8,
1061) !*linux.io_uring_sqe {
1062 const sqe = try self.get_sqe();
1063 sqe.prep_symlinkat(target, new_dir_fd, link_path);
1064 sqe.user_data = user_data;
1065 return sqe;
1066}
1067
1068/// Queues (but does not submit) an SQE to perform a `linkat(2)`.
1069/// Returns a pointer to the SQE.
1070pub fn linkat(
1071 self: *IoUring,
1072 user_data: u64,
1073 old_dir_fd: linux.fd_t,
1074 old_path: [*:0]const u8,
1075 new_dir_fd: linux.fd_t,
1076 new_path: [*:0]const u8,
1077 flags: u32,
1078) !*linux.io_uring_sqe {
1079 const sqe = try self.get_sqe();
1080 sqe.prep_linkat(old_dir_fd, old_path, new_dir_fd, new_path, flags);
1081 sqe.user_data = user_data;
1082 return sqe;
1083}
1084
1085/// Queues (but does not submit) an SQE to provide a group of buffers used for commands that read/receive data.
1086/// Returns a pointer to the SQE.
1087///
1088/// Provided buffers can be used in `read`, `recv` or `recvmsg` commands via .buffer_selection.
1089///
1090/// The kernel expects a contiguous block of memory of size (buffers_count * buffer_size).
1091pub fn provide_buffers(
1092 self: *IoUring,
1093 user_data: u64,
1094 buffers: [*]u8,
1095 buffer_size: usize,
1096 buffers_count: usize,
1097 group_id: usize,
1098 buffer_id: usize,
1099) !*linux.io_uring_sqe {
1100 const sqe = try self.get_sqe();
1101 sqe.prep_provide_buffers(buffers, buffer_size, buffers_count, group_id, buffer_id);
1102 sqe.user_data = user_data;
1103 return sqe;
1104}
1105
1106/// Queues (but does not submit) an SQE to remove a group of provided buffers.
1107/// Returns a pointer to the SQE.
1108pub fn remove_buffers(
1109 self: *IoUring,
1110 user_data: u64,
1111 buffers_count: usize,
1112 group_id: usize,
1113) !*linux.io_uring_sqe {
1114 const sqe = try self.get_sqe();
1115 sqe.prep_remove_buffers(buffers_count, group_id);
1116 sqe.user_data = user_data;
1117 return sqe;
1118}
1119
1120/// Queues (but does not submit) an SQE to perform a `waitid(2)`.
1121/// Returns a pointer to the SQE.
1122pub fn waitid(
1123 self: *IoUring,
1124 user_data: u64,
1125 id_type: linux.P,
1126 id: i32,
1127 infop: *linux.siginfo_t,
1128 options: u32,
1129 flags: u32,
1130) !*linux.io_uring_sqe {
1131 const sqe = try self.get_sqe();
1132 sqe.prep_waitid(id_type, id, infop, options, flags);
1133 sqe.user_data = user_data;
1134 return sqe;
1135}
1136
1137/// Registers an array of file descriptors.
1138/// Every time a file descriptor is put in an SQE and submitted to the kernel, the kernel must
1139/// retrieve a reference to the file, and once I/O has completed the file reference must be
1140/// dropped. The atomic nature of this file reference can be a slowdown for high IOPS workloads.
1141/// This slowdown can be avoided by pre-registering file descriptors.
1142/// To refer to a registered file descriptor, IOSQE_FIXED_FILE must be set in the SQE's flags,
1143/// and the SQE's fd must be set to the index of the file descriptor in the registered array.
1144/// Registering file descriptors will wait for the ring to idle.
1145/// Files are automatically unregistered by the kernel when the ring is torn down.
1146/// An application need unregister only if it wants to register a new array of file descriptors.
1147pub fn register_files(self: *IoUring, fds: []const linux.fd_t) !void {
1148 assert(self.fd >= 0);
1149 const res = linux.io_uring_register(
1150 self.fd,
1151 .REGISTER_FILES,
1152 @as(*const anyopaque, @ptrCast(fds.ptr)),
1153 @as(u32, @intCast(fds.len)),
1154 );
1155 try handle_registration_result(res);
1156}
1157
1158/// Updates registered file descriptors.
1159///
1160/// Updates are applied starting at the provided offset in the original file descriptors slice.
1161/// There are three kind of updates:
1162/// * turning a sparse entry (where the fd is -1) into a real one
1163/// * removing an existing entry (set the fd to -1)
1164/// * replacing an existing entry with a new fd
1165/// Adding new file descriptors must be done with `register_files`.
1166pub fn register_files_update(self: *IoUring, offset: u32, fds: []const linux.fd_t) !void {
1167 assert(self.fd >= 0);
1168
1169 const FilesUpdate = extern struct {
1170 offset: u32,
1171 resv: u32,
1172 fds: u64 align(8),
1173 };
1174 var update = FilesUpdate{
1175 .offset = offset,
1176 .resv = @as(u32, 0),
1177 .fds = @as(u64, @intFromPtr(fds.ptr)),
1178 };
1179
1180 const res = linux.io_uring_register(
1181 self.fd,
1182 .REGISTER_FILES_UPDATE,
1183 @as(*const anyopaque, @ptrCast(&update)),
1184 @as(u32, @intCast(fds.len)),
1185 );
1186 try handle_registration_result(res);
1187}
1188
1189/// Registers an empty (-1) file table of `nr_files` number of file descriptors.
1190pub fn register_files_sparse(self: *IoUring, nr_files: u32) !void {
1191 assert(self.fd >= 0);
1192
1193 const reg = &linux.io_uring_rsrc_register{
1194 .nr = nr_files,
1195 .flags = linux.IORING_RSRC_REGISTER_SPARSE,
1196 .resv2 = 0,
1197 .data = 0,
1198 .tags = 0,
1199 };
1200
1201 const res = linux.io_uring_register(
1202 self.fd,
1203 .REGISTER_FILES2,
1204 @ptrCast(reg),
1205 @as(u32, @sizeOf(linux.io_uring_rsrc_register)),
1206 );
1207
1208 return handle_registration_result(res);
1209}
1210
1211// Registers range for fixed file allocations.
1212// Available since 6.0
1213pub fn register_file_alloc_range(self: *IoUring, offset: u32, len: u32) !void {
1214 assert(self.fd >= 0);
1215
1216 const range = &linux.io_uring_file_index_range{
1217 .off = offset,
1218 .len = len,
1219 .resv = 0,
1220 };
1221
1222 const res = linux.io_uring_register(
1223 self.fd,
1224 .REGISTER_FILE_ALLOC_RANGE,
1225 @ptrCast(range),
1226 @as(u32, @sizeOf(linux.io_uring_file_index_range)),
1227 );
1228
1229 return handle_registration_result(res);
1230}
1231
1232/// Registers the file descriptor for an eventfd that will be notified of completion events on
1233/// an io_uring instance.
1234/// Only a single a eventfd can be registered at any given point in time.
1235pub fn register_eventfd(self: *IoUring, fd: linux.fd_t) !void {
1236 assert(self.fd >= 0);
1237 const res = linux.io_uring_register(
1238 self.fd,
1239 .REGISTER_EVENTFD,
1240 @as(*const anyopaque, @ptrCast(&fd)),
1241 1,
1242 );
1243 try handle_registration_result(res);
1244}
1245
1246/// Registers the file descriptor for an eventfd that will be notified of completion events on
1247/// an io_uring instance. Notifications are only posted for events that complete in an async manner.
1248/// This means that events that complete inline while being submitted do not trigger a notification event.
1249/// Only a single eventfd can be registered at any given point in time.
1250pub fn register_eventfd_async(self: *IoUring, fd: linux.fd_t) !void {
1251 assert(self.fd >= 0);
1252 const res = linux.io_uring_register(
1253 self.fd,
1254 .REGISTER_EVENTFD_ASYNC,
1255 @as(*const anyopaque, @ptrCast(&fd)),
1256 1,
1257 );
1258 try handle_registration_result(res);
1259}
1260
1261/// Unregister the registered eventfd file descriptor.
1262pub fn unregister_eventfd(self: *IoUring) !void {
1263 assert(self.fd >= 0);
1264 const res = linux.io_uring_register(
1265 self.fd,
1266 .UNREGISTER_EVENTFD,
1267 null,
1268 0,
1269 );
1270 try handle_registration_result(res);
1271}
1272
1273pub fn register_napi(self: *IoUring, napi: *linux.io_uring_napi) !void {
1274 assert(self.fd >= 0);
1275 const res = linux.io_uring_register(self.fd, .REGISTER_NAPI, napi, 1);
1276 try handle_registration_result(res);
1277}
1278
1279pub fn unregister_napi(self: *IoUring, napi: *linux.io_uring_napi) !void {
1280 assert(self.fd >= 0);
1281 const res = linux.io_uring_register(self.fd, .UNREGISTER_NAPI, napi, 1);
1282 try handle_registration_result(res);
1283}
1284
1285/// Registers an array of buffers for use with `read_fixed` and `write_fixed`.
1286pub fn register_buffers(self: *IoUring, buffers: []const posix.iovec) !void {
1287 assert(self.fd >= 0);
1288 const res = linux.io_uring_register(
1289 self.fd,
1290 .REGISTER_BUFFERS,
1291 buffers.ptr,
1292 @as(u32, @intCast(buffers.len)),
1293 );
1294 try handle_registration_result(res);
1295}
1296
1297/// Unregister the registered buffers.
1298pub fn unregister_buffers(self: *IoUring) !void {
1299 assert(self.fd >= 0);
1300 const res = linux.io_uring_register(self.fd, .UNREGISTER_BUFFERS, null, 0);
1301 switch (linux.errno(res)) {
1302 .SUCCESS => {},
1303 .NXIO => return error.BuffersNotRegistered,
1304 else => |errno| return posix.unexpectedErrno(errno),
1305 }
1306}
1307
1308/// Returns a io_uring_probe which is used to probe the capabilities of the
1309/// io_uring subsystem of the running kernel. The io_uring_probe contains the
1310/// list of supported operations.
1311pub fn get_probe(self: *IoUring) !linux.io_uring_probe {
1312 var probe = mem.zeroInit(linux.io_uring_probe, .{});
1313 const res = linux.io_uring_register(self.fd, .REGISTER_PROBE, &probe, probe.ops.len);
1314 try handle_register_buf_ring_result(res);
1315 return probe;
1316}
1317
1318fn handle_registration_result(res: usize) !void {
1319 switch (linux.errno(res)) {
1320 .SUCCESS => {},
1321 // One or more fds in the array are invalid, or the kernel does not support sparse sets:
1322 .BADF => return error.FileDescriptorInvalid,
1323 .BUSY => return error.FilesAlreadyRegistered,
1324 .INVAL => return error.FilesEmpty,
1325 // Adding `nr_args` file references would exceed the maximum allowed number of files the
1326 // user is allowed to have according to the per-user RLIMIT_NOFILE resource limit and
1327 // the CAP_SYS_RESOURCE capability is not set, or `nr_args` exceeds the maximum allowed
1328 // for a fixed file set (older kernels have a limit of 1024 files vs 64K files):
1329 .MFILE => return error.UserFdQuotaExceeded,
1330 // Insufficient kernel resources, or the caller had a non-zero RLIMIT_MEMLOCK soft
1331 // resource limit but tried to lock more memory than the limit permitted (not enforced
1332 // when the process is privileged with CAP_IPC_LOCK):
1333 .NOMEM => return error.SystemResources,
1334 // Attempt to register files on a ring already registering files or being torn down:
1335 .NXIO => return error.RingShuttingDownOrAlreadyRegisteringFiles,
1336 else => |errno| return posix.unexpectedErrno(errno),
1337 }
1338}
1339
1340/// Unregisters all registered file descriptors previously associated with the ring.
1341pub fn unregister_files(self: *IoUring) !void {
1342 assert(self.fd >= 0);
1343 const res = linux.io_uring_register(self.fd, .UNREGISTER_FILES, null, 0);
1344 switch (linux.errno(res)) {
1345 .SUCCESS => {},
1346 .NXIO => return error.FilesNotRegistered,
1347 else => |errno| return posix.unexpectedErrno(errno),
1348 }
1349}
1350
1351/// Prepares a socket creation request.
1352/// New socket fd will be returned in completion result.
1353/// Available since 5.19
1354pub fn socket(
1355 self: *IoUring,
1356 user_data: u64,
1357 domain: u32,
1358 socket_type: u32,
1359 protocol: u32,
1360 flags: u32,
1361) !*linux.io_uring_sqe {
1362 const sqe = try self.get_sqe();
1363 sqe.prep_socket(domain, socket_type, protocol, flags);
1364 sqe.user_data = user_data;
1365 return sqe;
1366}
1367
1368/// Prepares a socket creation request for registered file at index `file_index`.
1369/// Available since 5.19
1370pub fn socket_direct(
1371 self: *IoUring,
1372 user_data: u64,
1373 domain: u32,
1374 socket_type: u32,
1375 protocol: u32,
1376 flags: u32,
1377 file_index: u32,
1378) !*linux.io_uring_sqe {
1379 const sqe = try self.get_sqe();
1380 sqe.prep_socket_direct(domain, socket_type, protocol, flags, file_index);
1381 sqe.user_data = user_data;
1382 return sqe;
1383}
1384
1385/// Prepares a socket creation request for registered file, index chosen by kernel (file index alloc).
1386/// File index will be returned in CQE res field.
1387/// Available since 5.19
1388pub fn socket_direct_alloc(
1389 self: *IoUring,
1390 user_data: u64,
1391 domain: u32,
1392 socket_type: u32,
1393 protocol: u32,
1394 flags: u32,
1395) !*linux.io_uring_sqe {
1396 const sqe = try self.get_sqe();
1397 sqe.prep_socket_direct_alloc(domain, socket_type, protocol, flags);
1398 sqe.user_data = user_data;
1399 return sqe;
1400}
1401
1402/// Queues (but does not submit) an SQE to perform an `bind(2)` on a socket.
1403/// Returns a pointer to the SQE.
1404/// Available since 6.11
1405pub fn bind(
1406 self: *IoUring,
1407 user_data: u64,
1408 fd: linux.fd_t,
1409 addr: *const posix.sockaddr,
1410 addrlen: posix.socklen_t,
1411 flags: u32,
1412) !*linux.io_uring_sqe {
1413 const sqe = try self.get_sqe();
1414 sqe.prep_bind(fd, addr, addrlen, flags);
1415 sqe.user_data = user_data;
1416 return sqe;
1417}
1418
1419/// Queues (but does not submit) an SQE to perform an `listen(2)` on a socket.
1420/// Returns a pointer to the SQE.
1421/// Available since 6.11
1422pub fn listen(
1423 self: *IoUring,
1424 user_data: u64,
1425 fd: linux.fd_t,
1426 backlog: usize,
1427 flags: u32,
1428) !*linux.io_uring_sqe {
1429 const sqe = try self.get_sqe();
1430 sqe.prep_listen(fd, backlog, flags);
1431 sqe.user_data = user_data;
1432 return sqe;
1433}
1434
1435/// Prepares an cmd request for a socket.
1436/// See: https://man7.org/linux/man-pages/man3/io_uring_prep_cmd.3.html
1437/// Available since 6.7.
1438pub fn cmd_sock(
1439 self: *IoUring,
1440 user_data: u64,
1441 cmd_op: linux.IO_URING_SOCKET_OP,
1442 fd: linux.fd_t,
1443 level: u32, // linux.SOL
1444 optname: u32, // linux.SO
1445 optval: u64, // pointer to the option value
1446 optlen: u32, // size of the option value
1447) !*linux.io_uring_sqe {
1448 const sqe = try self.get_sqe();
1449 sqe.prep_cmd_sock(cmd_op, fd, level, optname, optval, optlen);
1450 sqe.user_data = user_data;
1451 return sqe;
1452}
1453
1454/// Prepares set socket option for the optname argument, at the protocol
1455/// level specified by the level argument.
1456/// Available since 6.7.n
1457pub fn setsockopt(
1458 self: *IoUring,
1459 user_data: u64,
1460 fd: linux.fd_t,
1461 level: u32, // linux.SOL
1462 optname: u32, // linux.SO
1463 opt: []const u8,
1464) !*linux.io_uring_sqe {
1465 return try self.cmd_sock(
1466 user_data,
1467 .SETSOCKOPT,
1468 fd,
1469 level,
1470 optname,
1471 @intFromPtr(opt.ptr),
1472 @intCast(opt.len),
1473 );
1474}
1475
1476/// Prepares get socket option to retrieve the value for the option specified by
1477/// the option_name argument for the socket specified by the fd argument.
1478/// Available since 6.7.
1479pub fn getsockopt(
1480 self: *IoUring,
1481 user_data: u64,
1482 fd: linux.fd_t,
1483 level: u32, // linux.SOL
1484 optname: u32, // linux.SO
1485 opt: []u8,
1486) !*linux.io_uring_sqe {
1487 return try self.cmd_sock(
1488 user_data,
1489 .GETSOCKOPT,
1490 fd,
1491 level,
1492 optname,
1493 @intFromPtr(opt.ptr),
1494 @intCast(opt.len),
1495 );
1496}
1497
1498pub const SubmissionQueue = struct {
1499 head: *u32,
1500 tail: *u32,
1501 mask: u32,
1502 flags: *u32,
1503 dropped: *u32,
1504 array: []u32,
1505 sqes: []linux.io_uring_sqe,
1506 mmap: []align(page_size_min) u8,
1507 mmap_sqes: []align(page_size_min) u8,
1508
1509 // We use `sqe_head` and `sqe_tail` in the same way as liburing:
1510 // We increment `sqe_tail` (but not `tail`) for each call to `get_sqe()`.
1511 // We then set `tail` to `sqe_tail` once, only when these events are actually submitted.
1512 // This allows us to amortize the cost of the @atomicStore to `tail` across multiple SQEs.
1513 sqe_head: u32 = 0,
1514 sqe_tail: u32 = 0,
1515
1516 pub fn init(fd: linux.fd_t, p: linux.io_uring_params) !SubmissionQueue {
1517 assert(fd >= 0);
1518 assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
1519 const size = @max(
1520 p.sq_off.array + p.sq_entries * @sizeOf(u32),
1521 p.cq_off.cqes + p.cq_entries * @sizeOf(linux.io_uring_cqe),
1522 );
1523 const mmap = try posix.mmap(
1524 null,
1525 size,
1526 posix.PROT.READ | posix.PROT.WRITE,
1527 .{ .TYPE = .SHARED, .POPULATE = true },
1528 fd,
1529 linux.IORING_OFF_SQ_RING,
1530 );
1531 errdefer posix.munmap(mmap);
1532 assert(mmap.len == size);
1533
1534 // The motivation for the `sqes` and `array` indirection is to make it possible for the
1535 // application to preallocate static linux.io_uring_sqe entries and then replay them when needed.
1536 const size_sqes = p.sq_entries * @sizeOf(linux.io_uring_sqe);
1537 const mmap_sqes = try posix.mmap(
1538 null,
1539 size_sqes,
1540 posix.PROT.READ | posix.PROT.WRITE,
1541 .{ .TYPE = .SHARED, .POPULATE = true },
1542 fd,
1543 linux.IORING_OFF_SQES,
1544 );
1545 errdefer posix.munmap(mmap_sqes);
1546 assert(mmap_sqes.len == size_sqes);
1547
1548 const array: [*]u32 = @ptrCast(@alignCast(&mmap[p.sq_off.array]));
1549 const sqes: [*]linux.io_uring_sqe = @ptrCast(@alignCast(&mmap_sqes[0]));
1550 // We expect the kernel copies p.sq_entries to the u32 pointed to by p.sq_off.ring_entries,
1551 // see https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L7843-L7844.
1552 assert(p.sq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_entries]))).*);
1553 return SubmissionQueue{
1554 .head = @ptrCast(@alignCast(&mmap[p.sq_off.head])),
1555 .tail = @ptrCast(@alignCast(&mmap[p.sq_off.tail])),
1556 .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.sq_off.ring_mask]))).*,
1557 .flags = @ptrCast(@alignCast(&mmap[p.sq_off.flags])),
1558 .dropped = @ptrCast(@alignCast(&mmap[p.sq_off.dropped])),
1559 .array = array[0..p.sq_entries],
1560 .sqes = sqes[0..p.sq_entries],
1561 .mmap = mmap,
1562 .mmap_sqes = mmap_sqes,
1563 };
1564 }
1565
1566 pub fn deinit(self: *SubmissionQueue) void {
1567 posix.munmap(self.mmap_sqes);
1568 posix.munmap(self.mmap);
1569 }
1570};
1571
1572pub const CompletionQueue = struct {
1573 head: *u32,
1574 tail: *u32,
1575 mask: u32,
1576 overflow: *u32,
1577 cqes: []linux.io_uring_cqe,
1578
1579 pub fn init(fd: linux.fd_t, p: linux.io_uring_params, sq: SubmissionQueue) !CompletionQueue {
1580 assert(fd >= 0);
1581 assert((p.features & linux.IORING_FEAT_SINGLE_MMAP) != 0);
1582 const mmap = sq.mmap;
1583 const cqes: [*]linux.io_uring_cqe = @ptrCast(@alignCast(&mmap[p.cq_off.cqes]));
1584 assert(p.cq_entries == @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_entries]))).*);
1585 return CompletionQueue{
1586 .head = @ptrCast(@alignCast(&mmap[p.cq_off.head])),
1587 .tail = @ptrCast(@alignCast(&mmap[p.cq_off.tail])),
1588 .mask = @as(*u32, @ptrCast(@alignCast(&mmap[p.cq_off.ring_mask]))).*,
1589 .overflow = @ptrCast(@alignCast(&mmap[p.cq_off.overflow])),
1590 .cqes = cqes[0..p.cq_entries],
1591 };
1592 }
1593
1594 pub fn deinit(self: *CompletionQueue) void {
1595 _ = self;
1596 // A no-op since we now share the mmap with the submission queue.
1597 // Here for symmetry with the submission queue, and for any future feature support.
1598 }
1599};
1600
1601/// Group of application provided buffers. Uses newer type, called ring mapped
1602/// buffers, supported since kernel 5.19. Buffers are identified by a buffer
1603/// group ID, and within that group, a buffer ID. IO_Uring can have multiple
1604/// buffer groups, each with unique group ID.
1605///
1606/// In `init` application provides contiguous block of memory `buffers` for
1607/// `buffers_count` buffers of size `buffers_size`. Application can then submit
1608/// `recv` operation without providing buffer upfront. Once the operation is
1609/// ready to receive data, a buffer is picked automatically and the resulting
1610/// CQE will contain the buffer ID in `cqe.buffer_id()`. Use `get` method to get
1611/// buffer for buffer ID identified by CQE. Once the application has processed
1612/// the buffer, it may hand ownership back to the kernel, by calling `put`
1613/// allowing the cycle to repeat.
1614///
1615/// Depending on the rate of arrival of data, it is possible that a given buffer
1616/// group will run out of buffers before those in CQEs can be put back to the
1617/// kernel. If this happens, a `cqe.err()` will have ENOBUFS as the error value.
1618///
1619pub const BufferGroup = struct {
1620 /// Parent ring for which this group is registered.
1621 ring: *IoUring,
1622 /// Pointer to the memory shared by the kernel.
1623 /// `buffers_count` of `io_uring_buf` structures are shared by the kernel.
1624 /// First `io_uring_buf` is overlaid by `io_uring_buf_ring` struct.
1625 br: *align(page_size_min) linux.io_uring_buf_ring,
1626 /// Contiguous block of memory of size (buffers_count * buffer_size).
1627 buffers: []u8,
1628 /// Size of each buffer in buffers.
1629 buffer_size: u32,
1630 /// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
1631 buffers_count: u16,
1632 /// Head of unconsumed part of each buffer, if incremental consumption is enabled
1633 heads: []u32,
1634 /// ID of this group, must be unique in ring.
1635 group_id: u16,
1636
1637 pub fn init(
1638 ring: *IoUring,
1639 allocator: mem.Allocator,
1640 group_id: u16,
1641 buffer_size: u32,
1642 buffers_count: u16,
1643 ) !BufferGroup {
1644 const buffers = try allocator.alloc(u8, buffer_size * buffers_count);
1645 errdefer allocator.free(buffers);
1646 const heads = try allocator.alloc(u32, buffers_count);
1647 errdefer allocator.free(heads);
1648
1649 const br = try setup_buf_ring(ring.fd, buffers_count, group_id, .{ .inc = true });
1650 buf_ring_init(br);
1651
1652 const mask = buf_ring_mask(buffers_count);
1653 var i: u16 = 0;
1654 while (i < buffers_count) : (i += 1) {
1655 const pos = buffer_size * i;
1656 const buf = buffers[pos .. pos + buffer_size];
1657 heads[i] = 0;
1658 buf_ring_add(br, buf, i, mask, i);
1659 }
1660 buf_ring_advance(br, buffers_count);
1661
1662 return BufferGroup{
1663 .ring = ring,
1664 .group_id = group_id,
1665 .br = br,
1666 .buffers = buffers,
1667 .heads = heads,
1668 .buffer_size = buffer_size,
1669 .buffers_count = buffers_count,
1670 };
1671 }
1672
1673 pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void {
1674 free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
1675 allocator.free(self.buffers);
1676 allocator.free(self.heads);
1677 }
1678
1679 // Prepare recv operation which will select buffer from this group.
1680 pub fn recv(self: *BufferGroup, user_data: u64, fd: linux.fd_t, flags: u32) !*linux.io_uring_sqe {
1681 var sqe = try self.ring.get_sqe();
1682 sqe.prep_rw(.RECV, fd, 0, 0, 0);
1683 sqe.rw_flags = flags;
1684 sqe.flags |= linux.IOSQE_BUFFER_SELECT;
1685 sqe.buf_index = self.group_id;
1686 sqe.user_data = user_data;
1687 return sqe;
1688 }
1689
1690 // Prepare multishot recv operation which will select buffer from this group.
1691 pub fn recv_multishot(self: *BufferGroup, user_data: u64, fd: linux.fd_t, flags: u32) !*linux.io_uring_sqe {
1692 var sqe = try self.recv(user_data, fd, flags);
1693 sqe.ioprio |= linux.IORING_RECV_MULTISHOT;
1694 return sqe;
1695 }
1696
1697 // Get buffer by id.
1698 fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 {
1699 const pos = self.buffer_size * buffer_id;
1700 return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..];
1701 }
1702
1703 // Get buffer by CQE.
1704 pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
1705 const buffer_id = try cqe.buffer_id();
1706 const used_len = @as(usize, @intCast(cqe.res));
1707 return self.get_by_id(buffer_id)[0..used_len];
1708 }
1709
1710 // Release buffer from CQE to the kernel.
1711 pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
1712 const buffer_id = try cqe.buffer_id();
1713 if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) {
1714 // Incremental consumption active, kernel will write to the this buffer again
1715 const used_len = @as(u32, @intCast(cqe.res));
1716 // Track what part of the buffer is used
1717 self.heads[buffer_id] += used_len;
1718 return;
1719 }
1720 self.heads[buffer_id] = 0;
1721
1722 // Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count);
1723 const mask = buf_ring_mask(self.buffers_count);
1724 buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0);
1725 buf_ring_advance(self.br, 1);
1726 }
1727};
1728
1729/// Registers a shared buffer ring to be used with provided buffers.
1730/// `entries` number of `io_uring_buf` structures is mem mapped and shared by kernel.
1731/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered.
1732/// `entries` is the number of entries requested in the buffer ring, must be power of 2.
1733/// `group_id` is the chosen buffer group ID, unique in IO_Uring.
1734pub fn setup_buf_ring(
1735 fd: linux.fd_t,
1736 entries: u16,
1737 group_id: u16,
1738 flags: linux.io_uring_buf_reg.Flags,
1739) !*align(page_size_min) linux.io_uring_buf_ring {
1740 if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange;
1741 if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
1742
1743 const mmap_size = @as(usize, entries) * @sizeOf(linux.io_uring_buf);
1744 const mmap = try posix.mmap(
1745 null,
1746 mmap_size,
1747 posix.PROT.READ | posix.PROT.WRITE,
1748 .{ .TYPE = .PRIVATE, .ANONYMOUS = true },
1749 -1,
1750 0,
1751 );
1752 errdefer posix.munmap(mmap);
1753 assert(mmap.len == mmap_size);
1754
1755 const br: *align(page_size_min) linux.io_uring_buf_ring = @ptrCast(mmap.ptr);
1756 try register_buf_ring(fd, @intFromPtr(br), entries, group_id, flags);
1757 return br;
1758}
1759
1760fn register_buf_ring(
1761 fd: linux.fd_t,
1762 addr: u64,
1763 entries: u32,
1764 group_id: u16,
1765 flags: linux.io_uring_buf_reg.Flags,
1766) !void {
1767 var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
1768 .ring_addr = addr,
1769 .ring_entries = entries,
1770 .bgid = group_id,
1771 .flags = flags,
1772 });
1773 var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1);
1774 if (linux.errno(res) == .INVAL and reg.flags.inc) {
1775 // Retry without incremental buffer consumption.
1776 // It is available since kernel 6.12. returns INVAL on older.
1777 reg.flags.inc = false;
1778 res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1);
1779 }
1780 try handle_register_buf_ring_result(res);
1781}
1782
1783fn unregister_buf_ring(fd: linux.fd_t, group_id: u16) !void {
1784 var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
1785 .bgid = group_id,
1786 });
1787 const res = linux.io_uring_register(
1788 fd,
1789 .UNREGISTER_PBUF_RING,
1790 @as(*const anyopaque, @ptrCast(®)),
1791 1,
1792 );
1793 try handle_register_buf_ring_result(res);
1794}
1795
1796fn handle_register_buf_ring_result(res: usize) !void {
1797 switch (linux.errno(res)) {
1798 .SUCCESS => {},
1799 .INVAL => return error.ArgumentsInvalid,
1800 else => |errno| return posix.unexpectedErrno(errno),
1801 }
1802}
1803
1804// Unregisters a previously registered shared buffer ring, returned from io_uring_setup_buf_ring.
1805pub fn free_buf_ring(fd: linux.fd_t, br: *align(page_size_min) linux.io_uring_buf_ring, entries: u32, group_id: u16) void {
1806 unregister_buf_ring(fd, group_id) catch {};
1807 var mmap: []align(page_size_min) u8 = undefined;
1808 mmap.ptr = @ptrCast(br);
1809 mmap.len = entries * @sizeOf(linux.io_uring_buf);
1810 posix.munmap(mmap);
1811}
1812
1813/// Initialises `br` so that it is ready to be used.
1814pub fn buf_ring_init(br: *linux.io_uring_buf_ring) void {
1815 br.tail = 0;
1816}
1817
1818/// Calculates the appropriate size mask for a buffer ring.
1819/// `entries` is the ring entries as specified in io_uring_register_buf_ring.
1820pub fn buf_ring_mask(entries: u16) u16 {
1821 return entries - 1;
1822}
1823
1824/// Assigns `buffer` with the `br` buffer ring.
1825/// `buffer_id` is identifier which will be returned in the CQE.
1826/// `buffer_offset` is the offset to insert at from the current tail.
1827/// If just one buffer is provided before the ring tail is committed with advance then offset should be 0.
1828/// If buffers are provided in a loop before being committed, the offset must be incremented by one for each buffer added.
1829pub fn buf_ring_add(
1830 br: *linux.io_uring_buf_ring,
1831 buffer: []u8,
1832 buffer_id: u16,
1833 mask: u16,
1834 buffer_offset: u16,
1835) void {
1836 const bufs: [*]linux.io_uring_buf = @ptrCast(br);
1837 const buf: *linux.io_uring_buf = &bufs[(br.tail +% buffer_offset) & mask];
1838
1839 buf.addr = @intFromPtr(buffer.ptr);
1840 buf.len = @intCast(buffer.len);
1841 buf.bid = buffer_id;
1842}
1843
1844/// Make `count` new buffers visible to the kernel. Called after
1845/// `io_uring_buf_ring_add` has been called `count` times to fill in new buffers.
1846pub fn buf_ring_advance(br: *linux.io_uring_buf_ring, count: u16) void {
1847 const tail: u16 = br.tail +% count;
1848 @atomicStore(u16, &br.tail, tail, .release);
1849}
1850
1851test "structs/offsets/entries" {
1852 if (!is_linux) return error.SkipZigTest;
1853
1854 try testing.expectEqual(@as(usize, 120), @sizeOf(linux.io_uring_params));
1855 try testing.expectEqual(@as(usize, 64), @sizeOf(linux.io_uring_sqe));
1856 try testing.expectEqual(@as(usize, 16), @sizeOf(linux.io_uring_cqe));
1857
1858 try testing.expectEqual(0, linux.IORING_OFF_SQ_RING);
1859 try testing.expectEqual(0x8000000, linux.IORING_OFF_CQ_RING);
1860 try testing.expectEqual(0x10000000, linux.IORING_OFF_SQES);
1861
1862 try testing.expectError(error.EntriesZero, IoUring.init(0, 0));
1863 try testing.expectError(error.EntriesNotPowerOfTwo, IoUring.init(3, 0));
1864}
1865
1866test "nop" {
1867 if (!is_linux) return error.SkipZigTest;
1868
1869 var ring = IoUring.init(1, 0) catch |err| switch (err) {
1870 error.SystemOutdated => return error.SkipZigTest,
1871 error.PermissionDenied => return error.SkipZigTest,
1872 else => return err,
1873 };
1874 defer {
1875 ring.deinit();
1876 testing.expectEqual(@as(linux.fd_t, -1), ring.fd) catch @panic("test failed");
1877 }
1878
1879 const sqe = try ring.nop(0xaaaaaaaa);
1880 try testing.expectEqual(linux.io_uring_sqe{
1881 .opcode = .NOP,
1882 .flags = 0,
1883 .ioprio = 0,
1884 .fd = 0,
1885 .off = 0,
1886 .addr = 0,
1887 .len = 0,
1888 .rw_flags = 0,
1889 .user_data = 0xaaaaaaaa,
1890 .buf_index = 0,
1891 .personality = 0,
1892 .splice_fd_in = 0,
1893 .addr3 = 0,
1894 .resv = 0,
1895 }, sqe.*);
1896
1897 try testing.expectEqual(@as(u32, 0), ring.sq.sqe_head);
1898 try testing.expectEqual(@as(u32, 1), ring.sq.sqe_tail);
1899 try testing.expectEqual(@as(u32, 0), ring.sq.tail.*);
1900 try testing.expectEqual(@as(u32, 0), ring.cq.head.*);
1901 try testing.expectEqual(@as(u32, 1), ring.sq_ready());
1902 try testing.expectEqual(@as(u32, 0), ring.cq_ready());
1903
1904 try testing.expectEqual(@as(u32, 1), try ring.submit());
1905 try testing.expectEqual(@as(u32, 1), ring.sq.sqe_head);
1906 try testing.expectEqual(@as(u32, 1), ring.sq.sqe_tail);
1907 try testing.expectEqual(@as(u32, 1), ring.sq.tail.*);
1908 try testing.expectEqual(@as(u32, 0), ring.cq.head.*);
1909 try testing.expectEqual(@as(u32, 0), ring.sq_ready());
1910
1911 try testing.expectEqual(linux.io_uring_cqe{
1912 .user_data = 0xaaaaaaaa,
1913 .res = 0,
1914 .flags = 0,
1915 }, try ring.copy_cqe());
1916 try testing.expectEqual(@as(u32, 1), ring.cq.head.*);
1917 try testing.expectEqual(@as(u32, 0), ring.cq_ready());
1918
1919 const sqe_barrier = try ring.nop(0xbbbbbbbb);
1920 sqe_barrier.flags |= linux.IOSQE_IO_DRAIN;
1921 try testing.expectEqual(@as(u32, 1), try ring.submit());
1922 try testing.expectEqual(linux.io_uring_cqe{
1923 .user_data = 0xbbbbbbbb,
1924 .res = 0,
1925 .flags = 0,
1926 }, try ring.copy_cqe());
1927 try testing.expectEqual(@as(u32, 2), ring.sq.sqe_head);
1928 try testing.expectEqual(@as(u32, 2), ring.sq.sqe_tail);
1929 try testing.expectEqual(@as(u32, 2), ring.sq.tail.*);
1930 try testing.expectEqual(@as(u32, 2), ring.cq.head.*);
1931}
1932
1933test "readv" {
1934 if (!is_linux) return error.SkipZigTest;
1935
1936 var ring = IoUring.init(1, 0) catch |err| switch (err) {
1937 error.SystemOutdated => return error.SkipZigTest,
1938 error.PermissionDenied => return error.SkipZigTest,
1939 else => return err,
1940 };
1941 defer ring.deinit();
1942
1943 const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
1944 defer posix.close(fd);
1945
1946 // Linux Kernel 5.4 supports IORING_REGISTER_FILES but not sparse fd sets (i.e. an fd of -1).
1947 // Linux Kernel 5.5 adds support for sparse fd sets.
1948 // Compare:
1949 // https://github.com/torvalds/linux/blob/v5.4/fs/io_uring.c#L3119-L3124 vs
1950 // https://github.com/torvalds/linux/blob/v5.8/fs/io_uring.c#L6687-L6691
1951 // We therefore avoid stressing sparse fd sets here:
1952 var registered_fds = [_]linux.fd_t{0} ** 1;
1953 const fd_index = 0;
1954 registered_fds[fd_index] = fd;
1955 try ring.register_files(registered_fds[0..]);
1956
1957 var buffer = [_]u8{42} ** 128;
1958 var iovecs = [_]posix.iovec{posix.iovec{ .base = &buffer, .len = buffer.len }};
1959 const sqe = try ring.read(0xcccccccc, fd_index, .{ .iovecs = iovecs[0..] }, 0);
1960 try testing.expectEqual(linux.IORING_OP.READV, sqe.opcode);
1961 sqe.flags |= linux.IOSQE_FIXED_FILE;
1962
1963 try testing.expectError(error.SubmissionQueueFull, ring.nop(0));
1964 try testing.expectEqual(@as(u32, 1), try ring.submit());
1965 try testing.expectEqual(linux.io_uring_cqe{
1966 .user_data = 0xcccccccc,
1967 .res = buffer.len,
1968 .flags = 0,
1969 }, try ring.copy_cqe());
1970 try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer.len), buffer[0..]);
1971
1972 try ring.unregister_files();
1973}
1974
1975test "writev/fsync/readv" {
1976 if (!is_linux) return error.SkipZigTest;
1977
1978 var ring = IoUring.init(4, 0) catch |err| switch (err) {
1979 error.SystemOutdated => return error.SkipZigTest,
1980 error.PermissionDenied => return error.SkipZigTest,
1981 else => return err,
1982 };
1983 defer ring.deinit();
1984
1985 var tmp = std.testing.tmpDir(.{});
1986 defer tmp.cleanup();
1987
1988 const path = "test_io_uring_writev_fsync_readv";
1989 const file = try tmp.dir.createFile(path, .{ .read = true, .truncate = true });
1990 defer file.close();
1991 const fd = file.handle;
1992
1993 const buffer_write = [_]u8{42} ** 128;
1994 const iovecs_write = [_]posix.iovec_const{
1995 posix.iovec_const{ .base = &buffer_write, .len = buffer_write.len },
1996 };
1997 var buffer_read = [_]u8{0} ** 128;
1998 var iovecs_read = [_]posix.iovec{
1999 posix.iovec{ .base = &buffer_read, .len = buffer_read.len },
2000 };
2001
2002 const sqe_writev = try ring.writev(0xdddddddd, fd, iovecs_write[0..], 17);
2003 try testing.expectEqual(linux.IORING_OP.WRITEV, sqe_writev.opcode);
2004 try testing.expectEqual(@as(u64, 17), sqe_writev.off);
2005 sqe_writev.flags |= linux.IOSQE_IO_LINK;
2006
2007 const sqe_fsync = try ring.fsync(0xeeeeeeee, fd, 0);
2008 try testing.expectEqual(linux.IORING_OP.FSYNC, sqe_fsync.opcode);
2009 try testing.expectEqual(fd, sqe_fsync.fd);
2010 sqe_fsync.flags |= linux.IOSQE_IO_LINK;
2011
2012 const sqe_readv = try ring.read(0xffffffff, fd, .{ .iovecs = iovecs_read[0..] }, 17);
2013 try testing.expectEqual(linux.IORING_OP.READV, sqe_readv.opcode);
2014 try testing.expectEqual(@as(u64, 17), sqe_readv.off);
2015
2016 try testing.expectEqual(@as(u32, 3), ring.sq_ready());
2017 try testing.expectEqual(@as(u32, 3), try ring.submit_and_wait(3));
2018 try testing.expectEqual(@as(u32, 0), ring.sq_ready());
2019 try testing.expectEqual(@as(u32, 3), ring.cq_ready());
2020
2021 try testing.expectEqual(linux.io_uring_cqe{
2022 .user_data = 0xdddddddd,
2023 .res = buffer_write.len,
2024 .flags = 0,
2025 }, try ring.copy_cqe());
2026 try testing.expectEqual(@as(u32, 2), ring.cq_ready());
2027
2028 try testing.expectEqual(linux.io_uring_cqe{
2029 .user_data = 0xeeeeeeee,
2030 .res = 0,
2031 .flags = 0,
2032 }, try ring.copy_cqe());
2033 try testing.expectEqual(@as(u32, 1), ring.cq_ready());
2034
2035 try testing.expectEqual(linux.io_uring_cqe{
2036 .user_data = 0xffffffff,
2037 .res = buffer_read.len,
2038 .flags = 0,
2039 }, try ring.copy_cqe());
2040 try testing.expectEqual(@as(u32, 0), ring.cq_ready());
2041
2042 try testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
2043}
2044
2045test "write/read" {
2046 if (!is_linux) return error.SkipZigTest;
2047
2048 var ring = IoUring.init(2, 0) catch |err| switch (err) {
2049 error.SystemOutdated => return error.SkipZigTest,
2050 error.PermissionDenied => return error.SkipZigTest,
2051 else => return err,
2052 };
2053 defer ring.deinit();
2054
2055 var tmp = std.testing.tmpDir(.{});
2056 defer tmp.cleanup();
2057 const path = "test_io_uring_write_read";
2058 const file = try tmp.dir.createFile(path, .{ .read = true, .truncate = true });
2059 defer file.close();
2060 const fd = file.handle;
2061
2062 const buffer_write = [_]u8{97} ** 20;
2063 var buffer_read = [_]u8{98} ** 20;
2064 const sqe_write = try ring.write(0x11111111, fd, buffer_write[0..], 10);
2065 try testing.expectEqual(linux.IORING_OP.WRITE, sqe_write.opcode);
2066 try testing.expectEqual(@as(u64, 10), sqe_write.off);
2067 sqe_write.flags |= linux.IOSQE_IO_LINK;
2068 const sqe_read = try ring.read(0x22222222, fd, .{ .buffer = buffer_read[0..] }, 10);
2069 try testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode);
2070 try testing.expectEqual(@as(u64, 10), sqe_read.off);
2071 try testing.expectEqual(@as(u32, 2), try ring.submit());
2072
2073 const cqe_write = try ring.copy_cqe();
2074 const cqe_read = try ring.copy_cqe();
2075 // Prior to Linux Kernel 5.6 this is the only way to test for read/write support:
2076 // https://lwn.net/Articles/809820/
2077 if (cqe_write.err() == .INVAL) return error.SkipZigTest;
2078 if (cqe_read.err() == .INVAL) return error.SkipZigTest;
2079 try testing.expectEqual(linux.io_uring_cqe{
2080 .user_data = 0x11111111,
2081 .res = buffer_write.len,
2082 .flags = 0,
2083 }, cqe_write);
2084 try testing.expectEqual(linux.io_uring_cqe{
2085 .user_data = 0x22222222,
2086 .res = buffer_read.len,
2087 .flags = 0,
2088 }, cqe_read);
2089 try testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
2090}
2091
2092test "splice/read" {
2093 if (!is_linux) return error.SkipZigTest;
2094
2095 var ring = IoUring.init(4, 0) catch |err| switch (err) {
2096 error.SystemOutdated => return error.SkipZigTest,
2097 error.PermissionDenied => return error.SkipZigTest,
2098 else => return err,
2099 };
2100 defer ring.deinit();
2101
2102 var tmp = std.testing.tmpDir(.{});
2103 const path_src = "test_io_uring_splice_src";
2104 const file_src = try tmp.dir.createFile(path_src, .{ .read = true, .truncate = true });
2105 defer file_src.close();
2106 const fd_src = file_src.handle;
2107
2108 const path_dst = "test_io_uring_splice_dst";
2109 const file_dst = try tmp.dir.createFile(path_dst, .{ .read = true, .truncate = true });
2110 defer file_dst.close();
2111 const fd_dst = file_dst.handle;
2112
2113 const buffer_write = [_]u8{97} ** 20;
2114 var buffer_read = [_]u8{98} ** 20;
2115 _ = try file_src.write(&buffer_write);
2116
2117 const fds = try posix.pipe();
2118 const pipe_offset: u64 = std.math.maxInt(u64);
2119
2120 const sqe_splice_to_pipe = try ring.splice(0x11111111, fd_src, 0, fds[1], pipe_offset, buffer_write.len);
2121 try testing.expectEqual(linux.IORING_OP.SPLICE, sqe_splice_to_pipe.opcode);
2122 try testing.expectEqual(@as(u64, 0), sqe_splice_to_pipe.addr);
2123 try testing.expectEqual(pipe_offset, sqe_splice_to_pipe.off);
2124 sqe_splice_to_pipe.flags |= linux.IOSQE_IO_LINK;
2125
2126 const sqe_splice_from_pipe = try ring.splice(0x22222222, fds[0], pipe_offset, fd_dst, 10, buffer_write.len);
2127 try testing.expectEqual(linux.IORING_OP.SPLICE, sqe_splice_from_pipe.opcode);
2128 try testing.expectEqual(pipe_offset, sqe_splice_from_pipe.addr);
2129 try testing.expectEqual(@as(u64, 10), sqe_splice_from_pipe.off);
2130 sqe_splice_from_pipe.flags |= linux.IOSQE_IO_LINK;
2131
2132 const sqe_read = try ring.read(0x33333333, fd_dst, .{ .buffer = buffer_read[0..] }, 10);
2133 try testing.expectEqual(linux.IORING_OP.READ, sqe_read.opcode);
2134 try testing.expectEqual(@as(u64, 10), sqe_read.off);
2135 try testing.expectEqual(@as(u32, 3), try ring.submit());
2136
2137 const cqe_splice_to_pipe = try ring.copy_cqe();
2138 const cqe_splice_from_pipe = try ring.copy_cqe();
2139 const cqe_read = try ring.copy_cqe();
2140 // Prior to Linux Kernel 5.6 this is the only way to test for splice/read support:
2141 // https://lwn.net/Articles/809820/
2142 if (cqe_splice_to_pipe.err() == .INVAL) return error.SkipZigTest;
2143 if (cqe_splice_from_pipe.err() == .INVAL) return error.SkipZigTest;
2144 if (cqe_read.err() == .INVAL) return error.SkipZigTest;
2145 try testing.expectEqual(linux.io_uring_cqe{
2146 .user_data = 0x11111111,
2147 .res = buffer_write.len,
2148 .flags = 0,
2149 }, cqe_splice_to_pipe);
2150 try testing.expectEqual(linux.io_uring_cqe{
2151 .user_data = 0x22222222,
2152 .res = buffer_write.len,
2153 .flags = 0,
2154 }, cqe_splice_from_pipe);
2155 try testing.expectEqual(linux.io_uring_cqe{
2156 .user_data = 0x33333333,
2157 .res = buffer_read.len,
2158 .flags = 0,
2159 }, cqe_read);
2160 try testing.expectEqualSlices(u8, buffer_write[0..], buffer_read[0..]);
2161}
2162
2163test "write_fixed/read_fixed" {
2164 if (!is_linux) return error.SkipZigTest;
2165
2166 var ring = IoUring.init(2, 0) catch |err| switch (err) {
2167 error.SystemOutdated => return error.SkipZigTest,
2168 error.PermissionDenied => return error.SkipZigTest,
2169 else => return err,
2170 };
2171 defer ring.deinit();
2172
2173 var tmp = std.testing.tmpDir(.{});
2174 defer tmp.cleanup();
2175
2176 const path = "test_io_uring_write_read_fixed";
2177 const file = try tmp.dir.createFile(path, .{ .read = true, .truncate = true });
2178 defer file.close();
2179 const fd = file.handle;
2180
2181 var raw_buffers: [2][11]u8 = undefined;
2182 // First buffer will be written to the file.
2183 @memset(&raw_buffers[0], 'z');
2184 raw_buffers[0][0.."foobar".len].* = "foobar".*;
2185
2186 var buffers = [2]posix.iovec{
2187 .{ .base = &raw_buffers[0], .len = raw_buffers[0].len },
2188 .{ .base = &raw_buffers[1], .len = raw_buffers[1].len },
2189 };
2190 ring.register_buffers(&buffers) catch |err| switch (err) {
2191 error.SystemResources => {
2192 // See https://github.com/ziglang/zig/issues/15362
2193 return error.SkipZigTest;
2194 },
2195 else => |e| return e,
2196 };
2197
2198 const sqe_write = try ring.write_fixed(0x45454545, fd, &buffers[0], 3, 0);
2199 try testing.expectEqual(linux.IORING_OP.WRITE_FIXED, sqe_write.opcode);
2200 try testing.expectEqual(@as(u64, 3), sqe_write.off);
2201 sqe_write.flags |= linux.IOSQE_IO_LINK;
2202
2203 const sqe_read = try ring.read_fixed(0x12121212, fd, &buffers[1], 0, 1);
2204 try testing.expectEqual(linux.IORING_OP.READ_FIXED, sqe_read.opcode);
2205 try testing.expectEqual(@as(u64, 0), sqe_read.off);
2206
2207 try testing.expectEqual(@as(u32, 2), try ring.submit());
2208
2209 const cqe_write = try ring.copy_cqe();
2210 const cqe_read = try ring.copy_cqe();
2211
2212 try testing.expectEqual(linux.io_uring_cqe{
2213 .user_data = 0x45454545,
2214 .res = @as(i32, @intCast(buffers[0].len)),
2215 .flags = 0,
2216 }, cqe_write);
2217 try testing.expectEqual(linux.io_uring_cqe{
2218 .user_data = 0x12121212,
2219 .res = @as(i32, @intCast(buffers[1].len)),
2220 .flags = 0,
2221 }, cqe_read);
2222
2223 try testing.expectEqualSlices(u8, "\x00\x00\x00", buffers[1].base[0..3]);
2224 try testing.expectEqualSlices(u8, "foobar", buffers[1].base[3..9]);
2225 try testing.expectEqualSlices(u8, "zz", buffers[1].base[9..11]);
2226}
2227
2228test "openat" {
2229 if (!is_linux) return error.SkipZigTest;
2230
2231 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2232 error.SystemOutdated => return error.SkipZigTest,
2233 error.PermissionDenied => return error.SkipZigTest,
2234 else => return err,
2235 };
2236 defer ring.deinit();
2237
2238 var tmp = std.testing.tmpDir(.{});
2239 defer tmp.cleanup();
2240
2241 const path = "test_io_uring_openat";
2242
2243 // Workaround for LLVM bug: https://github.com/ziglang/zig/issues/12014
2244 const path_addr = if (builtin.zig_backend == .stage2_llvm) p: {
2245 var workaround = path;
2246 _ = &workaround;
2247 break :p @intFromPtr(workaround);
2248 } else @intFromPtr(path);
2249
2250 const flags: linux.O = .{ .CLOEXEC = true, .ACCMODE = .RDWR, .CREAT = true };
2251 const mode: posix.mode_t = 0o666;
2252 const sqe_openat = try ring.openat(0x33333333, tmp.dir.fd, path, flags, mode);
2253 try testing.expectEqual(linux.io_uring_sqe{
2254 .opcode = .OPENAT,
2255 .flags = 0,
2256 .ioprio = 0,
2257 .fd = tmp.dir.fd,
2258 .off = 0,
2259 .addr = path_addr,
2260 .len = mode,
2261 .rw_flags = @bitCast(flags),
2262 .user_data = 0x33333333,
2263 .buf_index = 0,
2264 .personality = 0,
2265 .splice_fd_in = 0,
2266 .addr3 = 0,
2267 .resv = 0,
2268 }, sqe_openat.*);
2269 try testing.expectEqual(@as(u32, 1), try ring.submit());
2270
2271 const cqe_openat = try ring.copy_cqe();
2272 try testing.expectEqual(@as(u64, 0x33333333), cqe_openat.user_data);
2273 if (cqe_openat.err() == .INVAL) return error.SkipZigTest;
2274 if (cqe_openat.err() == .BADF) return error.SkipZigTest;
2275 if (cqe_openat.res <= 0) std.debug.print("\ncqe_openat.res={}\n", .{cqe_openat.res});
2276 try testing.expect(cqe_openat.res > 0);
2277 try testing.expectEqual(@as(u32, 0), cqe_openat.flags);
2278
2279 posix.close(cqe_openat.res);
2280}
2281
2282test "close" {
2283 if (!is_linux) return error.SkipZigTest;
2284
2285 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2286 error.SystemOutdated => return error.SkipZigTest,
2287 error.PermissionDenied => return error.SkipZigTest,
2288 else => return err,
2289 };
2290 defer ring.deinit();
2291
2292 var tmp = std.testing.tmpDir(.{});
2293 defer tmp.cleanup();
2294
2295 const path = "test_io_uring_close";
2296 const file = try tmp.dir.createFile(path, .{});
2297 errdefer file.close();
2298
2299 const sqe_close = try ring.close(0x44444444, file.handle);
2300 try testing.expectEqual(linux.IORING_OP.CLOSE, sqe_close.opcode);
2301 try testing.expectEqual(file.handle, sqe_close.fd);
2302 try testing.expectEqual(@as(u32, 1), try ring.submit());
2303
2304 const cqe_close = try ring.copy_cqe();
2305 if (cqe_close.err() == .INVAL) return error.SkipZigTest;
2306 try testing.expectEqual(linux.io_uring_cqe{
2307 .user_data = 0x44444444,
2308 .res = 0,
2309 .flags = 0,
2310 }, cqe_close);
2311}
2312
2313test "accept/connect/send/recv" {
2314 if (!is_linux) return error.SkipZigTest;
2315
2316 var ring = IoUring.init(16, 0) catch |err| switch (err) {
2317 error.SystemOutdated => return error.SkipZigTest,
2318 error.PermissionDenied => return error.SkipZigTest,
2319 else => return err,
2320 };
2321 defer ring.deinit();
2322
2323 const socket_test_harness = try createSocketTestHarness(&ring);
2324 defer socket_test_harness.close();
2325
2326 const buffer_send = [_]u8{ 1, 0, 1, 0, 1, 0, 1, 0, 1, 0 };
2327 var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 };
2328
2329 const sqe_send = try ring.send(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0);
2330 sqe_send.flags |= linux.IOSQE_IO_LINK;
2331 _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
2332 try testing.expectEqual(@as(u32, 2), try ring.submit());
2333
2334 const cqe_send = try ring.copy_cqe();
2335 if (cqe_send.err() == .INVAL) return error.SkipZigTest;
2336 try testing.expectEqual(linux.io_uring_cqe{
2337 .user_data = 0xeeeeeeee,
2338 .res = buffer_send.len,
2339 .flags = 0,
2340 }, cqe_send);
2341
2342 const cqe_recv = try ring.copy_cqe();
2343 if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
2344 try testing.expectEqual(linux.io_uring_cqe{
2345 .user_data = 0xffffffff,
2346 .res = buffer_recv.len,
2347 // ignore IORING_CQE_F_SOCK_NONEMPTY since it is only set on some systems
2348 .flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
2349 }, cqe_recv);
2350
2351 try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
2352}
2353
2354test "sendmsg/recvmsg" {
2355 if (!is_linux) return error.SkipZigTest;
2356
2357 var ring = IoUring.init(2, 0) catch |err| switch (err) {
2358 error.SystemOutdated => return error.SkipZigTest,
2359 error.PermissionDenied => return error.SkipZigTest,
2360 else => return err,
2361 };
2362 defer ring.deinit();
2363
2364 var address_server: linux.sockaddr.in = .{
2365 .port = 0,
2366 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
2367 };
2368
2369 const server = try posix.socket(address_server.family, posix.SOCK.DGRAM, 0);
2370 defer posix.close(server);
2371 try posix.setsockopt(server, posix.SOL.SOCKET, posix.SO.REUSEPORT, &mem.toBytes(@as(c_int, 1)));
2372 try posix.setsockopt(server, posix.SOL.SOCKET, posix.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1)));
2373 try posix.bind(server, addrAny(&address_server), @sizeOf(linux.sockaddr.in));
2374
2375 // set address_server to the OS-chosen IP/port.
2376 var slen: posix.socklen_t = @sizeOf(linux.sockaddr.in);
2377 try posix.getsockname(server, addrAny(&address_server), &slen);
2378
2379 const client = try posix.socket(address_server.family, posix.SOCK.DGRAM, 0);
2380 defer posix.close(client);
2381
2382 const buffer_send = [_]u8{42} ** 128;
2383 const iovecs_send = [_]posix.iovec_const{
2384 posix.iovec_const{ .base = &buffer_send, .len = buffer_send.len },
2385 };
2386 const msg_send: linux.msghdr_const = .{
2387 .name = addrAny(&address_server),
2388 .namelen = @sizeOf(linux.sockaddr.in),
2389 .iov = &iovecs_send,
2390 .iovlen = 1,
2391 .control = null,
2392 .controllen = 0,
2393 .flags = 0,
2394 };
2395 const sqe_sendmsg = try ring.sendmsg(0x11111111, client, &msg_send, 0);
2396 sqe_sendmsg.flags |= linux.IOSQE_IO_LINK;
2397 try testing.expectEqual(linux.IORING_OP.SENDMSG, sqe_sendmsg.opcode);
2398 try testing.expectEqual(client, sqe_sendmsg.fd);
2399
2400 var buffer_recv = [_]u8{0} ** 128;
2401 var iovecs_recv = [_]posix.iovec{
2402 posix.iovec{ .base = &buffer_recv, .len = buffer_recv.len },
2403 };
2404 var address_recv: linux.sockaddr.in = .{
2405 .port = 0,
2406 .addr = 0,
2407 };
2408 var msg_recv: linux.msghdr = .{
2409 .name = addrAny(&address_recv),
2410 .namelen = @sizeOf(linux.sockaddr.in),
2411 .iov = &iovecs_recv,
2412 .iovlen = 1,
2413 .control = null,
2414 .controllen = 0,
2415 .flags = 0,
2416 };
2417 const sqe_recvmsg = try ring.recvmsg(0x22222222, server, &msg_recv, 0);
2418 try testing.expectEqual(linux.IORING_OP.RECVMSG, sqe_recvmsg.opcode);
2419 try testing.expectEqual(server, sqe_recvmsg.fd);
2420
2421 try testing.expectEqual(@as(u32, 2), ring.sq_ready());
2422 try testing.expectEqual(@as(u32, 2), try ring.submit_and_wait(2));
2423 try testing.expectEqual(@as(u32, 0), ring.sq_ready());
2424 try testing.expectEqual(@as(u32, 2), ring.cq_ready());
2425
2426 const cqe_sendmsg = try ring.copy_cqe();
2427 if (cqe_sendmsg.res == -@as(i32, @intFromEnum(linux.E.INVAL))) return error.SkipZigTest;
2428 try testing.expectEqual(linux.io_uring_cqe{
2429 .user_data = 0x11111111,
2430 .res = buffer_send.len,
2431 .flags = 0,
2432 }, cqe_sendmsg);
2433
2434 const cqe_recvmsg = try ring.copy_cqe();
2435 if (cqe_recvmsg.res == -@as(i32, @intFromEnum(linux.E.INVAL))) return error.SkipZigTest;
2436 try testing.expectEqual(linux.io_uring_cqe{
2437 .user_data = 0x22222222,
2438 .res = buffer_recv.len,
2439 // ignore IORING_CQE_F_SOCK_NONEMPTY since it is set non-deterministically
2440 .flags = cqe_recvmsg.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
2441 }, cqe_recvmsg);
2442
2443 try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
2444}
2445
2446test "timeout (after a relative time)" {
2447 if (!is_linux) return error.SkipZigTest;
2448
2449 const io = testing.io;
2450
2451 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2452 error.SystemOutdated => return error.SkipZigTest,
2453 error.PermissionDenied => return error.SkipZigTest,
2454 else => return err,
2455 };
2456 defer ring.deinit();
2457
2458 const ms = 10;
2459 const margin = 5;
2460 const ts: linux.kernel_timespec = .{ .sec = 0, .nsec = ms * 1000000 };
2461
2462 const started = try std.Io.Clock.awake.now(io);
2463 const sqe = try ring.timeout(0x55555555, &ts, 0, 0);
2464 try testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe.opcode);
2465 try testing.expectEqual(@as(u32, 1), try ring.submit());
2466 const cqe = try ring.copy_cqe();
2467 const stopped = try std.Io.Clock.awake.now(io);
2468
2469 try testing.expectEqual(linux.io_uring_cqe{
2470 .user_data = 0x55555555,
2471 .res = -@as(i32, @intFromEnum(linux.E.TIME)),
2472 .flags = 0,
2473 }, cqe);
2474
2475 // Tests should not depend on timings: skip test if outside margin.
2476 const ms_elapsed = started.durationTo(stopped).toMilliseconds();
2477 if (ms_elapsed > margin) return error.SkipZigTest;
2478}
2479
2480test "timeout (after a number of completions)" {
2481 if (!is_linux) return error.SkipZigTest;
2482
2483 var ring = IoUring.init(2, 0) catch |err| switch (err) {
2484 error.SystemOutdated => return error.SkipZigTest,
2485 error.PermissionDenied => return error.SkipZigTest,
2486 else => return err,
2487 };
2488 defer ring.deinit();
2489
2490 const ts: linux.kernel_timespec = .{ .sec = 3, .nsec = 0 };
2491 const count_completions: u64 = 1;
2492 const sqe_timeout = try ring.timeout(0x66666666, &ts, count_completions, 0);
2493 try testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe_timeout.opcode);
2494 try testing.expectEqual(count_completions, sqe_timeout.off);
2495 _ = try ring.nop(0x77777777);
2496 try testing.expectEqual(@as(u32, 2), try ring.submit());
2497
2498 const cqe_nop = try ring.copy_cqe();
2499 try testing.expectEqual(linux.io_uring_cqe{
2500 .user_data = 0x77777777,
2501 .res = 0,
2502 .flags = 0,
2503 }, cqe_nop);
2504
2505 const cqe_timeout = try ring.copy_cqe();
2506 try testing.expectEqual(linux.io_uring_cqe{
2507 .user_data = 0x66666666,
2508 .res = 0,
2509 .flags = 0,
2510 }, cqe_timeout);
2511}
2512
2513test "timeout_remove" {
2514 if (!is_linux) return error.SkipZigTest;
2515
2516 var ring = IoUring.init(2, 0) catch |err| switch (err) {
2517 error.SystemOutdated => return error.SkipZigTest,
2518 error.PermissionDenied => return error.SkipZigTest,
2519 else => return err,
2520 };
2521 defer ring.deinit();
2522
2523 const ts: linux.kernel_timespec = .{ .sec = 3, .nsec = 0 };
2524 const sqe_timeout = try ring.timeout(0x88888888, &ts, 0, 0);
2525 try testing.expectEqual(linux.IORING_OP.TIMEOUT, sqe_timeout.opcode);
2526 try testing.expectEqual(@as(u64, 0x88888888), sqe_timeout.user_data);
2527
2528 const sqe_timeout_remove = try ring.timeout_remove(0x99999999, 0x88888888, 0);
2529 try testing.expectEqual(linux.IORING_OP.TIMEOUT_REMOVE, sqe_timeout_remove.opcode);
2530 try testing.expectEqual(@as(u64, 0x88888888), sqe_timeout_remove.addr);
2531 try testing.expectEqual(@as(u64, 0x99999999), sqe_timeout_remove.user_data);
2532
2533 try testing.expectEqual(@as(u32, 2), try ring.submit());
2534
2535 // The order in which the CQE arrive is not clearly documented and it changed with kernel 5.18:
2536 // * kernel 5.10 gives user data 0x88888888 first, 0x99999999 second
2537 // * kernel 5.18 gives user data 0x99999999 first, 0x88888888 second
2538
2539 var cqes: [2]linux.io_uring_cqe = undefined;
2540 cqes[0] = try ring.copy_cqe();
2541 cqes[1] = try ring.copy_cqe();
2542
2543 for (cqes) |cqe| {
2544 // IORING_OP_TIMEOUT_REMOVE is not supported by this kernel version:
2545 // Timeout remove operations set the fd to -1, which results in EBADF before EINVAL.
2546 // We use IORING_FEAT_RW_CUR_POS as a safety check here to make sure we are at least pre-5.6.
2547 // We don't want to skip this test for newer kernels.
2548 if (cqe.user_data == 0x99999999 and
2549 cqe.err() == .BADF and
2550 (ring.features & linux.IORING_FEAT_RW_CUR_POS) == 0)
2551 {
2552 return error.SkipZigTest;
2553 }
2554
2555 try testing.expect(cqe.user_data == 0x88888888 or cqe.user_data == 0x99999999);
2556
2557 if (cqe.user_data == 0x88888888) {
2558 try testing.expectEqual(linux.io_uring_cqe{
2559 .user_data = 0x88888888,
2560 .res = -@as(i32, @intFromEnum(linux.E.CANCELED)),
2561 .flags = 0,
2562 }, cqe);
2563 } else if (cqe.user_data == 0x99999999) {
2564 try testing.expectEqual(linux.io_uring_cqe{
2565 .user_data = 0x99999999,
2566 .res = 0,
2567 .flags = 0,
2568 }, cqe);
2569 }
2570 }
2571}
2572
2573test "accept/connect/recv/link_timeout" {
2574 if (!is_linux) return error.SkipZigTest;
2575
2576 var ring = IoUring.init(16, 0) catch |err| switch (err) {
2577 error.SystemOutdated => return error.SkipZigTest,
2578 error.PermissionDenied => return error.SkipZigTest,
2579 else => return err,
2580 };
2581 defer ring.deinit();
2582
2583 const socket_test_harness = try createSocketTestHarness(&ring);
2584 defer socket_test_harness.close();
2585
2586 var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 };
2587
2588 const sqe_recv = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
2589 sqe_recv.flags |= linux.IOSQE_IO_LINK;
2590
2591 const ts = linux.kernel_timespec{ .sec = 0, .nsec = 1000000 };
2592 _ = try ring.link_timeout(0x22222222, &ts, 0);
2593
2594 const nr_wait = try ring.submit();
2595 try testing.expectEqual(@as(u32, 2), nr_wait);
2596
2597 var i: usize = 0;
2598 while (i < nr_wait) : (i += 1) {
2599 const cqe = try ring.copy_cqe();
2600 switch (cqe.user_data) {
2601 0xffffffff => {
2602 if (cqe.res != -@as(i32, @intFromEnum(linux.E.INTR)) and
2603 cqe.res != -@as(i32, @intFromEnum(linux.E.CANCELED)))
2604 {
2605 std.debug.print("Req 0x{x} got {d}\n", .{ cqe.user_data, cqe.res });
2606 try testing.expect(false);
2607 }
2608 },
2609 0x22222222 => {
2610 if (cqe.res != -@as(i32, @intFromEnum(linux.E.ALREADY)) and
2611 cqe.res != -@as(i32, @intFromEnum(linux.E.TIME)))
2612 {
2613 std.debug.print("Req 0x{x} got {d}\n", .{ cqe.user_data, cqe.res });
2614 try testing.expect(false);
2615 }
2616 },
2617 else => @panic("should not happen"),
2618 }
2619 }
2620}
2621
2622test "fallocate" {
2623 if (!is_linux) return error.SkipZigTest;
2624
2625 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2626 error.SystemOutdated => return error.SkipZigTest,
2627 error.PermissionDenied => return error.SkipZigTest,
2628 else => return err,
2629 };
2630 defer ring.deinit();
2631
2632 var tmp = std.testing.tmpDir(.{});
2633 defer tmp.cleanup();
2634
2635 const path = "test_io_uring_fallocate";
2636 const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
2637 defer file.close();
2638
2639 try testing.expectEqual(@as(u64, 0), (try file.stat()).size);
2640
2641 const len: u64 = 65536;
2642 const sqe = try ring.fallocate(0xaaaaaaaa, file.handle, 0, 0, len);
2643 try testing.expectEqual(linux.IORING_OP.FALLOCATE, sqe.opcode);
2644 try testing.expectEqual(file.handle, sqe.fd);
2645 try testing.expectEqual(@as(u32, 1), try ring.submit());
2646
2647 const cqe = try ring.copy_cqe();
2648 switch (cqe.err()) {
2649 .SUCCESS => {},
2650 // This kernel's io_uring does not yet implement fallocate():
2651 .INVAL => return error.SkipZigTest,
2652 // This kernel does not implement fallocate():
2653 .NOSYS => return error.SkipZigTest,
2654 // The filesystem containing the file referred to by fd does not support this operation;
2655 // or the mode is not supported by the filesystem containing the file referred to by fd:
2656 .OPNOTSUPP => return error.SkipZigTest,
2657 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2658 }
2659 try testing.expectEqual(linux.io_uring_cqe{
2660 .user_data = 0xaaaaaaaa,
2661 .res = 0,
2662 .flags = 0,
2663 }, cqe);
2664
2665 try testing.expectEqual(len, (try file.stat()).size);
2666}
2667
2668test "statx" {
2669 if (!is_linux) return error.SkipZigTest;
2670
2671 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2672 error.SystemOutdated => return error.SkipZigTest,
2673 error.PermissionDenied => return error.SkipZigTest,
2674 else => return err,
2675 };
2676 defer ring.deinit();
2677
2678 var tmp = std.testing.tmpDir(.{});
2679 defer tmp.cleanup();
2680 const path = "test_io_uring_statx";
2681 const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
2682 defer file.close();
2683
2684 try testing.expectEqual(@as(u64, 0), (try file.stat()).size);
2685
2686 try file.writeAll("foobar");
2687
2688 var buf: linux.Statx = undefined;
2689 const sqe = try ring.statx(
2690 0xaaaaaaaa,
2691 tmp.dir.fd,
2692 path,
2693 0,
2694 linux.STATX_SIZE,
2695 &buf,
2696 );
2697 try testing.expectEqual(linux.IORING_OP.STATX, sqe.opcode);
2698 try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
2699 try testing.expectEqual(@as(u32, 1), try ring.submit());
2700
2701 const cqe = try ring.copy_cqe();
2702 switch (cqe.err()) {
2703 .SUCCESS => {},
2704 // This kernel's io_uring does not yet implement statx():
2705 .INVAL => return error.SkipZigTest,
2706 // This kernel does not implement statx():
2707 .NOSYS => return error.SkipZigTest,
2708 // The filesystem containing the file referred to by fd does not support this operation;
2709 // or the mode is not supported by the filesystem containing the file referred to by fd:
2710 .OPNOTSUPP => return error.SkipZigTest,
2711 // not supported on older kernels (5.4)
2712 .BADF => return error.SkipZigTest,
2713 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2714 }
2715 try testing.expectEqual(linux.io_uring_cqe{
2716 .user_data = 0xaaaaaaaa,
2717 .res = 0,
2718 .flags = 0,
2719 }, cqe);
2720
2721 try testing.expect(buf.mask & linux.STATX_SIZE == linux.STATX_SIZE);
2722 try testing.expectEqual(@as(u64, 6), buf.size);
2723}
2724
2725test "accept/connect/recv/cancel" {
2726 if (!is_linux) return error.SkipZigTest;
2727
2728 var ring = IoUring.init(16, 0) catch |err| switch (err) {
2729 error.SystemOutdated => return error.SkipZigTest,
2730 error.PermissionDenied => return error.SkipZigTest,
2731 else => return err,
2732 };
2733 defer ring.deinit();
2734
2735 const socket_test_harness = try createSocketTestHarness(&ring);
2736 defer socket_test_harness.close();
2737
2738 var buffer_recv = [_]u8{ 0, 1, 0, 1, 0 };
2739
2740 _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
2741 try testing.expectEqual(@as(u32, 1), try ring.submit());
2742
2743 const sqe_cancel = try ring.cancel(0x99999999, 0xffffffff, 0);
2744 try testing.expectEqual(linux.IORING_OP.ASYNC_CANCEL, sqe_cancel.opcode);
2745 try testing.expectEqual(@as(u64, 0xffffffff), sqe_cancel.addr);
2746 try testing.expectEqual(@as(u64, 0x99999999), sqe_cancel.user_data);
2747 try testing.expectEqual(@as(u32, 1), try ring.submit());
2748
2749 var cqe_recv = try ring.copy_cqe();
2750 if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
2751 var cqe_cancel = try ring.copy_cqe();
2752 if (cqe_cancel.err() == .INVAL) return error.SkipZigTest;
2753
2754 // The recv/cancel CQEs may arrive in any order, the recv CQE will sometimes come first:
2755 if (cqe_recv.user_data == 0x99999999 and cqe_cancel.user_data == 0xffffffff) {
2756 const a = cqe_recv;
2757 const b = cqe_cancel;
2758 cqe_recv = b;
2759 cqe_cancel = a;
2760 }
2761
2762 try testing.expectEqual(linux.io_uring_cqe{
2763 .user_data = 0xffffffff,
2764 .res = -@as(i32, @intFromEnum(linux.E.CANCELED)),
2765 .flags = 0,
2766 }, cqe_recv);
2767
2768 try testing.expectEqual(linux.io_uring_cqe{
2769 .user_data = 0x99999999,
2770 .res = 0,
2771 .flags = 0,
2772 }, cqe_cancel);
2773}
2774
2775test "register_files_update" {
2776 if (!is_linux) return error.SkipZigTest;
2777
2778 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2779 error.SystemOutdated => return error.SkipZigTest,
2780 error.PermissionDenied => return error.SkipZigTest,
2781 else => return err,
2782 };
2783 defer ring.deinit();
2784
2785 const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
2786 defer posix.close(fd);
2787
2788 var registered_fds = [_]linux.fd_t{0} ** 2;
2789 const fd_index = 0;
2790 const fd_index2 = 1;
2791 registered_fds[fd_index] = fd;
2792 registered_fds[fd_index2] = -1;
2793
2794 ring.register_files(registered_fds[0..]) catch |err| switch (err) {
2795 // Happens when the kernel doesn't support sparse entry (-1) in the file descriptors array.
2796 error.FileDescriptorInvalid => return error.SkipZigTest,
2797 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2798 };
2799
2800 // Test IORING_REGISTER_FILES_UPDATE
2801 // Only available since Linux 5.5
2802
2803 const fd2 = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
2804 defer posix.close(fd2);
2805
2806 registered_fds[fd_index] = fd2;
2807 registered_fds[fd_index2] = -1;
2808 try ring.register_files_update(0, registered_fds[0..]);
2809
2810 var buffer = [_]u8{42} ** 128;
2811 {
2812 const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0);
2813 try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
2814 sqe.flags |= linux.IOSQE_FIXED_FILE;
2815
2816 try testing.expectEqual(@as(u32, 1), try ring.submit());
2817 try testing.expectEqual(linux.io_uring_cqe{
2818 .user_data = 0xcccccccc,
2819 .res = buffer.len,
2820 .flags = 0,
2821 }, try ring.copy_cqe());
2822 try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer.len), buffer[0..]);
2823 }
2824
2825 // Test with a non-zero offset
2826
2827 registered_fds[fd_index] = -1;
2828 registered_fds[fd_index2] = -1;
2829 try ring.register_files_update(1, registered_fds[1..]);
2830
2831 {
2832 // Next read should still work since fd_index in the registered file descriptors hasn't been updated yet.
2833 const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0);
2834 try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
2835 sqe.flags |= linux.IOSQE_FIXED_FILE;
2836
2837 try testing.expectEqual(@as(u32, 1), try ring.submit());
2838 try testing.expectEqual(linux.io_uring_cqe{
2839 .user_data = 0xcccccccc,
2840 .res = buffer.len,
2841 .flags = 0,
2842 }, try ring.copy_cqe());
2843 try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer.len), buffer[0..]);
2844 }
2845
2846 try ring.register_files_update(0, registered_fds[0..]);
2847
2848 {
2849 // Now this should fail since both fds are sparse (-1)
2850 const sqe = try ring.read(0xcccccccc, fd_index, .{ .buffer = &buffer }, 0);
2851 try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
2852 sqe.flags |= linux.IOSQE_FIXED_FILE;
2853
2854 try testing.expectEqual(@as(u32, 1), try ring.submit());
2855 const cqe = try ring.copy_cqe();
2856 try testing.expectEqual(linux.E.BADF, cqe.err());
2857 }
2858
2859 try ring.unregister_files();
2860}
2861
2862test "shutdown" {
2863 if (!is_linux) return error.SkipZigTest;
2864
2865 var ring = IoUring.init(16, 0) catch |err| switch (err) {
2866 error.SystemOutdated => return error.SkipZigTest,
2867 error.PermissionDenied => return error.SkipZigTest,
2868 else => return err,
2869 };
2870 defer ring.deinit();
2871
2872 var address: linux.sockaddr.in = .{
2873 .port = 0,
2874 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
2875 };
2876
2877 // Socket bound, expect shutdown to work
2878 {
2879 const server = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
2880 defer posix.close(server);
2881 try posix.setsockopt(server, posix.SOL.SOCKET, posix.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1)));
2882 try posix.bind(server, addrAny(&address), @sizeOf(linux.sockaddr.in));
2883 try posix.listen(server, 1);
2884
2885 // set address to the OS-chosen IP/port.
2886 var slen: posix.socklen_t = @sizeOf(linux.sockaddr.in);
2887 try posix.getsockname(server, addrAny(&address), &slen);
2888
2889 const shutdown_sqe = try ring.shutdown(0x445445445, server, linux.SHUT.RD);
2890 try testing.expectEqual(linux.IORING_OP.SHUTDOWN, shutdown_sqe.opcode);
2891 try testing.expectEqual(@as(i32, server), shutdown_sqe.fd);
2892
2893 try testing.expectEqual(@as(u32, 1), try ring.submit());
2894
2895 const cqe = try ring.copy_cqe();
2896 switch (cqe.err()) {
2897 .SUCCESS => {},
2898 // This kernel's io_uring does not yet implement shutdown (kernel version < 5.11)
2899 .INVAL => return error.SkipZigTest,
2900 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2901 }
2902
2903 try testing.expectEqual(linux.io_uring_cqe{
2904 .user_data = 0x445445445,
2905 .res = 0,
2906 .flags = 0,
2907 }, cqe);
2908 }
2909
2910 // Socket not bound, expect to fail with ENOTCONN
2911 {
2912 const server = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
2913 defer posix.close(server);
2914
2915 const shutdown_sqe = ring.shutdown(0x445445445, server, linux.SHUT.RD) catch |err| switch (err) {
2916 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2917 };
2918 try testing.expectEqual(linux.IORING_OP.SHUTDOWN, shutdown_sqe.opcode);
2919 try testing.expectEqual(@as(i32, server), shutdown_sqe.fd);
2920
2921 try testing.expectEqual(@as(u32, 1), try ring.submit());
2922
2923 const cqe = try ring.copy_cqe();
2924 try testing.expectEqual(@as(u64, 0x445445445), cqe.user_data);
2925 try testing.expectEqual(linux.E.NOTCONN, cqe.err());
2926 }
2927}
2928
2929test "renameat" {
2930 if (!is_linux) return error.SkipZigTest;
2931
2932 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2933 error.SystemOutdated => return error.SkipZigTest,
2934 error.PermissionDenied => return error.SkipZigTest,
2935 else => return err,
2936 };
2937 defer ring.deinit();
2938
2939 const old_path = "test_io_uring_renameat_old";
2940 const new_path = "test_io_uring_renameat_new";
2941
2942 var tmp = std.testing.tmpDir(.{});
2943 defer tmp.cleanup();
2944
2945 // Write old file with data
2946
2947 const old_file = try tmp.dir.createFile(old_path, .{ .truncate = true, .mode = 0o666 });
2948 defer old_file.close();
2949 try old_file.writeAll("hello");
2950
2951 // Submit renameat
2952
2953 const sqe = try ring.renameat(
2954 0x12121212,
2955 tmp.dir.fd,
2956 old_path,
2957 tmp.dir.fd,
2958 new_path,
2959 0,
2960 );
2961 try testing.expectEqual(linux.IORING_OP.RENAMEAT, sqe.opcode);
2962 try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
2963 try testing.expectEqual(@as(i32, tmp.dir.fd), @as(i32, @bitCast(sqe.len)));
2964 try testing.expectEqual(@as(u32, 1), try ring.submit());
2965
2966 const cqe = try ring.copy_cqe();
2967 switch (cqe.err()) {
2968 .SUCCESS => {},
2969 // This kernel's io_uring does not yet implement renameat (kernel version < 5.11)
2970 .BADF, .INVAL => return error.SkipZigTest,
2971 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
2972 }
2973 try testing.expectEqual(linux.io_uring_cqe{
2974 .user_data = 0x12121212,
2975 .res = 0,
2976 .flags = 0,
2977 }, cqe);
2978
2979 // Validate that the old file doesn't exist anymore
2980 try testing.expectError(error.FileNotFound, tmp.dir.openFile(old_path, .{}));
2981
2982 // Validate that the new file exists with the proper content
2983 var new_file_data: [16]u8 = undefined;
2984 try testing.expectEqualStrings("hello", try tmp.dir.readFile(new_path, &new_file_data));
2985}
2986
2987test "unlinkat" {
2988 if (!is_linux) return error.SkipZigTest;
2989
2990 var ring = IoUring.init(1, 0) catch |err| switch (err) {
2991 error.SystemOutdated => return error.SkipZigTest,
2992 error.PermissionDenied => return error.SkipZigTest,
2993 else => return err,
2994 };
2995 defer ring.deinit();
2996
2997 const path = "test_io_uring_unlinkat";
2998
2999 var tmp = std.testing.tmpDir(.{});
3000 defer tmp.cleanup();
3001
3002 // Write old file with data
3003
3004 const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
3005 defer file.close();
3006
3007 // Submit unlinkat
3008
3009 const sqe = try ring.unlinkat(
3010 0x12121212,
3011 tmp.dir.fd,
3012 path,
3013 0,
3014 );
3015 try testing.expectEqual(linux.IORING_OP.UNLINKAT, sqe.opcode);
3016 try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3017 try testing.expectEqual(@as(u32, 1), try ring.submit());
3018
3019 const cqe = try ring.copy_cqe();
3020 switch (cqe.err()) {
3021 .SUCCESS => {},
3022 // This kernel's io_uring does not yet implement unlinkat (kernel version < 5.11)
3023 .BADF, .INVAL => return error.SkipZigTest,
3024 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3025 }
3026 try testing.expectEqual(linux.io_uring_cqe{
3027 .user_data = 0x12121212,
3028 .res = 0,
3029 .flags = 0,
3030 }, cqe);
3031
3032 // Validate that the file doesn't exist anymore
3033 _ = tmp.dir.openFile(path, .{}) catch |err| switch (err) {
3034 error.FileNotFound => {},
3035 else => std.debug.panic("unexpected error: {}", .{err}),
3036 };
3037}
3038
3039test "mkdirat" {
3040 if (!is_linux) return error.SkipZigTest;
3041
3042 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3043 error.SystemOutdated => return error.SkipZigTest,
3044 error.PermissionDenied => return error.SkipZigTest,
3045 else => return err,
3046 };
3047 defer ring.deinit();
3048
3049 var tmp = std.testing.tmpDir(.{});
3050 defer tmp.cleanup();
3051
3052 const path = "test_io_uring_mkdirat";
3053
3054 // Submit mkdirat
3055
3056 const sqe = try ring.mkdirat(
3057 0x12121212,
3058 tmp.dir.fd,
3059 path,
3060 0o0755,
3061 );
3062 try testing.expectEqual(linux.IORING_OP.MKDIRAT, sqe.opcode);
3063 try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3064 try testing.expectEqual(@as(u32, 1), try ring.submit());
3065
3066 const cqe = try ring.copy_cqe();
3067 switch (cqe.err()) {
3068 .SUCCESS => {},
3069 // This kernel's io_uring does not yet implement mkdirat (kernel version < 5.15)
3070 .BADF, .INVAL => return error.SkipZigTest,
3071 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3072 }
3073 try testing.expectEqual(linux.io_uring_cqe{
3074 .user_data = 0x12121212,
3075 .res = 0,
3076 .flags = 0,
3077 }, cqe);
3078
3079 // Validate that the directory exist
3080 _ = try tmp.dir.openDir(path, .{});
3081}
3082
3083test "symlinkat" {
3084 if (!is_linux) return error.SkipZigTest;
3085
3086 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3087 error.SystemOutdated => return error.SkipZigTest,
3088 error.PermissionDenied => return error.SkipZigTest,
3089 else => return err,
3090 };
3091 defer ring.deinit();
3092
3093 var tmp = std.testing.tmpDir(.{});
3094 defer tmp.cleanup();
3095
3096 const path = "test_io_uring_symlinkat";
3097 const link_path = "test_io_uring_symlinkat_link";
3098
3099 const file = try tmp.dir.createFile(path, .{ .truncate = true, .mode = 0o666 });
3100 defer file.close();
3101
3102 // Submit symlinkat
3103
3104 const sqe = try ring.symlinkat(
3105 0x12121212,
3106 path,
3107 tmp.dir.fd,
3108 link_path,
3109 );
3110 try testing.expectEqual(linux.IORING_OP.SYMLINKAT, sqe.opcode);
3111 try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3112 try testing.expectEqual(@as(u32, 1), try ring.submit());
3113
3114 const cqe = try ring.copy_cqe();
3115 switch (cqe.err()) {
3116 .SUCCESS => {},
3117 // This kernel's io_uring does not yet implement symlinkat (kernel version < 5.15)
3118 .BADF, .INVAL => return error.SkipZigTest,
3119 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3120 }
3121 try testing.expectEqual(linux.io_uring_cqe{
3122 .user_data = 0x12121212,
3123 .res = 0,
3124 .flags = 0,
3125 }, cqe);
3126
3127 // Validate that the symlink exist
3128 _ = try tmp.dir.openFile(link_path, .{});
3129}
3130
3131test "linkat" {
3132 if (!is_linux) return error.SkipZigTest;
3133
3134 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3135 error.SystemOutdated => return error.SkipZigTest,
3136 error.PermissionDenied => return error.SkipZigTest,
3137 else => return err,
3138 };
3139 defer ring.deinit();
3140
3141 var tmp = std.testing.tmpDir(.{});
3142 defer tmp.cleanup();
3143
3144 const first_path = "test_io_uring_linkat_first";
3145 const second_path = "test_io_uring_linkat_second";
3146
3147 // Write file with data
3148
3149 const first_file = try tmp.dir.createFile(first_path, .{ .truncate = true, .mode = 0o666 });
3150 defer first_file.close();
3151 try first_file.writeAll("hello");
3152
3153 // Submit linkat
3154
3155 const sqe = try ring.linkat(
3156 0x12121212,
3157 tmp.dir.fd,
3158 first_path,
3159 tmp.dir.fd,
3160 second_path,
3161 0,
3162 );
3163 try testing.expectEqual(linux.IORING_OP.LINKAT, sqe.opcode);
3164 try testing.expectEqual(@as(i32, tmp.dir.fd), sqe.fd);
3165 try testing.expectEqual(@as(i32, tmp.dir.fd), @as(i32, @bitCast(sqe.len)));
3166 try testing.expectEqual(@as(u32, 1), try ring.submit());
3167
3168 const cqe = try ring.copy_cqe();
3169 switch (cqe.err()) {
3170 .SUCCESS => {},
3171 // This kernel's io_uring does not yet implement linkat (kernel version < 5.15)
3172 .BADF, .INVAL => return error.SkipZigTest,
3173 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3174 }
3175 try testing.expectEqual(linux.io_uring_cqe{
3176 .user_data = 0x12121212,
3177 .res = 0,
3178 .flags = 0,
3179 }, cqe);
3180
3181 // Validate the second file
3182 var second_file_data: [16]u8 = undefined;
3183 try testing.expectEqualStrings("hello", try tmp.dir.readFile(second_path, &second_file_data));
3184}
3185
3186test "provide_buffers: read" {
3187 if (!is_linux) return error.SkipZigTest;
3188
3189 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3190 error.SystemOutdated => return error.SkipZigTest,
3191 error.PermissionDenied => return error.SkipZigTest,
3192 else => return err,
3193 };
3194 defer ring.deinit();
3195
3196 const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
3197 defer posix.close(fd);
3198
3199 const group_id = 1337;
3200 const buffer_id = 0;
3201
3202 const buffer_len = 128;
3203
3204 var buffers: [4][buffer_len]u8 = undefined;
3205
3206 // Provide 4 buffers
3207
3208 {
3209 const sqe = try ring.provide_buffers(0xcccccccc, @as([*]u8, @ptrCast(&buffers)), buffer_len, buffers.len, group_id, buffer_id);
3210 try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
3211 try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
3212 try testing.expectEqual(@as(u32, buffers[0].len), sqe.len);
3213 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3214 try testing.expectEqual(@as(u32, 1), try ring.submit());
3215
3216 const cqe = try ring.copy_cqe();
3217 switch (cqe.err()) {
3218 // Happens when the kernel is < 5.7
3219 .INVAL, .BADF => return error.SkipZigTest,
3220 .SUCCESS => {},
3221 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3222 }
3223 try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
3224 }
3225
3226 // Do 4 reads which should consume all buffers
3227
3228 var i: usize = 0;
3229 while (i < buffers.len) : (i += 1) {
3230 const sqe = try ring.read(0xdededede, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3231 try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
3232 try testing.expectEqual(@as(i32, fd), sqe.fd);
3233 try testing.expectEqual(@as(u64, 0), sqe.addr);
3234 try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3235 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3236 try testing.expectEqual(@as(u32, 1), try ring.submit());
3237
3238 const cqe = try ring.copy_cqe();
3239 switch (cqe.err()) {
3240 .SUCCESS => {},
3241 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3242 }
3243
3244 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3245 const used_buffer_id = cqe.flags >> 16;
3246 try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
3247 try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3248
3249 try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
3250 try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))]);
3251 }
3252
3253 // This read should fail
3254
3255 {
3256 const sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3257 try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
3258 try testing.expectEqual(@as(i32, fd), sqe.fd);
3259 try testing.expectEqual(@as(u64, 0), sqe.addr);
3260 try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3261 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3262 try testing.expectEqual(@as(u32, 1), try ring.submit());
3263
3264 const cqe = try ring.copy_cqe();
3265 switch (cqe.err()) {
3266 // Expected
3267 .NOBUFS => {},
3268 .SUCCESS => std.debug.panic("unexpected success", .{}),
3269 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3270 }
3271 try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3272 }
3273
3274 // Provide 1 buffer again
3275
3276 // Deliberately put something we don't expect in the buffers
3277 @memset(mem.sliceAsBytes(&buffers), 42);
3278
3279 const reprovided_buffer_id = 2;
3280
3281 {
3282 _ = try ring.provide_buffers(0xabababab, @as([*]u8, @ptrCast(&buffers[reprovided_buffer_id])), buffer_len, 1, group_id, reprovided_buffer_id);
3283 try testing.expectEqual(@as(u32, 1), try ring.submit());
3284
3285 const cqe = try ring.copy_cqe();
3286 switch (cqe.err()) {
3287 .SUCCESS => {},
3288 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3289 }
3290 }
3291
3292 // Final read which should work
3293
3294 {
3295 const sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3296 try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
3297 try testing.expectEqual(@as(i32, fd), sqe.fd);
3298 try testing.expectEqual(@as(u64, 0), sqe.addr);
3299 try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3300 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3301 try testing.expectEqual(@as(u32, 1), try ring.submit());
3302
3303 const cqe = try ring.copy_cqe();
3304 switch (cqe.err()) {
3305 .SUCCESS => {},
3306 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3307 }
3308
3309 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3310 const used_buffer_id = cqe.flags >> 16;
3311 try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
3312 try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3313 try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3314 try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))]);
3315 }
3316}
3317
3318test "remove_buffers" {
3319 if (!is_linux) return error.SkipZigTest;
3320
3321 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3322 error.SystemOutdated => return error.SkipZigTest,
3323 error.PermissionDenied => return error.SkipZigTest,
3324 else => return err,
3325 };
3326 defer ring.deinit();
3327
3328 const fd = try posix.openZ("/dev/zero", .{ .ACCMODE = .RDONLY, .CLOEXEC = true }, 0);
3329 defer posix.close(fd);
3330
3331 const group_id = 1337;
3332 const buffer_id = 0;
3333
3334 const buffer_len = 128;
3335
3336 var buffers: [4][buffer_len]u8 = undefined;
3337
3338 // Provide 4 buffers
3339
3340 {
3341 _ = try ring.provide_buffers(0xcccccccc, @as([*]u8, @ptrCast(&buffers)), buffer_len, buffers.len, group_id, buffer_id);
3342 try testing.expectEqual(@as(u32, 1), try ring.submit());
3343
3344 const cqe = try ring.copy_cqe();
3345 switch (cqe.err()) {
3346 .INVAL, .BADF => return error.SkipZigTest,
3347 .SUCCESS => {},
3348 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3349 }
3350 try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
3351 }
3352
3353 // Remove 3 buffers
3354
3355 {
3356 const sqe = try ring.remove_buffers(0xbababababa, 3, group_id);
3357 try testing.expectEqual(linux.IORING_OP.REMOVE_BUFFERS, sqe.opcode);
3358 try testing.expectEqual(@as(i32, 3), sqe.fd);
3359 try testing.expectEqual(@as(u64, 0), sqe.addr);
3360 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3361 try testing.expectEqual(@as(u32, 1), try ring.submit());
3362
3363 const cqe = try ring.copy_cqe();
3364 switch (cqe.err()) {
3365 .SUCCESS => {},
3366 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3367 }
3368 try testing.expectEqual(@as(u64, 0xbababababa), cqe.user_data);
3369 }
3370
3371 // This read should work
3372
3373 {
3374 _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3375 try testing.expectEqual(@as(u32, 1), try ring.submit());
3376
3377 const cqe = try ring.copy_cqe();
3378 switch (cqe.err()) {
3379 .SUCCESS => {},
3380 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3381 }
3382
3383 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3384 const used_buffer_id = cqe.flags >> 16;
3385 try testing.expect(used_buffer_id >= 0 and used_buffer_id < 4);
3386 try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3387 try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3388 try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))]);
3389 }
3390
3391 // Final read should _not_ work
3392
3393 {
3394 _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3395 try testing.expectEqual(@as(u32, 1), try ring.submit());
3396
3397 const cqe = try ring.copy_cqe();
3398 switch (cqe.err()) {
3399 // Expected
3400 .NOBUFS => {},
3401 .SUCCESS => std.debug.panic("unexpected success", .{}),
3402 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3403 }
3404 }
3405}
3406
3407test "provide_buffers: accept/connect/send/recv" {
3408 if (!is_linux) return error.SkipZigTest;
3409
3410 var ring = IoUring.init(16, 0) catch |err| switch (err) {
3411 error.SystemOutdated => return error.SkipZigTest,
3412 error.PermissionDenied => return error.SkipZigTest,
3413 else => return err,
3414 };
3415 defer ring.deinit();
3416
3417 const group_id = 1337;
3418 const buffer_id = 0;
3419
3420 const buffer_len = 128;
3421 var buffers: [4][buffer_len]u8 = undefined;
3422
3423 // Provide 4 buffers
3424
3425 {
3426 const sqe = try ring.provide_buffers(0xcccccccc, @as([*]u8, @ptrCast(&buffers)), buffer_len, buffers.len, group_id, buffer_id);
3427 try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
3428 try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
3429 try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3430 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3431 try testing.expectEqual(@as(u32, 1), try ring.submit());
3432
3433 const cqe = try ring.copy_cqe();
3434 switch (cqe.err()) {
3435 // Happens when the kernel is < 5.7
3436 .INVAL => return error.SkipZigTest,
3437 // Happens on the kernel 5.4
3438 .BADF => return error.SkipZigTest,
3439 .SUCCESS => {},
3440 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3441 }
3442 try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
3443 }
3444
3445 const socket_test_harness = try createSocketTestHarness(&ring);
3446 defer socket_test_harness.close();
3447
3448 // Do 4 send on the socket
3449
3450 {
3451 var i: usize = 0;
3452 while (i < buffers.len) : (i += 1) {
3453 _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'z'} ** buffer_len), 0);
3454 try testing.expectEqual(@as(u32, 1), try ring.submit());
3455 }
3456
3457 var cqes: [4]linux.io_uring_cqe = undefined;
3458 try testing.expectEqual(@as(u32, 4), try ring.copy_cqes(&cqes, 4));
3459 }
3460
3461 // Do 4 recv which should consume all buffers
3462
3463 // Deliberately put something we don't expect in the buffers
3464 @memset(mem.sliceAsBytes(&buffers), 1);
3465
3466 var i: usize = 0;
3467 while (i < buffers.len) : (i += 1) {
3468 const sqe = try ring.recv(0xdededede, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3469 try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
3470 try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
3471 try testing.expectEqual(@as(u64, 0), sqe.addr);
3472 try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3473 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3474 try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
3475 try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
3476 try testing.expectEqual(@as(u32, 1), try ring.submit());
3477
3478 const cqe = try ring.copy_cqe();
3479 switch (cqe.err()) {
3480 .SUCCESS => {},
3481 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3482 }
3483
3484 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3485 const used_buffer_id = cqe.flags >> 16;
3486 try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
3487 try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3488
3489 try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
3490 const buffer = buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))];
3491 try testing.expectEqualSlices(u8, &([_]u8{'z'} ** buffer_len), buffer);
3492 }
3493
3494 // This recv should fail
3495
3496 {
3497 const sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3498 try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
3499 try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
3500 try testing.expectEqual(@as(u64, 0), sqe.addr);
3501 try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3502 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3503 try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
3504 try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
3505 try testing.expectEqual(@as(u32, 1), try ring.submit());
3506
3507 const cqe = try ring.copy_cqe();
3508 switch (cqe.err()) {
3509 // Expected
3510 .NOBUFS => {},
3511 .SUCCESS => std.debug.panic("unexpected success", .{}),
3512 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3513 }
3514 try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3515 }
3516
3517 // Provide 1 buffer again
3518
3519 const reprovided_buffer_id = 2;
3520
3521 {
3522 _ = try ring.provide_buffers(0xabababab, @as([*]u8, @ptrCast(&buffers[reprovided_buffer_id])), buffer_len, 1, group_id, reprovided_buffer_id);
3523 try testing.expectEqual(@as(u32, 1), try ring.submit());
3524
3525 const cqe = try ring.copy_cqe();
3526 switch (cqe.err()) {
3527 .SUCCESS => {},
3528 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3529 }
3530 }
3531
3532 // Redo 1 send on the server socket
3533
3534 {
3535 _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'w'} ** buffer_len), 0);
3536 try testing.expectEqual(@as(u32, 1), try ring.submit());
3537
3538 _ = try ring.copy_cqe();
3539 }
3540
3541 // Final recv which should work
3542
3543 // Deliberately put something we don't expect in the buffers
3544 @memset(mem.sliceAsBytes(&buffers), 1);
3545
3546 {
3547 const sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
3548 try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
3549 try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
3550 try testing.expectEqual(@as(u64, 0), sqe.addr);
3551 try testing.expectEqual(@as(u32, buffer_len), sqe.len);
3552 try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
3553 try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
3554 try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
3555 try testing.expectEqual(@as(u32, 1), try ring.submit());
3556
3557 const cqe = try ring.copy_cqe();
3558 switch (cqe.err()) {
3559 .SUCCESS => {},
3560 else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
3561 }
3562
3563 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
3564 const used_buffer_id = cqe.flags >> 16;
3565 try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
3566 try testing.expectEqual(@as(i32, buffer_len), cqe.res);
3567 try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
3568 const buffer = buffers[used_buffer_id][0..@as(usize, @intCast(cqe.res))];
3569 try testing.expectEqualSlices(u8, &([_]u8{'w'} ** buffer_len), buffer);
3570 }
3571}
3572
3573/// Used for testing server/client interactions.
3574const SocketTestHarness = struct {
3575 listener: posix.socket_t,
3576 server: posix.socket_t,
3577 client: posix.socket_t,
3578
3579 fn close(self: SocketTestHarness) void {
3580 posix.close(self.client);
3581 posix.close(self.listener);
3582 }
3583};
3584
3585fn createSocketTestHarness(ring: *IoUring) !SocketTestHarness {
3586 // Create a TCP server socket
3587 var address: linux.sockaddr.in = .{
3588 .port = 0,
3589 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3590 };
3591 const listener_socket = try createListenerSocket(&address);
3592 errdefer posix.close(listener_socket);
3593
3594 // Submit 1 accept
3595 var accept_addr: posix.sockaddr = undefined;
3596 var accept_addr_len: posix.socklen_t = @sizeOf(@TypeOf(accept_addr));
3597 _ = try ring.accept(0xaaaaaaaa, listener_socket, &accept_addr, &accept_addr_len, 0);
3598
3599 // Create a TCP client socket
3600 const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3601 errdefer posix.close(client);
3602 _ = try ring.connect(0xcccccccc, client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3603
3604 try testing.expectEqual(@as(u32, 2), try ring.submit());
3605
3606 var cqe_accept = try ring.copy_cqe();
3607 if (cqe_accept.err() == .INVAL) return error.SkipZigTest;
3608 var cqe_connect = try ring.copy_cqe();
3609 if (cqe_connect.err() == .INVAL) return error.SkipZigTest;
3610
3611 // The accept/connect CQEs may arrive in any order, the connect CQE will sometimes come first:
3612 if (cqe_accept.user_data == 0xcccccccc and cqe_connect.user_data == 0xaaaaaaaa) {
3613 const a = cqe_accept;
3614 const b = cqe_connect;
3615 cqe_accept = b;
3616 cqe_connect = a;
3617 }
3618
3619 try testing.expectEqual(@as(u64, 0xaaaaaaaa), cqe_accept.user_data);
3620 if (cqe_accept.res <= 0) std.debug.print("\ncqe_accept.res={}\n", .{cqe_accept.res});
3621 try testing.expect(cqe_accept.res > 0);
3622 try testing.expectEqual(@as(u32, 0), cqe_accept.flags);
3623 try testing.expectEqual(linux.io_uring_cqe{
3624 .user_data = 0xcccccccc,
3625 .res = 0,
3626 .flags = 0,
3627 }, cqe_connect);
3628
3629 // All good
3630
3631 return SocketTestHarness{
3632 .listener = listener_socket,
3633 .server = cqe_accept.res,
3634 .client = client,
3635 };
3636}
3637
3638fn createListenerSocket(address: *linux.sockaddr.in) !posix.socket_t {
3639 const kernel_backlog = 1;
3640 const listener_socket = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3641 errdefer posix.close(listener_socket);
3642
3643 try posix.setsockopt(listener_socket, posix.SOL.SOCKET, posix.SO.REUSEADDR, &mem.toBytes(@as(c_int, 1)));
3644 try posix.bind(listener_socket, addrAny(address), @sizeOf(linux.sockaddr.in));
3645 try posix.listen(listener_socket, kernel_backlog);
3646
3647 // set address to the OS-chosen IP/port.
3648 var slen: posix.socklen_t = @sizeOf(linux.sockaddr.in);
3649 try posix.getsockname(listener_socket, addrAny(address), &slen);
3650
3651 return listener_socket;
3652}
3653
3654test "accept multishot" {
3655 if (!is_linux) return error.SkipZigTest;
3656
3657 var ring = IoUring.init(16, 0) catch |err| switch (err) {
3658 error.SystemOutdated => return error.SkipZigTest,
3659 error.PermissionDenied => return error.SkipZigTest,
3660 else => return err,
3661 };
3662 defer ring.deinit();
3663
3664 var address: linux.sockaddr.in = .{
3665 .port = 0,
3666 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3667 };
3668 const listener_socket = try createListenerSocket(&address);
3669 defer posix.close(listener_socket);
3670
3671 // submit multishot accept operation
3672 var addr: posix.sockaddr = undefined;
3673 var addr_len: posix.socklen_t = @sizeOf(@TypeOf(addr));
3674 const userdata: u64 = 0xaaaaaaaa;
3675 _ = try ring.accept_multishot(userdata, listener_socket, &addr, &addr_len, 0);
3676 try testing.expectEqual(@as(u32, 1), try ring.submit());
3677
3678 var nr: usize = 4; // number of clients to connect
3679 while (nr > 0) : (nr -= 1) {
3680 // connect client
3681 const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3682 errdefer posix.close(client);
3683 try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3684
3685 // test accept completion
3686 var cqe = try ring.copy_cqe();
3687 if (cqe.err() == .INVAL) return error.SkipZigTest;
3688 try testing.expect(cqe.res > 0);
3689 try testing.expect(cqe.user_data == userdata);
3690 try testing.expect(cqe.flags & linux.IORING_CQE_F_MORE > 0); // more flag is set
3691
3692 posix.close(client);
3693 }
3694}
3695
3696test "accept/connect/send_zc/recv" {
3697 try skipKernelLessThan(.{ .major = 6, .minor = 0, .patch = 0 });
3698
3699 var ring = IoUring.init(16, 0) catch |err| switch (err) {
3700 error.SystemOutdated => return error.SkipZigTest,
3701 error.PermissionDenied => return error.SkipZigTest,
3702 else => return err,
3703 };
3704 defer ring.deinit();
3705
3706 const socket_test_harness = try createSocketTestHarness(&ring);
3707 defer socket_test_harness.close();
3708
3709 const buffer_send = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
3710 var buffer_recv = [_]u8{0} ** 10;
3711
3712 // zero-copy send
3713 const sqe_send = try ring.send_zc(0xeeeeeeee, socket_test_harness.client, buffer_send[0..], 0, 0);
3714 sqe_send.flags |= linux.IOSQE_IO_LINK;
3715 _ = try ring.recv(0xffffffff, socket_test_harness.server, .{ .buffer = buffer_recv[0..] }, 0);
3716 try testing.expectEqual(@as(u32, 2), try ring.submit());
3717
3718 var cqe_send = try ring.copy_cqe();
3719 // First completion of zero-copy send.
3720 // IORING_CQE_F_MORE, means that there
3721 // will be a second completion event / notification for the
3722 // request, with the user_data field set to the same value.
3723 // buffer_send must be keep alive until second cqe.
3724 try testing.expectEqual(linux.io_uring_cqe{
3725 .user_data = 0xeeeeeeee,
3726 .res = buffer_send.len,
3727 .flags = linux.IORING_CQE_F_MORE,
3728 }, cqe_send);
3729
3730 cqe_send, const cqe_recv = brk: {
3731 const cqe1 = try ring.copy_cqe();
3732 const cqe2 = try ring.copy_cqe();
3733 break :brk if (cqe1.user_data == 0xeeeeeeee) .{ cqe1, cqe2 } else .{ cqe2, cqe1 };
3734 };
3735
3736 try testing.expectEqual(linux.io_uring_cqe{
3737 .user_data = 0xffffffff,
3738 .res = buffer_recv.len,
3739 .flags = cqe_recv.flags & linux.IORING_CQE_F_SOCK_NONEMPTY,
3740 }, cqe_recv);
3741 try testing.expectEqualSlices(u8, buffer_send[0..buffer_recv.len], buffer_recv[0..]);
3742
3743 // Second completion of zero-copy send.
3744 // IORING_CQE_F_NOTIF in flags signals that kernel is done with send_buffer
3745 try testing.expectEqual(linux.io_uring_cqe{
3746 .user_data = 0xeeeeeeee,
3747 .res = 0,
3748 .flags = linux.IORING_CQE_F_NOTIF,
3749 }, cqe_send);
3750}
3751
3752test "accept_direct" {
3753 try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3754
3755 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3756 error.SystemOutdated => return error.SkipZigTest,
3757 error.PermissionDenied => return error.SkipZigTest,
3758 else => return err,
3759 };
3760 defer ring.deinit();
3761 var address: linux.sockaddr.in = .{
3762 .port = 0,
3763 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3764 };
3765
3766 // register direct file descriptors
3767 var registered_fds = [_]linux.fd_t{-1} ** 2;
3768 try ring.register_files(registered_fds[0..]);
3769
3770 const listener_socket = try createListenerSocket(&address);
3771 defer posix.close(listener_socket);
3772
3773 const accept_userdata: u64 = 0xaaaaaaaa;
3774 const read_userdata: u64 = 0xbbbbbbbb;
3775 const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
3776
3777 for (0..2) |_| {
3778 for (registered_fds, 0..) |_, i| {
3779 var buffer_recv = [_]u8{0} ** 16;
3780 const buffer_send: []const u8 = data[0 .. data.len - i]; // make it different at each loop
3781
3782 // submit accept, will chose registered fd and return index in cqe
3783 _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0);
3784 try testing.expectEqual(@as(u32, 1), try ring.submit());
3785
3786 // connect
3787 const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3788 try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3789 defer posix.close(client);
3790
3791 // accept completion
3792 const cqe_accept = try ring.copy_cqe();
3793 try testing.expectEqual(posix.E.SUCCESS, cqe_accept.err());
3794 const fd_index = cqe_accept.res;
3795 try testing.expect(fd_index < registered_fds.len);
3796 try testing.expect(cqe_accept.user_data == accept_userdata);
3797
3798 // send data
3799 _ = try posix.send(client, buffer_send, 0);
3800
3801 // Example of how to use registered fd:
3802 // Submit receive to fixed file returned by accept (fd_index).
3803 // Fd field is set to registered file index, returned by accept.
3804 // Flag linux.IOSQE_FIXED_FILE must be set.
3805 const recv_sqe = try ring.recv(read_userdata, fd_index, .{ .buffer = &buffer_recv }, 0);
3806 recv_sqe.flags |= linux.IOSQE_FIXED_FILE;
3807 try testing.expectEqual(@as(u32, 1), try ring.submit());
3808
3809 // accept receive
3810 const recv_cqe = try ring.copy_cqe();
3811 try testing.expect(recv_cqe.user_data == read_userdata);
3812 try testing.expect(recv_cqe.res == buffer_send.len);
3813 try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]);
3814 }
3815 // no more available fds, accept will get NFILE error
3816 {
3817 // submit accept
3818 _ = try ring.accept_direct(accept_userdata, listener_socket, null, null, 0);
3819 try testing.expectEqual(@as(u32, 1), try ring.submit());
3820 // connect
3821 const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3822 try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3823 defer posix.close(client);
3824 // completion with error
3825 const cqe_accept = try ring.copy_cqe();
3826 try testing.expect(cqe_accept.user_data == accept_userdata);
3827 try testing.expectEqual(posix.E.NFILE, cqe_accept.err());
3828 }
3829 // return file descriptors to kernel
3830 try ring.register_files_update(0, registered_fds[0..]);
3831 }
3832 try ring.unregister_files();
3833}
3834
3835test "accept_multishot_direct" {
3836 try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3837
3838 if (builtin.cpu.arch == .riscv64) {
3839 // https://github.com/ziglang/zig/issues/25734
3840 return error.SkipZigTest;
3841 }
3842
3843 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3844 error.SystemOutdated => return error.SkipZigTest,
3845 error.PermissionDenied => return error.SkipZigTest,
3846 else => return err,
3847 };
3848 defer ring.deinit();
3849
3850 var address: linux.sockaddr.in = .{
3851 .port = 0,
3852 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3853 };
3854
3855 var registered_fds = [_]linux.fd_t{-1} ** 2;
3856 try ring.register_files(registered_fds[0..]);
3857
3858 const listener_socket = try createListenerSocket(&address);
3859 defer posix.close(listener_socket);
3860
3861 const accept_userdata: u64 = 0xaaaaaaaa;
3862
3863 for (0..2) |_| {
3864 // submit multishot accept
3865 // Will chose registered fd and return index of the selected registered file in cqe.
3866 _ = try ring.accept_multishot_direct(accept_userdata, listener_socket, null, null, 0);
3867 try testing.expectEqual(@as(u32, 1), try ring.submit());
3868
3869 for (registered_fds) |_| {
3870 // connect
3871 const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3872 try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3873 defer posix.close(client);
3874
3875 // accept completion
3876 const cqe_accept = try ring.copy_cqe();
3877 const fd_index = cqe_accept.res;
3878 try testing.expect(fd_index < registered_fds.len);
3879 try testing.expect(cqe_accept.user_data == accept_userdata);
3880 try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE > 0); // has more is set
3881 }
3882 // No more available fds, accept will get NFILE error.
3883 // Multishot is terminated (more flag is not set).
3884 {
3885 // connect
3886 const client = try posix.socket(address.family, posix.SOCK.STREAM | posix.SOCK.CLOEXEC, 0);
3887 try posix.connect(client, addrAny(&address), @sizeOf(linux.sockaddr.in));
3888 defer posix.close(client);
3889 // completion with error
3890 const cqe_accept = try ring.copy_cqe();
3891 try testing.expect(cqe_accept.user_data == accept_userdata);
3892 try testing.expectEqual(posix.E.NFILE, cqe_accept.err());
3893 try testing.expect(cqe_accept.flags & linux.IORING_CQE_F_MORE == 0); // has more is not set
3894 }
3895 // return file descriptors to kernel
3896 try ring.register_files_update(0, registered_fds[0..]);
3897 }
3898 try ring.unregister_files();
3899}
3900
3901test "socket" {
3902 try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3903
3904 var ring = IoUring.init(1, 0) catch |err| switch (err) {
3905 error.SystemOutdated => return error.SkipZigTest,
3906 error.PermissionDenied => return error.SkipZigTest,
3907 else => return err,
3908 };
3909 defer ring.deinit();
3910
3911 // prepare, submit socket operation
3912 _ = try ring.socket(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0);
3913 try testing.expectEqual(@as(u32, 1), try ring.submit());
3914
3915 // test completion
3916 var cqe = try ring.copy_cqe();
3917 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
3918 const fd: linux.fd_t = @intCast(cqe.res);
3919 try testing.expect(fd > 2);
3920
3921 posix.close(fd);
3922}
3923
3924test "socket_direct/socket_direct_alloc/close_direct" {
3925 try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
3926
3927 var ring = IoUring.init(2, 0) catch |err| switch (err) {
3928 error.SystemOutdated => return error.SkipZigTest,
3929 error.PermissionDenied => return error.SkipZigTest,
3930 else => return err,
3931 };
3932 defer ring.deinit();
3933
3934 var registered_fds = [_]linux.fd_t{-1} ** 3;
3935 try ring.register_files(registered_fds[0..]);
3936
3937 // create socket in registered file descriptor at index 0 (last param)
3938 _ = try ring.socket_direct(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0, 0);
3939 try testing.expectEqual(@as(u32, 1), try ring.submit());
3940 var cqe_socket = try ring.copy_cqe();
3941 try testing.expectEqual(posix.E.SUCCESS, cqe_socket.err());
3942 try testing.expect(cqe_socket.res == 0);
3943
3944 // create socket in registered file descriptor at index 1 (last param)
3945 _ = try ring.socket_direct(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0, 1);
3946 try testing.expectEqual(@as(u32, 1), try ring.submit());
3947 cqe_socket = try ring.copy_cqe();
3948 try testing.expectEqual(posix.E.SUCCESS, cqe_socket.err());
3949 try testing.expect(cqe_socket.res == 0); // res is 0 when index is specified
3950
3951 // create socket in kernel chosen file descriptor index (_alloc version)
3952 // completion res has index from registered files
3953 _ = try ring.socket_direct_alloc(0, linux.AF.INET, posix.SOCK.STREAM, 0, 0);
3954 try testing.expectEqual(@as(u32, 1), try ring.submit());
3955 cqe_socket = try ring.copy_cqe();
3956 try testing.expectEqual(posix.E.SUCCESS, cqe_socket.err());
3957 try testing.expect(cqe_socket.res == 2); // returns registered file index
3958
3959 // use sockets from registered_fds in connect operation
3960 var address: linux.sockaddr.in = .{
3961 .port = 0,
3962 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
3963 };
3964 const listener_socket = try createListenerSocket(&address);
3965 defer posix.close(listener_socket);
3966 const accept_userdata: u64 = 0xaaaaaaaa;
3967 const connect_userdata: u64 = 0xbbbbbbbb;
3968 const close_userdata: u64 = 0xcccccccc;
3969 for (registered_fds, 0..) |_, fd_index| {
3970 // prepare accept
3971 _ = try ring.accept(accept_userdata, listener_socket, null, null, 0);
3972 // prepare connect with fixed socket
3973 const connect_sqe = try ring.connect(connect_userdata, @intCast(fd_index), addrAny(&address), @sizeOf(linux.sockaddr.in));
3974 connect_sqe.flags |= linux.IOSQE_FIXED_FILE; // fd is fixed file index
3975 // submit both
3976 try testing.expectEqual(@as(u32, 2), try ring.submit());
3977 // get completions
3978 var cqe_connect = try ring.copy_cqe();
3979 var cqe_accept = try ring.copy_cqe();
3980 // ignore order
3981 if (cqe_connect.user_data == accept_userdata and cqe_accept.user_data == connect_userdata) {
3982 const a = cqe_accept;
3983 const b = cqe_connect;
3984 cqe_accept = b;
3985 cqe_connect = a;
3986 }
3987 // test connect completion
3988 try testing.expect(cqe_connect.user_data == connect_userdata);
3989 try testing.expectEqual(posix.E.SUCCESS, cqe_connect.err());
3990 // test accept completion
3991 try testing.expect(cqe_accept.user_data == accept_userdata);
3992 try testing.expectEqual(posix.E.SUCCESS, cqe_accept.err());
3993
3994 // submit and test close_direct
3995 _ = try ring.close_direct(close_userdata, @intCast(fd_index));
3996 try testing.expectEqual(@as(u32, 1), try ring.submit());
3997 var cqe_close = try ring.copy_cqe();
3998 try testing.expect(cqe_close.user_data == close_userdata);
3999 try testing.expectEqual(posix.E.SUCCESS, cqe_close.err());
4000 }
4001
4002 try ring.unregister_files();
4003}
4004
4005test "openat_direct/close_direct" {
4006 try skipKernelLessThan(.{ .major = 5, .minor = 19, .patch = 0 });
4007
4008 var ring = IoUring.init(2, 0) catch |err| switch (err) {
4009 error.SystemOutdated => return error.SkipZigTest,
4010 error.PermissionDenied => return error.SkipZigTest,
4011 else => return err,
4012 };
4013 defer ring.deinit();
4014
4015 var registered_fds = [_]linux.fd_t{-1} ** 3;
4016 try ring.register_files(registered_fds[0..]);
4017
4018 var tmp = std.testing.tmpDir(.{});
4019 defer tmp.cleanup();
4020 const path = "test_io_uring_close_direct";
4021 const flags: linux.O = .{ .ACCMODE = .RDWR, .CREAT = true };
4022 const mode: posix.mode_t = 0o666;
4023 const user_data: u64 = 0;
4024
4025 // use registered file at index 0 (last param)
4026 _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 0);
4027 try testing.expectEqual(@as(u32, 1), try ring.submit());
4028 var cqe = try ring.copy_cqe();
4029 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4030 try testing.expect(cqe.res == 0);
4031
4032 // use registered file at index 1
4033 _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, 1);
4034 try testing.expectEqual(@as(u32, 1), try ring.submit());
4035 cqe = try ring.copy_cqe();
4036 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4037 try testing.expect(cqe.res == 0); // res is 0 when we specify index
4038
4039 // let kernel choose registered file index
4040 _ = try ring.openat_direct(user_data, tmp.dir.fd, path, flags, mode, linux.IORING_FILE_INDEX_ALLOC);
4041 try testing.expectEqual(@as(u32, 1), try ring.submit());
4042 cqe = try ring.copy_cqe();
4043 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4044 try testing.expect(cqe.res == 2); // chosen index is in res
4045
4046 // close all open file descriptors
4047 for (registered_fds, 0..) |_, fd_index| {
4048 _ = try ring.close_direct(user_data, @intCast(fd_index));
4049 try testing.expectEqual(@as(u32, 1), try ring.submit());
4050 var cqe_close = try ring.copy_cqe();
4051 try testing.expectEqual(posix.E.SUCCESS, cqe_close.err());
4052 }
4053 try ring.unregister_files();
4054}
4055
4056test "waitid" {
4057 try skipKernelLessThan(.{ .major = 6, .minor = 7, .patch = 0 });
4058
4059 var ring = IoUring.init(16, 0) catch |err| switch (err) {
4060 error.SystemOutdated => return error.SkipZigTest,
4061 error.PermissionDenied => return error.SkipZigTest,
4062 else => return err,
4063 };
4064 defer ring.deinit();
4065
4066 const pid = try posix.fork();
4067 if (pid == 0) {
4068 posix.exit(7);
4069 }
4070
4071 var siginfo: posix.siginfo_t = undefined;
4072 _ = try ring.waitid(0, .PID, pid, &siginfo, posix.W.EXITED, 0);
4073
4074 try testing.expectEqual(1, try ring.submit());
4075
4076 const cqe_waitid = try ring.copy_cqe();
4077 try testing.expectEqual(0, cqe_waitid.res);
4078 try testing.expectEqual(pid, siginfo.fields.common.first.piduid.pid);
4079 try testing.expectEqual(7, siginfo.fields.common.second.sigchld.status);
4080}
4081
4082/// For use in tests. Returns SkipZigTest if kernel version is less than required.
4083inline fn skipKernelLessThan(required: std.SemanticVersion) !void {
4084 if (!is_linux) return error.SkipZigTest;
4085
4086 var uts: linux.utsname = undefined;
4087 const res = linux.uname(&uts);
4088 switch (linux.errno(res)) {
4089 .SUCCESS => {},
4090 else => |errno| return posix.unexpectedErrno(errno),
4091 }
4092
4093 const release = mem.sliceTo(&uts.release, 0);
4094 // Strips potential extra, as kernel version might not be semver compliant, example "6.8.9-300.fc40.x86_64"
4095 const extra_index = std.mem.indexOfAny(u8, release, "-+");
4096 const stripped = release[0..(extra_index orelse release.len)];
4097 // Make sure the input don't rely on the extra we just stripped
4098 try testing.expect(required.pre == null and required.build == null);
4099
4100 var current = try std.SemanticVersion.parse(stripped);
4101 current.pre = null; // don't check pre field
4102 if (required.order(current) == .gt) return error.SkipZigTest;
4103}
4104
4105test BufferGroup {
4106 if (!is_linux) return error.SkipZigTest;
4107
4108 // Init IoUring
4109 var ring = IoUring.init(16, 0) catch |err| switch (err) {
4110 error.SystemOutdated => return error.SkipZigTest,
4111 error.PermissionDenied => return error.SkipZigTest,
4112 else => return err,
4113 };
4114 defer ring.deinit();
4115
4116 // Init buffer group for ring
4117 const group_id: u16 = 1; // buffers group id
4118 const buffers_count: u16 = 1; // number of buffers in buffer group
4119 const buffer_size: usize = 128; // size of each buffer in group
4120 var buf_grp = BufferGroup.init(
4121 &ring,
4122 testing.allocator,
4123 group_id,
4124 buffer_size,
4125 buffers_count,
4126 ) catch |err| switch (err) {
4127 // kernel older than 5.19
4128 error.ArgumentsInvalid => return error.SkipZigTest,
4129 else => return err,
4130 };
4131 defer buf_grp.deinit(testing.allocator);
4132
4133 // Create client/server fds
4134 const fds = try createSocketTestHarness(&ring);
4135 defer fds.close();
4136 const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
4137
4138 // Client sends data
4139 {
4140 _ = try ring.send(1, fds.client, data[0..], 0);
4141 const submitted = try ring.submit();
4142 try testing.expectEqual(1, submitted);
4143 const cqe_send = try ring.copy_cqe();
4144 if (cqe_send.err() == .INVAL) return error.SkipZigTest;
4145 try testing.expectEqual(linux.io_uring_cqe{ .user_data = 1, .res = data.len, .flags = 0 }, cqe_send);
4146 }
4147
4148 // Server uses buffer group receive
4149 {
4150 // Submit recv operation, buffer will be chosen from buffer group
4151 _ = try buf_grp.recv(2, fds.server, 0);
4152 const submitted = try ring.submit();
4153 try testing.expectEqual(1, submitted);
4154
4155 // ... when we have completion for recv operation
4156 const cqe = try ring.copy_cqe();
4157 try testing.expectEqual(2, cqe.user_data); // matches submitted user_data
4158 try testing.expect(cqe.res >= 0); // success
4159 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4160 try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len
4161
4162 // Get buffer from pool
4163 const buf = try buf_grp.get(cqe);
4164 try testing.expectEqualSlices(u8, &data, buf);
4165 // Release buffer to the kernel when application is done with it
4166 try buf_grp.put(cqe);
4167 }
4168}
4169
4170test "ring mapped buffers recv" {
4171 if (!is_linux) return error.SkipZigTest;
4172
4173 var ring = IoUring.init(16, 0) catch |err| switch (err) {
4174 error.SystemOutdated => return error.SkipZigTest,
4175 error.PermissionDenied => return error.SkipZigTest,
4176 else => return err,
4177 };
4178 defer ring.deinit();
4179
4180 // init buffer group
4181 const group_id: u16 = 1; // buffers group id
4182 const buffers_count: u16 = 2; // number of buffers in buffer group
4183 const buffer_size: usize = 4; // size of each buffer in group
4184 var buf_grp = BufferGroup.init(
4185 &ring,
4186 testing.allocator,
4187 group_id,
4188 buffer_size,
4189 buffers_count,
4190 ) catch |err| switch (err) {
4191 // kernel older than 5.19
4192 error.ArgumentsInvalid => return error.SkipZigTest,
4193 else => return err,
4194 };
4195 defer buf_grp.deinit(testing.allocator);
4196
4197 // create client/server fds
4198 const fds = try createSocketTestHarness(&ring);
4199 defer fds.close();
4200
4201 // for random user_data in sqe/cqe
4202 var Rnd = std.Random.DefaultPrng.init(std.testing.random_seed);
4203 var rnd = Rnd.random();
4204
4205 var round: usize = 4; // repeat send/recv cycle round times
4206 while (round > 0) : (round -= 1) {
4207 // client sends data
4208 const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
4209 {
4210 const user_data = rnd.int(u64);
4211 _ = try ring.send(user_data, fds.client, data[0..], 0);
4212 try testing.expectEqual(@as(u32, 1), try ring.submit());
4213 const cqe_send = try ring.copy_cqe();
4214 if (cqe_send.err() == .INVAL) return error.SkipZigTest;
4215 try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
4216 }
4217 var pos: usize = 0;
4218
4219 // read first chunk
4220 const cqe1 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
4221 var buf = try buf_grp.get(cqe1);
4222 try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
4223 pos += buf.len;
4224 // second chunk
4225 const cqe2 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
4226 buf = try buf_grp.get(cqe2);
4227 try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
4228 pos += buf.len;
4229
4230 // both buffers provided to the kernel are used so we get error
4231 // 'no more buffers', until we put buffers to the kernel
4232 {
4233 const user_data = rnd.int(u64);
4234 _ = try buf_grp.recv(user_data, fds.server, 0);
4235 try testing.expectEqual(@as(u32, 1), try ring.submit());
4236 const cqe = try ring.copy_cqe();
4237 try testing.expectEqual(user_data, cqe.user_data);
4238 try testing.expect(cqe.res < 0); // fail
4239 try testing.expectEqual(posix.E.NOBUFS, cqe.err());
4240 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
4241 try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
4242 }
4243
4244 // put buffers back to the kernel
4245 try buf_grp.put(cqe1);
4246 try buf_grp.put(cqe2);
4247
4248 // read remaining data
4249 while (pos < data.len) {
4250 const cqe = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
4251 buf = try buf_grp.get(cqe);
4252 try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
4253 pos += buf.len;
4254 try buf_grp.put(cqe);
4255 }
4256 }
4257}
4258
4259test "ring mapped buffers multishot recv" {
4260 if (!is_linux) return error.SkipZigTest;
4261
4262 var ring = IoUring.init(16, 0) catch |err| switch (err) {
4263 error.SystemOutdated => return error.SkipZigTest,
4264 error.PermissionDenied => return error.SkipZigTest,
4265 else => return err,
4266 };
4267 defer ring.deinit();
4268
4269 // init buffer group
4270 const group_id: u16 = 1; // buffers group id
4271 const buffers_count: u16 = 2; // number of buffers in buffer group
4272 const buffer_size: usize = 4; // size of each buffer in group
4273 var buf_grp = BufferGroup.init(
4274 &ring,
4275 testing.allocator,
4276 group_id,
4277 buffer_size,
4278 buffers_count,
4279 ) catch |err| switch (err) {
4280 // kernel older than 5.19
4281 error.ArgumentsInvalid => return error.SkipZigTest,
4282 else => return err,
4283 };
4284 defer buf_grp.deinit(testing.allocator);
4285
4286 // create client/server fds
4287 const fds = try createSocketTestHarness(&ring);
4288 defer fds.close();
4289
4290 // for random user_data in sqe/cqe
4291 var Rnd = std.Random.DefaultPrng.init(std.testing.random_seed);
4292 var rnd = Rnd.random();
4293
4294 var round: usize = 4; // repeat send/recv cycle round times
4295 while (round > 0) : (round -= 1) {
4296 // client sends data
4297 const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf };
4298 {
4299 const user_data = rnd.int(u64);
4300 _ = try ring.send(user_data, fds.client, data[0..], 0);
4301 try testing.expectEqual(@as(u32, 1), try ring.submit());
4302 const cqe_send = try ring.copy_cqe();
4303 if (cqe_send.err() == .INVAL) return error.SkipZigTest;
4304 try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
4305 }
4306
4307 // start multishot recv
4308 var recv_user_data = rnd.int(u64);
4309 _ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
4310 try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
4311
4312 // server reads data into provided buffers
4313 // there are 2 buffers of size 4, so each read gets only chunk of data
4314 // we read four chunks of 4, 4, 4, 4 bytes each
4315 var chunk: []const u8 = data[0..buffer_size]; // first chunk
4316 const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4317 try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0);
4318
4319 chunk = data[buffer_size .. buffer_size * 2]; // second chunk
4320 const cqe2 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4321 try testing.expect(cqe2.flags & linux.IORING_CQE_F_MORE > 0);
4322
4323 // both buffers provided to the kernel are used so we get error
4324 // 'no more buffers', until we put buffers to the kernel
4325 {
4326 const cqe = try ring.copy_cqe();
4327 try testing.expectEqual(recv_user_data, cqe.user_data);
4328 try testing.expect(cqe.res < 0); // fail
4329 try testing.expectEqual(posix.E.NOBUFS, cqe.err());
4330 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == 0); // IORING_CQE_F_BUFFER flags is set on success only
4331 // has more is not set
4332 // indicates that multishot is finished
4333 try testing.expect(cqe.flags & linux.IORING_CQE_F_MORE == 0);
4334 try testing.expectError(error.NoBufferSelected, cqe.buffer_id());
4335 }
4336
4337 // put buffers back to the kernel
4338 try buf_grp.put(cqe1);
4339 try buf_grp.put(cqe2);
4340
4341 // restart multishot
4342 recv_user_data = rnd.int(u64);
4343 _ = try buf_grp.recv_multishot(recv_user_data, fds.server, 0);
4344 try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
4345
4346 chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
4347 const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4348 try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0);
4349 try buf_grp.put(cqe3);
4350
4351 chunk = data[buffer_size * 3 ..]; // last chunk
4352 const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
4353 try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0);
4354 try buf_grp.put(cqe4);
4355
4356 // cancel pending multishot recv operation
4357 {
4358 const cancel_user_data = rnd.int(u64);
4359 _ = try ring.cancel(cancel_user_data, recv_user_data, 0);
4360 try testing.expectEqual(@as(u32, 1), try ring.submit());
4361
4362 // expect completion of cancel operation and completion of recv operation
4363 var cqe_cancel = try ring.copy_cqe();
4364 if (cqe_cancel.err() == .INVAL) return error.SkipZigTest;
4365 var cqe_recv = try ring.copy_cqe();
4366 if (cqe_recv.err() == .INVAL) return error.SkipZigTest;
4367
4368 // don't depend on order of completions
4369 if (cqe_cancel.user_data == recv_user_data and cqe_recv.user_data == cancel_user_data) {
4370 const a = cqe_cancel;
4371 const b = cqe_recv;
4372 cqe_cancel = b;
4373 cqe_recv = a;
4374 }
4375
4376 // Note on different kernel results:
4377 // on older kernel (tested with v6.0.16, v6.1.57, v6.2.12, v6.4.16)
4378 // cqe_cancel.err() == .NOENT
4379 // cqe_recv.err() == .NOBUFS
4380 // on kernel (tested with v6.5.0, v6.5.7)
4381 // cqe_cancel.err() == .SUCCESS
4382 // cqe_recv.err() == .CANCELED
4383 // Upstream reference: https://github.com/axboe/liburing/issues/984
4384
4385 // cancel operation is success (or NOENT on older kernels)
4386 try testing.expectEqual(cancel_user_data, cqe_cancel.user_data);
4387 try testing.expect(cqe_cancel.err() == .NOENT or cqe_cancel.err() == .SUCCESS);
4388
4389 // recv operation is failed with err CANCELED (or NOBUFS on older kernels)
4390 try testing.expectEqual(recv_user_data, cqe_recv.user_data);
4391 try testing.expect(cqe_recv.res < 0);
4392 try testing.expect(cqe_recv.err() == .NOBUFS or cqe_recv.err() == .CANCELED);
4393 try testing.expect(cqe_recv.flags & linux.IORING_CQE_F_MORE == 0);
4394 }
4395 }
4396}
4397
4398// Prepare, submit recv and get cqe using buffer group.
4399fn buf_grp_recv_submit_get_cqe(
4400 ring: *IoUring,
4401 buf_grp: *BufferGroup,
4402 fd: linux.fd_t,
4403 user_data: u64,
4404) !linux.io_uring_cqe {
4405 // prepare and submit recv
4406 const sqe = try buf_grp.recv(user_data, fd, 0);
4407 try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT);
4408 try testing.expect(sqe.buf_index == buf_grp.group_id);
4409 try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
4410 // get cqe, expect success
4411 const cqe = try ring.copy_cqe();
4412 try testing.expectEqual(user_data, cqe.user_data);
4413 try testing.expect(cqe.res >= 0); // success
4414 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4415 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
4416
4417 return cqe;
4418}
4419
4420fn expect_buf_grp_cqe(
4421 ring: *IoUring,
4422 buf_grp: *BufferGroup,
4423 user_data: u64,
4424 expected: []const u8,
4425) !linux.io_uring_cqe {
4426 // get cqe
4427 const cqe = try ring.copy_cqe();
4428 try testing.expectEqual(user_data, cqe.user_data);
4429 try testing.expect(cqe.res >= 0); // success
4430 try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
4431 try testing.expectEqual(expected.len, @as(usize, @intCast(cqe.res)));
4432 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4433
4434 // get buffer from pool
4435 const buffer_id = try cqe.buffer_id();
4436 const len = @as(usize, @intCast(cqe.res));
4437 const buf = buf_grp.get_by_id(buffer_id)[0..len];
4438 try testing.expectEqualSlices(u8, expected, buf);
4439
4440 return cqe;
4441}
4442
4443test "copy_cqes with wrapping sq.cqes buffer" {
4444 if (!is_linux) return error.SkipZigTest;
4445
4446 var ring = IoUring.init(2, 0) catch |err| switch (err) {
4447 error.SystemOutdated => return error.SkipZigTest,
4448 error.PermissionDenied => return error.SkipZigTest,
4449 else => return err,
4450 };
4451 defer ring.deinit();
4452
4453 try testing.expectEqual(2, ring.sq.sqes.len);
4454 try testing.expectEqual(4, ring.cq.cqes.len);
4455
4456 // submit 2 entries, receive 2 completions
4457 var cqes: [8]linux.io_uring_cqe = undefined;
4458 {
4459 for (0..2) |_| {
4460 const sqe = try ring.get_sqe();
4461 sqe.prep_timeout(&.{ .sec = 0, .nsec = 10000 }, 0, 0);
4462 try testing.expect(try ring.submit() == 1);
4463 }
4464 var cqe_count: u32 = 0;
4465 while (cqe_count < 2) {
4466 cqe_count += try ring.copy_cqes(&cqes, 2 - cqe_count);
4467 }
4468 }
4469
4470 try testing.expectEqual(2, ring.cq.head.*);
4471
4472 // sq.sqes len is 4, starting at position 2
4473 // every 4 entries submit wraps completion buffer
4474 // we are reading ring.cq.cqes at indexes 2,3,0,1
4475 for (1..1024) |i| {
4476 for (0..4) |_| {
4477 const sqe = try ring.get_sqe();
4478 sqe.prep_timeout(&.{ .sec = 0, .nsec = 10000 }, 0, 0);
4479 try testing.expect(try ring.submit() == 1);
4480 }
4481 var cqe_count: u32 = 0;
4482 while (cqe_count < 4) {
4483 cqe_count += try ring.copy_cqes(&cqes, 4 - cqe_count);
4484 }
4485 try testing.expectEqual(4, cqe_count);
4486 try testing.expectEqual(2 + 4 * i, ring.cq.head.*);
4487 }
4488}
4489
4490test "bind/listen/connect" {
4491 if (builtin.cpu.arch == .s390x) return error.SkipZigTest; // https://github.com/ziglang/zig/issues/25956
4492
4493 var ring = IoUring.init(4, 0) catch |err| switch (err) {
4494 error.SystemOutdated => return error.SkipZigTest,
4495 error.PermissionDenied => return error.SkipZigTest,
4496 else => return err,
4497 };
4498 defer ring.deinit();
4499
4500 const probe = ring.get_probe() catch return error.SkipZigTest;
4501 // LISTEN is higher required operation
4502 if (!probe.is_supported(.LISTEN)) return error.SkipZigTest;
4503
4504 var addr: linux.sockaddr.in = .{
4505 .port = 0,
4506 .addr = @bitCast([4]u8{ 127, 0, 0, 1 }),
4507 };
4508 const proto: u32 = if (addr.family == linux.AF.UNIX) 0 else linux.IPPROTO.TCP;
4509
4510 const listen_fd = brk: {
4511 // Create socket
4512 _ = try ring.socket(1, addr.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
4513 try testing.expectEqual(1, try ring.submit());
4514 var cqe = try ring.copy_cqe();
4515 try testing.expectEqual(1, cqe.user_data);
4516 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4517 const listen_fd: linux.fd_t = @intCast(cqe.res);
4518 try testing.expect(listen_fd > 2);
4519
4520 // Prepare: set socket option * 2, bind, listen
4521 var optval: u32 = 1;
4522 (try ring.setsockopt(2, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval))).link_next();
4523 (try ring.setsockopt(3, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEPORT, mem.asBytes(&optval))).link_next();
4524 (try ring.bind(4, listen_fd, addrAny(&addr), @sizeOf(linux.sockaddr.in), 0)).link_next();
4525 _ = try ring.listen(5, listen_fd, 1, 0);
4526 // Submit 4 operations
4527 try testing.expectEqual(4, try ring.submit());
4528 // Expect all to succeed
4529 for (2..6) |user_data| {
4530 cqe = try ring.copy_cqe();
4531 try testing.expectEqual(user_data, cqe.user_data);
4532 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4533 }
4534
4535 // Check that socket option is set
4536 optval = 0;
4537 _ = try ring.getsockopt(5, listen_fd, linux.SOL.SOCKET, linux.SO.REUSEADDR, mem.asBytes(&optval));
4538 try testing.expectEqual(1, try ring.submit());
4539 cqe = try ring.copy_cqe();
4540 try testing.expectEqual(5, cqe.user_data);
4541 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4542 try testing.expectEqual(1, optval);
4543
4544 // Read system assigned port into addr
4545 var addr_len: posix.socklen_t = @sizeOf(linux.sockaddr.in);
4546 try posix.getsockname(listen_fd, addrAny(&addr), &addr_len);
4547
4548 break :brk listen_fd;
4549 };
4550
4551 const connect_fd = brk: {
4552 // Create connect socket
4553 _ = try ring.socket(6, addr.family, linux.SOCK.STREAM | linux.SOCK.CLOEXEC, proto, 0);
4554 try testing.expectEqual(1, try ring.submit());
4555 const cqe = try ring.copy_cqe();
4556 try testing.expectEqual(6, cqe.user_data);
4557 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4558 // Get connect socket fd
4559 const connect_fd: linux.fd_t = @intCast(cqe.res);
4560 try testing.expect(connect_fd > 2 and connect_fd != listen_fd);
4561 break :brk connect_fd;
4562 };
4563
4564 // Prepare accept/connect operations
4565 _ = try ring.accept(7, listen_fd, null, null, 0);
4566 _ = try ring.connect(8, connect_fd, addrAny(&addr), @sizeOf(linux.sockaddr.in));
4567 try testing.expectEqual(2, try ring.submit());
4568 // Get listener accepted socket
4569 var accept_fd: posix.socket_t = 0;
4570 for (0..2) |_| {
4571 const cqe = try ring.copy_cqe();
4572 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4573 if (cqe.user_data == 7) {
4574 accept_fd = @intCast(cqe.res);
4575 } else {
4576 try testing.expectEqual(8, cqe.user_data);
4577 }
4578 }
4579 try testing.expect(accept_fd > 2 and accept_fd != listen_fd and accept_fd != connect_fd);
4580
4581 // Communicate
4582 try testSendRecv(&ring, connect_fd, accept_fd);
4583 try testSendRecv(&ring, accept_fd, connect_fd);
4584
4585 // Shutdown and close all sockets
4586 for ([_]posix.socket_t{ connect_fd, accept_fd, listen_fd }) |fd| {
4587 (try ring.shutdown(9, fd, posix.SHUT.RDWR)).link_next();
4588 _ = try ring.close(10, fd);
4589 try testing.expectEqual(2, try ring.submit());
4590 for (0..2) |i| {
4591 const cqe = try ring.copy_cqe();
4592 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4593 try testing.expectEqual(9 + i, cqe.user_data);
4594 }
4595 }
4596}
4597
4598fn testSendRecv(ring: *IoUring, send_fd: posix.socket_t, recv_fd: posix.socket_t) !void {
4599 const buffer_send = "0123456789abcdf" ** 10;
4600 var buffer_recv: [buffer_send.len * 2]u8 = undefined;
4601
4602 // 2 sends
4603 _ = try ring.send(1, send_fd, buffer_send, linux.MSG.WAITALL);
4604 _ = try ring.send(2, send_fd, buffer_send, linux.MSG.WAITALL);
4605 try testing.expectEqual(2, try ring.submit());
4606 for (0..2) |i| {
4607 const cqe = try ring.copy_cqe();
4608 try testing.expectEqual(1 + i, cqe.user_data);
4609 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4610 try testing.expectEqual(buffer_send.len, @as(usize, @intCast(cqe.res)));
4611 }
4612
4613 // receive
4614 var recv_len: usize = 0;
4615 while (recv_len < buffer_send.len * 2) {
4616 _ = try ring.recv(3, recv_fd, .{ .buffer = buffer_recv[recv_len..] }, 0);
4617 try testing.expectEqual(1, try ring.submit());
4618 const cqe = try ring.copy_cqe();
4619 try testing.expectEqual(3, cqe.user_data);
4620 try testing.expectEqual(posix.E.SUCCESS, cqe.err());
4621 recv_len += @intCast(cqe.res);
4622 }
4623
4624 // inspect recv buffer
4625 try testing.expectEqualSlices(u8, buffer_send, buffer_recv[0..buffer_send.len]);
4626 try testing.expectEqualSlices(u8, buffer_send, buffer_recv[buffer_send.len..]);
4627}
4628
4629fn addrAny(addr: *linux.sockaddr.in) *linux.sockaddr {
4630 return @ptrCast(addr);
4631}