Commit 9d0f44f08a

Jacob Young <jacobly0@users.noreply.github.com>
2025-03-27 22:19:53
EventLoop: add threads
1 parent 629a204
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -7,15 +7,25 @@ const EventLoop = @This();
 
 gpa: Allocator,
 mutex: std.Thread.Mutex,
+cond: std.Thread.Condition,
 queue: std.DoublyLinkedList(void),
 free: std.DoublyLinkedList(void),
 main_fiber_buffer: [@sizeOf(Fiber) + max_result_len]u8 align(@alignOf(Fiber)),
+exiting: bool,
+idle_count: usize,
+threads: std.ArrayListUnmanaged(Thread),
 
+threadlocal var current_thread: *Thread = undefined;
 threadlocal var current_fiber: *Fiber = undefined;
 
 const max_result_len = 64;
 const min_stack_size = 4 * 1024 * 1024;
 
+const Thread = struct {
+    thread: std.Thread,
+    idle_fiber: Fiber,
+};
+
 const Fiber = struct {
     context: Context,
     awaiter: ?*Fiber,
@@ -23,32 +33,58 @@ const Fiber = struct {
 
     const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber)));
 
-    fn resultPointer(f: *Fiber) [*]u8 {
-        const base: [*]u8 = @ptrCast(f);
-        return base + @sizeOf(Fiber);
-    }
-
-    fn stackEndPointer(f: *Fiber) [*]u8 {
-        const base: [*]u8 = @ptrCast(f);
-        return base + std.mem.alignForward(
+    fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
+        const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
+        return base[0..std.mem.alignForward(
             usize,
             @sizeOf(Fiber) + max_result_len + min_stack_size,
             std.heap.page_size_max,
-        );
+        )];
+    }
+
+    fn resultSlice(f: *Fiber) []u8 {
+        const base: [*]align(@alignOf(Fiber)) u8 = @ptrCast(f);
+        return base[@sizeOf(Fiber)..][0..max_result_len];
+    }
+
+    fn stackEndPointer(f: *Fiber) [*]u8 {
+        const allocated_slice = f.allocatedSlice();
+        return allocated_slice[allocated_slice.len..].ptr;
     }
 };
 
-pub fn init(el: *EventLoop, gpa: Allocator) void {
+pub fn init(el: *EventLoop, gpa: Allocator) error{OutOfMemory}!void {
     el.* = .{
         .gpa = gpa,
         .mutex = .{},
+        .cond = .{},
         .queue = .{},
         .free = .{},
         .main_fiber_buffer = undefined,
+        .exiting = false,
+        .idle_count = 0,
+        .threads = try .initCapacity(gpa, @max(std.Thread.getCpuCount() catch 1, 1)),
     };
+    current_thread = el.threads.addOneAssumeCapacity();
     current_fiber = @ptrCast(&el.main_fiber_buffer);
 }
 
+pub fn deinit(el: *EventLoop) void {
+    {
+        el.mutex.lock();
+        defer el.mutex.unlock();
+        assert(el.queue.len == 0); // pending async
+        el.exiting = true;
+    }
+    el.cond.broadcast();
+    while (el.free.pop()) |free_node| {
+        const free_fiber: *Fiber = @fieldParentPtr("queue_node", free_node);
+        el.gpa.free(free_fiber.allocatedSlice());
+    }
+    for (el.threads.items[1..]) |*thread| thread.thread.join();
+    el.threads.deinit(el.gpa);
+}
+
 fn allocateFiber(el: *EventLoop, result_len: usize) error{OutOfMemory}!*Fiber {
     assert(result_len <= max_result_len);
     const free_node = free_node: {
@@ -73,10 +109,8 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v
         break :ready_node el.queue.pop();
     }) |ready_node|
         @fieldParentPtr("queue_node", ready_node)
-    else if (register_awaiter) |_| // time to switch to an idle fiber?
-        @panic("no other fiber to switch to in order to be able to register this fiber as an awaiter")
-    else // nothing to do
-        return;
+    else
+        &current_thread.idle_fiber;
     const message: SwitchMessage = .{
         .prev_context = &current_fiber.context,
         .ready_context = &ready_fiber.context,
@@ -90,20 +124,44 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, register_awaiter: ?*?*Fiber) v
 }
 
 fn schedule(el: *EventLoop, fiber: *Fiber) void {
-    el.mutex.lock();
-    defer el.mutex.unlock();
-    el.queue.append(&fiber.queue_node);
+    signal: {
+        el.mutex.lock();
+        defer el.mutex.unlock();
+        el.queue.append(&fiber.queue_node);
+        if (el.idle_count > 0) break :signal;
+        if (el.threads.items.len == el.threads.capacity) return;
+        const thread = el.threads.addOneAssumeCapacity();
+        thread.thread = std.Thread.spawn(.{
+            .stack_size = min_stack_size,
+            .allocator = el.gpa,
+        }, threadEntry, .{ el, thread }) catch return;
+    }
+    el.cond.signal();
 }
 
 fn recycle(el: *EventLoop, fiber: *Fiber) void {
     std.log.debug("recyling {*}", .{fiber});
     fiber.awaiter = undefined;
-    @memset(fiber.resultPointer()[0..max_result_len], undefined);
+    @memset(fiber.resultSlice(), undefined);
     el.mutex.lock();
     defer el.mutex.unlock();
     el.free.append(&fiber.queue_node);
 }
 
+fn threadEntry(el: *EventLoop, thread: *Thread) void {
+    current_thread = thread;
+    current_fiber = &thread.idle_fiber;
+    while (true) {
+        el.yield(null, null);
+        el.mutex.lock();
+        defer el.mutex.unlock();
+        if (el.exiting) return;
+        el.idle_count += 1;
+        defer el.idle_count -= 1;
+        el.cond.wait(&el.mutex);
+    }
+}
+
 const SwitchMessage = extern struct {
     prev_context: *Context,
     ready_context: *Context,
@@ -209,7 +267,7 @@ const AsyncClosure = struct {
     fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.c) noreturn {
         message.handle(closure.event_loop);
         std.log.debug("{*} performing async", .{closure.fiber});
-        closure.start(closure.context, closure.fiber.resultPointer());
+        closure.start(closure.context, closure.fiber.resultSlice().ptr);
         const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
         closure.event_loop.yield(awaiter, null);
         unreachable; // switched to dead fiber
@@ -219,7 +277,7 @@ const AsyncClosure = struct {
 pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
     const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
     const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
-    const result_src = future_fiber.resultPointer()[0..result.len];
+    const result_src = future_fiber.resultSlice()[0..result.len];
     if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter);
     @memcpy(result, result_src);
     event_loop.recycle(future_fiber);