Commit fe6f1efde4

Jacob Young <jacobly0@users.noreply.github.com>
2025-03-27 06:49:01
EventLoop: prepare for threading
1 parent 4d56267
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -5,6 +5,7 @@ const Io = std.Io;
 const EventLoop = @This();
 
 gpa: Allocator,
+mutex: std.Thread.Mutex,
 queue: std.DoublyLinkedList(void),
 free: std.DoublyLinkedList(void),
 main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)),
@@ -39,6 +40,7 @@ const Fiber = struct {
 pub fn init(el: *EventLoop, gpa: Allocator) void {
     el.* = .{
         .gpa = gpa,
+        .mutex = .{},
         .queue = .{},
         .free = .{},
         .main_fiber_buffer = undefined,
@@ -48,7 +50,11 @@ pub fn init(el: *EventLoop, gpa: Allocator) void {
 
 fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
     assert(result_len <= max_result_len);
-    const free_node = el.free.pop() orelse {
+    const free_node = free_node: {
+        el.mutex.lock();
+        defer el.mutex.unlock();
+        break :free_node el.free.pop();
+    } orelse {
         const n = std.mem.alignForward(
             usize,
             @sizeOf(Fiber) + max_result_len + min_stack_size,
@@ -59,36 +65,48 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
     return @fieldParentPtr("queue_node", free_node);
 }
 
-fn yield(el: *EventLoop, optional_fiber: ?*Fiber) void {
-    if (optional_fiber) |fiber| {
-        const old = &current_fiber.regs;
-        current_fiber = fiber;
-        contextSwitch(old, &fiber.regs);
-        return;
-    }
-    if (el.queue.pop()) |node| {
-        const fiber: *Fiber = @fieldParentPtr("queue_node", node);
-        const old = &current_fiber.regs;
-        current_fiber = fiber;
-        contextSwitch(old, &fiber.regs);
-        return;
-    }
-    @panic("everything is done");
+fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void {
+    const message: SwitchMessage = .{
+        .ready_fiber = optional_fiber orelse if (ready_node: {
+            el.mutex.lock();
+            defer el.mutex.unlock();
+            break :ready_node el.queue.pop();
+        }) |ready_node|
+            @fieldParentPtr("queue_node", ready_node)
+        else if (register_awaiter) |_|
+            @panic("no other fiber to switch to in order to be able to register this fiber as an awaiter") // time to switch to an idle fiber?
+        else
+            return, // nothing to do
+        .register_awaiter = register_awaiter,
+    };
+    std.log.debug("switching from {*} to {*}", .{ current_fiber, message.ready_fiber });
+    SwitchMessage.handle(@ptrFromInt(contextSwitch(&current_fiber.regs, &message.ready_fiber.regs, @intFromPtr(&message))), el);
 }
 
-/// Equivalent to calling `yield` and then giving the fiber back to the event loop.
-fn exit(el: *EventLoop, optional_fiber: ?*Fiber) noreturn {
-    yield(el, optional_fiber);
-    @panic("TODO recycle the fiber");
-}
+const SwitchMessage = struct {
+    ready_fiber: *Fiber,
+    register_awaiter: ?*?*Fiber,
+
+    fn handle(message: *const SwitchMessage, el: *EventLoop) void {
+        const prev_fiber = current_fiber;
+        current_fiber = message.ready_fiber;
+        if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
+    }
+};
 
 fn schedule(el: *EventLoop, fiber: *Fiber) void {
+    el.mutex.lock();
+    defer el.mutex.unlock();
     el.queue.append(&fiber.queue_node);
 }
 
-fn myFiber(el: *EventLoop) *Fiber {
-    _ = el;
-    return current_fiber;
+fn recycle(el: *EventLoop, fiber: *Fiber) void {
+    std.log.debug("recyling {*}", .{fiber});
+    fiber.awaiter = undefined;
+    @memset(fiber.resultPointer()[0..max_result_len], undefined);
+    el.mutex.lock();
+    defer el.mutex.unlock();
+    el.free.append(&fiber.queue_node);
 }
 
 const Regs = extern struct {
@@ -101,7 +119,7 @@ const Regs = extern struct {
     rbp: usize,
 };
 
-const contextSwitch: *const fn (old: *Regs, new: *Regs) callconv(.c) void = @ptrCast(&contextSwitch_naked);
+const contextSwitch: *const fn (old: *Regs, new: *Regs, message: usize) callconv(.c) usize = @ptrCast(&contextSwitch_naked);
 
 noinline fn contextSwitch_naked() callconv(.naked) void {
     asm volatile (
@@ -121,6 +139,7 @@ noinline fn contextSwitch_naked() callconv(.naked) void {
         \\movq 0x28(%%rsi), %%rbx
         \\movq 0x30(%%rsi), %%rbp
         \\
+        \\movq %%rdx, %%rax
         \\ret
     );
 }
@@ -128,6 +147,7 @@ noinline fn contextSwitch_naked() callconv(.naked) void {
 fn popRet() callconv(.naked) void {
     asm volatile (
         \\pop %%rdi
+        \\movq %%rax, %%rsi
         \\ret
     );
 }
@@ -145,6 +165,7 @@ pub fn @"async"(
     };
     fiber.awaiter = null;
     fiber.queue_node = .{ .data = {} };
+    std.log.debug("allocated {*}", .{fiber});
 
     const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward(
         usize,
@@ -157,14 +178,16 @@ pub fn @"async"(
         .fiber = fiber,
         .start = start,
     };
-    const stack_end_ptr: [*]align(16) usize = @alignCast(@ptrCast(closure));
-    (stack_end_ptr - 1)[0] = 0;
-    (stack_end_ptr - 2)[0] = @intFromPtr(&AsyncClosure.call);
-    (stack_end_ptr - 3)[0] = @intFromPtr(closure);
-    (stack_end_ptr - 4)[0] = @intFromPtr(&popRet);
-
+    const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure));
+    const stack_top = (stack_end - 4)[0..4];
+    stack_top.* = .{
+        @intFromPtr(&popRet),
+        @intFromPtr(closure),
+        @intFromPtr(&AsyncClosure.call),
+        0,
+    };
     fiber.regs = .{
-        .rsp = @intFromPtr(stack_end_ptr - 4),
+        .rsp = @intFromPtr(stack_top),
         .r15 = 0,
         .r14 = 0,
         .r13 = 0,
@@ -181,30 +204,24 @@ const AsyncClosure = struct {
     _: void align(16) = {},
     event_loop: *EventLoop,
     context: ?*anyopaque,
-    fiber: *EventLoop.Fiber,
+    fiber: *Fiber,
     start: *const fn (context: ?*anyopaque, result: *anyopaque) void,
 
-    fn call(closure: *AsyncClosure) callconv(.c) void {
-        std.log.debug("wrap called in async", .{});
+    fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn {
+        message.handle(closure.event_loop);
+        std.log.debug("{*} performing async", .{closure.fiber});
         closure.start(closure.context, closure.fiber.resultPointer());
-        const awaiter = @atomicRmw(?*EventLoop.Fiber, &closure.fiber.awaiter, .Xchg, EventLoop.Fiber.finished, .seq_cst);
-        closure.event_loop.exit(awaiter);
+        const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
+        closure.event_loop.yield(awaiter, null);
+        unreachable; // switched to dead fiber
     }
 };
 
 pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
     const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
-    const future_fiber: *EventLoop.Fiber = @alignCast(@ptrCast(any_future));
+    const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
     const result_src = future_fiber.resultPointer()[0..result.len];
-    const my_fiber = event_loop.myFiber();
-
-    const prev = @atomicRmw(?*EventLoop.Fiber, &future_fiber.awaiter, .Xchg, my_fiber, .seq_cst);
-    if (prev == EventLoop.Fiber.finished) {
-        @memcpy(result, result_src);
-        return;
-    }
-    event_loop.yield(prev);
-    // Resumed when the value is available.
-    std.log.debug("yield returned in await", .{});
+    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter);
     @memcpy(result, result_src);
+    event_loop.recycle(future_fiber);
 }