Commit f1dd06b01f

Jacob Young <jacobly0@users.noreply.github.com>
2025-03-28 00:32:26
EventLoop: implement main idle fiber
1 parent 9d0f44f
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -11,19 +11,21 @@ cond: std.Thread.Condition,
 queue: std.DoublyLinkedList(void),
 free: std.DoublyLinkedList(void),
 main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)),
-exiting: bool,
+exit_awaiter: ?*Fiber,
 idle_count: usize,
 threads: std.ArrayListUnmanaged(Thread),
 
-threadlocal var current_thread: *Thread = undefined;
-threadlocal var current_fiber: *Fiber = undefined;
+threadlocal var current_idle_context: *Context = undefined;
+threadlocal var current_fiber_context: *Context = undefined;
 
 const max_result_len = 64;
 const min_stack_size = 4 * 1024 * 1024;
+const idle_stack_size = 32 * 1024;
+const stack_align = 16;
 
 const Thread = struct {
     thread: std.Thread,
-    idle_fiber: Fiber,
+    idle_context: Context,
 };
 
 const Fiber = struct {
@@ -54,6 +56,11 @@ const Fiber = struct {
 };
 
 pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
+    const threads_bytes = ((std.Thread.getCpuCount() catch 1) -| 1) * @sizeOf(Thread);
+    const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context));
+    const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
+    const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context), stack_align), idle_stack_end_offset);
+    errdefer gpa.free(allocated_slice);
     el.* = .{
         .gpa = gpa,
         .mutex = .{},
@@ -61,28 +68,37 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
         .queue = .{},
         .free = .{},
         .main_fiber_buffer = undefined,
-        .exiting = false,
+        .exit_awaiter = null,
         .idle_count = 0,
-        .threads = try .initCapacity(gpa, @max(std.Thread.getCpuCount() catch 1, 1)),
+        .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])),
     };
-    current_thread = el.threads.addOneAssumeCapacity();
-    current_fiber = @ptrCast(&el.main_fiber_buffer);
+    const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)]));
+    const idle_stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
+    (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
+    main_idle_context.* = .{
+        .rsp = @intFromPtr(idle_stack_end - 1),
+        .rbp = 0,
+        .rip = @intFromPtr(&mainIdleEntry),
+    };
+    std.log.debug("created main idle {*}", .{main_idle_context});
+    current_idle_context = main_idle_context;
+    const current_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
+    std.log.debug("created main fiber {*}", .{current_fiber});
+    current_fiber_context = &current_fiber.context;
 }
 
 pub fn deinit(el: *EventLoop) void {
-    {
-        el.mutex.lock();
-        defer el.mutex.unlock();
-        assert(el.queue.len == 0); // pending async
-        el.exiting = true;
-    }
-    el.cond.broadcast();
+    assert(el.queue.len == 0); // pending async
+    el.yield(null, &el.exit_awaiter);
     while (el.free.pop()) |free_node| {
         const free_fiber: *Fiber = @fieldParentPtr("queue_node", free_node);
         el.gpa.free(free_fiber.allocatedSlice());
     }
-    for (el.threads.items[1..]) |*thread| thread.thread.join();
-    el.threads.deinit(el.gpa);
+    const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context));
+    const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
+    const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context), stack_align)) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
+    for (el.threads.items) |*thread| thread.thread.join();
+    el.gpa.free(allocated_ptr[0..idle_stack_end]);
 }
 
 fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
@@ -103,40 +119,44 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
 }
 
 fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void {
-    const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
-        el.mutex.lock();
-        defer el.mutex.unlock();
-        break :ready_node el.queue.pop();
-    }) |ready_node|
-        @fieldParentPtr("queue_node", ready_node)
-    else
-        &current_thread.idle_fiber;
+    const ready_context: *Context = ready_context: {
+        const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
+            el.mutex.lock();
+            defer el.mutex.unlock();
+            break :ready_node el.queue.pop();
+        }) |ready_node|
+            @fieldParentPtr("queue_node", ready_node)
+        else
+            break :ready_context current_idle_context;
+        break :ready_context &ready_fiber.context;
+    };
     const message: SwitchMessage = .{
-        .prev_context = &current_fiber.context,
-        .ready_context = &ready_fiber.context,
+        .prev_context = current_fiber_context,
+        .ready_context = ready_context,
         .register_awaiter = register_awaiter,
     };
