Commit 254c05a9e1

Igor Anić <igor.anic@gmail.com>
2024-03-31 23:57:16
io_uring: simplify copy_cqe logic
First copy as much as we can in this cycle. If there is more needed wrap and start from the buffer 0 position.
1 parent 704660c
Changed files (1)
lib
std
os
lib/std/os/linux/IoUring.zig
@@ -282,19 +282,15 @@ fn copy_cqes_ready(self: *IoUring, cqes: []linux.io_uring_cqe) u32 {
     const ready = self.cq_ready();
     const count = @min(cqes.len, ready);
     const head = self.cq.head.* & self.cq.mask;
-    const tail = (self.cq.head.* +% count) & self.cq.mask;
-
-    if (head < tail) {
-        // head behind tail -> no wrapping
-        @memcpy(cqes[0..count], self.cq.cqes[head..tail]);
-    } else {
-        // head in front of tail -> buffer wraps
-        const two_copies_required: bool = self.cq.cqes.len - head < count;
-        const amount_to_copy_in_first = if (two_copies_required) self.cq.cqes.len - head else count;
-        @memcpy(cqes[0..amount_to_copy_in_first], self.cq.cqes[head .. head + amount_to_copy_in_first]);
-        if (two_copies_required) {
-            @memcpy(cqes[amount_to_copy_in_first..count], self.cq.cqes[0..tail]);
-        }
+
+    // before wrapping
+    const n = @min(self.cq.cqes.len - head, count);
+    @memcpy(cqes[0..n], self.cq.cqes[head..][0..n]);
+
+    if (count > n) {
+        // wrap self.cq.cqes
+        const w = count - n;
+        @memcpy(cqes[n..][0..w], self.cq.cqes[0..w]);
     }
 
     self.cq_advance(count);
@@ -4231,24 +4227,49 @@ fn expect_buf_grp_cqe(
     return cqe;
 }
 
-test "failing test for issue 19451" {
-    var ring = try IoUring.init(2, 0);
+test "copy_cqes with wrapping sq.cqes buffer" {
+    if (!is_linux) return error.SkipZigTest;
+
+    var ring = IoUring.init(2, 0) catch |err| switch (err) {
+        error.SystemOutdated => return error.SkipZigTest,
+        error.PermissionDenied => return error.SkipZigTest,
+        else => return err,
+    };
     defer ring.deinit();
 
     try testing.expectEqual(2, ring.sq.sqes.len);
     try testing.expectEqual(4, ring.cq.cqes.len);
 
-    for (0..4) |i| {
-        const sqe = try ring.get_sqe();
-        sqe.prep_timeout(&.{ .tv_sec = 0, .tv_nsec = 10000 }, 0, 0);
-        sqe.user_data = i;
-        _ = try ring.submit();
+    // submit 2 entries, receive 2 completions
+    var cqes: [8]linux.io_uring_cqe = undefined;
+    {
+        for (0..2) |_| {
+            const sqe = try ring.get_sqe();
+            sqe.prep_timeout(&.{ .tv_sec = 0, .tv_nsec = 10000 }, 0, 0);
+            try testing.expect(try ring.submit() == 1);
+        }
+        var cqe_count: u32 = 0;
+        while (cqe_count < 2) {
+            cqe_count += try ring.copy_cqes(&cqes, 2 - cqe_count);
+        }
     }
 
-    var cqe_count: u32 = 0;
-    while (cqe_count < 4) {
-        var cqes: [8]linux.io_uring_cqe = undefined;
-        cqe_count += try ring.copy_cqes(&cqes, 4 - cqe_count);
+    try testing.expectEqual(2, ring.cq.head.*);
+
+    // sq.sqes len is 4, starting at position 2
+    // every 4 entries submit wraps completion buffer
+    // we are reading ring.cq.cqes at indexes 2,3,0,1
+    for (1..1024) |i| {
+        for (0..4) |_| {
+            const sqe = try ring.get_sqe();
+            sqe.prep_timeout(&.{ .tv_sec = 0, .tv_nsec = 10000 }, 0, 0);
+            try testing.expect(try ring.submit() == 1);
+        }
+        var cqe_count: u32 = 0;
+        while (cqe_count < 4) {
+            cqe_count += try ring.copy_cqes(&cqes, 4 - cqe_count);
+        }
+        try testing.expectEqual(4, cqe_count);
+        try testing.expectEqual(2 + 4 * i, ring.cq.head.*);
     }
-    try testing.expectEqual(4, cqe_count);
 }