master
1//! Stores and manages the queue of link tasks. Each task is either a `PrelinkTask` or a `ZcuTask`.
2//!
3//! There must be at most one link thread (the thread processing these tasks) active at a time. If
4//! `!comp.separateCodegenThreadOk()`, then ZCU tasks will be run on the main thread, bypassing this
5//! queue entirely.
6//!
7//! All prelink tasks must be processed before any ZCU tasks are processed. After all prelink tasks
8//! are run, but before any ZCU tasks are run, `prelink` must be called on the `link.File`.
9//!
10//! There will sometimes be a `ZcuTask` in the queue which is not yet ready because it depends on
11//! MIR which has not yet been generated by any codegen thread. In this case, we must pause
12//! processing of linker tasks until the MIR is ready. It would be incorrect to run any other link
13//! tasks first, since this would make builds unreproducible.
14
15mutex: std.Thread.Mutex,
16/// Validates that only one `flushTaskQueue` thread is running at a time.
17flush_safety: std.debug.SafetyLock,
18
19/// This value is positive while there are still prelink tasks yet to be queued. Once they are
20/// all queued, this value becomes 0, and ZCU tasks can be run. Guarded by `mutex`.
21prelink_wait_count: u32,
22
23/// Prelink tasks which have been enqueued and are not yet owned by the worker thread.
24/// Allocated into `gpa`, guarded by `mutex`.
25queued_prelink: std.ArrayList(PrelinkTask),
26/// The worker thread moves items from `queued_prelink` into this array in order to process them.
27/// Allocated into `gpa`, accessed only by the worker thread.
28wip_prelink: std.ArrayList(PrelinkTask),
29
30/// Like `queued_prelink`, but for ZCU tasks.
31/// Allocated into `gpa`, guarded by `mutex`.
32queued_zcu: std.ArrayList(ZcuTask),
33/// Like `wip_prelink`, but for ZCU tasks.
34/// Allocated into `gpa`, accessed only by the worker thread.
35wip_zcu: std.ArrayList(ZcuTask),
36
37/// When processing ZCU link tasks, we might have to block due to unpopulated MIR. When this
38/// happens, some tasks in `wip_zcu` have been run, and some are still pending. This is the
39/// index into `wip_zcu` which we have reached.
40wip_zcu_idx: usize,
41
42/// The sum of all `air_bytes` for all currently-queued `ZcuTask.link_func` tasks. Because
43/// MIR bytes are approximately proportional to AIR bytes, this acts to limit the amount of
44/// AIR and MIR which is queued for codegen and link respectively, to prevent excessive
45/// memory usage if analysis produces AIR faster than it can be processed by codegen/link.
46/// The cap is `max_air_bytes_in_flight`.
47/// Guarded by `mutex`.
48air_bytes_in_flight: u32,
49/// If nonzero, then a call to `enqueueZcu` is blocked waiting to add a `link_func` task, but
50/// cannot until `air_bytes_in_flight` is no greater than this value.
51/// Guarded by `mutex`.
52air_bytes_waiting: u32,
53/// After setting `air_bytes_waiting`, `enqueueZcu` will wait on this condition (with `mutex`).
54/// When `air_bytes_waiting` many bytes can be queued, this condition should be signaled.
55air_bytes_cond: std.Thread.Condition,
56
57/// Guarded by `mutex`.
58state: union(enum) {
59 /// The link thread is currently running or queued to run.
60 running,
61 /// The link thread is not running or queued, because it has exhausted all immediately available
62 /// tasks. It should be spawned when more tasks are enqueued. If `prelink_wait_count` is not
63 /// zero, we are specifically waiting for prelink tasks.
64 finished,
65 /// The link thread is not running or queued, because it is waiting for this MIR to be populated.
66 /// Once codegen completes, it must call `mirReady` which will restart the link thread.
67 wait_for_mir: InternPool.Index,
68},
69
70/// In the worst observed case, MIR is around 50 times as large as AIR. More typically, the ratio is
71/// around 20. Going by that 50x multiplier, and assuming we want to consume no more than 500 MiB of
72/// memory on AIR/MIR, we see a limit of around 10 MiB of AIR in-flight.
73const max_air_bytes_in_flight = 10 * 1024 * 1024;
74
75/// The initial `Queue` state, containing no tasks, expecting no prelink tasks, and with no running worker thread.
76/// The `queued_prelink` field may be appended to before calling `start`.
77pub const empty: Queue = .{
78 .mutex = .{},
79 .flush_safety = .{},
80 .prelink_wait_count = undefined, // set in `start`
81 .queued_prelink = .empty,
82 .wip_prelink = .empty,
83 .queued_zcu = .empty,
84 .wip_zcu = .empty,
85 .wip_zcu_idx = 0,
86 .state = .finished,
87 .air_bytes_in_flight = 0,
88 .air_bytes_waiting = 0,
89 .air_bytes_cond = .{},
90};
91/// `lf` is needed to correctly deinit any pending `ZcuTask`s.
92pub fn deinit(q: *Queue, comp: *Compilation) void {
93 const gpa = comp.gpa;
94 for (q.queued_zcu.items) |t| t.deinit(comp.zcu.?);
95 for (q.wip_zcu.items[q.wip_zcu_idx..]) |t| t.deinit(comp.zcu.?);
96 q.queued_prelink.deinit(gpa);
97 q.wip_prelink.deinit(gpa);
98 q.queued_zcu.deinit(gpa);
99 q.wip_zcu.deinit(gpa);
100}
101
102/// This is expected to be called exactly once, after which the caller must not directly access
103/// `queued_prelink` any longer. This will spawn the link thread if necessary.
104pub fn start(q: *Queue, comp: *Compilation) void {
105 assert(q.state == .finished);
106 assert(q.queued_zcu.items.len == 0);
107 // Reset this to 1. We can't init it to 1 in `empty`, because it would fall to 0 on successive
108 // incremental updates, but we still need the initial 1.
109 q.prelink_wait_count = 1;
110 if (q.queued_prelink.items.len != 0) {
111 q.state = .running;
112 comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
113 }
114}
115
116/// Every call to this must be paired with a call to `finishPrelinkItem`.
117pub fn startPrelinkItem(q: *Queue) void {
118 q.mutex.lock();
119 defer q.mutex.unlock();
120 assert(q.prelink_wait_count > 0); // must not have finished everything already
121 q.prelink_wait_count += 1;
122}
123/// This function must be called exactly one more time than `startPrelinkItem` is. The final call
124/// indicates that we have finished calling `startPrelinkItem`, so once all pending items finish,
125/// we are ready to move on to ZCU tasks.
126pub fn finishPrelinkItem(q: *Queue, comp: *Compilation) void {
127 {
128 q.mutex.lock();
129 defer q.mutex.unlock();
130 q.prelink_wait_count -= 1;
131 if (q.prelink_wait_count != 0) return;
132 // The prelink task count dropped to 0; restart the linker thread if necessary.
133 switch (q.state) {
134 .wait_for_mir => unreachable, // we've not started zcu tasks yet
135 .running => return,
136 .finished => {},
137 }
138 assert(q.queued_prelink.items.len == 0);
139 // Even if there are no ZCU tasks, we must restart the linker thread to make sure
140 // that `link.File.prelink()` is called.
141 q.state = .running;
142 }
143 comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
144}
145
146/// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link
147/// thread was waiting for this MIR, it can resume.
148pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir: *ZcuTask.LinkFunc.SharedMir) void {
149 // We would like to assert that `mir` is not pending, but that would race with a worker thread
150 // potentially freeing it.
151 {
152 q.mutex.lock();
153 defer q.mutex.unlock();
154 switch (q.state) {
155 .finished, .running => return,
156 .wait_for_mir => |wait_for| if (wait_for != func_index) return,
157 }
158 // We were waiting for `mir`, so we will restart the linker thread.
159 q.state = .running;
160 }
161 assert(mir.status.load(.acquire) != .pending);
162 comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
163}
164
165/// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that
166/// `prelink_wait_count` is not yet 0. Also asserts that `tasks.len` is not 0.
167pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void {
168 {
169 q.mutex.lock();
170 defer q.mutex.unlock();
171 assert(q.prelink_wait_count > 0);
172 try q.queued_prelink.appendSlice(comp.gpa, tasks);
173 switch (q.state) {
174 .wait_for_mir => unreachable, // we've not started zcu tasks yet
175 .running => return,
176 .finished => {},
177 }
178 // Restart the linker thread, because it was waiting for a task
179 q.state = .running;
180 }
181 comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
182}
183
184pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void {
185 assert(comp.separateCodegenThreadOk());
186 {
187 q.mutex.lock();
188 defer q.mutex.unlock();
189 // If this is a `link_func` task, we might need to wait for `air_bytes_in_flight` to fall.
190 if (task == .link_func) {
191 const max_in_flight = max_air_bytes_in_flight -| task.link_func.air_bytes;
192 while (q.air_bytes_in_flight > max_in_flight) {
193 q.air_bytes_waiting = task.link_func.air_bytes;
194 q.air_bytes_cond.wait(&q.mutex);
195 q.air_bytes_waiting = 0;
196 }
197 q.air_bytes_in_flight += task.link_func.air_bytes;
198 }
199 try q.queued_zcu.append(comp.gpa, task);
200 switch (q.state) {
201 .running, .wait_for_mir => return,
202 .finished => if (q.prelink_wait_count > 0) return,
203 }
204 // Restart the linker thread, unless it would immediately be blocked
205 if (task == .link_func and task.link_func.mir.status.load(.acquire) == .pending) {
206 q.state = .{ .wait_for_mir = task.link_func.func };
207 return;
208 }
209 q.state = .running;
210 }
211 comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
212}
213
214fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void {
215 q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex`
216 if (std.debug.runtime_safety) {
217 q.mutex.lock();
218 defer q.mutex.unlock();
219 assert(q.state == .running);
220 }
221
222 var have_idle_tasks = true;
223 prelink: while (true) {
224 assert(q.wip_prelink.items.len == 0);
225 swap_queues: while (true) {
226 {
227 q.mutex.lock();
228 defer q.mutex.unlock();
229 std.mem.swap(std.ArrayList(PrelinkTask), &q.queued_prelink, &q.wip_prelink);
230 if (q.wip_prelink.items.len > 0) break :swap_queues;
231 if (q.prelink_wait_count == 0) break :prelink; // prelink is done
232 if (!have_idle_tasks) {
233 // We're expecting more prelink tasks so can't move on to ZCU tasks.
234 q.state = .finished;
235 q.flush_safety.unlock();
236 return;
237 }
238 }
239 have_idle_tasks = link.doIdleTask(comp, tid) catch |err| switch (err) {
240 error.OutOfMemory => have_idle_tasks: {
241 comp.link_diags.setAllocFailure();
242 break :have_idle_tasks false;
243 },
244 error.LinkFailure => false,
245 };
246 }
247 for (q.wip_prelink.items) |task| {
248 link.doPrelinkTask(comp, task);
249 }
250 have_idle_tasks = true;
251 q.wip_prelink.clearRetainingCapacity();
252 }
253
254 // We've finished the prelink tasks, so run prelink if necessary.
255 if (comp.bin_file) |lf| {
256 if (!lf.post_prelink) {
257 if (lf.prelink()) |_| {
258 lf.post_prelink = true;
259 } else |err| switch (err) {
260 error.OutOfMemory => comp.link_diags.setAllocFailure(),
261 error.LinkFailure => {},
262 }
263 }
264 }
265
266 // Now we can run ZCU tasks.
267 while (true) {
268 if (q.wip_zcu.items.len == q.wip_zcu_idx) swap_queues: {
269 q.wip_zcu.clearRetainingCapacity();
270 q.wip_zcu_idx = 0;
271 while (true) {
272 {
273 q.mutex.lock();
274 defer q.mutex.unlock();
275 std.mem.swap(std.ArrayList(ZcuTask), &q.queued_zcu, &q.wip_zcu);
276 if (q.wip_zcu.items.len > 0) break :swap_queues;
277 if (!have_idle_tasks) {
278 // We've exhausted all available tasks.
279 q.state = .finished;
280 q.flush_safety.unlock();
281 return;
282 }
283 }
284 have_idle_tasks = link.doIdleTask(comp, tid) catch |err| switch (err) {
285 error.OutOfMemory => have_idle_tasks: {
286 comp.link_diags.setAllocFailure();
287 break :have_idle_tasks false;
288 },
289 error.LinkFailure => false,
290 };
291 }
292 }
293 const task = q.wip_zcu.items[q.wip_zcu_idx];
294 // If the task is a `link_func`, we might have to stop until its MIR is populated.
295 pending: {
296 if (task != .link_func) break :pending;
297 const status_ptr = &task.link_func.mir.status;
298 while (true) {
299 // First check without the mutex to optimize for the common case where MIR is ready.
300 if (status_ptr.load(.acquire) != .pending) break :pending;
301 if (have_idle_tasks) have_idle_tasks = link.doIdleTask(comp, tid) catch |err| switch (err) {
302 error.OutOfMemory => have_idle_tasks: {
303 comp.link_diags.setAllocFailure();
304 break :have_idle_tasks false;
305 },
306 error.LinkFailure => false,
307 };
308 if (!have_idle_tasks) break;
309 }
310 q.mutex.lock();
311 defer q.mutex.unlock();
312 if (status_ptr.load(.acquire) != .pending) break :pending;
313 // We will stop for now, and get restarted once this MIR is ready.
314 q.state = .{ .wait_for_mir = task.link_func.func };
315 q.flush_safety.unlock();
316 return;
317 }
318 link.doZcuTask(comp, tid, task);
319 task.deinit(comp.zcu.?);
320 if (task == .link_func) {
321 // Decrease `air_bytes_in_flight`, since we've finished processing this MIR.
322 q.mutex.lock();
323 defer q.mutex.unlock();
324 q.air_bytes_in_flight -= task.link_func.air_bytes;
325 if (q.air_bytes_waiting != 0 and
326 q.air_bytes_in_flight <= max_air_bytes_in_flight -| q.air_bytes_waiting)
327 {
328 q.air_bytes_cond.signal();
329 }
330 }
331 q.wip_zcu_idx += 1;
332 have_idle_tasks = true;
333 }
334}
335
336const std = @import("std");
337const assert = std.debug.assert;
338const Allocator = std.mem.Allocator;
339const Compilation = @import("../Compilation.zig");
340const InternPool = @import("../InternPool.zig");
341const link = @import("../link.zig");
342const PrelinkTask = link.PrelinkTask;
343const ZcuTask = link.ZcuTask;
344const Queue = @This();