-    std.log.debug("switching from {*} to {*}", .{
-        @as(*Fiber, @fieldParentPtr("context", message.prev_context)),
-        @as(*Fiber, @fieldParentPtr("context", message.ready_context)),
-    });
+    std.log.debug("switching from {*} to {*}", .{ message.prev_context, message.ready_context });
     contextSwitch(&message).handle(el);
 }
 
 fn schedule(el: *EventLoop, fiber: *Fiber) void {
-    signal: {
-        el.mutex.lock();
-        defer el.mutex.unlock();
-        el.queue.append(&fiber.queue_node);
-        if (el.idle_count > 0) break :signal;
-        if (el.threads.items.len == el.threads.capacity) return;
-        const thread = el.threads.addOneAssumeCapacity();
-        thread.thread = std.Thread.spawn(.{
-            .stack_size = min_stack_size,
-            .allocator = el.gpa,
-        }, threadEntry, .{ el, thread }) catch return;
+    el.mutex.lock();
+    el.queue.append(&fiber.queue_node);
+    if (el.idle_count > 0) {
+        el.mutex.unlock();
+        el.cond.signal();
+        return;
     }
-    el.cond.signal();
+    defer el.mutex.unlock();
+    if (el.threads.items.len == el.threads.capacity) return;
+    const thread = el.threads.addOneAssumeCapacity();
+    thread.thread = std.Thread.spawn(.{
+        .stack_size = idle_stack_size,
+        .allocator = el.gpa,
+    }, threadEntry, .{ el, thread }) catch {
+        el.threads.items.len -= 1;
+        return;
+    };
 }
 
 fn recycle(el: *EventLoop, fiber: *Fiber) void {
@@ -148,14 +168,28 @@ fn recycle(el: *EventLoop, fiber: *Fiber) void {
     el.free.append(&fiber.queue_node);
 }
 
+fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.c) noreturn {
+    message.handle(el);
+    el.yield(el.idle(), null);
+    unreachable; // switched to dead fiber
+}
+
 fn threadEntry(el: *EventLoop, thread: *Thread) void {
-    current_thread = thread;
-    current_fiber = &thread.idle_fiber;
+    std.log.debug("created thread idle {*}", .{&thread.idle_context});
+    current_idle_context = &thread.idle_context;
+    current_fiber_context = &thread.idle_context;
+    _ = el.idle();
+}
+
+fn idle(el: *EventLoop) *Fiber {
     while (true) {
         el.yield(null, null);
+        if (@atomicLoad(?*Fiber, &el.exit_awaiter, .acquire)) |exit_awaiter| {
+            el.cond.broadcast();
+            return exit_awaiter;
+        }
         el.mutex.lock();
         defer el.mutex.unlock();
-        if (el.exiting) return;
         el.idle_count += 1;
         defer el.idle_count -= 1;
         el.cond.wait(&el.mutex);
@@ -169,7 +203,7 @@ const SwitchMessage = extern struct {
 
     fn handle(message: *const SwitchMessage, el: *EventLoop) void {
         const prev_fiber: *Fiber = @fieldParentPtr("context", message.prev_context);
-        current_fiber = @fieldParentPtr("context", message.ready_context);
+        current_fiber_context = message.ready_context;
         if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
     }
 };
@@ -208,6 +242,18 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
     };
 }
 
+fn mainIdleEntry() callconv(.naked) void {
+    switch (builtin.cpu.arch) {
+        .x86_64 => asm volatile (
+            \\ movq (%%rsp), %%rdi
+            \\ jmp %[mainIdle:P]
+            :
+            : [mainIdle] "X" (&mainIdle),
+        ),
+        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
+    }
+}
+
 fn fiberEntry() callconv(.naked) void {
     switch (builtin.cpu.arch) {
         .x86_64 => asm volatile (
@@ -238,7 +284,7 @@ pub fn @"async"(
     const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward(
         usize,
         @intFromPtr(fiber.stackEndPointer() - @sizeOf(AsyncClosure)),
-        @alignOf(AsyncClosure),
+        @max(@alignOf(AsyncClosure), stack_align),
     ));
     closure.* = .{
         .event_loop = event_loop,
@@ -246,7 +292,7 @@ pub fn @"async"(
         .fiber = fiber,
         .start = start,
     };
-    const stack_end: [*]align(16) usize = @alignCast(@ptrCast(closure));
+    const stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(closure));
     fiber.context = .{
         .rsp = @intFromPtr(stack_end - 1),
         .rbp = 0,
@@ -258,7 +304,6 @@ pub fn @"async"(
 }
 
 const AsyncClosure = struct {
-    _: void align(16) = {},
     event_loop: *EventLoop,
     context: ?*anyopaque,
     fiber: *Fiber,