Commit 929b616e0f

Andrew Kelley <andrew@ziglang.org>
2025-04-01 01:25:11
std.Io.Condition: change primitive to support only one
and no timer
1 parent 8773b63
Changed files (3)
lib
lib/std/Io/EventLoop.zig
@@ -781,7 +781,6 @@ fn go(
     event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });
 }
 
-
 fn @"await"(
     userdata: ?*anyopaque,
     any_future: *std.Io.AnyFuture,
@@ -1277,24 +1276,24 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
     el.yield(maybe_waiting_fiber.?, .reschedule);
 }
 
-fn conditionWait(
-    userdata: ?*anyopaque,
-    cond: *Io.Condition,
-    mutex: *Io.Mutex,
-    timeout: ?u64,
-) Io.Condition.WaitError!void {
-    _ = userdata;
-    _ = cond;
-    _ = mutex;
-    _ = timeout;
-    @panic("TODO");
+fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
+    const el: *EventLoop = @alignCast(@ptrCast(userdata));
+    const cond_state: *?*Fiber = @ptrCast(&cond.state);
+    const thread: *Thread = .current();
+    const fiber = thread.currentFiber();
+    const prev = @atomicRmw(?*Fiber, cond_state, .Xchg, fiber, .acquire);
+    assert(prev == null); // More than one wait on same Condition is illegal.
+    mutex.unlock(io(el));
+    el.yield(null, .nothing);
+    try mutex.lock(io(el));
 }
 
-fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void {
-    _ = userdata;
-    _ = cond;
-    _ = notify;
-    @panic("TODO");
+fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void {
+    const el: *EventLoop = @alignCast(@ptrCast(userdata));
+    const cond_state: *?*Fiber = @ptrCast(&cond.state);
+    if (@atomicRmw(?*Fiber, cond_state, .Xchg, null, .acquire)) |fiber| {
+        el.yield(fiber, .reschedule);
+    }
 }
 
 fn errno(signed: i32) std.os.linux.E {
lib/std/Thread/Pool.zig
@@ -619,12 +619,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
     }
 }
 
-fn conditionWait(
-    userdata: ?*anyopaque,
-    cond: *Io.Condition,
-    mutex: *Io.Mutex,
-    timeout: ?u64,
-) Io.Condition.WaitError!void {
+fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     comptime assert(@TypeOf(cond.state) == u64);
     const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
@@ -652,25 +647,11 @@ fn conditionWait(
     mutex.unlock(pool.io());
     defer mutex.lock(pool.io()) catch @panic("TODO");
 
-    var futex_deadline = std.Thread.Futex.Deadline.init(timeout);
+    var futex_deadline = std.Thread.Futex.Deadline.init(null);
 
     while (true) {
         futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) {
-            // On timeout, we must decrement the waiter we added above.
-            error.Timeout => {
-                while (true) {
-                    // If there's a signal when we're timing out, consume it and report being woken up instead.
-                    // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
-                    while (state & signal_mask != 0) {
-                        const new_state = state - one_waiter - one_signal;
-                        state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
-                    }
-
-                    // Remove the waiter we added and officially return timed out.
-                    const new_state = state - one_waiter;
-                    state = cond_state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err;
-                }
-            },
+            error.Timeout => unreachable,
         };
 
         epoch = cond_epoch.load(.acquire);
@@ -685,7 +666,7 @@ fn conditionWait(
     }
 }
 
-fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void {
+fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition) void {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     _ = pool;
     comptime assert(@TypeOf(cond.state) == u64);
@@ -709,10 +690,7 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Conditio
             return;
         }
 
-        const to_wake = switch (notify) {
-            .one => 1,
-            .all => wakeable,
-        };
+        const to_wake = 1;
 
         // Reserve the amount of waiters to wake by incrementing the signals count.
         // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
lib/std/Io.zig
@@ -629,8 +629,8 @@ pub const VTable = struct {
     mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
     mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
 
-    conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void,
-    conditionWake: *const fn (?*anyopaque, cond: *Condition, notify: Condition.Notify) void,
+    conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void,
+    conditionWake: *const fn (?*anyopaque, cond: *Condition) void,
 
     createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File,
     openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File,
@@ -642,6 +642,11 @@ pub const VTable = struct {
     sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void,
 };
 
+pub const Cancelable = error{
+    /// Caller has requested the async operation to stop.
+    Canceled,
+};
+
 pub const OpenFlags = fs.File.OpenFlags;
 pub const CreateFlags = fs.File.CreateFlags;
 
@@ -795,43 +800,18 @@ pub const Mutex = if (true) struct {
     }
 };
 
+/// Supports exactly 1 waiter. More than 1 simultaneous wait on the same
+/// condition is illegal.
 pub const Condition = struct {
     state: u64 = 0,
 
-    pub const WaitError = error{
-        Timeout,
-        Canceled,
-    };
-
-    /// How many waiters to wake up.
-    pub const Notify = enum {
-        one,
-        all,
-    };
-
     pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
-        io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) {
-            error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
-            error.Canceled => return error.Canceled,
-        };
+        return io.vtable.conditionWait(io.userdata, cond, mutex);
     }
 
-    pub fn timedWait(cond: *Condition, io: Io, mutex: *Mutex, timeout_ns: u64) WaitError!void {
-        return io.vtable.conditionWait(io.userdata, cond, mutex, timeout_ns);
+    pub fn wake(cond: *Condition, io: Io) void {
+        io.vtable.conditionWake(io.userdata, cond);
     }
-
-    pub fn signal(cond: *Condition, io: Io) void {
-        io.vtable.conditionWake(io.userdata, cond, .one);
-    }
-
-    pub fn broadcast(cond: *Condition, io: Io) void {
-        io.vtable.conditionWake(io.userdata, cond, .all);
-    }
-};
-
-pub const Cancelable = error{
-    /// Caller has requested the async operation to stop.
-    Canceled,
 };
 
 pub const TypeErasedQueue = struct {
@@ -883,7 +863,7 @@ pub const TypeErasedQueue = struct {
             remaining = remaining[copy_len..];
             getter.data.remaining = getter.data.remaining[copy_len..];
             if (getter.data.remaining.len == 0) {
-                getter.data.condition.signal(io);
+                getter.data.condition.wake(io);
                 continue;
             }
             q.getters.prepend(getter);
@@ -966,7 +946,7 @@ pub const TypeErasedQueue = struct {
                 putter.data.remaining = putter.data.remaining[copy_len..];
                 remaining = remaining[copy_len..];
                 if (putter.data.remaining.len == 0) {
-                    putter.data.condition.signal(io);
+                    putter.data.condition.wake(io);
                 } else {
                     assert(remaining.len == 0);
                     q.putters.prepend(putter);
@@ -999,7 +979,7 @@ pub const TypeErasedQueue = struct {
             putter.data.remaining = putter.data.remaining[copy_len..];
             q.put_index += copy_len;
             if (putter.data.remaining.len == 0) {
-                putter.data.condition.signal(io);
+                putter.data.condition.wake(io);
                 continue;
             }
             const second_available = q.buffer[0..q.get_index];
@@ -1008,7 +988,7 @@ pub const TypeErasedQueue = struct {
             putter.data.remaining = putter.data.remaining[copy_len..];
             q.put_index = copy_len;
             if (putter.data.remaining.len == 0) {
-                putter.data.condition.signal(io);
+                putter.data.condition.wake(io);
                 continue;
             }
             q.putters.prepend(putter);