Commit 6a64c9b7c8

Andrew Kelley <andrew@ziglang.org>
2025-10-23 14:24:41
std.Io.Kqueue: add missing Thread deinit logic
1 parent cc11dd1
Changed files (1)
lib
std
lib/std/Io/Kqueue.zig
@@ -36,6 +36,14 @@ const Thread = struct {
     kq_fd: posix.fd_t,
     idle_search_index: u32,
     steal_ready_search_index: u32,
+    /// For ensuring multiple fibers waiting on the same file descriptor and
+    /// filter use the same kevent.
+    wait_queues: std.AutoArrayHashMapUnmanaged(WaitQueueKey, *Fiber),
+
+    const WaitQueueKey = struct {
+        ident: usize,
+        filter: i32,
+    };
 
     const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
 
@@ -54,6 +62,13 @@ const Thread = struct {
         reserved: u32,
         active: u32,
     };
+
+    fn deinit(thread: *Thread, gpa: Allocator) void {
+        posix.close(thread.kq_fd);
+        assert(thread.wait_queues.count() == 0);
+        thread.wait_queues.deinit(gpa);
+        thread.* = undefined;
+    }
 };
 
 const Fiber = struct {
@@ -138,8 +153,14 @@ fn recycle(k: *Kqueue, fiber: *Fiber) void {
     k.gpa.free(fiber.allocatedSlice());
 }
 
-pub fn init(k: *Kqueue, gpa: Allocator) !void {
-    const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread);
+pub const InitOptions = struct {
+    n_threads: ?usize = null,
+};
+
+pub fn init(k: *Kqueue, gpa: Allocator, options: InitOptions) !void {
+    assert(options.n_threads != 0);
+    const n_threads = @max(1, options.n_threads orelse std.Thread.getCpuCount() catch 1);
+    const threads_size = n_threads * @sizeOf(Thread);
     const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
     const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
     errdefer gpa.free(allocated_slice);
@@ -186,6 +207,7 @@ pub fn init(k: *Kqueue, gpa: Allocator) !void {
         .kq_fd = try posix.kqueue(),
         .idle_search_index = 1,
         .steal_ready_search_index = 1,
+        .wait_queues = .empty,
     };
     errdefer std.posix.close(main_thread.kq_fd);
     std.log.debug("created main idle {*}", .{&main_thread.idle_context});
@@ -199,10 +221,13 @@ pub fn deinit(k: *Kqueue) void {
         assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
     }
     k.yield(null, .exit);
+    const main_thread = &k.threads.allocated[0];
+    const gpa = k.gpa;
+    main_thread.deinit(gpa);
     const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(k.threads.allocated.ptr));
     const idle_stack_end_offset = std.mem.alignForward(usize, k.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
     for (k.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
-    k.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
+    gpa.free(allocated_ptr[0..idle_stack_end_offset]);
     k.* = undefined;
 }
 
@@ -317,6 +342,7 @@ fn schedule(k: *Kqueue, thread: *Thread, ready_queue: Fiber.Queue) void {
             },
             .idle_search_index = 0,
             .steal_ready_search_index = 0,
+            .wait_queues = .empty,
         };
         new_thread.thread = std.Thread.spawn(.{
             .stack_size = idle_stack_size,
@@ -355,6 +381,7 @@ fn threadEntry(k: *Kqueue, index: u32) void {
     Thread.self = thread;
     std.log.debug("created thread idle {*}", .{&thread.idle_context});
     k.idle(thread);
+    thread.deinit(k.gpa);
 }
 
 const Completion = struct {