Commit 4c7c0c4178

Andrew Kelley <andrew@ziglang.org>
2025-03-28 07:59:35
update threaded fibers impl to actually storing args
sorry, something still not working correctly
1 parent 31ed2d6
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -18,7 +18,11 @@ threads: std.ArrayListUnmanaged(Thread),
 threadlocal var current_idle_context: *Context = undefined;
 threadlocal var current_fiber_context: *Context = undefined;
 
+/// Also used for context.
 const max_result_len = 64;
+/// Also used for context.
+const max_result_align: std.mem.Alignment = .@"16";
+
 const min_stack_size = 4 * 1024 * 1024;
 const idle_stack_size = 32 * 1024;
 const stack_align = 16;
@@ -29,6 +33,8 @@ const Thread = struct {
 };
 
 const Fiber = struct {
+    _: void align(max_result_align.toByteUnits()) = {},
+
     context: Context,
     awaiter: ?*Fiber,
     queue_node: std.DoublyLinkedList(void).Node,
@@ -39,14 +45,27 @@ const Fiber = struct {
         const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
         return base[0..std.mem.alignForward(
             usize,
-            @sizeOf(Fiber) + max_result_len + min_stack_size,
+            resultOffset() + max_result_len + min_stack_size,
             std.heap.page_size_max,
         )];
     }
 
+    fn argsOffset() usize {
+        return max_result_align.forward(@sizeOf(Fiber));
+    }
+
+    fn resultOffset() usize {
+        return max_result_align.forward(argsOffset() + max_result_len);
+    }
+
+    fn argsSlice(f: *Fiber) []u8 {
+        const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
+        return base[argsOffset()..][0..max_result_len];
+    }
+
     fn resultSlice(f: *Fiber) []u8 {
         const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
-        return base[@sizeOf(Fiber)..][0..max_result_len];
+        return base[resultOffset()..][0..max_result_len];
     }
 
     fn stackEndPointer(f: *Fiber) [*]u8 {
@@ -102,7 +121,7 @@ pub fn deinit(el: *EventLoop) void {
     assert(el.queue.len == 0); // pending async
     el.yield(null, &el.exit_awaiter);
     while (el.free.pop()) |free_node| {
-        const free_fiber: *Fiber = @fieldParentPtr("queue_node", free_node);
+        const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node));
         el.gpa.free(free_fiber.allocatedSlice());
     }
     const idle_context_offset = std.mem.alignForward(usize, el.threads.items.len * @sizeOf(Thread), @alignOf(Context));
@@ -112,8 +131,7 @@ pub fn deinit(el: *EventLoop) void {
     el.gpa.free(allocated_ptr[0..idle_stack_end]);
 }
 
-fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
-    assert(result_len <= max_result_len);
+fn allocateFiber(el: *EventLoop) error{OutOfMemory}!*Fiber {
     const free_node = free_node: {
         el.mutex.lock();
         defer el.mutex.unlock();
@@ -121,12 +139,12 @@ fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
     } orelse {
         const n = std.mem.alignForward(
             usize,
-            @sizeOf(Fiber) + max_result_len + min_stack_size,
+            Fiber.resultOffset() + max_result_len + min_stack_size,
             std.heap.page_size_max,
         );
         return @alignCast(@ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), n)));
     };
-    return @fieldParentPtr("queue_node", free_node);
+    return @alignCast(@fieldParentPtr("queue_node", free_node));
 }
 
 fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) void {
@@ -136,7 +154,7 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v
             defer el.mutex.unlock();
             break :ready_node el.queue.pop();
         }) |ready_node|
-            @fieldParentPtr("queue_node", ready_node)
+            @alignCast(@fieldParentPtr("queue_node", ready_node))
         else
             break :ready_context current_idle_context;
         break :ready_context &ready_fiber.context;
@@ -213,7 +231,7 @@ const SwitchMessage = extern struct {
     register_awaiter: ?*?*Fiber,
 
     fn handle(message: *const SwitchMessage, el: *EventLoop) void {
-        const prev_fiber: *Fiber = @fieldParentPtr("context", message.prev_context);
+        const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context));
         current_fiber_context = message.ready_context;
         if (message.register_awaiter) |awaiter| if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
     }
@@ -279,17 +297,25 @@ fn fiberEntry() callconv(.naked) void {
 
 pub fn @"async"(
     userdata: ?*anyopaque,
-    eager_result: []u8,
-    context: ?*anyopaque,
-    start: *const fn (context: ?*anyopaque, result: *anyopaque) void,
+    result: []u8,
+    result_alignment: std.mem.Alignment,
+    context: []const u8,
+    context_alignment: std.mem.Alignment,
+    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 ) ?*std.Io.AnyFuture {
+    assert(result_alignment.compare(.lte, max_result_align)); // TODO
+    assert(context_alignment.compare(.lte, max_result_align)); // TODO
+    assert(result.len <= max_result_len); // TODO
+    assert(context.len <= max_result_len); // TODO
+
     const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
-    const fiber = event_loop.allocateFiber(eager_result.len) catch {
-        start(context, eager_result.ptr);
+    const fiber = event_loop.allocateFiber() catch {
+        start(context.ptr, result.ptr);
         return null;
     };
     fiber.awaiter = null;
     fiber.queue_node = .{ .data = {} };
+    @memcpy(fiber.argsSlice()[0..context.len], context);
     std.log.debug("allocated {*}", .{fiber});
 
     const closure: *AsyncClosure = @ptrFromInt(std.mem.alignBackward(
@@ -299,7 +325,6 @@ pub fn @"async"(
     ));
     closure.* = .{
         .event_loop = event_loop,
-        .context = context,
         .fiber = fiber,
         .start = start,
     };
@@ -316,14 +341,13 @@ pub fn @"async"(
 
 const AsyncClosure = struct {
     event_loop: *EventLoop,
-    context: ?*anyopaque,
     fiber: *Fiber,
-    start: *const fn (context: ?*anyopaque, result: *anyopaque) void,
+    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 
     fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn {
         message.handle(closure.event_loop);
         std.log.debug("{*} performing async", .{closure.fiber});
-        closure.start(closure.context, closure.fiber.resultSlice().ptr);
+        closure.start(closure.fiber.argsSlice().ptr, closure.fiber.resultSlice().ptr);
         const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
         closure.event_loop.yield(awaiter, null);
         unreachable; // switched to dead fiber