Commit a242292644

Andrew Kelley <andrew@ziglang.org>
2025-11-24 16:37:05
build runner: update from std.Thread.Pool to std.Io
1 parent 32dc46a
Changed files (6)
lib/compiler/build_runner.zig
@@ -107,7 +107,6 @@ pub fn main() !void {
 
     var targets = std.array_list.Managed([]const u8).init(arena);
     var debug_log_scopes = std.array_list.Managed([]const u8).init(arena);
-    var thread_pool_options: std.Thread.Pool.Options = .{ .allocator = arena };
 
     var install_prefix: ?[]const u8 = null;
     var dir_list = std.Build.DirList{};
@@ -413,19 +412,11 @@ pub fn main() !void {
                 };
             } else if (mem.eql(u8, arg, "-fno-reference-trace")) {
                 builder.reference_trace = null;
-            } else if (mem.startsWith(u8, arg, "-j")) {
-                const num = arg["-j".len..];
-                const n_jobs = std.fmt.parseUnsigned(u32, num, 10) catch |err| {
-                    std.debug.print("unable to parse jobs count '{s}': {s}", .{
-                        num, @errorName(err),
-                    });
-                    process.exit(1);
-                };
-                if (n_jobs < 1) {
-                    std.debug.print("number of jobs must be at least 1\n", .{});
-                    process.exit(1);
-                }
-                thread_pool_options.n_jobs = n_jobs;
+            } else if (mem.cutPrefix(u8, arg, "-j")) |text| {
+                const n = std.fmt.parseUnsigned(u32, text, 10) catch |err|
+                    fatal("unable to parse jobs count '{s}': {t}", .{ text, err });
+                if (n < 1) fatal("number of jobs must be at least 1", .{});
+                threaded.setAsyncLimit(.limited(n));
             } else if (mem.eql(u8, arg, "--")) {
                 builder.args = argsRest(args, arg_idx);
                 break;
@@ -516,7 +507,6 @@ pub fn main() !void {
         .error_style = error_style,
         .multiline_errors = multiline_errors,
         .summary = summary orelse if (watch or webui_listen != null) .line else .failures,
-        .thread_pool = undefined,
 
         .ttyconf = ttyconf,
     };
@@ -547,16 +537,12 @@ pub fn main() !void {
         break :w try .init();
     };
 
-    try run.thread_pool.init(thread_pool_options);
-    defer run.thread_pool.deinit();
-
     const now = Io.Clock.Timestamp.now(io, .awake) catch |err| fatal("failed to collect timestamp: {t}", .{err});
 
     run.web_server = if (webui_listen) |listen_address| ws: {
         if (builtin.single_threaded) unreachable; // `fatal` above
         break :ws .init(.{
             .gpa = gpa,
-            .thread_pool = &run.thread_pool,
             .ttyconf = ttyconf,
             .graph = &graph,
             .all_steps = run.step_stack.keys(),
@@ -675,7 +661,6 @@ const Run = struct {
     memory_blocked_steps: std.ArrayList(*Step),
     /// Allocated into `gpa`.
     step_stack: std.AutoArrayHashMapUnmanaged(*Step, void),
-    thread_pool: std.Thread.Pool,
     /// Similar to the `tty.Config` returned by `std.debug.lockStderrWriter`,
     /// but also respects the '--color' flag.
     ttyconf: tty.Config,
@@ -754,14 +739,13 @@ fn runStepNames(
     const gpa = run.gpa;
     const io = b.graph.io;
     const step_stack = &run.step_stack;
-    const thread_pool = &run.thread_pool;
 
     {
         const step_prog = parent_prog_node.start("steps", step_stack.count());
         defer step_prog.end();
 
-        var wait_group: std.Thread.WaitGroup = .{};
-        defer wait_group.wait();
+        var group: Io.Group = .init;
+        defer group.wait(io);
 
         // Here we spawn the initial set of tasks with a nice heuristic -
         // dependency order. Each worker when it finishes a step will then
@@ -771,9 +755,7 @@ fn runStepNames(
             const step = steps_slice[steps_slice.len - i - 1];
             if (step.state == .skipped_oom) continue;
 
-            thread_pool.spawnWg(&wait_group, workerMakeOneStep, .{
-                &wait_group, b, step, step_prog, run,
-            });
+            group.async(io, workerMakeOneStep, .{ &group, b, step, step_prog, run });
         }
     }
 
@@ -855,7 +837,6 @@ fn runStepNames(
         var f = std.Build.Fuzz.init(
             gpa,
             io,
-            thread_pool,
             run.ttyconf,
             step_stack.keys(),
             parent_prog_node,
@@ -1318,14 +1299,12 @@ fn constructGraphAndCheckForDependencyLoop(
 }
 
 fn workerMakeOneStep(
-    wg: *std.Thread.WaitGroup,
+    group: *Io.Group,
     b: *std.Build,
     s: *Step,
     prog_node: std.Progress.Node,
     run: *Run,
 ) void {
-    const thread_pool = &run.thread_pool;
-
     // First, check the conditions for running this step. If they are not met,
     // then we return without doing the step, relying on another worker to
     // queue this step up again when dependencies are met.
@@ -1381,7 +1360,6 @@ fn workerMakeOneStep(
 
     const make_result = s.make(.{
         .progress_node = sub_prog_node,
-        .thread_pool = thread_pool,
         .watch = run.watch,
         .web_server = if (run.web_server) |*ws| ws else null,
         .ttyconf = run.ttyconf,
@@ -1400,6 +1378,8 @@ fn workerMakeOneStep(
         printErrorMessages(run.gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {};
     }
 
+    const io = b.graph.io;
+
     handle_result: {
         if (make_result) |_| {
             @atomicStore(Step.State, &s.state, .success, .seq_cst);
@@ -1419,9 +1399,7 @@ fn workerMakeOneStep(
 
         // Successful completion of a step, so we queue up its dependants as well.
         for (s.dependants.items) |dep| {
-            thread_pool.spawnWg(wg, workerMakeOneStep, .{
-                wg, b, dep, prog_node, run,
-            });
+            group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run });
         }
     }
 
@@ -1444,9 +1422,7 @@ fn workerMakeOneStep(
             if (dep.max_rss <= remaining) {
                 remaining -= dep.max_rss;
 
-                thread_pool.spawnWg(wg, workerMakeOneStep, .{
-                    wg, b, dep, prog_node, run,
-                });
+                group.async(io, workerMakeOneStep, .{ group, b, dep, prog_node, run });
             } else {
                 run.memory_blocked_steps.items[i] = dep;
                 i += 1;
lib/std/Build/Step/Run.zig
@@ -1151,7 +1151,6 @@ pub fn rerunInFuzzMode(
     const tmp_dir_path = "tmp" ++ fs.path.sep_str ++ std.fmt.hex(rand_int);
     try runCommand(run, argv_list.items, has_side_effects, tmp_dir_path, .{
         .progress_node = prog_node,
-        .thread_pool = undefined, // not used by `runCommand`
         .watch = undefined, // not used by `runCommand`
         .web_server = null, // only needed for time reports
         .ttyconf = fuzz.ttyconf,
lib/std/Build/Fuzz.zig
@@ -22,10 +22,9 @@ mode: Mode,
 /// Allocated into `gpa`.
 run_steps: []const *Step.Run,
 
-wait_group: std.Thread.WaitGroup,
+group: Io.Group,
 root_prog_node: std.Progress.Node,
 prog_node: std.Progress.Node,
-thread_pool: *std.Thread.Pool,
 
 /// Protects `coverage_files`.
 coverage_mutex: std.Thread.Mutex,
@@ -78,7 +77,6 @@ const CoverageMap = struct {
 pub fn init(
     gpa: Allocator,
     io: Io,
-    thread_pool: *std.Thread.Pool,
     ttyconf: tty.Config,
     all_steps: []const *Build.Step,
     root_prog_node: std.Progress.Node,
@@ -89,20 +87,22 @@ pub fn init(
         defer steps.deinit(gpa);
         const rebuild_node = root_prog_node.start("Rebuilding Unit Tests", 0);
         defer rebuild_node.end();
-        var rebuild_wg: std.Thread.WaitGroup = .{};
-        defer rebuild_wg.wait();
+        var rebuild_group: Io.Group = .init;
+        defer rebuild_group.cancel(io);
 
         for (all_steps) |step| {
             const run = step.cast(Step.Run) orelse continue;
             if (run.producer == null) continue;
             if (run.fuzz_tests.items.len == 0) continue;
             try steps.append(gpa, run);
-            thread_pool.spawnWg(&rebuild_wg, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node });
+            rebuild_group.async(io, rebuildTestsWorkerRun, .{ run, gpa, ttyconf, rebuild_node });
         }
 
         if (steps.items.len == 0) fatal("no fuzz tests found", .{});
         rebuild_node.setEstimatedTotalItems(steps.items.len);
-        break :steps try gpa.dupe(*Step.Run, steps.items);
+        const run_steps = try gpa.dupe(*Step.Run, steps.items);
+        rebuild_group.wait(io);
+        break :steps run_steps;
     };
     errdefer gpa.free(run_steps);
 
@@ -118,8 +118,7 @@ pub fn init(
         .ttyconf = ttyconf,
         .mode = mode,
         .run_steps = run_steps,
-        .wait_group = .{},
-        .thread_pool = thread_pool,
+        .group = .init,
         .root_prog_node = root_prog_node,
         .prog_node = .none,
         .coverage_files = .empty,
@@ -131,29 +130,26 @@ pub fn init(
 }
 
 pub fn start(fuzz: *Fuzz) void {
+    const io = fuzz.io;
     fuzz.prog_node = fuzz.root_prog_node.start("Fuzzing", fuzz.run_steps.len);
 
     if (fuzz.mode == .forever) {
         // For polling messages and sending updates to subscribers.
-        fuzz.wait_group.start();
-        _ = std.Thread.spawn(.{}, coverageRun, .{fuzz}) catch |err| {
-            fuzz.wait_group.finish();
-            fatal("unable to spawn coverage thread: {s}", .{@errorName(err)});
-        };
+        fuzz.group.concurrent(io, coverageRun, .{fuzz}) catch |err|
+            fatal("unable to spawn coverage task: {t}", .{err});
     }
 
     for (fuzz.run_steps) |run| {
         for (run.fuzz_tests.items) |unit_test_index| {
             assert(run.rebuilt_executable != null);
-            fuzz.thread_pool.spawnWg(&fuzz.wait_group, fuzzWorkerRun, .{
-                fuzz, run, unit_test_index,
-            });
+            fuzz.group.async(io, fuzzWorkerRun, .{ fuzz, run, unit_test_index });
         }
     }
 }
 
 pub fn deinit(fuzz: *Fuzz) void {
-    if (!fuzz.wait_group.isDone()) @panic("TODO: terminate the fuzzer processes");
+    const io = fuzz.io;
+    fuzz.group.cancel(io);
     fuzz.prog_node.end();
     fuzz.gpa.free(fuzz.run_steps);
 }
@@ -335,8 +331,6 @@ pub fn sendUpdate(
 }
 
 fn coverageRun(fuzz: *Fuzz) void {
-    defer fuzz.wait_group.finish();
-
     fuzz.queue_mutex.lock();
     defer fuzz.queue_mutex.unlock();
 
@@ -511,8 +505,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void {
     assert(fuzz.mode == .limit);
     const io = fuzz.io;
 
-    fuzz.wait_group.wait();
-    fuzz.wait_group.reset();
+    fuzz.group.wait(io);
+    fuzz.group = .init;
 
     std.debug.print("======= FUZZING REPORT =======\n", .{});
     for (fuzz.msg_queue.items) |msg| {
lib/std/Build/Step.zig
@@ -110,7 +110,6 @@ pub const TestResults = struct {
 
 pub const MakeOptions = struct {
     progress_node: std.Progress.Node,
-    thread_pool: *std.Thread.Pool,
     watch: bool,
     web_server: switch (builtin.target.cpu.arch) {
         else => ?*Build.WebServer,
lib/std/Build/WebServer.zig
@@ -1,5 +1,4 @@
 gpa: Allocator,
-thread_pool: *std.Thread.Pool,
 graph: *const Build.Graph,
 all_steps: []const *Build.Step,
 listen_address: net.IpAddress,
@@ -53,7 +52,6 @@ pub fn notifyUpdate(ws: *WebServer) void {
 
 pub const Options = struct {
     gpa: Allocator,
-    thread_pool: *std.Thread.Pool,
     ttyconf: Io.tty.Config,
     graph: *const std.Build.Graph,
     all_steps: []const *Build.Step,
@@ -100,7 +98,6 @@ pub fn init(opts: Options) WebServer {
 
     return .{
         .gpa = opts.gpa,
-        .thread_pool = opts.thread_pool,
         .ttyconf = opts.ttyconf,
         .graph = opts.graph,
         .all_steps = all_steps,
@@ -235,7 +232,6 @@ pub fn finishBuild(ws: *WebServer, opts: struct {
         ws.fuzz = Fuzz.init(
             ws.gpa,
             ws.graph.io,
-            ws.thread_pool,
             ws.ttyconf,
             ws.all_steps,
             ws.root_prog_node,
lib/std/Io/Threaded.zig
@@ -33,6 +33,9 @@ wait_group: std.Thread.WaitGroup = .{},
 /// immediately.
 ///
 /// Defaults to a number equal to logical CPU cores.
+///
+/// Protected by `mutex` once the I/O instance is already in use. See
+/// `setAsyncLimit`.
 async_limit: Io.Limit,
 /// Maximum thread pool size (excluding main thread) for dispatching concurrent
 /// tasks. Until this limit, calls to `Io.concurrent` will increase the thread
@@ -168,6 +171,12 @@ pub const init_single_threaded: Threaded = .{
     .have_signal_handler = false,
 };
 
+pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
+    t.mutex.lock();
+    defer t.mutex.unlock();
+    t.async_limit = new_limit;
+}
+
 pub fn deinit(t: *Threaded) void {
     t.join();
     if (is_windows and t.wsa.status == .initialized) {
@@ -507,7 +516,7 @@ fn async(
     start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 ) ?*Io.AnyFuture {
     const t: *Threaded = @ptrCast(@alignCast(userdata));
-    if (builtin.single_threaded or t.async_limit == .nothing) {
+    if (builtin.single_threaded) {
         start(context.ptr, result.ptr);
         return null;
     }
@@ -684,8 +693,7 @@ fn groupAsync(
     start: *const fn (*Io.Group, context: *const anyopaque) void,
 ) void {
     const t: *Threaded = @ptrCast(@alignCast(userdata));
-    if (builtin.single_threaded or t.async_limit == .nothing)
-        return start(group, context.ptr);
+    if (builtin.single_threaded) return start(group, context.ptr);
 
     const gpa = t.allocator;
     const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch