Commit 0f1a6ae943

Andrew Kelley <andrew@ziglang.org>
2025-11-25 12:29:19
fetch: update from std.Thread.Pool to std.Io
1 parent d0ba664
Changed files (2)
src
src/Package/Fetch.zig
@@ -38,15 +38,12 @@ const assert = std.debug.assert;
 const ascii = std.ascii;
 const Allocator = std.mem.Allocator;
 const Cache = std.Build.Cache;
-const ThreadPool = std.Thread.Pool;
-const WaitGroup = std.Thread.WaitGroup;
 const git = @import("Fetch/git.zig");
 const Package = @import("../Package.zig");
 const Manifest = Package.Manifest;
 const ErrorBundle = std.zig.ErrorBundle;
 
 arena: std.heap.ArenaAllocator,
-io: Io,
 location: Location,
 location_tok: std.zig.Ast.TokenIndex,
 hash_tok: std.zig.Ast.OptionalTokenIndex,
@@ -104,7 +101,8 @@ pub const LazyStatus = enum {
 
 /// Contains shared state among all `Fetch` tasks.
 pub const JobQueue = struct {
-    mutex: std.Thread.Mutex = .{},
+    io: Io,
+    mutex: Io.Mutex = .init,
     /// It's an array hash map so that it can be sorted before rendering the
     /// dependencies.zig source file.
     /// Protected by `mutex`.
@@ -115,8 +113,7 @@ pub const JobQueue = struct {
     all_fetches: std.ArrayList(*Fetch) = .empty,
 
     http_client: *std.http.Client,
-    thread_pool: *ThreadPool,
-    wait_group: WaitGroup = .{},
+    group: Io.Group = .init,
     global_cache: Cache.Directory,
     /// If true then, no fetching occurs, and:
     /// * The `global_cache` directory is assumed to be the direct parent
@@ -320,13 +317,14 @@ pub const Location = union(enum) {
 
 pub const RunError = error{
     OutOfMemory,
+    Canceled,
     /// This error code is intended to be handled by inspecting the
     /// `error_bundle` field.
     FetchFailed,
 };
 
 pub fn run(f: *Fetch) RunError!void {
-    const io = f.io;
+    const io = f.job_queue.io;
     const eb = &f.error_bundle;
     const arena = f.arena.allocator();
     const gpa = f.arena.child_allocator;
@@ -488,7 +486,7 @@ fn runResource(
     resource: *Resource,
     remote_hash: ?Package.Hash,
 ) RunError!void {
-    const io = f.io;
+    const io = f.job_queue.io;
     defer resource.deinit(io);
     const arena = f.arena.allocator();
     const eb = &f.error_bundle;
@@ -702,7 +700,8 @@ fn loadManifest(f: *Fetch, pkg_root: Cache.Path) RunError!void {
 }
 
 fn queueJobsForDeps(f: *Fetch) RunError!void {
-    const io = f.io;
+    const io = f.job_queue.io;
+
     assert(f.job_queue.recursive);
 
     // If the package does not have a build.zig.zon file then there are no dependencies.
@@ -722,8 +721,8 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
         const prog_names = try parent_arena.alloc([]const u8, deps.len);
         var new_fetch_index: usize = 0;
 
-        f.job_queue.mutex.lock();
-        defer f.job_queue.mutex.unlock();
+        try f.job_queue.mutex.lock(io);
+        defer f.job_queue.mutex.unlock(io);
 
         try f.job_queue.all_fetches.ensureUnusedCapacity(gpa, new_fetches.len);
         try f.job_queue.table.ensureUnusedCapacity(gpa, @intCast(new_fetches.len));
@@ -792,7 +791,6 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
                 f.job_queue.all_fetches.appendAssumeCapacity(new_fetch);
             }
             new_fetch.* = .{
-                .io = io,
                 .arena = std.heap.ArenaAllocator.init(gpa),
                 .location = location,
                 .location_tok = dep.location_tok,
@@ -830,11 +828,9 @@ fn queueJobsForDeps(f: *Fetch) RunError!void {
         break :nf .{ new_fetches[0..new_fetch_index], prog_names[0..new_fetch_index] };
     };
 
-    // Now it's time to give tasks to the thread pool.
-    const thread_pool = f.job_queue.thread_pool;
-
+    // Now it's time to dispatch tasks.
     for (new_fetches, prog_names) |*new_fetch, prog_name| {
-        thread_pool.spawnWg(&f.job_queue.wait_group, workerRun, .{ new_fetch, prog_name });
+        f.job_queue.group.async(io, workerRun, .{ new_fetch, prog_name });
     }
 }
 
@@ -848,6 +844,7 @@ pub fn workerRun(f: *Fetch, prog_name: []const u8) void {
 
     run(f) catch |err| switch (err) {
         error.OutOfMemory => f.oom_flag = true,
+        error.Canceled => {},
         error.FetchFailed => {
             // Nothing to do because the errors are already reported in `error_bundle`,
             // and a reference is kept to the `Fetch` task inside `all_fetches`.
@@ -992,7 +989,7 @@ const FileType = enum {
 const init_resource_buffer_size = git.Packet.max_data_length;
 
 fn initResource(f: *Fetch, uri: std.Uri, resource: *Resource, reader_buffer: []u8) RunError!void {
-    const io = f.io;
+    const io = f.job_queue.io;
     const arena = f.arena.allocator();
     const eb = &f.error_bundle;
 
@@ -1281,12 +1278,16 @@ fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) RunError!Unpack
     return res;
 }
 
-fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutOfMemory, FetchFailed }!UnpackResult {
+fn unzip(
+    f: *Fetch,
+    out_dir: fs.Dir,
+    reader: *Io.Reader,
+) error{ ReadFailed, OutOfMemory, Canceled, FetchFailed }!UnpackResult {
     // We write the entire contents to a file first because zip files
     // must be processed back to front and they could be too large to
     // load into memory.
 
-    const io = f.io;
+    const io = f.job_queue.io;
     const cache_root = f.job_queue.global_cache;
     const prefix = "tmp/";
     const suffix = ".zip";
@@ -1306,6 +1307,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
             .read = true,
         }) catch |err| switch (err) {
             error.PathAlreadyExists => continue,
+            error.Canceled => return error.Canceled,
             else => |e| return f.fail(
                 f.location_tok,
                 try eb.printString("failed to create temporary zip file: {t}", .{e}),
@@ -1348,7 +1350,7 @@ fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *Io.Reader) error{ ReadFailed, OutO
 }
 
 fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult {
-    const io = f.io;
+    const io = f.job_queue.io;
     const arena = f.arena.allocator();
     // TODO don't try to get a gpa from an arena. expose this dependency higher up
     // because the backing of arena could be page allocator
@@ -1486,11 +1488,11 @@ const ComputedHash = struct {
 /// hashed* and must not be present on the file system when calling this
 /// function.
 fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!ComputedHash {
+    const io = f.job_queue.io;
     // All the path name strings need to be in memory for sorting.
     const arena = f.arena.allocator();
     const gpa = f.arena.child_allocator;
     const eb = &f.error_bundle;
-    const thread_pool = f.job_queue.thread_pool;
     const root_dir = pkg_path.root_dir.handle;
 
     // Collect all files, recursively, then sort.
@@ -1514,10 +1516,8 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
     {
         // The final hash will be a hash of each file hashed independently. This
         // allows hashing in parallel.
-        var wait_group: WaitGroup = .{};
-        // `computeHash` is called from a worker thread so there must not be
-        // any waiting without working or a deadlock could occur.
-        defer thread_pool.waitAndWork(&wait_group);
+        var group: Io.Group = .init;
+        defer group.wait(io);
 
         while (walker.next() catch |err| {
             try eb.addRootErrorMessage(.{ .msg = try eb.printString(
@@ -1542,7 +1542,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
                     .fs_path = fs_path,
                     .failure = undefined, // to be populated by the worker
                 };
-                thread_pool.spawnWg(&wait_group, workerDeleteFile, .{ root_dir, deleted_file });
+                group.async(io, workerDeleteFile, .{ root_dir, deleted_file });
                 try deleted_files.append(deleted_file);
                 continue;
             }
@@ -1570,7 +1570,7 @@ fn computeHash(f: *Fetch, pkg_path: Cache.Path, filter: Filter) RunError!Compute
                 .failure = undefined, // to be populated by the worker
                 .size = undefined, // to be populated by the worker
             };
-            thread_pool.spawnWg(&wait_group, workerHashFile, .{ root_dir, hashed_file });
+            group.async(io, workerHashFile, .{ root_dir, hashed_file });
             try all_files.append(hashed_file);
         }
     }
@@ -2241,7 +2241,6 @@ fn saveEmbedFile(comptime tarball_name: []const u8, dir: fs.Dir) !void {
 
 // Builds Fetch with required dependencies, clears dependencies on deinit().
 const TestFetchBuilder = struct {
-    thread_pool: ThreadPool,
     http_client: std.http.Client,
     global_cache_directory: Cache.Directory,
     job_queue: Fetch.JobQueue,
@@ -2256,13 +2255,12 @@ const TestFetchBuilder = struct {
     ) !*Fetch {
         const cache_dir = try cache_parent_dir.makeOpenPath("zig-global-cache", .{});
 
-        try self.thread_pool.init(.{ .allocator = allocator });
         self.http_client = .{ .allocator = allocator, .io = io };
         self.global_cache_directory = .{ .handle = cache_dir, .path = null };
 
         self.job_queue = .{
+            .io = io,
             .http_client = &self.http_client,
-            .thread_pool = &self.thread_pool,
             .global_cache = self.global_cache_directory,
             .recursive = false,
             .read_only = false,
@@ -2273,7 +2271,6 @@ const TestFetchBuilder = struct {
 
         self.fetch = .{
             .arena = std.heap.ArenaAllocator.init(allocator),
-            .io = io,
             .location = .{ .path_or_url = path_or_url },
             .location_tok = 0,
             .hash_tok = .none,
@@ -2309,7 +2306,6 @@ const TestFetchBuilder = struct {
         self.fetch.prog_node.end();
         self.global_cache_directory.handle.close();
         self.http_client.deinit();
-        self.thread_pool.deinit();
     }
 
     fn packageDir(self: *TestFetchBuilder) !fs.Dir {
src/main.zig
@@ -5139,8 +5139,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
                 defer fetch_prog_node.end();
 
                 var job_queue: Package.Fetch.JobQueue = .{
+                    .io = io,
                     .http_client = &http_client,
-                    .thread_pool = &thread_pool,
                     .global_cache = dirs.global_cache,
                     .read_only = false,
                     .recursive = true,
@@ -5173,7 +5173,6 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
 
                 var fetch: Package.Fetch = .{
                     .arena = std.heap.ArenaAllocator.init(gpa),
-                    .io = io,
                     .location = .{ .relative_path = phantom_package_root },
                     .location_tok = 0,
                     .hash_tok = .none,
@@ -5207,10 +5206,8 @@ fn cmdBuild(gpa: Allocator, arena: Allocator, io: Io, args: []const []const u8)
                     &fetch,
                 );
 
-                job_queue.thread_pool.spawnWg(&job_queue.wait_group, Package.Fetch.workerRun, .{
-                    &fetch, "root",
-                });
-                job_queue.wait_group.wait();
+                job_queue.group.async(io, Package.Fetch.workerRun, .{ &fetch, "root" });
+                job_queue.group.wait(io);
 
                 try job_queue.consolidateErrors();
 
@@ -6899,8 +6896,8 @@ fn cmdFetch(
     defer global_cache_directory.handle.close();
 
     var job_queue: Package.Fetch.JobQueue = .{
+        .io = io,
         .http_client = &http_client,
-        .thread_pool = &thread_pool,
         .global_cache = global_cache_directory,
         .recursive = false,
         .read_only = false,
@@ -6912,7 +6909,6 @@ fn cmdFetch(
 
     var fetch: Package.Fetch = .{
         .arena = std.heap.ArenaAllocator.init(gpa),
-        .io = io,
         .location = .{ .path_or_url = path_or_url },
         .location_tok = 0,
         .hash_tok = .none,
@@ -6942,7 +6938,7 @@ fn cmdFetch(
     defer fetch.deinit();
 
     fetch.run() catch |err| switch (err) {
-        error.OutOfMemory => fatal("out of memory", .{}),
+        error.OutOfMemory, error.Canceled => |e| return e,
         error.FetchFailed => {}, // error bundle checked below
     };