Commit 1493c3b5f3

Jacob Young <jacobly0@users.noreply.github.com>
2025-03-28 15:05:52
EventLoop: move context after the async closure
This avoids needing to store more sizes and alignments. Only the result alignment needs to be stored, because `Fiber` is at a fixed zero offset.
1 parent 29355ff
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -4,28 +4,23 @@ const assert = std.debug.assert;
 const Allocator = std.mem.Allocator;
 const Io = std.Io;
 const EventLoop = @This();
+const Alignment = std.mem.Alignment;
 
 gpa: Allocator,
 mutex: std.Thread.Mutex,
 cond: std.Thread.Condition,
 queue: std.DoublyLinkedList(void),
 free: std.DoublyLinkedList(void),
-main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)),
+main_context: Context,
 exit_awaiter: ?*Fiber,
 idle_count: usize,
 threads: std.ArrayListUnmanaged(Thread),
 
 threadlocal var current_idle_context: *Context = undefined;
-threadlocal var current_fiber_context: *Context = undefined;
+threadlocal var current_context: *Context = undefined;
 
-/// Also used for context.
-const max_result_len = 64;
-/// Also used for context.
-const max_result_align: std.mem.Alignment = .@"16";
-
-const min_stack_size = 4 * 1024 * 1024;
+/// Empirically saw 10KB being used by the self-hosted backend for logging.
 const idle_stack_size = 32 * 1024;
-const stack_align = 16;
 
 const Thread = struct {
     thread: std.Thread,
@@ -33,45 +28,53 @@ const Thread = struct {
 };
 
 const Fiber = struct {
-    _: void align(max_result_align.toByteUnits()) = {},
-
     context: Context,
     awaiter: ?*Fiber,
     queue_node: std.DoublyLinkedList(void).Node,
+    result_align: Alignment,
 
     const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber)));
 
-    fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
-        const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
-        return base[0..std.mem.alignForward(
+    const max_result_align: Alignment = .@"16";
+    const max_result_size = max_result_align.forward(64);
+    /// This includes any stack realignments that need to happen, and also the
+    /// initial frame return address slot and argument frame, depending on target.
+    const min_stack_size = 4 * 1024 * 1024;
+    const max_context_align: Alignment = .@"16";
+    const max_context_size = max_context_align.forward(1024);
+    const allocation_size = std.mem.alignForward(
+        usize,
+        std.mem.alignForward(
             usize,
-            resultOffset() + max_result_len + min_stack_size,
-            std.heap.page_size_max,
-        )];
-    }
-
-    fn argsOffset() usize {
-        return max_result_align.forward(@sizeOf(Fiber));
-    }
-
-    fn resultOffset() usize {
-        return max_result_align.forward(argsOffset() + max_result_len);
-    }
-
-    fn argsSlice(f: *Fiber) []u8 {
-        const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
-        return base[argsOffset()..][0..max_result_len];
+            max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
+            @max(@alignOf(AsyncClosure), max_context_align.toByteUnits()),
+        ) + @sizeOf(AsyncClosure) + max_context_size,
+        std.heap.page_size_max,
+    );
+
+    fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
+        return if (free_node: {
+            el.mutex.lock();
+            defer el.mutex.unlock();
+            break :free_node el.free.pop();
+        }) |free_node|
+            @alignCast(@fieldParentPtr("queue_node", free_node))
+        else
+            @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size));
     }
 
-    fn resultSlice(f: *Fiber) []u8 {
-        const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
-        return base[resultOffset()..][0..max_result_len];
+    fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
+        return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
     }
 
-    fn stackEndPointer(f: *Fiber) [*]u8 {
+    fn allocatedEnd(f: *Fiber) [*]u8 {
         const allocated_slice = f.allocatedSlice();
         return allocated_slice[allocated_slice.len..].ptr;
     }
+
+    fn resultPointer(f: *Fiber) [*]u8 {
+        return @ptrFromInt(f.result_align.forward(@intFromPtr(f) + @sizeOf(Fiber)));
+    }
 };
 
 pub fn io(el: *EventLoop) Io {
@@ -88,7 +91,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
     const threads_bytes = ((std.Thread.getCpuCount() catch 1) -| 1) * @sizeOf(Thread);
     const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context));
     const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
