Commit d958077203

Jacob Young <jacobly0@users.noreply.github.com>
2025-03-29 15:56:45
EventLoop: fix futex usage
How silly of me to forget that the kernel doesn't implement its own API. The scheduling is not great, but at least doesn't deadlock or hammer.
1 parent db0dd3a
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -11,7 +11,7 @@ gpa: Allocator,
 mutex: std.Thread.Mutex,
 queue: std.DoublyLinkedList(void),
 /// Atomic copy of queue.len
-queue_len: usize,
+queue_len: u32,
 free: std.DoublyLinkedList(void),
 main_fiber: Fiber,
 idle_count: usize,
@@ -20,8 +20,8 @@ exiting: bool,
 
 threadlocal var thread_index: u32 = undefined;
 
-/// Empirically saw 10KB being used by the self-hosted backend for logging.
-const idle_stack_size = 64 * 1024;
+/// Empirically saw >128KB being used by the self-hosted backend to panic.
+const idle_stack_size = 256 * 1024;
 
 const io_uring_entries = 64;
 
@@ -143,6 +143,7 @@ pub fn deinit(el: *EventLoop) void {
     const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
     for (el.threads.items[1..]) |*thread| thread.thread.join();
     el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
+    el.* = undefined;
 }
 
 fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
@@ -151,8 +152,9 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.Pe
         const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
             el.mutex.lock();
             defer el.mutex.unlock();
+            const expected_queue_len = std.math.lossyCast(u32, el.queue.len);
             const ready_node = el.queue.pop();
-            @atomicStore(usize, &el.queue_len, el.queue.len, .unordered);
+            _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic);
             break :ready_node ready_node;
         }) |ready_node|
             @alignCast(@fieldParentPtr("queue_node", ready_node))
@@ -172,20 +174,16 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.Pe
 }
 
 fn schedule(el: *EventLoop, fiber: *Fiber) void {
+    std.log.debug("scheduling {*}", .{fiber});
     if (idle_count: {
         el.mutex.lock();
         defer el.mutex.unlock();
+        const expected_queue_len = std.math.lossyCast(u32, el.queue.len);
         el.queue.append(&fiber.queue_node);
-        @atomicStore(usize, &el.queue_len, el.queue.len, .unordered);
+        _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic);
         break :idle_count el.idle_count;
     } > 0) {
-        _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), 1, switch (@bitSizeOf(usize)) {
-            8 => std.os.linux.FUTEX2.SIZE_U8,
-            16 => std.os.linux.FUTEX2.SIZE_U16,
-            32 => std.os.linux.FUTEX2.SIZE_U32,
-            64 => std.os.linux.FUTEX2.SIZE_U64,
-            else => @compileError("unsupported @sizeOf(usize)"),
-        } | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring
+        _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), 1, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring
         return;
     }
     if (el.threads.items.len == el.threads.capacity) return;
@@ -226,8 +224,8 @@ fn threadEntry(el: *EventLoop, index: usize) void {
     el.idle();
 }
 
-const UserData = enum(u64) {
-    queue_len_futex_wait,
+const CompletionKey = enum(u64) {
+    queue_len_futex_wait = 1,
     _,
 };
 
@@ -235,33 +233,44 @@ fn idle(el: *EventLoop) void {
     const thread: *Thread = &el.threads.items[thread_index];
     const iou = &thread.io_uring;
     var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
-    var futex_is_scheduled: bool = false;
+    var queue_len_futex_is_scheduled: bool = false;
 
     while (true) {
         el.yield(null, .nothing);
         if (@atomicLoad(bool, &el.exiting, .acquire)) return;
-        if (!futex_is_scheduled) {
+        if (!queue_len_futex_is_scheduled) {
             const sqe = getSqe(&thread.io_uring);
-            sqe.prep_rw(.FUTEX_WAIT, switch (@bitSizeOf(usize)) {
-                8 => std.os.linux.FUTEX2.SIZE_U8,
-                16 => std.os.linux.FUTEX2.SIZE_U16,
-                32 => std.os.linux.FUTEX2.SIZE_U32,
-                64 => std.os.linux.FUTEX2.SIZE_U64,
-                else => @compileError("unsupported @sizeOf(usize)"),
-            } | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0);
-            sqe.addr3 = std.math.maxInt(u64);
-            sqe.user_data = @intFromEnum(UserData.queue_len_futex_wait);
-            futex_is_scheduled = true;
+            sqe.prep_rw(.FUTEX_WAIT, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0);
+            sqe.addr3 = std.math.maxInt(u32);
+            sqe.user_data = @intFromEnum(CompletionKey.queue_len_futex_wait);
+            queue_len_futex_is_scheduled = true;
         }
         _ = iou.submit_and_wait(1) catch |err| switch (err) {
-            error.SignalInterrupt => 0,
+            error.SignalInterrupt => std.log.debug("submit_and_wait: SignalInterrupt", .{}),
             else => @panic(@errorName(err)),
         };
         for (cqes_buffer[0 .. iou.copy_cqes(&cqes_buffer, 1) catch |err| switch (err) {
-            error.SignalInterrupt => 0,
+            error.SignalInterrupt => cqes_len: {
+                std.log.debug("copy_cqes: SignalInterrupt", .{});
+                break :cqes_len 0;
+            },
             else => @panic(@errorName(err)),
-        }]) |cqe| switch (@as(UserData, @enumFromInt(cqe.user_data))) {
-            .queue_len_futex_wait => futex_is_scheduled = false,
+        }]) |cqe| switch (@as(CompletionKey, @enumFromInt(cqe.user_data))) {
+            .queue_len_futex_wait => {
+                switch (errno(cqe.res)) {
+                    .SUCCESS, .AGAIN => {},
+                    .INVAL => unreachable,
+                    else => |err| {
+                        std.posix.unexpectedErrno(err) catch {};
+                        @panic("unexpected");
+                    },
+                }
+                std.log.debug("{*} woken up with queue size of {d}", .{
+                    &thread.idle_context,
+                    @atomicLoad(u32, &el.queue_len, .unordered),
+                });
+                queue_len_futex_is_scheduled = false;
+            },
             _ => {
                 const fiber: *Fiber = @ptrFromInt(cqe.user_data);
                 const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer()));
@@ -296,14 +305,8 @@ const SwitchMessage = struct {
             },
             .exit => {
                 @atomicStore(bool, &el.exiting, true, .unordered);
-                @atomicStore(usize, &el.queue_len, std.math.maxInt(usize), .release);
-                _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), std.math.maxInt(i32), switch (@bitSizeOf(usize)) {
-                    8 => std.os.linux.FUTEX2.SIZE_U8,
-                    16 => std.os.linux.FUTEX2.SIZE_U16,
-                    32 => std.os.linux.FUTEX2.SIZE_U32,
-                    64 => std.os.linux.FUTEX2.SIZE_U64,
-                    else => @compileError("unsupported @sizeOf(usize)"),
-                } | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring
+                @atomicStore(u32, &el.queue_len, std.math.maxInt(u32), .release);
+                _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), std.math.maxInt(i32), std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring
             },
         }
     }