Commit 0d1cd0d482

Andrew Kelley <andrew@ziglang.org>
2020-12-16 02:30:29
use kprotty's ThreadPool implementation (v5)
1 parent 01d3385
src/Compilation.zig
@@ -26,6 +26,8 @@ const Module = @import("Module.zig");
 const Cache = @import("Cache.zig");
 const stage1 = @import("stage1.zig");
 const translate_c = @import("translate_c.zig");
+const ThreadPool = @import("ThreadPool.zig");
+const WaitGroup = @import("WaitGroup.zig");
 
 /// General-purpose allocator. Used for both temporary and long-term storage.
 gpa: *Allocator,
@@ -79,6 +81,7 @@ zig_lib_directory: Directory,
 local_cache_directory: Directory,
 global_cache_directory: Directory,
 libc_include_dir_list: []const []const u8,
+thread_pool: *ThreadPool,
 
 /// Populated when we build the libc++ static library. A Job to build this is placed in the queue
 /// and resolved before calling linker.flush().
@@ -335,6 +338,7 @@ pub const InitOptions = struct {
     root_name: []const u8,
     root_pkg: ?*Package,
     output_mode: std.builtin.OutputMode,
+    thread_pool: *ThreadPool,
     dynamic_linker: ?[]const u8 = null,
     /// `null` means to not emit a binary file.
     emit_bin: ?EmitLoc,
@@ -985,6 +989,7 @@ pub fn create(gpa: *Allocator, options: InitOptions) !*Compilation {
             .self_exe_path = options.self_exe_path,
             .libc_include_dir_list = libc_dirs.libc_include_dir_list,
             .sanitize_c = sanitize_c,
+            .thread_pool = options.thread_pool,
             .clang_passthrough_mode = options.clang_passthrough_mode,
             .clang_preprocessor_mode = options.clang_preprocessor_mode,
             .verbose_cc = options.verbose_cc,
@@ -1380,24 +1385,14 @@ pub fn performAllTheWork(self: *Compilation) error{ TimerUnsupported, OutOfMemor
     var c_comp_progress_node = main_progress_node.start("Compile C Objects", self.c_source_files.len);
     defer c_comp_progress_node.end();
 
+    var wg = WaitGroup{};
+    defer wg.wait();
+
     while (self.c_object_work_queue.readItem()) |c_object| {
-        self.updateCObject(c_object, &c_comp_progress_node) catch |err| switch (err) {
-            error.AnalysisFail => continue,
-            else => {
-                {
-                    var lock = self.mutex.acquire();
-                    defer lock.release();
-                    try self.failed_c_objects.ensureCapacity(self.gpa, self.failed_c_objects.items().len + 1);
-                    self.failed_c_objects.putAssumeCapacityNoClobber(c_object, try ErrorMsg.create(
-                        self.gpa,
-                        0,
-                        "unable to build C object: {s}",
-                        .{@errorName(err)},
-                    ));
-                }
-                c_object.status = .{ .failure = {} };
-            },
-        };
+        wg.start();
+        try self.thread_pool.spawn(workerUpdateCObject, .{
+            self, c_object, &c_comp_progress_node, &wg,
+        });
     }
 
     while (self.work_queue.readItem()) |work_item| switch (work_item) {
@@ -1721,6 +1716,37 @@ pub fn cImport(comp: *Compilation, c_src: []const u8) !CImportResult {
     };
 }
 
+fn workerUpdateCObject(
+    comp: *Compilation,
+    c_object: *CObject,
+    progress_node: *std.Progress.Node,
+    wg: *WaitGroup,
+) void {
+    defer wg.stop();
+
+    comp.updateCObject(c_object, progress_node) catch |err| switch (err) {
+        error.AnalysisFail => return,
+        else => {
+            {
+                var lock = comp.mutex.acquire();
+                defer lock.release();
+                comp.failed_c_objects.ensureCapacity(comp.gpa, comp.failed_c_objects.items().len + 1) catch {
+                    fatal("TODO handle this by setting c_object.status = oom failure", .{});
+                };
+                comp.failed_c_objects.putAssumeCapacityNoClobber(c_object, ErrorMsg.create(
+                    comp.gpa,
+                    0,
+                    "unable to build C object: {s}",
+                    .{@errorName(err)},
+                ) catch {
+                    fatal("TODO handle this by setting c_object.status = oom failure", .{});
+                });
+            }
+            c_object.status = .{ .failure = {} };
+        },
+    };
+}
+
 fn updateCObject(comp: *Compilation, c_object: *CObject, c_comp_progress_node: *std.Progress.Node) !void {
     if (!build_options.have_llvm) {
         return comp.failCObj(c_object, "clang not available: compiler built without LLVM extensions", .{});
@@ -2800,6 +2826,7 @@ fn buildOutputFromZig(
         .root_name = root_name,
         .root_pkg = &root_pkg,
         .output_mode = fixed_output_mode,
+        .thread_pool = comp.thread_pool,
         .libc_installation = comp.bin_file.options.libc_installation,
         .emit_bin = emit_bin,
         .optimize_mode = optimize_mode,
@@ -3173,6 +3200,7 @@ pub fn build_crt_file(
         .root_name = root_name,
         .root_pkg = null,
         .output_mode = output_mode,
+        .thread_pool = comp.thread_pool,
         .libc_installation = comp.bin_file.options.libc_installation,
         .emit_bin = emit_bin,
         .optimize_mode = comp.bin_file.options.optimize_mode,
src/glibc.zig
@@ -936,6 +936,7 @@ fn buildSharedLib(
         .root_pkg = null,
         .output_mode = .Lib,
         .link_mode = .Dynamic,
+        .thread_pool = comp.thread_pool,
         .libc_installation = comp.bin_file.options.libc_installation,
         .emit_bin = emit_bin,
         .optimize_mode = comp.bin_file.options.optimize_mode,
src/libcxx.zig
@@ -162,6 +162,7 @@ pub fn buildLibCXX(comp: *Compilation) !void {
         .root_name = root_name,
         .root_pkg = null,
         .output_mode = output_mode,
+        .thread_pool = comp.thread_pool,
         .libc_installation = comp.bin_file.options.libc_installation,
         .emit_bin = emit_bin,
         .optimize_mode = comp.bin_file.options.optimize_mode,
@@ -280,6 +281,7 @@ pub fn buildLibCXXABI(comp: *Compilation) !void {
         .root_name = root_name,
         .root_pkg = null,
         .output_mode = output_mode,
+        .thread_pool = comp.thread_pool,
         .libc_installation = comp.bin_file.options.libc_installation,
         .emit_bin = emit_bin,
         .optimize_mode = comp.bin_file.options.optimize_mode,
src/libunwind.zig
@@ -95,6 +95,7 @@ pub fn buildStaticLib(comp: *Compilation) !void {
         .root_name = root_name,
         .root_pkg = null,
         .output_mode = output_mode,
+        .thread_pool = comp.thread_pool,
         .libc_installation = comp.bin_file.options.libc_installation,
         .emit_bin = emit_bin,
         .optimize_mode = comp.bin_file.options.optimize_mode,
src/main.zig
@@ -19,6 +19,7 @@ const LibCInstallation = @import("libc_installation.zig").LibCInstallation;
 const translate_c = @import("translate_c.zig");
 const Cache = @import("Cache.zig");
 const target_util = @import("target.zig");
+const ThreadPool = @import("ThreadPool.zig");
 
 pub fn fatal(comptime format: []const u8, args: anytype) noreturn {
     std.log.emerg(format, args);
@@ -1632,6 +1633,10 @@ fn buildOutputType(
         };
     defer zig_lib_directory.handle.close();
 
+    var thread_pool: ThreadPool = undefined;
+    try thread_pool.init(gpa);
+    defer thread_pool.deinit();
+
     var libc_installation: ?LibCInstallation = null;
     defer if (libc_installation) |*l| l.deinit(gpa);
 
@@ -1747,6 +1752,7 @@ fn buildOutputType(
         .single_threaded = single_threaded,
         .function_sections = function_sections,
         .self_exe_path = self_exe_path,
+        .thread_pool = &thread_pool,
         .clang_passthrough_mode = arg_mode != .build,
         .clang_preprocessor_mode = clang_preprocessor_mode,
         .version = optional_version,
@@ -2412,6 +2418,9 @@ pub fn cmdBuild(gpa: *Allocator, arena: *Allocator, args: []const []const u8) !v
             .directory = null, // Use the local zig-cache.
             .basename = exe_basename,
         };
+        var thread_pool: ThreadPool = undefined;
+        try thread_pool.init(gpa);
+        defer thread_pool.deinit();
         const comp = Compilation.create(gpa, .{
             .zig_lib_directory = zig_lib_directory,
             .local_cache_directory = local_cache_directory,
@@ -2427,6 +2436,7 @@ pub fn cmdBuild(gpa: *Allocator, arena: *Allocator, args: []const []const u8) !v
             .emit_h = null,
             .optimize_mode = .Debug,
             .self_exe_path = self_exe_path,
+            .thread_pool = &thread_pool,
         }) catch |err| {
             fatal("unable to create compilation: {}", .{@errorName(err)});
         };
src/musl.zig
@@ -200,6 +200,7 @@ pub fn buildCRTFile(comp: *Compilation, crt_file: CRTFile) !void {
                 .root_pkg = null,
                 .output_mode = .Lib,
                 .link_mode = .Dynamic,
+                .thread_pool = comp.thread_pool,
                 .libc_installation = comp.bin_file.options.libc_installation,
                 .emit_bin = Compilation.EmitLoc{ .directory = null, .basename = "libc.so" },
                 .optimize_mode = comp.bin_file.options.optimize_mode,
src/test.zig
@@ -10,6 +10,7 @@ const enable_qemu: bool = build_options.enable_qemu;
 const enable_wine: bool = build_options.enable_wine;
 const enable_wasmtime: bool = build_options.enable_wasmtime;
 const glibc_multi_install_dir: ?[]const u8 = build_options.glibc_multi_install_dir;
+const ThreadPool = @import("ThreadPool.zig");
 
 const cheader = @embedFile("link/cbe.h");
 
@@ -467,6 +468,10 @@ pub const TestContext = struct {
         defer zig_lib_directory.handle.close();
         defer std.testing.allocator.free(zig_lib_directory.path.?);
 
+        var thread_pool: ThreadPool = undefined;
+        try thread_pool.init(std.testing.allocator);
+        defer thread_pool.deinit();
+
         for (self.cases.items) |case| {
             if (build_options.skip_non_native and case.target.getCpuArch() != std.Target.current.cpu.arch)
                 continue;
@@ -480,7 +485,13 @@ pub const TestContext = struct {
             progress.initial_delay_ns = 0;
             progress.refresh_rate_ns = 0;
 
-            try self.runOneCase(std.testing.allocator, &prg_node, case, zig_lib_directory);
+            try self.runOneCase(
+                std.testing.allocator,
+                &prg_node,
+                case,
+                zig_lib_directory,
+                &thread_pool,
+            );
         }
     }
 
@@ -490,6 +501,7 @@ pub const TestContext = struct {
         root_node: *std.Progress.Node,
         case: Case,
         zig_lib_directory: Compilation.Directory,
+        thread_pool: *ThreadPool,
     ) !void {
         const target_info = try std.zig.system.NativeTargetInfo.detect(allocator, case.target);
         const target = target_info.target;
@@ -539,6 +551,7 @@ pub const TestContext = struct {
             .local_cache_directory = zig_cache_directory,
             .global_cache_directory = zig_cache_directory,
             .zig_lib_directory = zig_lib_directory,
+            .thread_pool = thread_pool,
             .root_name = "test_case",
             .target = target,
             // TODO: support tests for object file building, and library builds
src/ThreadPool.zig
@@ -0,0 +1,116 @@
+const std = @import("std");
+const ThreadPool = @This();
+
+lock: std.Mutex = .{},
+is_running: bool = true,
+allocator: *std.mem.Allocator,
+running: usize = 0,
+threads: []*std.Thread,
+run_queue: RunQueue = .{},
+idle_queue: IdleQueue = .{},
+
+const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent);
+const RunQueue = std.SinglyLinkedList(Runnable);
+const Runnable = struct {
+    runFn: fn (*Runnable) void,
+};
+
+pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
+    self.* = .{
+        .allocator = allocator,
+        .threads = &[_]*std.Thread{},
+    };
+
+    errdefer self.deinit();
+
+    var num_threads = std.Thread.cpuCount() catch 1;
+    if (num_threads > 0)
+        self.threads = try allocator.alloc(*std.Thread, num_threads);
+
+    while (num_threads > 0) : (num_threads -= 1) {
+        const thread = try std.Thread.spawn(self, runWorker);
+        self.threads[self.running] = thread;
+        self.running += 1;
+    }
+}
+
+pub fn deinit(self: *ThreadPool) void {
+    self.shutdown();
+
+    std.debug.assert(!self.is_running);
+    for (self.threads[0..self.running]) |thread|
+        thread.wait();
+
+    defer self.threads = &[_]*std.Thread{};
+    if (self.running > 0)
+        self.allocator.free(self.threads);
+}
+
+pub fn shutdown(self: *ThreadPool) void {
+    const held = self.lock.acquire();
+
+    if (!self.is_running)
+        return held.release();
+
+    var idle_queue = self.idle_queue;
+    self.idle_queue = .{};
+    self.is_running = false;
+    held.release();
+
+    while (idle_queue.popFirst()) |idle_node|
+        idle_node.data.set();
+}
+
+pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
+    const Args = @TypeOf(args);
+    const Closure = struct {
+        arguments: Args,
+        pool: *ThreadPool,
+        run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
+
+        fn runFn(runnable: *Runnable) void {
+            const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
+            const closure = @fieldParentPtr(@This(), "run_node", run_node);
+            const result = @call(.{}, func, closure.arguments);
+            closure.pool.allocator.destroy(closure);
+        }
+    };
+
+    const closure = try self.allocator.create(Closure);
+    errdefer self.allocator.destroy(closure);
+    closure.* = .{
+        .arguments = args,
+        .pool = self,
+    };
+
+    const held = self.lock.acquire();
+    self.run_queue.prepend(&closure.run_node);
+
+    const idle_node = self.idle_queue.popFirst();
+    held.release();
+
+    if (idle_node) |node|
+        node.data.set();
+}
+
+fn runWorker(self: *ThreadPool) void {
+    while (true) {
+        const held = self.lock.acquire();
+
+        if (self.run_queue.popFirst()) |run_node| {
+            held.release();
+            (run_node.data.runFn)(&run_node.data);
+            continue;
+        }
+
+        if (!self.is_running) {
+            held.release();
+            return;
+        }
+
+        var idle_node = IdleQueue.Node{ .data = .{} };
+        self.idle_queue.prepend(&idle_node);
+        held.release();
+        idle_node.data.wait();
+    }
+}
src/WaitGroup.zig
@@ -0,0 +1,22 @@
+const std = @import("std");
+const WaitGroup = @This();
+
+counter: usize = 0,
+event: ?*std.AutoResetEvent = null,
+
+pub fn start(self: *WaitGroup) void {
+    _ = @atomicRmw(usize, &self.counter, .Add, 1, .SeqCst);
+}
+
+pub fn stop(self: *WaitGroup) void {
+    if (@atomicRmw(usize, &self.counter, .Sub, 1, .SeqCst) == 1)
+        if (@atomicRmw(?*std.AutoResetEvent, &self.event, .Xchg, null, .SeqCst)) |event|
+            event.set();
+}
+
+pub fn wait(self: *WaitGroup) void {
+    var event = std.AutoResetEvent{};
+    @atomicStore(?*std.AutoResetEvent, &self.event, &event, .SeqCst);
+    if (@atomicLoad(usize, &self.counter, .SeqCst) != 0)
+        event.wait();
+}