Commit 266bcfbf2f

Andrew Kelley <andrew@ziglang.org>
2025-03-31 23:36:20
EventLoop: implement detached async
data races on deinit tho
1 parent f84aca3
Changed files (2)
lib/std/Io/EventLoop.zig
@@ -27,6 +27,7 @@ const Thread = struct {
     current_context: *Context,
     ready_queue: ?*Fiber,
     free_queue: ?*Fiber,
+    detached_queue: ?*Fiber,
     io_uring: IoUring,
     idle_search_index: u32,
     steal_ready_search_index: u32,
@@ -208,6 +209,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
         .current_context = &main_fiber.context,
         .ready_queue = null,
         .free_queue = null,
+        .detached_queue = null,
         .io_uring = try IoUring.init(io_uring_entries, 0),
         .idle_search_index = 1,
         .steal_ready_search_index = 1,
@@ -218,7 +220,16 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
 }
 
 pub fn deinit(el: *EventLoop) void {
+    // Wait for detached fibers.
     const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
+    for (el.threads.allocated[0..active_threads]) |*thread| {
+        while (thread.detached_queue) |detached_fiber| {
+            if (@atomicLoad(?*Fiber, &detached_fiber.awaiter, .acquire) != Fiber.finished)
+                el.yield(null, .{ .register_awaiter = &detached_fiber.awaiter });
+            detached_fiber.recycle();
+        }
+    }
+
     for (el.threads.allocated[0..active_threads]) |*thread| {
         const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
         assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
@@ -336,6 +347,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
             .current_context = &new_thread.idle_context,
             .ready_queue = ready_queue.head,
             .free_queue = null,
+            .detached_queue = null,
             .io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
                 @atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
                 // no more access to `thread` after giving up reservation
@@ -470,6 +482,7 @@ const SwitchMessage = struct {
     const PendingTask = union(enum) {
         nothing,
         reschedule,
+        recycle: *Fiber,
         register_awaiter: *?*Fiber,
         lock_mutex: struct {
             prev_state: Io.Mutex.State,
@@ -488,6 +501,9 @@ const SwitchMessage = struct {
                 assert(prev_fiber.queue_next == null);
                 el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
             },
+            .recycle => |fiber| {
+                fiber.recycle();
+            },
             .register_awaiter => |awaiter| {
                 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
                 assert(prev_fiber.queue_next == null);
@@ -612,6 +628,18 @@ fn fiberEntry() callconv(.naked) void {
     }
 }
 
+fn fiberEntryDetached() callconv(.naked) void {
+    switch (builtin.cpu.arch) {
+        .x86_64 => asm volatile (
+            \\ leaq 8(%%rsp), %%rdi
+            \\ jmp %[DetachedClosure_call:P]
+            :
+            : [DetachedClosure_call] "X" (&DetachedClosure.call),
+        ),
+        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
+    }
+}
+
 const AsyncClosure = struct {
     event_loop: *EventLoop,
     fiber: *Fiber,
@@ -632,6 +660,31 @@ const AsyncClosure = struct {
     }
 };
 
+const DetachedClosure = struct {
+    event_loop: *EventLoop,
+    fiber: *Fiber,
+    start: *const fn (context: *const anyopaque) void,
+
+    fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
+        return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
+    }
+
+    fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn {
+        message.handle(closure.event_loop);
+        std.log.debug("{*} performing async detached", .{closure.fiber});
+        closure.start(closure.contextPointer());
+        const current_thread: *Thread = .current();
+        current_thread.detached_queue = closure.fiber.queue_next;
+        const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
+        if (awaiter) |a| {
+            closure.event_loop.yield(a, .nothing);
+        } else {
+            closure.event_loop.yield(null, .{ .recycle = closure.fiber });
+        }
+        unreachable; // switched to dead fiber
+    }
+};
+
 fn @"async"(
     userdata: ?*anyopaque,
     result: []u8,
@@ -682,6 +735,53 @@ fn @"async"(
     return @ptrCast(fiber);
 }
 
+fn go(
+    userdata: ?*anyopaque,
+    context: []const u8,
+    context_alignment: std.mem.Alignment,
+    start: *const fn (context: *const anyopaque) void,
+) void {
+    assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
+    assert(context.len <= Fiber.max_context_size); // TODO
+
+    const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
+    const fiber = Fiber.allocate(event_loop) catch {
+        start(context.ptr);
+        return;
+    };
+    std.log.debug("allocated {*}", .{fiber});
+
+    const current_thread: *Thread = .current();
+    const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward(
+        @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
+    ) - @sizeOf(DetachedClosure));
+    fiber.* = .{
+        .required_align = {},
+        .context = switch (builtin.cpu.arch) {
+            .x86_64 => .{
+                .rsp = @intFromPtr(closure) - @sizeOf(usize),
+                .rbp = 0,
+                .rip = @intFromPtr(&fiberEntryDetached),
+            },
+            else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
+        },
+        .awaiter = null,
+        .queue_next = current_thread.detached_queue,
+        .cancel_thread = null,
+        .awaiting_completions = .initEmpty(),
+    };
+    current_thread.detached_queue = fiber;
+    closure.* = .{
+        .event_loop = event_loop,
+        .fiber = fiber,
+        .start = start,
+    };
+    @memcpy(closure.contextPointer(), context);
+
+    event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });
+}
+
+
 fn @"await"(
     userdata: ?*anyopaque,
     any_future: *std.Io.AnyFuture,
@@ -690,24 +790,12 @@ fn @"await"(
 ) void {
     const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
     const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
-    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
+    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
+        event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
     @memcpy(result, future_fiber.resultBytes(result_alignment));
     future_fiber.recycle();
 }
 
-fn go(
-    userdata: ?*anyopaque,
-    context: []const u8,
-    context_alignment: std.mem.Alignment,
-    start: *const fn (context: *const anyopaque) void,
-) void {
-    _ = userdata;
-    _ = context;
-    _ = context_alignment;
-    _ = start;
-    @panic("TODO");
-}
-
 fn cancel(
     userdata: ?*anyopaque,
     any_future: *std.Io.AnyFuture,
lib/std/Io.zig
@@ -626,7 +626,7 @@ pub const VTable = struct {
     /// Thread-safe.
     cancelRequested: *const fn (?*anyopaque) bool,
 
-    mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) error{Canceled}!void,
+    mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
     mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
 
     conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void,
@@ -645,11 +645,11 @@ pub const VTable = struct {
 pub const OpenFlags = fs.File.OpenFlags;
 pub const CreateFlags = fs.File.CreateFlags;
 
-pub const FileOpenError = fs.File.OpenError || error{Canceled};
-pub const FileReadError = fs.File.ReadError || error{Canceled};
-pub const FilePReadError = fs.File.PReadError || error{Canceled};
-pub const FileWriteError = fs.File.WriteError || error{Canceled};
-pub const FilePWriteError = fs.File.PWriteError || error{Canceled};
+pub const FileOpenError = fs.File.OpenError || Cancelable;
+pub const FileReadError = fs.File.ReadError || Cancelable;
+pub const FilePReadError = fs.File.PReadError || Cancelable;
+pub const FileWriteError = fs.File.WriteError || Cancelable;
+pub const FilePWriteError = fs.File.PWriteError || Cancelable;
 
 pub const Timestamp = enum(i96) {
     _,
@@ -666,7 +666,7 @@ pub const Deadline = union(enum) {
     nanoseconds: i96,
     timestamp: Timestamp,
 };
-pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{Canceled};
+pub const ClockGetTimeError = std.posix.ClockGetTimeError || Cancelable;
 pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled };
 
 pub const AnyFuture = opaque {};
@@ -734,7 +734,7 @@ pub const Mutex = if (true) struct {
         return prev_state.isUnlocked();
     }
 
-    pub fn lock(mutex: *Mutex, io: std.Io) error{Canceled}!void {
+    pub fn lock(mutex: *Mutex, io: std.Io) Cancelable!void {
         const prev_state: State = @enumFromInt(@atomicRmw(
             usize,
             @as(*usize, @ptrCast(&mutex.state)),
@@ -783,7 +783,7 @@ pub const Mutex = if (true) struct {
     }
 
     /// Avoids the vtable for uncontended locks.
-    pub fn lock(m: *Mutex, io: Io) error{Canceled}!void {
+    pub fn lock(m: *Mutex, io: Io) Cancelable!void {
         if (!m.tryLock()) {
             @branchHint(.unlikely);
             try io.vtable.mutexLock(io.userdata, {}, m);
@@ -809,10 +809,10 @@ pub const Condition = struct {
         all,
     };
 
-    pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) void {
+    pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
         io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) {
             error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
-            error.Canceled => return, // handled as spurious wakeup
+            error.Canceled => return error.Canceled,
         };
     }
 
@@ -829,6 +829,11 @@ pub const Condition = struct {
     }
 };
 
+pub const Cancelable = error{
+    /// Caller has requested the async operation to stop.
+    Canceled,
+};
+
 pub const TypeErasedQueue = struct {
     mutex: Mutex,
 
@@ -852,7 +857,7 @@ pub const TypeErasedQueue = struct {
 
     pub fn init(buffer: []u8) TypeErasedQueue {
         return .{
-            .mutex = .{},
+            .mutex = .init,
             .buffer = buffer,
             .put_index = 0,
             .get_index = 0,
@@ -861,10 +866,10 @@ pub const TypeErasedQueue = struct {
         };
     }
 
-    pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
+    pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize {
         assert(elements.len >= min);
 
-        q.mutex.lock(io);
+        try q.mutex.lock(io);
         defer q.mutex.unlock(io);
 
         // Getters have first priority on the data, and only when the getters
@@ -911,15 +916,15 @@ pub const TypeErasedQueue = struct {
                 .data = .{ .remaining = remaining, .condition = .{} },
             };
             q.putters.append(&node);
-            node.data.condition.wait(io, &q.mutex);
+            try node.data.condition.wait(io, &q.mutex);
             remaining = node.data.remaining;
         }
     }
 
-    pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
+    pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize {
         assert(buffer.len >= min);
 
-        q.mutex.lock(io);
+        try q.mutex.lock(io);
         defer q.mutex.unlock(io);
 
         // The ring buffer gets first priority, then data should come from any
@@ -976,7 +981,7 @@ pub const TypeErasedQueue = struct {
                 .data = .{ .remaining = remaining, .condition = .{} },
             };
             q.getters.append(&node);
-            node.data.condition.wait(io, &q.mutex);
+            try node.data.condition.wait(io, &q.mutex);
             remaining = node.data.remaining;
         }
     }
@@ -1030,8 +1035,8 @@ pub fn Queue(Elem: type) type {
         /// Returns how many elements have been added to the queue.
         ///
         /// Asserts that `elements.len >= min`.
-        pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
-            return @divExact(q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
+        pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize {
+            return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
         }
 
         /// Receives elements from the beginning of the queue. The function
@@ -1041,17 +1046,17 @@ pub fn Queue(Elem: type) type {
         /// Returns how many elements of `buffer` have been populated.
         ///
         /// Asserts that `buffer.len >= min`.
-        pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
-            return @divExact(q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
+        pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize {
+            return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
         }
 
-        pub fn putOne(q: *@This(), io: Io, item: Elem) void {
-            assert(q.put(io, &.{item}, 1) == 1);
+        pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void {
+            assert(try q.put(io, &.{item}, 1) == 1);
         }
 
-        pub fn getOne(q: *@This(), io: Io) Elem {
+        pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
             var buf: [1]Elem = undefined;
-            assert(q.get(io, &buf, 1) == 1);
+            assert(try q.get(io, &buf, 1) == 1);
             return buf[0];
         }
     };