Commit 3eb7be5cf6

Jacob Young <jacobly0@users.noreply.github.com>
2025-04-01 09:45:31
EventLoop: implement detached fibers
1 parent 0f105a8
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -9,9 +9,12 @@ const IoUring = std.os.linux.IoUring;
 
 /// Must be a thread-safe allocator.
 gpa: Allocator,
-mutex: std.Thread.Mutex,
 main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
 threads: Thread.List,
+detached: struct {
+    mutex: std.Io.Mutex,
+    list: std.DoublyLinkedList(void),
+},
 
 /// Empirically saw >128KB being used by the self-hosted backend to panic.
 const idle_stack_size = 256 * 1024;
@@ -167,13 +170,16 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
     errdefer gpa.free(allocated_slice);
     el.* = .{
         .gpa = gpa,
-        .mutex = .{},
         .main_fiber_buffer = undefined,
         .threads = .{
             .allocated = @ptrCast(allocated_slice[0..threads_size]),
             .reserved = 1,
             .active = 1,
         },
+        .detached = .{
+            .mutex = .init,
+            .list = .{},
+        },
     };
     const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
     main_fiber.* = .{
@@ -207,6 +213,23 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
 }
 
 pub fn deinit(el: *EventLoop) void {
+    while (true) cancel(el, detached_future: {
+        el.detached.mutex.lock(el.io()) catch |err| switch (err) {
+            error.Canceled => unreachable, // main fiber cannot be canceled
+        };
+        defer el.detached.mutex.unlock(el.io());
+        const detached: *DetachedClosure = @fieldParentPtr(
+            "detached_queue_node",
+            el.detached.list.pop() orelse break,
+        );
+        // notify the detached fiber that it is no longer allowed to recycle itself
+        detached.detached_queue_node = .{
+            .prev = &detached.detached_queue_node,
+            .next = &detached.detached_queue_node,
+            .data = {},
+        };
+        break :detached_future @ptrCast(detached.fiber);
+    }, &.{}, .@"1");
     const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
     for (el.threads.allocated[0..active_threads]) |*thread| {
         const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
@@ -460,7 +483,7 @@ const SwitchMessage = struct {
     const PendingTask = union(enum) {
         nothing,
         reschedule,
-        recycle: *Fiber,
+        recycle,
         register_awaiter: *?*Fiber,
         mutex_lock: struct {
             prev_state: Io.Mutex.State,
@@ -483,8 +506,10 @@ const SwitchMessage = struct {
                 assert(prev_fiber.queue_next == null);
                 el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
             },
-            .recycle => |fiber| {
-                el.recycle(fiber);
+            .recycle => {
+                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
+                assert(prev_fiber.queue_next == null);
+                el.recycle(prev_fiber);
             },
             .register_awaiter => |awaiter| {
                 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
@@ -609,21 +634,7 @@ fn fiberEntry() callconv(.naked) void {
     switch (builtin.cpu.arch) {
         .x86_64 => asm volatile (
             \\ leaq 8(%%rsp), %%rdi
-            \\ jmp %[AsyncClosure_call:P]
-            :
-            : [AsyncClosure_call] "X" (&AsyncClosure.call),
-        ),
-        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
-    }
-}
-
-fn fiberEntryDetached() callconv(.naked) void {
-    switch (builtin.cpu.arch) {
-        .x86_64 => asm volatile (
-            \\ leaq 8(%%rsp), %%rdi
-            \\ jmp %[DetachedClosure_call:P]
-            :
-            : [DetachedClosure_call] "X" (&DetachedClosure.call),
+            \\ jmpq *(%%rsp)
         ),
         else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
     }
@@ -649,29 +660,6 @@ const AsyncClosure = struct {
     }
 };
 
-const DetachedClosure = struct {
-    event_loop: *EventLoop,
-    fiber: *Fiber,
-    start: *const fn (context: *const anyopaque) void,
-
-    fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
-        return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
-    }
-
-    fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn {
-        message.handle(closure.event_loop);
-        std.log.debug("{*} performing async detached", .{closure.fiber});
-        closure.start(closure.contextPointer());
-        const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
-        if (awaiter) |a| {
-            closure.event_loop.yield(a, .nothing);
-        } else {
-            closure.event_loop.yield(null, .{ .recycle = closure.fiber });
-        }
-        unreachable; // switched to dead fiber
-    }
-};
-
 fn @"async"(
     userdata: ?*anyopaque,
     result: []u8,
@@ -695,11 +683,13 @@ fn @"async"(
     const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
         @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
     ) - @sizeOf(AsyncClosure));
+    const stack_end: [*]usize = @alignCast(@ptrCast(closure));
+    (stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)};
     fiber.* = .{
         .required_align = {},
         .context = switch (builtin.cpu.arch) {
             .x86_64 => .{
-                .rsp = @intFromPtr(closure) - @sizeOf(usize),
+                .rsp = @intFromPtr(stack_end - 1),
                 .rbp = 0,
                 .rip = @intFromPtr(&fiberEntry),
             },
@@ -722,6 +712,34 @@ fn @"async"(
     return @ptrCast(fiber);
 }
 
+const DetachedClosure = struct {
+    event_loop: *EventLoop,
+    fiber: *Fiber,
+    start: *const fn (context: *const anyopaque) void,
+    detached_queue_node: std.DoublyLinkedList(void).Node,
+
+    fn contextPointer(closure: *DetachedClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
+        return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(DetachedClosure));
+    }
+
+    fn call(closure: *DetachedClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(DetachedClosure))) noreturn {
+        message.handle(closure.event_loop);
+        std.log.debug("{*} performing async detached", .{closure.fiber});
+        closure.start(closure.contextPointer());
+        const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
+        closure.event_loop.yield(awaiter, pending_task: {
+            closure.event_loop.detached.mutex.lock(closure.event_loop.io()) catch |err| switch (err) {
+                error.Canceled => break :pending_task .nothing,
+            };
+            defer closure.event_loop.detached.mutex.unlock(closure.event_loop.io());
+            if (closure.detached_queue_node.next == &closure.detached_queue_node) break :pending_task .nothing;
+            closure.event_loop.detached.list.remove(&closure.detached_queue_node);
+            break :pending_task .recycle;
+        });
+        unreachable; // switched to dead fiber
+    }
+};
+
 fn go(
     userdata: ?*anyopaque,
     context: []const u8,
@@ -742,13 +760,15 @@ fn go(
     const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward(
         @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
     ) - @sizeOf(DetachedClosure));
+    const stack_end: [*]usize = @alignCast(@ptrCast(closure));
+    (stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)};
     fiber.* = .{
         .required_align = {},
         .context = switch (builtin.cpu.arch) {
             .x86_64 => .{
-                .rsp = @intFromPtr(closure) - @sizeOf(usize),
+                .rsp = @intFromPtr(stack_end - 1),
                 .rbp = 0,
-                .rip = @intFromPtr(&fiberEntryDetached),
+                .rip = @intFromPtr(&fiberEntry),
             },
             else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
         },
@@ -761,7 +781,19 @@ fn go(
         .event_loop = event_loop,
         .fiber = fiber,
         .start = start,
+        .detached_queue_node = .{ .data = {} },
     };
+    {
+        event_loop.detached.mutex.lock(event_loop.io()) catch |err| switch (err) {
+            error.Canceled => {
+                event_loop.recycle(fiber);
+                start(context.ptr);
+                return;
+            },
+        };
+        defer event_loop.detached.mutex.unlock(event_loop.io());
+        event_loop.detached.list.append(&closure.detached_queue_node);
+    }
     @memcpy(closure.contextPointer(), context);
 
     event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });