Commit 8eaebf5939

Loris Cro <kappaloris@gmail.com>
2025-11-04 21:11:40
Io.Threaded PoC reimplementation
This is a reimplementation of Io.Threaded that fixes the issues highlighted in the recent Zulip discussion. It's poorly tested but it does successfully run to completion the litmust test example that I offered in the discussion. This implementation has the following key design decisions: - `t.cpu_count` is used as the threadpool size. - `t.concurrency_limit` is used as the maximum number of "burst, one-shot" threads that can be spawned by `io.concurrent` past `t.cpu_count`. - `t.available_thread_count` is the number of threads in the pool that is not currently busy with work (the bookkeeping happens in the worker function). - `t.one_shot_thread_count` is the number of active threads that were spawned by `io.concurrent` past `t.cpu_count`. In this implementation: - `io.async` first tries to decrement `t.available_thread_count`. If there are no threads available, it tries to spawn a new one if possible, otherwise it runs the task immediately. - `io.concurrent` first tries to use a thread in the pool same as `io.async`, but on failure (no available threads and pool size limit reached) it tries to spawn a new one-shot thread. One shot threads run a different main function that just executes one task, decrements the number of active one shot threads, and then exits. A relevant future improvement is to have one-shot threads stay on for a few seconds (and potentially pick up a new task) to amortize spawning costs.
1 parent d54fbc0
Changed files (1)
lib
lib/std/Io/Threaded.zig
@@ -24,8 +24,10 @@ run_queue: std.SinglyLinkedList = .{},
 join_requested: bool = false,
 threads: std.ArrayList(std.Thread),
 stack_size: usize,
-cpu_count: std.Thread.CpuCountError!usize,
-concurrent_count: usize,
+cpu_count: usize, // 0 means no limit
+concurrency_limit: usize, // 0 means no limit
+available_thread_count: usize = 0,
+one_shot_thread_count: usize = 0,
 
 wsa: if (is_windows) Wsa else struct {} = .{},
 
@@ -70,8 +72,6 @@ const Closure = struct {
     start: Start,
     node: std.SinglyLinkedList.Node = .{},
     cancel_tid: CancelId,
-    /// Whether this task bumps minimum number of threads in the pool.
-    is_concurrent: bool,
 
     const Start = *const fn (*Closure) void;
 
@@ -103,20 +103,20 @@ pub fn init(
     /// here.
     gpa: Allocator,
 ) Threaded {
+    assert(!builtin.single_threaded); // use 'init_single_threaded' instead
+
     var t: Threaded = .{
         .allocator = gpa,
         .threads = .empty,
         .stack_size = std.Thread.SpawnConfig.default_stack_size,
-        .cpu_count = std.Thread.getCpuCount(),
-        .concurrent_count = 0,
+        .cpu_count = std.Thread.getCpuCount() catch 0,
+        .concurrency_limit = 0,
         .old_sig_io = undefined,
         .old_sig_pipe = undefined,
         .have_signal_handler = false,
     };
 
-    if (t.cpu_count) |n| {
-        t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
-    } else |_| {}
+    t.threads.ensureTotalCapacity(gpa, t.cpu_count) catch {};
 
     if (posix.Sigaction != void) {
         // This causes sending `posix.SIG.IO` to thread to interrupt blocking
@@ -145,7 +145,7 @@ pub const init_single_threaded: Threaded = .{
     .threads = .empty,
     .stack_size = std.Thread.SpawnConfig.default_stack_size,
     .cpu_count = 1,
-    .concurrent_count = 0,
+    .concurrency_limit = 0,
     .old_sig_io = undefined,
     .old_sig_pipe = undefined,
     .have_signal_handler = false,
@@ -184,18 +184,22 @@ fn worker(t: *Threaded) void {
         while (t.run_queue.popFirst()) |closure_node| {
             t.mutex.unlock();
             const closure: *Closure = @fieldParentPtr("node", closure_node);
-            const is_concurrent = closure.is_concurrent;
             closure.start(closure);
             t.mutex.lock();
-            if (is_concurrent) {
-                t.concurrent_count -= 1;
-            }
+            t.available_thread_count += 1;
         }
         if (t.join_requested) break;
         t.cond.wait(&t.mutex);
     }
 }
 
+fn oneShotWorker(t: *Threaded, closure: *Closure) void {
+    closure.start(closure);
+    t.mutex.lock();
+    defer t.mutex.unlock();
+    t.one_shot_thread_count -= 1;
+}
+
 pub fn io(t: *Threaded) Io {
     return .{
         .userdata = t,
@@ -432,7 +436,6 @@ const AsyncClosure = struct {
 
     fn init(
         gpa: Allocator,
-        mode: enum { async, concurrent },
         result_len: usize,
         result_alignment: std.mem.Alignment,
         context: []const u8,
@@ -454,10 +457,6 @@ const AsyncClosure = struct {
             .closure = .{
                 .cancel_tid = .none,
                 .start = start,
-                .is_concurrent = switch (mode) {
-                    .async => false,
-                    .concurrent => true,
-                },
             },
             .func = func,
             .context_alignment = context_alignment,
@@ -490,55 +489,51 @@ fn async(
     context_alignment: std.mem.Alignment,
     start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 ) ?*Io.AnyFuture {
-    if (builtin.single_threaded) {
+    const t: *Threaded = @ptrCast(@alignCast(userdata));
+    if (t.cpu_count == 1) {
         start(context.ptr, result.ptr);
         return null;
     }
-
-    const t: *Threaded = @ptrCast(@alignCast(userdata));
-    const cpu_count = t.cpu_count catch {
-        return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
-            start(context.ptr, result.ptr);
-            return null;
-        };
-    };
-
     const gpa = t.allocator;
-    const ac = AsyncClosure.init(gpa, .async, result.len, result_alignment, context, context_alignment, start) catch {
+    const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch {
         start(context.ptr, result.ptr);
         return null;
     };
 
     t.mutex.lock();
 
-    const thread_capacity = cpu_count - 1 + t.concurrent_count;
-
-    t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
-        t.mutex.unlock();
-        ac.deinit(gpa);
-        start(context.ptr, result.ptr);
-        return null;
-    };
+    if (t.available_thread_count == 0) {
+        if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) {
+            t.mutex.unlock();
+            ac.deinit(gpa);
+            start(context.ptr, result.ptr);
+            return null;
+        }
 
-    t.run_queue.prepend(&ac.closure.node);
+        t.threads.ensureUnusedCapacity(gpa, 1) catch {
+            t.mutex.unlock();
+            ac.deinit(gpa);
+            start(context.ptr, result.ptr);
+            return null;
+        };
 
-    if (t.threads.items.len < thread_capacity) {
-        const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
-            if (t.threads.items.len == 0) {
-                assert(t.run_queue.popFirst() == &ac.closure.node);
-                t.mutex.unlock();
-                ac.deinit(gpa);
-                start(context.ptr, result.ptr);
-                return null;
-            }
-            // Rely on other workers to do it.
+        const thread = std.Thread.spawn(
+            .{ .stack_size = t.stack_size },
+            worker,
+            .{t},
+        ) catch {
             t.mutex.unlock();
-            t.cond.signal();
-            return @ptrCast(ac);
+            ac.deinit(gpa);
+            start(context.ptr, result.ptr);
+            return null;
         };
+
         t.threads.appendAssumeCapacity(thread);
+    } else {
+        t.available_thread_count -= 1;
     }
 
+    t.run_queue.prepend(&ac.closure.node);
     t.mutex.unlock();
     t.cond.signal();
     return @ptrCast(ac);
@@ -555,38 +550,49 @@ fn concurrent(
     if (builtin.single_threaded) return error.ConcurrencyUnavailable;
 
     const t: *Threaded = @ptrCast(@alignCast(userdata));
-    const cpu_count = t.cpu_count catch 1;
 
     const gpa = t.allocator;
-    const ac = AsyncClosure.init(gpa, .concurrent, result_len, result_alignment, context, context_alignment, start) catch {
+    const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch {
         return error.ConcurrencyUnavailable;
     };
+    errdefer ac.deinit(gpa);
 
     t.mutex.lock();
+    defer t.mutex.unlock();
 
-    t.concurrent_count += 1;
-    const thread_capacity = cpu_count - 1 + t.concurrent_count;
-
-    t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
-        t.mutex.unlock();
-        ac.deinit(gpa);
-        return error.ConcurrencyUnavailable;
-    };
+    // If there's an avilable thread, use it.
+    if (t.available_thread_count > 0) {
+        t.available_thread_count -= 1;
+        t.run_queue.prepend(&ac.closure.node);
+        t.cond.signal();
+        return @ptrCast(ac);
+    }
 
-    t.run_queue.prepend(&ac.closure.node);
+    // If we can spawn a normal worker, spawn it and use it.
+    if (t.cpu_count == 0 or t.threads.items.len < t.cpu_count) {
+        t.threads.ensureUnusedCapacity(gpa, 1) catch return error.ConcurrencyUnavailable;
 
-    if (t.threads.items.len < thread_capacity) {
-        const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
-            assert(t.run_queue.popFirst() == &ac.closure.node);
-            t.mutex.unlock();
-            ac.deinit(gpa);
+        const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
             return error.ConcurrencyUnavailable;
-        };
+
         t.threads.appendAssumeCapacity(thread);
+        t.run_queue.prepend(&ac.closure.node);
+        t.cond.signal();
+        return @ptrCast(ac);
     }
 
-    t.mutex.unlock();
-    t.cond.signal();
+    // If we have a concurrencty limit and we havent' hit it yet,
+    // spawn a new one-shot thread.
+    if (t.concurrency_limit != 0 and t.one_shot_thread_count >= t.concurrency_limit)
+        return error.ConcurrencyUnavailable;
+
+    t.one_shot_thread_count += 1;
+    errdefer t.one_shot_thread_count -= 1;
+
+    const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, oneShotWorker, .{ t, &ac.closure }) catch
+        return error.ConcurrencyUnavailable;
+    thread.detach();
+
     return @ptrCast(ac);
 }
 
@@ -652,7 +658,6 @@ const GroupClosure = struct {
             .closure = .{
                 .cancel_tid = .none,
                 .start = start,
-                .is_concurrent = false,
             },
             .t = t,
             .group = group,
@@ -684,12 +689,9 @@ fn groupAsync(
     if (builtin.single_threaded) return start(group, context.ptr);
 
     const t: *Threaded = @ptrCast(@alignCast(userdata));
-    const cpu_count = t.cpu_count catch 1;
-
     const gpa = t.allocator;
-    const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch {
+    const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
         return start(group, context.ptr);
-    };
 
     t.mutex.lock();
 
@@ -697,26 +699,36 @@ fn groupAsync(
     gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
     group.token = &gc.node;
 
-    const thread_capacity = cpu_count - 1 + t.concurrent_count;
-
-    t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
-        t.mutex.unlock();
-        gc.deinit(gpa);
-        return start(group, context.ptr);
-    };
+    if (t.available_thread_count == 0) {
+        if (t.cpu_count != 0 and t.threads.items.len >= t.cpu_count) {
+            t.mutex.unlock();
+            gc.deinit(gpa);
+            return start(group, context.ptr);
+        }
 
-    t.run_queue.prepend(&gc.closure.node);
+        t.threads.ensureUnusedCapacity(gpa, 1) catch {
+            t.mutex.unlock();
+            gc.deinit(gpa);
+            return start(group, context.ptr);
+        };
 
-    if (t.threads.items.len < thread_capacity) {
-        const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
-            assert(t.run_queue.popFirst() == &gc.closure.node);
+        const thread = std.Thread.spawn(
+            .{ .stack_size = t.stack_size },
+            worker,
+            .{t},
+        ) catch {
             t.mutex.unlock();
             gc.deinit(gpa);
             return start(group, context.ptr);
         };
+
         t.threads.appendAssumeCapacity(thread);
+    } else {
+        t.available_thread_count -= 1;
     }
 
+    t.run_queue.prepend(&gc.closure.node);
+
     // This needs to be done before unlocking the mutex to avoid a race with
     // the associated task finishing.
     const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);