Commit 08ce000276

Jacob Young <jacobly0@users.noreply.github.com>
2025-04-01 08:23:41
EventLoop: fix `std.Io.Condition` implementation
1. a fiber can't put itself on a queue that allows it to be rescheduled 2. allow the idle fiber to unlock a mutex held by another fiber by ignoring reschedule requests originating from the idle fiber
1 parent e366b13
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -380,8 +380,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
 
 fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
     message.handle(el);
-    const thread: *Thread = &el.threads.allocated[0];
-    el.idle(thread);
+    el.idle(&el.threads.allocated[0]);
     el.yield(@ptrCast(&el.main_fiber_buffer), .nothing);
     unreachable; // switched to dead fiber
 }
@@ -480,10 +479,14 @@ const SwitchMessage = struct {
         reschedule,
         recycle: *Fiber,
         register_awaiter: *?*Fiber,
-        lock_mutex: struct {
+        mutex_lock: struct {
             prev_state: Io.Mutex.State,
             mutex: *Io.Mutex,
         },
+        condition_wait: struct {
+            cond: *Io.Condition,
+            mutex: *Io.Mutex,
+        },
         exit,
     };
 
@@ -492,7 +495,7 @@ const SwitchMessage = struct {
         thread.current_context = message.contexts.ready;
         switch (message.pending_task) {
             .nothing => {},
-            .reschedule => {
+            .reschedule => if (message.contexts.prev != &thread.idle_context) {
                 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
                 assert(prev_fiber.queue_next == null);
                 el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
@@ -511,16 +514,16 @@ const SwitchMessage = struct {
                     .acq_rel,
                 ) == Fiber.finished) el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
             },
-            .lock_mutex => |lock_mutex| {
+            .mutex_lock => |mutex_lock| {
                 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
                 assert(prev_fiber.queue_next == null);
-                var prev_state = lock_mutex.prev_state;
+                var prev_state = mutex_lock.prev_state;
                 while (switch (prev_state) {
                     else => next_state: {
                         prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
                         break :next_state @cmpxchgWeak(
                             Io.Mutex.State,
-                            &lock_mutex.mutex.state,
+                            &mutex_lock.mutex.state,
                             prev_state,
                             @enumFromInt(@intFromPtr(prev_fiber)),
                             .release,
@@ -529,7 +532,7 @@ const SwitchMessage = struct {
                     },
                     .unlocked => @cmpxchgWeak(
                         Io.Mutex.State,
-                        &lock_mutex.mutex.state,
+                        &mutex_lock.mutex.state,
                         .unlocked,
                         .locked_once,
                         .acquire,
@@ -541,6 +544,13 @@ const SwitchMessage = struct {
                     },
                 }) |next_state| prev_state = next_state;
             },
+            .condition_wait => |condition_wait| {
+                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
+                assert(prev_fiber.queue_next == null);
+                const cond_state: *?*Fiber = @ptrCast(&condition_wait.cond.state);
+                assert(@atomicRmw(?*Fiber, cond_state, .Xchg, prev_fiber, .release) == null); // More than one wait on same Condition is illegal.
+                condition_wait.mutex.unlock(el.io());
+            },
             .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| {
                 getSqe(&thread.io_uring).* = .{
                     .opcode = .MSG_RING,
@@ -1242,7 +1252,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl
 
 fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void {
     const el: *EventLoop = @alignCast(@ptrCast(userdata));
-    el.yield(null, .{ .lock_mutex = .{
+    el.yield(null, .{ .mutex_lock = .{
         .prev_state = prev_state,
         .mutex = mutex,
     } });
@@ -1271,13 +1281,10 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
 
 fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
     const el: *EventLoop = @alignCast(@ptrCast(userdata));
-    const cond_state: *?*Fiber = @ptrCast(&cond.state);
-    const thread: *Thread = .current();
-    const fiber = thread.currentFiber();
-    const prev = @atomicRmw(?*Fiber, cond_state, .Xchg, fiber, .acquire);
-    assert(prev == null); // More than one wait on same Condition is illegal.
-    mutex.unlock(el.io());
-    el.yield(null, .nothing);
+    el.yield(null, .{ .condition_wait = .{
+        .cond = cond,
+        .mutex = mutex,
+    } });
     try mutex.lock(el.io());
 }