master
1const builtin = @import("builtin");
2const std = @import("std");
3const assert = std.debug.assert;
4const WaitGroup = @This();
5
6const is_waiting: usize = 1 << 0;
7const one_pending: usize = 1 << 1;
8
9state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
10event: std.Thread.ResetEvent = .unset,
11
12pub fn start(self: *WaitGroup) void {
13 return startStateless(&self.state);
14}
15
16pub fn startStateless(state: *std.atomic.Value(usize)) void {
17 const prev_state = state.fetchAdd(one_pending, .monotonic);
18 assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending));
19}
20
21pub fn startMany(self: *WaitGroup, n: usize) void {
22 const state = self.state.fetchAdd(one_pending * n, .monotonic);
23 assert((state / one_pending) < (std.math.maxInt(usize) / one_pending));
24}
25
26pub fn finish(self: *WaitGroup) void {
27 const state = self.state.fetchSub(one_pending, .acq_rel);
28 assert((state / one_pending) > 0);
29
30 if (state == (one_pending | is_waiting)) {
31 self.event.set();
32 }
33}
34
35pub fn finishStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
36 const prev_state = state.fetchSub(one_pending, .acq_rel);
37 assert((prev_state / one_pending) > 0);
38 if (prev_state == (one_pending | is_waiting)) event.set();
39}
40
41pub fn wait(wg: *WaitGroup) void {
42 return waitStateless(&wg.state, &wg.event);
43}
44
45pub fn waitStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
46 const prev_state = state.fetchAdd(is_waiting, .acquire);
47 assert(prev_state & is_waiting == 0);
48 if ((prev_state / one_pending) > 0) event.wait();
49}
50
51pub fn reset(self: *WaitGroup) void {
52 self.state.store(0, .monotonic);
53 self.event.reset();
54}
55
56pub fn isDone(wg: *WaitGroup) bool {
57 const state = wg.state.load(.acquire);
58 assert(state & is_waiting == 0);
59
60 return (state / one_pending) == 0;
61}
62
63pub fn value(wg: *WaitGroup) usize {
64 return wg.state.load(.monotonic) / one_pending;
65}
66
67// Spawns a new thread for the task. This is appropriate when the callee
68// delegates all work.
69pub fn spawnManager(
70 wg: *WaitGroup,
71 comptime func: anytype,
72 args: anytype,
73) void {
74 if (builtin.single_threaded) {
75 @call(.auto, func, args);
76 return;
77 }
78 const Manager = struct {
79 fn run(wg_inner: *WaitGroup, args_inner: @TypeOf(args)) void {
80 defer wg_inner.finish();
81 @call(.auto, func, args_inner);
82 }
83 };
84 wg.start();
85 const t = std.Thread.spawn(.{}, Manager.run, .{ wg, args }) catch return Manager.run(wg, args);
86 t.detach();
87}