Commit 829c00a77f

Andrew Kelley <andrew@ziglang.org>
2020-12-21 22:21:51
kprotty ThreadPool and WaitGroup patch
1 parent 4eb4d26
ci/drone/linux_script
@@ -17,8 +17,7 @@ git config core.abbrev 9
 
 mkdir build
 cd build
-# TODO figure out why Drone CI is deadlocking and stop passing -DZIG_SINGLE_THREADED=ON
-cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja -DZIG_SINGLE_THREADED=ON
+cmake .. -DCMAKE_BUILD_TYPE=Release "-DCMAKE_INSTALL_PREFIX=$DISTDIR" -DZIG_STATIC=ON -DCMAKE_PREFIX_PATH=/deps/local -GNinja
 
 samu install
 ./zig build test -Dskip-release -Dskip-non-native
src/Event.zig
@@ -1,43 +0,0 @@
-// SPDX-License-Identifier: MIT
-// Copyright (c) 2015-2020 Zig Contributors
-// This file is part of [zig](https://ziglang.org/), which is MIT licensed.
-// The MIT license requires this copyright notice to be included in all copies
-// and substantial portions of the software.
-const std = @import("std");
-const Event = @This();
-
-lock: std.Mutex = .{},
-event: std.ResetEvent = undefined,
-state: enum { empty, waiting, notified } = .empty,
-
-pub fn wait(self: *Event) void {
-    const held = self.lock.acquire();
-
-    switch (self.state) {
-        .empty => {
-            self.state = .waiting;
-            self.event = @TypeOf(self.event).init();
-            held.release();
-            self.event.wait();
-            self.event.deinit();
-        },
-        .waiting => unreachable,
-        .notified => held.release(),
-    }
-}
-
-pub fn set(self: *Event) void {
-    const held = self.lock.acquire();
-
-    switch (self.state) {
-        .empty => {
-            self.state = .notified;
-            held.release();
-        },
-        .waiting => {
-            held.release();
-            self.event.set();
-        },
-        .notified => unreachable,
-    }
-}
src/ThreadPool.zig
@@ -9,12 +9,12 @@ const ThreadPool = @This();
 lock: std.Mutex = .{},
 is_running: bool = true,
 allocator: *std.mem.Allocator,
-running: usize = 0,
+spawned: usize = 0,
 threads: []*std.Thread,
 run_queue: RunQueue = .{},
 idle_queue: IdleQueue = .{},
 
