Commit 92b8378814

Andrew Kelley <andrew@ziglang.org>
2025-10-23 12:38:34
concurrent and await
1 parent dd945bf
Changed files (1)
lib
std
lib/std/Io/Kqueue.zig
@@ -172,9 +172,6 @@ pub fn init(k: *Kqueue, gpa: Allocator) !void {
                 .sp = @intFromPtr(idle_stack_end),
                 .fp = 0,
                 .pc = @intFromPtr(&mainIdleEntry),
-                .x18 = asm (""
-                    : [x18] "={x18}" (-> u64),
-                ),
             },
             .x86_64 => .{
                 .rsp = @intFromPtr(idle_stack_end - 1),
@@ -548,7 +545,6 @@ const Context = switch (builtin.cpu.arch) {
         sp: u64,
         fp: u64,
         pc: u64,
-        x18: u64,
     },
     .x86_64 => extern struct {
         rsp: u64,
@@ -562,12 +558,12 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
     return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
         .aarch64 => asm volatile (
             \\ ldp x0, x2, [x1]
-            \\ ldp x3, x18, [x2, #16]
+            \\ ldr x3, [x2, #16]
             \\ mov x4, sp
             \\ stp x4, fp, [x0]
             \\ adr x5, 0f
             \\ ldp x4, fp, [x2]
-            \\ stp x5, x18, [x0, #16]
+            \\ str x5, [x0, #16]
             \\ mov sp, x4
             \\ br x3
             \\0:
@@ -761,12 +757,18 @@ fn fiberEntry() callconv(.naked) void {
             :
             : [AsyncClosure_call] "X" (&AsyncClosure.call),
         ),
+        .aarch64 => asm volatile (
+            \\ mov x0, sp
+            \\ b %[AsyncClosure_call]
+            :
+            : [AsyncClosure_call] "X" (&AsyncClosure.call),
+        ),
         else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
     }
 }
 
 const AsyncClosure = struct {
-    event_loop: *Kqueue,
+    kqueue: *Kqueue,
     fiber: *Fiber,
     start: *const fn (context: *const anyopaque, result: *anyopaque) void,
     result_align: Alignment,
@@ -777,7 +779,7 @@ const AsyncClosure = struct {
     }
 
     fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
-        message.handle(closure.event_loop);
+        message.handle(closure.kqueue);
         const fiber = closure.fiber;
         std.log.debug("{*} performing async", .{fiber});
         closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
@@ -787,7 +789,7 @@ const AsyncClosure = struct {
             if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
             break :r a;
         };
-        closure.event_loop.yield(ready_awaiter, .nothing);
+        closure.kqueue.yield(ready_awaiter, .nothing);
         unreachable; // switched to dead fiber
     }
 
@@ -881,19 +883,52 @@ fn async(
 fn concurrent(
     userdata: ?*anyopaque,
     result_len: usize,
-    result_alignment: std.mem.Alignment,
+    result_alignment: Alignment,
     context: []const u8,
-    context_alignment: std.mem.Alignment,
+    context_alignment: Alignment,
     start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 ) error{OutOfMemory}!*Io.AnyFuture {
     const k: *Kqueue = @ptrCast(@alignCast(userdata));
-    _ = k;
-    _ = result_len;
-    _ = result_alignment;
-    _ = context;
-    _ = context_alignment;
-    _ = start;
-    @panic("TODO");
+    assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
+    assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
+    assert(result_len <= Fiber.max_result_size); // TODO
+    assert(context.len <= Fiber.max_context_size); // TODO
+
+    const fiber = try Fiber.allocate(k);
+    std.log.debug("allocated {*}", .{fiber});
+
+    const closure: *AsyncClosure = .fromFiber(fiber);
+    fiber.* = .{
+        .required_align = {},
+        .context = switch (builtin.cpu.arch) {
+            .x86_64 => .{
+                .rsp = @intFromPtr(closure) - @sizeOf(usize),
+                .rbp = 0,
+                .rip = @intFromPtr(&fiberEntry),
+            },
+            .aarch64 => .{
+                .sp = @intFromPtr(closure),
+                .fp = 0,
+                .pc = @intFromPtr(&fiberEntry),
+            },
+            else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
+        },
+        .awaiter = null,
+        .queue_next = null,
+        .cancel_thread = null,
+        .awaiting_completions = .initEmpty(),
+    };
+    closure.* = .{
+        .kqueue = k,
+        .fiber = fiber,
+        .start = start,
+        .result_align = result_alignment,
+        .already_awaited = false,
+    };
+    @memcpy(closure.contextPointer(), context);
+
+    k.schedule(.current(), .{ .head = fiber, .tail = fiber });
+    return @ptrCast(fiber);
 }
 
 fn await(
@@ -903,11 +938,11 @@ fn await(
     result_alignment: std.mem.Alignment,
 ) void {
     const k: *Kqueue = @ptrCast(@alignCast(userdata));
-    _ = k;
-    _ = any_future;
-    _ = result;
-    _ = result_alignment;
-    @panic("TODO");
+    const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
+    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
+        k.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
+    @memcpy(result, future_fiber.resultBytes(result_alignment));
+    k.recycle(future_fiber);
 }
 
 fn cancel(