Commit c133171567
Changed files (2)
lib
std
os
linux
lib/std/os/linux/IoUring.zig
@@ -1594,28 +1594,34 @@ pub const BufferGroup = struct {
buffers: []u8,
/// Size of each buffer in buffers.
buffer_size: u32,
- // Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
+ /// Number of buffers in `buffers`, number of `io_uring_buf structures` in br.
buffers_count: u16,
+ /// Head of unconsumed part of each buffer, if incremental consumption is enabled
+ heads: []u32,
/// ID of this group, must be unique in ring.
group_id: u16,
pub fn init(
ring: *IoUring,
+ allocator: mem.Allocator,
group_id: u16,
- buffers: []u8,
buffer_size: u32,
buffers_count: u16,
) !BufferGroup {
- assert(buffers.len == buffers_count * buffer_size);
+ const buffers = try allocator.alloc(u8, buffer_size * buffers_count);
+ errdefer allocator.free(buffers);
+ const heads = try allocator.alloc(u32, buffers_count);
+ errdefer allocator.free(heads);
- const br = try setup_buf_ring(ring.fd, buffers_count, group_id);
+ const br = try setup_buf_ring(ring.fd, buffers_count, group_id, linux.io_uring_buf_reg.FLAG.INC);
buf_ring_init(br);
const mask = buf_ring_mask(buffers_count);
var i: u16 = 0;
while (i < buffers_count) : (i += 1) {
- const start = buffer_size * i;
- const buf = buffers[start .. start + buffer_size];
+ const pos = buffer_size * i;
+ const buf = buffers[pos .. pos + buffer_size];
+ heads[i] = 0;
buf_ring_add(br, buf, i, mask, i);
}
buf_ring_advance(br, buffers_count);
@@ -1625,11 +1631,18 @@ pub const BufferGroup = struct {
.group_id = group_id,
.br = br,
.buffers = buffers,
+ .heads = heads,
.buffer_size = buffer_size,
.buffers_count = buffers_count,
};
}
+ pub fn deinit(self: *BufferGroup, allocator: mem.Allocator) void {
+ free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
+ allocator.free(self.buffers);
+ allocator.free(self.heads);
+ }
+
// Prepare recv operation which will select buffer from this group.
pub fn recv(self: *BufferGroup, user_data: u64, fd: posix.fd_t, flags: u32) !*linux.io_uring_sqe {
var sqe = try self.ring.get_sqe();
@@ -1649,33 +1662,34 @@ pub const BufferGroup = struct {
}
// Get buffer by id.
- pub fn get(self: *BufferGroup, buffer_id: u16) []u8 {
- const head = self.buffer_size * buffer_id;
- return self.buffers[head .. head + self.buffer_size];
+ fn get_by_id(self: *BufferGroup, buffer_id: u16) []u8 {
+ const pos = self.buffer_size * buffer_id;
+ return self.buffers[pos .. pos + self.buffer_size][self.heads[buffer_id]..];
}
// Get buffer by CQE.
- pub fn get_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
+ pub fn get(self: *BufferGroup, cqe: linux.io_uring_cqe) ![]u8 {
const buffer_id = try cqe.buffer_id();
const used_len = @as(usize, @intCast(cqe.res));
- return self.get(buffer_id)[0..used_len];
- }
-
- // Release buffer to the kernel.
- pub fn put(self: *BufferGroup, buffer_id: u16) void {
- const mask = buf_ring_mask(self.buffers_count);
- const buffer = self.get(buffer_id);
- buf_ring_add(self.br, buffer, buffer_id, mask, 0);
- buf_ring_advance(self.br, 1);
+ return self.get_by_id(buffer_id)[0..used_len];
}
// Release buffer from CQE to the kernel.
- pub fn put_cqe(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
- self.put(try cqe.buffer_id());
- }
+ pub fn put(self: *BufferGroup, cqe: linux.io_uring_cqe) !void {
+ const buffer_id = try cqe.buffer_id();
+ if (cqe.flags & linux.IORING_CQE_F_BUF_MORE == linux.IORING_CQE_F_BUF_MORE) {
+ // Incremental consumption active, kernel will write to the this buffer again
+ const used_len = @as(u32, @intCast(cqe.res));
+ // Track what part of the buffer is used
+ self.heads[buffer_id] += used_len;
+ return;
+ }
+ self.heads[buffer_id] = 0;
- pub fn deinit(self: *BufferGroup) void {
- free_buf_ring(self.ring.fd, self.br, self.buffers_count, self.group_id);
+ // Release buffer to the kernel. const mask = buf_ring_mask(self.buffers_count);
+ const mask = buf_ring_mask(self.buffers_count);
+ buf_ring_add(self.br, self.get_by_id(buffer_id), buffer_id, mask, 0);
+ buf_ring_advance(self.br, 1);
}
};
@@ -1684,7 +1698,7 @@ pub const BufferGroup = struct {
/// `fd` is IO_Uring.fd for which the provided buffer ring is being registered.
/// `entries` is the number of entries requested in the buffer ring, must be power of 2.
/// `group_id` is the chosen buffer group ID, unique in IO_Uring.
-pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_size_min) linux.io_uring_buf_ring {
+pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16, flags: u16) !*align(page_size_min) linux.io_uring_buf_ring {
if (entries == 0 or entries > 1 << 15) return error.EntriesNotInRange;
if (!std.math.isPowerOfTwo(entries)) return error.EntriesNotPowerOfTwo;
@@ -1701,22 +1715,24 @@ pub fn setup_buf_ring(fd: posix.fd_t, entries: u16, group_id: u16) !*align(page_
assert(mmap.len == mmap_size);
const br: *align(page_size_min) linux.io_uring_buf_ring = @ptrCast(mmap.ptr);
- try register_buf_ring(fd, @intFromPtr(br), entries, group_id);
+ try register_buf_ring(fd, @intFromPtr(br), entries, group_id, flags);
return br;
}
-fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16) !void {
+fn register_buf_ring(fd: posix.fd_t, addr: u64, entries: u32, group_id: u16, flags: u16) !void {
var reg = mem.zeroInit(linux.io_uring_buf_reg, .{
.ring_addr = addr,
.ring_entries = entries,
.bgid = group_id,
+ .flags = flags,
});
- const res = linux.io_uring_register(
- fd,
- .REGISTER_PBUF_RING,
- @as(*const anyopaque, @ptrCast(®)),
- 1,
- );
+ var res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1);
+ if (linux.E.init(res) == .INVAL and reg.flags & linux.io_uring_buf_reg.FLAG.INC > 0) {
+ // Retry without incremental buffer consumption.
+ // It is available since kernel 6.12. returns INVAL on older.
+ reg.flags &= ~linux.io_uring_buf_reg.FLAG.INC;
+ res = linux.io_uring_register(fd, .REGISTER_PBUF_RING, @as(*const anyopaque, @ptrCast(®)), 1);
+ }
try handle_register_buf_ring_result(res);
}
@@ -4041,12 +4057,10 @@ test BufferGroup {
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 1; // number of buffers in buffer group
const buffer_size: usize = 128; // size of each buffer in group
- const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
- defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
+ testing.allocator,
group_id,
- buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
@@ -4054,7 +4068,7 @@ test BufferGroup {
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
- defer buf_grp.deinit();
+ defer buf_grp.deinit(testing.allocator);
// Create client/server fds
const fds = try createSocketTestHarness(&ring);
@@ -4085,14 +4099,11 @@ test BufferGroup {
try testing.expectEqual(posix.E.SUCCESS, cqe.err());
try testing.expectEqual(data.len, @as(usize, @intCast(cqe.res))); // cqe.res holds received data len
- // Read buffer_id and used buffer len from cqe
- const buffer_id = try cqe.buffer_id();
- const len: usize = @intCast(cqe.res);
// Get buffer from pool
- const buf = buf_grp.get(buffer_id)[0..len];
+ const buf = try buf_grp.get(cqe);
try testing.expectEqualSlices(u8, &data, buf);
// Release buffer to the kernel when application is done with it
- buf_grp.put(buffer_id);
+ try buf_grp.put(cqe);
}
}
@@ -4110,12 +4121,10 @@ test "ring mapped buffers recv" {
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 2; // number of buffers in buffer group
const buffer_size: usize = 4; // size of each buffer in group
- const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
- defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
+ testing.allocator,
group_id,
- buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
@@ -4123,7 +4132,7 @@ test "ring mapped buffers recv" {
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
- defer buf_grp.deinit();
+ defer buf_grp.deinit(testing.allocator);
// create client/server fds
const fds = try createSocketTestHarness(&ring);
@@ -4145,14 +4154,18 @@ test "ring mapped buffers recv" {
if (cqe_send.err() == .INVAL) return error.SkipZigTest;
try testing.expectEqual(linux.io_uring_cqe{ .user_data = user_data, .res = data.len, .flags = 0 }, cqe_send);
}
-
- // server reads data into provided buffers
- // there are 2 buffers of size 4, so each read gets only chunk of data
- // we read four chunks of 4, 4, 4, 3 bytes each
- var chunk: []const u8 = data[0..buffer_size]; // first chunk
- const id1 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
- chunk = data[buffer_size .. buffer_size * 2]; // second chunk
- const id2 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
+ var pos: usize = 0;
+
+ // read first chunk
+ const cqe1 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
+ var buf = try buf_grp.get(cqe1);
+ try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
+ pos += buf.len;
+ // second chunk
+ const cqe2 = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
+ buf = try buf_grp.get(cqe2);
+ try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
+ pos += buf.len;
// both buffers provided to the kernel are used so we get error
// 'no more buffers', until we put buffers to the kernel
@@ -4169,16 +4182,17 @@ test "ring mapped buffers recv" {
}
// put buffers back to the kernel
- buf_grp.put(id1);
- buf_grp.put(id2);
-
- chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
- const id3 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
- buf_grp.put(id3);
-
- chunk = data[buffer_size * 3 ..]; // last chunk
- const id4 = try expect_buf_grp_recv(&ring, &buf_grp, fds.server, rnd.int(u64), chunk);
- buf_grp.put(id4);
+ try buf_grp.put(cqe1);
+ try buf_grp.put(cqe2);
+
+ // read remaining data
+ while (pos < data.len) {
+ const cqe = try buf_grp_recv_submit_get_cqe(&ring, &buf_grp, fds.server, rnd.int(u64));
+ buf = try buf_grp.get(cqe);
+ try testing.expectEqualSlices(u8, data[pos..][0..buf.len], buf);
+ pos += buf.len;
+ try buf_grp.put(cqe);
+ }
}
}
@@ -4196,12 +4210,10 @@ test "ring mapped buffers multishot recv" {
const group_id: u16 = 1; // buffers group id
const buffers_count: u16 = 2; // number of buffers in buffer group
const buffer_size: usize = 4; // size of each buffer in group
- const buffers = try testing.allocator.alloc(u8, buffers_count * buffer_size);
- defer testing.allocator.free(buffers);
var buf_grp = BufferGroup.init(
&ring,
+ testing.allocator,
group_id,
- buffers,
buffer_size,
buffers_count,
) catch |err| switch (err) {
@@ -4209,7 +4221,7 @@ test "ring mapped buffers multishot recv" {
error.ArgumentsInvalid => return error.SkipZigTest,
else => return err,
};
- defer buf_grp.deinit();
+ defer buf_grp.deinit(testing.allocator);
// create client/server fds
const fds = try createSocketTestHarness(&ring);
@@ -4222,7 +4234,7 @@ test "ring mapped buffers multishot recv" {
var round: usize = 4; // repeat send/recv cycle round times
while (round > 0) : (round -= 1) {
// client sends data
- const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe };
+ const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf };
{
const user_data = rnd.int(u64);
_ = try ring.send(user_data, fds.client, data[0..], 0);
@@ -4239,7 +4251,7 @@ test "ring mapped buffers multishot recv" {
// server reads data into provided buffers
// there are 2 buffers of size 4, so each read gets only chunk of data
- // we read four chunks of 4, 4, 4, 3 bytes each
+ // we read four chunks of 4, 4, 4, 4 bytes each
var chunk: []const u8 = data[0..buffer_size]; // first chunk
const cqe1 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe1.flags & linux.IORING_CQE_F_MORE > 0);
@@ -4263,8 +4275,8 @@ test "ring mapped buffers multishot recv" {
}
// put buffers back to the kernel
- buf_grp.put(try cqe1.buffer_id());
- buf_grp.put(try cqe2.buffer_id());
+ try buf_grp.put(cqe1);
+ try buf_grp.put(cqe2);
// restart multishot
recv_user_data = rnd.int(u64);
@@ -4274,12 +4286,12 @@ test "ring mapped buffers multishot recv" {
chunk = data[buffer_size * 2 .. buffer_size * 3]; // third chunk
const cqe3 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe3.flags & linux.IORING_CQE_F_MORE > 0);
- buf_grp.put(try cqe3.buffer_id());
+ try buf_grp.put(cqe3);
chunk = data[buffer_size * 3 ..]; // last chunk
const cqe4 = try expect_buf_grp_cqe(&ring, &buf_grp, recv_user_data, chunk);
try testing.expect(cqe4.flags & linux.IORING_CQE_F_MORE > 0);
- buf_grp.put(try cqe4.buffer_id());
+ try buf_grp.put(cqe4);
// cancel pending multishot recv operation
{
@@ -4323,23 +4335,26 @@ test "ring mapped buffers multishot recv" {
}
}
-// Prepare and submit recv using buffer group.
-// Test that buffer from group, pointed by cqe, matches expected.
-fn expect_buf_grp_recv(
+// Prepare, submit recv and get cqe using buffer group.
+fn buf_grp_recv_submit_get_cqe(
ring: *IoUring,
buf_grp: *BufferGroup,
fd: posix.fd_t,
user_data: u64,
- expected: []const u8,
-) !u16 {
- // prepare and submit read
+) !linux.io_uring_cqe {
+ // prepare and submit recv
const sqe = try buf_grp.recv(user_data, fd, 0);
try testing.expect(sqe.flags & linux.IOSQE_BUFFER_SELECT == linux.IOSQE_BUFFER_SELECT);
try testing.expect(sqe.buf_index == buf_grp.group_id);
try testing.expectEqual(@as(u32, 1), try ring.submit()); // submit
+ // get cqe, expect success
+ const cqe = try ring.copy_cqe();
+ try testing.expectEqual(user_data, cqe.user_data);
+ try testing.expect(cqe.res >= 0); // success
+ try testing.expectEqual(posix.E.SUCCESS, cqe.err());
+ try testing.expect(cqe.flags & linux.IORING_CQE_F_BUFFER == linux.IORING_CQE_F_BUFFER); // IORING_CQE_F_BUFFER flag is set
- const cqe = try expect_buf_grp_cqe(ring, buf_grp, user_data, expected);
- return try cqe.buffer_id();
+ return cqe;
}
fn expect_buf_grp_cqe(
@@ -4359,7 +4374,7 @@ fn expect_buf_grp_cqe(
// get buffer from pool
const buffer_id = try cqe.buffer_id();
const len = @as(usize, @intCast(cqe.res));
- const buf = buf_grp.get(buffer_id)[0..len];
+ const buf = buf_grp.get_by_id(buffer_id)[0..len];
try testing.expectEqualSlices(u8, expected, buf);
return cqe;
lib/std/os/linux.zig
@@ -5933,6 +5933,8 @@ pub const IORING_CQE_F_MORE = 1 << 1;
pub const IORING_CQE_F_SOCK_NONEMPTY = 1 << 2;
/// Set for notification CQEs. Can be used to distinct them from sends.
pub const IORING_CQE_F_NOTIF = 1 << 3;
+/// If set, the buffer ID set in the completion will get more completions.
+pub const IORING_CQE_F_BUF_MORE = 1 << 4;
pub const IORING_CQE_BUFFER_SHIFT = 16;
@@ -6222,8 +6224,13 @@ pub const io_uring_buf_reg = extern struct {
ring_addr: u64,
ring_entries: u32,
bgid: u16,
- pad: u16,
+ flags: u16,
resv: [3]u64,
+
+ pub const FLAG = struct {
+ // Incremental buffer consummation.
+ pub const INC: u16 = 2;
+ };
};
pub const io_uring_getevents_arg = extern struct {