-    const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context), stack_align), idle_stack_end_offset);
+    const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context)), idle_stack_end_offset);
     errdefer gpa.free(allocated_slice);
     el.* = .{
         .gpa = gpa,
@@ -96,13 +99,13 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
         .cond = .{},
         .queue = .{},
         .free = .{},
-        .main_fiber_buffer = undefined,
+        .main_context = undefined,
         .exit_awaiter = null,
         .idle_count = 0,
         .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])),
     };
     const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)]));
-    const idle_stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
+    const idle_stack_end: [*]align(@max(@alignOf(Thread), @alignOf(Context))) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
     (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
     main_idle_context.* = .{
         .rsp = @intFromPtr(idle_stack_end - 1),
@@ -111,9 +114,8 @@ pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
     };
     std.log.debug("created main idle {*}", .{main_idle_context});
     current_idle_context = main_idle_context;
-    const current_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
-    std.log.debug("created main fiber {*}", .{current_fiber});
-    current_fiber_context = &current_fiber.context;
+    std.log.debug("created main {*}", .{&el.main_context});
+    current_context = &el.main_context;
 }
 
 pub fn deinit(el: *EventLoop) void {
@@ -125,27 +127,11 @@ pub fn deinit(el: *EventLoop) void {
     }
     const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context));
     const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
-    const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context), stack_align)) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
+    const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context))) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
     for (el.threads.items) |*thread| thread.thread.join();
     el.gpa.free(allocated_ptr[0..idle_stack_end]);
 }
 
