Commit 5578c760a7

Andrew Kelley <andrew@ziglang.org>
2025-10-23 15:02:46
std.Io.Kqueue: implement wait queue per fd
Solves the issue when one kevent() call would clobber another if they used the same file descriptor as an identifier.
1 parent 6a64c9b
Changed files (1)
lib
std
lib/std/Io/Kqueue.zig
@@ -423,17 +423,35 @@ fn idle(k: *Kqueue, thread: *Thread) void {
                 return;
             },
             _ => {
-                const fiber: *Fiber = @ptrFromInt(event.udata);
-                assert(fiber.queue_next == null);
-                fiber.resultPointer(Completion).* = .{
+                const event_head_fiber: *Fiber = @ptrFromInt(event.udata);
+                const event_tail_fiber = thread.wait_queues.fetchSwapRemove(.{
+                    .ident = event.ident,
+                    .filter = event.filter,
+                }).?.value;
+                assert(event_tail_fiber.queue_next == null);
+
+                // TODO reevaluate this logic
+                event_head_fiber.resultPointer(Completion).* = .{
                     .flags = event.flags,
                     .fflags = event.fflags,
                     .data = event.data,
                 };
-                if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
-                    ready_queue.tail.queue_next = fiber;
-                    ready_queue.tail = fiber;
-                } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
+
+                queue_ready: {
+                    const head: *Fiber = if (maybe_ready_fiber == null) f: {
+                        maybe_ready_fiber = event_head_fiber;
+                        const next = event_head_fiber.queue_next orelse break :queue_ready;
+                        event_head_fiber.queue_next = null;
+                        break :f next;
+                    } else event_head_fiber;
+
+                    if (maybe_ready_queue) |*ready_queue| {
+                        ready_queue.tail.queue_next = head;
+                        ready_queue.tail = event_tail_fiber;
+                    } else {
+                        maybe_ready_queue = .{ .head = head, .tail = event_tail_fiber };
+                    }
+                }
             },
         };
         if (maybe_ready_queue) |ready_queue| k.schedule(thread, ready_queue);
@@ -1477,7 +1495,6 @@ fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Strea
 
     while (true) {
         try k.checkCancel();
-        std.debug.print("calling readv\n", .{});
         const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
         switch (posix.errno(rc)) {
             .SUCCESS => return @intCast(rc),
@@ -1486,19 +1503,33 @@ fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Strea
             .AGAIN => {
                 const thread: *Thread = .current();
                 const fiber = thread.currentFiber();
-                const changes = [_]posix.Kevent{
-                    .{
-                        .ident = @as(u32, @bitCast(fd)),
-                        .filter = std.c.EVFILT.READ,
-                        .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
-                        .fflags = 0,
-                        .data = 0,
-                        .udata = @intFromPtr(fiber),
-                    },
-                };
-                assert(0 == (posix.kevent(thread.kq_fd, &changes, &.{}, null) catch |err| {
-                    @panic(@errorName(err)); // TODO
-                }));
+                const ident: u32 = @bitCast(fd);
+                const filter = std.c.EVFILT.READ;
+                const gop = thread.wait_queues.getOrPut(k.gpa, .{
+                    .ident = ident,
+                    .filter = filter,
+                }) catch return error.SystemResources;
+                if (gop.found_existing) {
+                    const tail_fiber = gop.value_ptr.*;
+                    assert(tail_fiber.queue_next == null);
+                    tail_fiber.queue_next = fiber;
+                    gop.value_ptr.* = fiber;
+                } else {
+                    gop.value_ptr.* = fiber;
+                    const changes = [_]posix.Kevent{
+                        .{
+                            .ident = ident,
+                            .filter = filter,
+                            .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
+                            .fflags = 0,
+                            .data = 0,
+                            .udata = @intFromPtr(fiber),
+                        },
+                    };
+                    assert(0 == (posix.kevent(thread.kq_fd, &changes, &.{}, null) catch |err| {
+                        @panic(@errorName(err)); // TODO
+                    }));
+                }
                 yield(k, null, .nothing);
                 continue;
             },