Commit acb8af468f
Changed files (1)
lib
std
os
linux
lib/std/os/linux/io_uring.zig
@@ -2830,6 +2830,390 @@ test "linkat" {
try testing.expectEqualStrings("hello", second_file_data[0..read]);
}
+test "provide_buffers: read" {
+ if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+ var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ error.SystemOutdated => return error.SkipZigTest,
+ error.PermissionDenied => return error.SkipZigTest,
+ else => return err,
+ };
+ defer ring.deinit();
+
+ const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0);
+ defer os.close(fd);
+
+ const group_id = 1337;
+ const buffer_id = 0;
+
+ const buffer_len = 128;
+
+ var buffers: [4][buffer_len]u8 = undefined;
+
+ // Provide 4 buffers
+
+ {
+ const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id);
+ try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
+ try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
+ try testing.expectEqual(@as(u32, buffers[0].len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ // Happens when the kernel is < 5.7
+ .INVAL => return error.SkipZigTest,
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
+ }
+
+ // Do 4 reads which should consume all buffers
+
+ var i: usize = 0;
+ while (i < buffers.len) : (i += 1) {
+ var sqe = try ring.read(0xdededede, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
+ try testing.expectEqual(@as(i32, fd), sqe.fd);
+ try testing.expectEqual(@as(u64, 0), sqe.addr);
+ try testing.expectEqual(@as(u32, buffer_len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+
+ try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
+ const used_buffer_id = cqe.flags >> 16;
+ try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
+ try testing.expectEqual(@as(i32, buffer_len), cqe.res);
+
+ try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
+ try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]);
+ }
+
+ // This read should fail
+
+ {
+ var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
+ try testing.expectEqual(@as(i32, fd), sqe.fd);
+ try testing.expectEqual(@as(u64, 0), sqe.addr);
+ try testing.expectEqual(@as(u32, buffer_len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ // Expected
+ .NOBUFS => {},
+ .SUCCESS => std.debug.panic("unexpected success", .{}),
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
+ }
+
+ // Provide 1 buffer again
+
+ // Deliberately put something we don't expect in the buffers
+ mem.set(u8, mem.sliceAsBytes(&buffers), 42);
+
+ const reprovided_buffer_id = 2;
+
+ {
+ _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ }
+
+ // Final read which should work
+
+ {
+ var sqe = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(linux.IORING_OP.READ, sqe.opcode);
+ try testing.expectEqual(@as(i32, fd), sqe.fd);
+ try testing.expectEqual(@as(u64, 0), sqe.addr);
+ try testing.expectEqual(@as(u32, buffer_len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+
+ try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
+ const used_buffer_id = cqe.flags >> 16;
+ try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
+ try testing.expectEqual(@as(i32, buffer_len), cqe.res);
+ try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
+ try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]);
+ }
+}
+
+test "remove_buffers" {
+ if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+ var ring = IO_Uring.init(1, 0) catch |err| switch (err) {
+ error.SystemOutdated => return error.SkipZigTest,
+ error.PermissionDenied => return error.SkipZigTest,
+ else => return err,
+ };
+ defer ring.deinit();
+
+ const fd = try os.openZ("/dev/zero", os.O.RDONLY | os.O.CLOEXEC, 0);
+ defer os.close(fd);
+
+ const group_id = 1337;
+ const buffer_id = 0;
+
+ const buffer_len = 128;
+
+ var buffers: [4][buffer_len]u8 = undefined;
+
+ // Provide 4 buffers
+
+ {
+ _ = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
+ }
+
+ // Remove the first 3 buffers
+
+ {
+ var sqe = try ring.remove_buffers(0xbababababa, 3, group_id);
+ try testing.expectEqual(linux.IORING_OP.REMOVE_BUFFERS, sqe.opcode);
+ try testing.expectEqual(@as(i32, 3), sqe.fd);
+ try testing.expectEqual(@as(u64, 0), sqe.addr);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ try testing.expectEqual(@as(u64, 0xbababababa), cqe.user_data);
+ }
+
+ // This read should work
+
+ {
+ _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+
+ try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
+ const used_buffer_id = cqe.flags >> 16;
+ try testing.expectEqual(used_buffer_id, 0);
+ try testing.expectEqual(@as(i32, buffer_len), cqe.res);
+ try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
+ try testing.expectEqualSlices(u8, &([_]u8{0} ** buffer_len), buffers[used_buffer_id][0..@intCast(usize, cqe.res)]);
+ }
+
+ // Final read should _not_ work
+
+ {
+ _ = try ring.read(0xdfdfdfdf, fd, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ // Expected
+ .NOBUFS => {},
+ .SUCCESS => std.debug.panic("unexpected success", .{}),
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ }
+}
+
+test "provide_buffers: accept/connect/send/recv" {
+ if (builtin.os.tag != .linux) return error.SkipZigTest;
+
+ var ring = IO_Uring.init(16, 0) catch |err| switch (err) {
+ error.SystemOutdated => return error.SkipZigTest,
+ error.PermissionDenied => return error.SkipZigTest,
+ else => return err,
+ };
+ defer ring.deinit();
+
+ const group_id = 1337;
+ const buffer_id = 0;
+
+ const buffer_len = 128;
+ var buffers: [4][buffer_len]u8 = undefined;
+
+ // Provide 4 buffers
+
+ {
+ const sqe = try ring.provide_buffers(0xcccccccc, @ptrCast([*]u8, &buffers), buffers.len, buffer_len, group_id, buffer_id);
+ try testing.expectEqual(linux.IORING_OP.PROVIDE_BUFFERS, sqe.opcode);
+ try testing.expectEqual(@as(i32, buffers.len), sqe.fd);
+ try testing.expectEqual(@as(u32, buffer_len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ // Happens when the kernel is < 5.7
+ .INVAL => return error.SkipZigTest,
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ try testing.expectEqual(@as(u64, 0xcccccccc), cqe.user_data);
+ }
+
+ const socket_test_harness = try createSocketTestHarness(&ring);
+ defer socket_test_harness.close();
+
+ // Do 4 send on the socket
+
+ {
+ var i: usize = 0;
+ while (i < buffers.len) : (i += 1) {
+ _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'z'} ** buffer_len), 0);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+ }
+
+ var cqes: [4]linux.io_uring_cqe = undefined;
+ try testing.expectEqual(@as(u32, 4), try ring.copy_cqes(&cqes, 4));
+ }
+
+ // Do 4 recv which should consume all buffers
+
+ // Deliberately put something we don't expect in the buffers
+ mem.set(u8, mem.sliceAsBytes(&buffers), 1);
+
+ var i: usize = 0;
+ while (i < buffers.len) : (i += 1) {
+ var sqe = try ring.recv(0xdededede, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
+ try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
+ try testing.expectEqual(@as(u64, 0), sqe.addr);
+ try testing.expectEqual(@as(u32, buffer_len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
+ try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+
+ try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
+ const used_buffer_id = cqe.flags >> 16;
+ try testing.expect(used_buffer_id >= 0 and used_buffer_id <= 3);
+ try testing.expectEqual(@as(i32, buffer_len), cqe.res);
+
+ try testing.expectEqual(@as(u64, 0xdededede), cqe.user_data);
+ const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)];
+ try testing.expectEqualSlices(u8, &([_]u8{'z'} ** buffer_len), buffer);
+ }
+
+ // This recv should fail
+
+ {
+ var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
+ try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
+ try testing.expectEqual(@as(u64, 0), sqe.addr);
+ try testing.expectEqual(@as(u32, buffer_len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
+ try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ // Expected
+ .NOBUFS => {},
+ .SUCCESS => std.debug.panic("unexpected success", .{}),
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
+ }
+
+ // Provide 1 buffer again
+
+ const reprovided_buffer_id = 2;
+
+ {
+ _ = try ring.provide_buffers(0xabababab, @ptrCast([*]u8, &buffers[reprovided_buffer_id]), 1, buffer_len, group_id, reprovided_buffer_id);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+ }
+
+ // Redo 1 send on the server socket
+
+ {
+ _ = try ring.send(0xdeaddead, socket_test_harness.server, &([_]u8{'w'} ** buffer_len), 0);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ _ = try ring.copy_cqe();
+ }
+
+ // Final recv which should work
+
+ // Deliberately put something we don't expect in the buffers
+ mem.set(u8, mem.sliceAsBytes(&buffers), 1);
+
+ {
+ var sqe = try ring.recv(0xdfdfdfdf, socket_test_harness.client, .{ .buffer_selection = .{ .group_id = group_id, .len = buffer_len } }, 0);
+ try testing.expectEqual(linux.IORING_OP.RECV, sqe.opcode);
+ try testing.expectEqual(@as(i32, socket_test_harness.client), sqe.fd);
+ try testing.expectEqual(@as(u64, 0), sqe.addr);
+ try testing.expectEqual(@as(u32, buffer_len), sqe.len);
+ try testing.expectEqual(@as(u16, group_id), sqe.buf_index);
+ try testing.expectEqual(@as(u32, 0), sqe.rw_flags);
+ try testing.expectEqual(@as(u32, linux.IOSQE_BUFFER_SELECT), sqe.flags);
+ try testing.expectEqual(@as(u32, 1), try ring.submit());
+
+ const cqe = try ring.copy_cqe();
+ switch (cqe.err()) {
+ .SUCCESS => {},
+ else => |errno| std.debug.panic("unhandled errno: {}", .{errno}),
+ }
+
+ try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER);
+ const used_buffer_id = cqe.flags >> 16;
+ try testing.expectEqual(used_buffer_id, reprovided_buffer_id);
+ try testing.expectEqual(@as(i32, buffer_len), cqe.res);
+ try testing.expectEqual(@as(u64, 0xdfdfdfdf), cqe.user_data);
+ const buffer = buffers[used_buffer_id][0..@intCast(usize, cqe.res)];
+ try testing.expectEqualSlices(u8, &([_]u8{'w'} ** buffer_len), buffer);
+ }
+}
+
/// Used for testing server/client interactions.
const SocketTestHarness = struct {
listener: os.socket_t,