-fn allocateFiber(el: *EventLoop) error{OutOfMemory}!*Fiber {
-    const free_node = free_node: {
-        el.mutex.lock();
-        defer el.mutex.unlock();
-        break :free_node el.free.pop();
-    } orelse {
-        const n = std.mem.alignForward(
-            usize,
-            Fiber.resultOffset() + max_result_len + min_stack_size,
-            std.heap.page_size_max,
-        );
-        return @alignCast(@ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), n)));
-    };
-    return @alignCast(@fieldParentPtr("queue_node", free_node));
-}
-
 fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void {
     const ready_context: *Context = ready_context: {
         const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
@@ -159,7 +145,7 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v
         break :ready_context &ready_fiber.context;
     };
     const message: SwitchMessage = .{
-        .prev_context = current_fiber_context,
+        .prev_context = current_context,
         .ready_context = ready_context,
         .register_awaiter = register_awaiter,
     };
@@ -189,14 +175,13 @@ fn schedule(el: *EventLoop, fiber: *Fiber) void {
 
 fn recycle(el: *EventLoop, fiber: *Fiber) void {
     std.log.debug("recyling {*}", .{fiber});
-    fiber.awaiter = undefined;
-    @memset(fiber.resultSlice(), undefined);
+    @memset(fiber.allocatedSlice(), undefined);
     el.mutex.lock();
     defer el.mutex.unlock();
     el.free.append(&fiber.queue_node);
 }
 
-fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.c) noreturn {
+fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
     message.handle(el);
     el.yield(el.idle(), null);
     unreachable; // switched to dead fiber
@@ -205,7 +190,7 @@ fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.c) noreturn
 fn threadEntry(el: *EventLoop, thread: *Thread) void {
     std.log.debug("created thread idle {*}", .{&thread.idle_context});
     current_idle_context = &thread.idle_context;
-    current_fiber_context = &thread.idle_context;
+    current_context = &thread.idle_context;
     _ = el.idle();
 }
 
@@ -230,7 +215,7 @@ const SwitchMessage = extern struct {
     register_awaiter: ?*?*Fiber,
 
     fn handle(message: *const SwitchMessage, el: *EventLoop) void {
-        current_fiber_context = message.ready_context;
+        current_context = message.ready_context;
         if (message.register_awaiter) |awaiter| {
             const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context));
             if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
@@ -238,10 +223,13 @@ const SwitchMessage = extern struct {
     }
 };
 
-const Context = extern struct {
-    rsp: usize,
-    rbp: usize,
-    rip: usize,
+const Context = switch (builtin.cpu.arch) {
+    .x86_64 => extern struct {
+        rsp: u64,
+        rbp: u64,
+        rip: u64,
+    },
+    else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 };
 
 inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
@@ -299,42 +287,45 @@ fn fiberEntry() callconv(.naked) void {
 pub fn @"async"(
     userdata: ?*anyopaque,
     result: []u8,
-    result_alignment: std.mem.Alignment,
+    result_alignment: Alignment,
     context: []const u8,
-    context_alignment: std.mem.Alignment,
+    context_alignment: Alignment,
     start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 ) ?*std.Io.AnyFuture {
-    assert(result_alignment.compare(.lte, max_result_align)); // TODO
-    assert(context_alignment.compare(.lte, max_result_align)); // TODO
-    assert(result.len <= max_result_len); // TODO
-    assert(context.len <= max_result_len); // TODO
+    assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
+    assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
+    assert(result.len <= Fiber.max_result_size); // TODO
+    assert(context.len <= Fiber.max_context_size); // TODO
 
     const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
-    const fiber = event_loop.allocateFiber() catch {
+    const fiber = Fiber.allocate(event_loop) catch {
         start(context.ptr, result.ptr);
         return null;
     };
-    fiber.awaiter = null;
-    fiber.queue_node = .{ .data = {} };
-    @memcpy(fiber.argsSlice()[0..context.len], context);
     std.log.debug("allocated {*}", .{fiber});
 
-    const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward(
-        usize,
-        @intFromPtr(fiber.stackEndPointer() - @sizeOf(AsyncClosure)),
-        @max(@alignOf(AsyncClosure), stack_align),
-    ));
+    const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
+        @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
+    ) - @sizeOf(AsyncClosure));
+    fiber.* = .{
+        .context = switch (builtin.cpu.arch) {
+            .x86_64 => .{
+                .rsp = @intFromPtr(closure) - @sizeOf(usize),
+                .rbp = 0,
+                .rip = @intFromPtr(&fiberEntry),
+            },
+            else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
+        },
+        .awaiter = null,
+        .queue_node = undefined,
+        .result_align = result_alignment,
+    };
     closure.* = .{
         .event_loop = event_loop,
         .fiber = fiber,
         .start = start,
     };
-    const stack_end: [*]align(stack_align) usize = @alignCast(@ptrCast(closure));
-    fiber.context = .{
-        .rsp = @intFromPtr(stack_end - 1),
-        .rbp = 0,
-        .rip = @intFromPtr(&fiberEntry),
-    };
+    @memcpy(closure.contextPointer(), context);
 
     event_loop.schedule(fiber);
     return @ptrCast(fiber);
@@ -345,10 +336,14 @@ const AsyncClosure = struct {
     fiber: *Fiber,
     start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 
-    fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn {
+    fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
+        return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
+    }
+
+    fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
         message.handle(closure.event_loop);
         std.log.debug("{*} performing async", .{closure.fiber});
-        closure.start(closure.fiber.argsSlice().ptr, closure.fiber.resultSlice().ptr);
+        closure.start(closure.contextPointer(), closure.fiber.resultPointer());
         const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
         closure.event_loop.yield(awaiter, null);
         unreachable; // switched to dead fiber
@@ -358,8 +353,7 @@ const AsyncClosure = struct {
 pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
     const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
     const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
-    const result_src = future_fiber.resultSlice()[0..result.len];
     if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter);
-    @memcpy(result, result_src);
+    @memcpy(result, future_fiber.resultPointer());
     event_loop.recycle(future_fiber);
 }