-const IdleQueue = std.SinglyLinkedList(std.AutoResetEvent);
+const IdleQueue = std.SinglyLinkedList(std.ResetEvent);
 const RunQueue = std.SinglyLinkedList(Runnable);
 const Runnable = struct {
     runFn: fn (*Runnable) void,
@@ -30,49 +30,37 @@ pub fn init(self: *ThreadPool, allocator: *std.mem.Allocator) !void {
 
     errdefer self.deinit();
 
-    var num_threads = std.Thread.cpuCount() catch 1;
-    if (num_threads > 0)
-        self.threads = try allocator.alloc(*std.Thread, num_threads);
+    var num_threads = std.math.max(1, std.Thread.cpuCount() catch 1);
+    self.threads = try allocator.alloc(*std.Thread, num_threads);
 
     while (num_threads > 0) : (num_threads -= 1) {
         const thread = try std.Thread.spawn(self, runWorker);
-        self.threads[self.running] = thread;
-        self.running += 1;
+        self.threads[self.spawned] = thread;
+        self.spawned += 1;
     }
 }
 
 pub fn deinit(self: *ThreadPool) void {
-    self.shutdown();
-
-    std.debug.assert(!self.is_running);
-    for (self.threads[0..self.running]) |thread|
-        thread.wait();
-
-    defer self.threads = &[_]*std.Thread{};
-    if (self.running > 0)
-        self.allocator.free(self.threads);
-}
-
-pub fn shutdown(self: *ThreadPool) void {
-    const held = self.lock.acquire();
-
-    if (!self.is_running)
-        return held.release();
+    {
+        const held = self.lock.acquire();
+        defer held.release();
 
-    var idle_queue = self.idle_queue;
-    self.idle_queue = .{};
-    self.is_running = false;
-    held.release();
+        self.is_running = false;
+        while (self.idle_queue.popFirst()) |idle_node|
+            idle_node.data.set();
+    }
 
-    while (idle_queue.popFirst()) |idle_node|
-        idle_node.data.set();
+    defer self.allocator.free(self.threads);
+    for (self.threads[0..self.spawned]) |thread|
+        thread.wait();
 }
 
 pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
     if (std.builtin.single_threaded) {
-        @call(.{}, func, args);
+        const result = @call(.{}, func, args);
         return;
     }
+
     const Args = @TypeOf(args);
     const Closure = struct {
         arguments: Args,
@@ -83,24 +71,26 @@ pub fn spawn(self: *ThreadPool, comptime func: anytype, args: anytype) !void {
             const run_node = @fieldParentPtr(RunQueue.Node, "data", runnable);
             const closure = @fieldParentPtr(@This(), "run_node", run_node);
             const result = @call(.{}, func, closure.arguments);
+
+            const held = closure.pool.lock.acquire();
+            defer held.release();
             closure.pool.allocator.destroy(closure);
         }
     };
 
+    const held = self.lock.acquire();
+    defer held.release();
+
     const closure = try self.allocator.create(Closure);
     closure.* = .{
         .arguments = args,
         .pool = self,
     };
 
-    const held = self.lock.acquire();
     self.run_queue.prepend(&closure.run_node);
 
-    const idle_node = self.idle_queue.popFirst();
-    held.release();
-
-    if (idle_node) |node|
-        node.data.set();
+    if (self.idle_queue.popFirst()) |idle_node|
+        idle_node.data.set();
 }
 
 fn runWorker(self: *ThreadPool) void {
@@ -113,14 +103,18 @@ fn runWorker(self: *ThreadPool) void {
             continue;
         }
 
-        if (!self.is_running) {
+        if (self.is_running) {
+            var idle_node = IdleQueue.Node{ .data = std.ResetEvent.init() };
+            defer idle_node.data.deinit();
+
+            self.idle_queue.prepend(&idle_node);
             held.release();
-            return;
+
+            idle_node.data.wait();
+            continue;
         }
 
-        var idle_node = IdleQueue.Node{ .data = .{} };
-        self.idle_queue.prepend(&idle_node);
         held.release();
-        idle_node.data.wait();
+        return;
     }
 }
src/WaitGroup.zig
@@ -5,11 +5,10 @@
 // and substantial portions of the software.
 const std = @import("std");
 const WaitGroup = @This();
-const Event = @import("Event.zig");
 
 lock: std.Mutex = .{},
 counter: usize = 0,
-event: ?*Event = null,
+event: ?*std.ResetEvent = null,
 
 pub fn start(self: *WaitGroup) void {
     const held = self.lock.acquire();
@@ -19,28 +18,33 @@ pub fn start(self: *WaitGroup) void {
 }
 
 pub fn stop(self: *WaitGroup) void {
-    var event: ?*Event = null;
-    defer if (event) |waiter|
-        waiter.set();
-
     const held = self.lock.acquire();
     defer held.release();
 
     self.counter -= 1;
-    if (self.counter == 0)
-        std.mem.swap(?*Event, &self.event, &event);
+
+    if (self.counter == 0) {
+        if (self.event) |event| {
+            self.event = null;
+            event.set();
+        }
+    }
 }
 
 pub fn wait(self: *WaitGroup) void {
-    var event = Event{};
-    var has_event = false;
-    defer if (has_event)
-        event.wait();
-
     const held = self.lock.acquire();
-    defer held.release();
 
-    has_event = self.counter != 0;
-    if (has_event)
-        self.event = &event;
+    if (self.counter == 0) {
+        held.release();
+        return;
+    }
+
+    var event = std.ResetEvent.init();
+    defer event.deinit();
+
+    std.debug.assert(self.event == null);
+    self.event = &event;
+
+    held.release();
+    event.wait();
 }
CMakeLists.txt
@@ -512,7 +512,6 @@ set(ZIG_STAGE2_SOURCES
     "${CMAKE_SOURCE_DIR}/src/Cache.zig"
     "${CMAKE_SOURCE_DIR}/src/Compilation.zig"
     "${CMAKE_SOURCE_DIR}/src/DepTokenizer.zig"
-    "${CMAKE_SOURCE_DIR}/src/Event.zig"
     "${CMAKE_SOURCE_DIR}/src/Module.zig"
     "${CMAKE_SOURCE_DIR}/src/Package.zig"
     "${CMAKE_SOURCE_DIR}/src/RangeSet.zig"