master
  1//! Stores and manages the queue of link tasks. Each task is either a `PrelinkTask` or a `ZcuTask`.
  2//!
  3//! There must be at most one link thread (the thread processing these tasks) active at a time. If
  4//! `!comp.separateCodegenThreadOk()`, then ZCU tasks will be run on the main thread, bypassing this
  5//! queue entirely.
  6//!
  7//! All prelink tasks must be processed before any ZCU tasks are processed. After all prelink tasks
  8//! are run, but before any ZCU tasks are run, `prelink` must be called on the `link.File`.
  9//!
 10//! There will sometimes be a `ZcuTask` in the queue which is not yet ready because it depends on
 11//! MIR which has not yet been generated by any codegen thread. In this case, we must pause
 12//! processing of linker tasks until the MIR is ready. It would be incorrect to run any other link
 13//! tasks first, since this would make builds unreproducible.
 14
 15mutex: std.Thread.Mutex,
 16/// Validates that only one `flushTaskQueue` thread is running at a time.
 17flush_safety: std.debug.SafetyLock,
 18
 19/// This value is positive while there are still prelink tasks yet to be queued. Once they are
 20/// all queued, this value becomes 0, and ZCU tasks can be run. Guarded by `mutex`.
 21prelink_wait_count: u32,
 22
 23/// Prelink tasks which have been enqueued and are not yet owned by the worker thread.
 24/// Allocated into `gpa`, guarded by `mutex`.
 25queued_prelink: std.ArrayList(PrelinkTask),
 26/// The worker thread moves items from `queued_prelink` into this array in order to process them.
 27/// Allocated into `gpa`, accessed only by the worker thread.
 28wip_prelink: std.ArrayList(PrelinkTask),
 29
 30/// Like `queued_prelink`, but for ZCU tasks.
 31/// Allocated into `gpa`, guarded by `mutex`.
 32queued_zcu: std.ArrayList(ZcuTask),
 33/// Like `wip_prelink`, but for ZCU tasks.
 34/// Allocated into `gpa`, accessed only by the worker thread.
 35wip_zcu: std.ArrayList(ZcuTask),
 36
 37/// When processing ZCU link tasks, we might have to block due to unpopulated MIR. When this
 38/// happens, some tasks in `wip_zcu` have been run, and some are still pending. This is the
 39/// index into `wip_zcu` which we have reached.
 40wip_zcu_idx: usize,
 41
 42/// The sum of all `air_bytes` for all currently-queued `ZcuTask.link_func` tasks. Because
 43/// MIR bytes are approximately proportional to AIR bytes, this acts to limit the amount of
 44/// AIR and MIR which is queued for codegen and link respectively, to prevent excessive
 45/// memory usage if analysis produces AIR faster than it can be processed by codegen/link.
 46/// The cap is `max_air_bytes_in_flight`.
 47/// Guarded by `mutex`.
 48air_bytes_in_flight: u32,
 49/// If nonzero, then a call to `enqueueZcu` is blocked waiting to add a `link_func` task, but
 50/// cannot until `air_bytes_in_flight` is no greater than this value.
 51/// Guarded by `mutex`.
 52air_bytes_waiting: u32,
 53/// After setting `air_bytes_waiting`, `enqueueZcu` will wait on this condition (with `mutex`).
 54/// When `air_bytes_waiting` many bytes can be queued, this condition should be signaled.
 55air_bytes_cond: std.Thread.Condition,
 56
 57/// Guarded by `mutex`.
 58state: union(enum) {
 59    /// The link thread is currently running or queued to run.
 60    running,
 61    /// The link thread is not running or queued, because it has exhausted all immediately available
 62    /// tasks. It should be spawned when more tasks are enqueued. If `prelink_wait_count` is not
 63    /// zero, we are specifically waiting for prelink tasks.
 64    finished,
 65    /// The link thread is not running or queued, because it is waiting for this MIR to be populated.
 66    /// Once codegen completes, it must call `mirReady` which will restart the link thread.
 67    wait_for_mir: InternPool.Index,
 68},
 69
 70/// In the worst observed case, MIR is around 50 times as large as AIR. More typically, the ratio is
 71/// around 20. Going by that 50x multiplier, and assuming we want to consume no more than 500 MiB of
 72/// memory on AIR/MIR, we see a limit of around 10 MiB of AIR in-flight.
 73const max_air_bytes_in_flight = 10 * 1024 * 1024;
 74
 75/// The initial `Queue` state, containing no tasks, expecting no prelink tasks, and with no running worker thread.
 76/// The `queued_prelink` field may be appended to before calling `start`.
 77pub const empty: Queue = .{
 78    .mutex = .{},
 79    .flush_safety = .{},
 80    .prelink_wait_count = undefined, // set in `start`
 81    .queued_prelink = .empty,
 82    .wip_prelink = .empty,
 83    .queued_zcu = .empty,
 84    .wip_zcu = .empty,
 85    .wip_zcu_idx = 0,
 86    .state = .finished,
 87    .air_bytes_in_flight = 0,
 88    .air_bytes_waiting = 0,
 89    .air_bytes_cond = .{},
 90};
 91/// `lf` is needed to correctly deinit any pending `ZcuTask`s.
 92pub fn deinit(q: *Queue, comp: *Compilation) void {
 93    const gpa = comp.gpa;
 94    for (q.queued_zcu.items) |t| t.deinit(comp.zcu.?);
 95    for (q.wip_zcu.items[q.wip_zcu_idx..]) |t| t.deinit(comp.zcu.?);
 96    q.queued_prelink.deinit(gpa);
 97    q.wip_prelink.deinit(gpa);
 98    q.queued_zcu.deinit(gpa);
 99    q.wip_zcu.deinit(gpa);
100}
101
102/// This is expected to be called exactly once, after which the caller must not directly access
103/// `queued_prelink` any longer. This will spawn the link thread if necessary.
104pub fn start(q: *Queue, comp: *Compilation) void {
105    assert(q.state == .finished);
106    assert(q.queued_zcu.items.len == 0);
107    // Reset this to 1. We can't init it to 1 in `empty`, because it would fall to 0 on successive
108    // incremental updates, but we still need the initial 1.
109    q.prelink_wait_count = 1;
110    if (q.queued_prelink.items.len != 0) {
111        q.state = .running;
112        comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
113    }
114}
115
116/// Every call to this must be paired with a call to `finishPrelinkItem`.
117pub fn startPrelinkItem(q: *Queue) void {
118    q.mutex.lock();
119    defer q.mutex.unlock();
120    assert(q.prelink_wait_count > 0); // must not have finished everything already
121    q.prelink_wait_count += 1;
122}
123/// This function must be called exactly one more time than `startPrelinkItem` is. The final call
124/// indicates that we have finished calling `startPrelinkItem`, so once all pending items finish,
125/// we are ready to move on to ZCU tasks.
126pub fn finishPrelinkItem(q: *Queue, comp: *Compilation) void {
127    {
128        q.mutex.lock();
129        defer q.mutex.unlock();
130        q.prelink_wait_count -= 1;
131        if (q.prelink_wait_count != 0) return;
132        // The prelink task count dropped to 0; restart the linker thread if necessary.
133        switch (q.state) {
134            .wait_for_mir => unreachable, // we've not started zcu tasks yet
135            .running => return,
136            .finished => {},
137        }
138        assert(q.queued_prelink.items.len == 0);
139        // Even if there are no ZCU tasks, we must restart the linker thread to make sure
140        // that `link.File.prelink()` is called.
141        q.state = .running;
142    }
143    comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
144}
145
146/// Called by codegen workers after they have populated a `ZcuTask.LinkFunc.SharedMir`. If the link
147/// thread was waiting for this MIR, it can resume.
148pub fn mirReady(q: *Queue, comp: *Compilation, func_index: InternPool.Index, mir: *ZcuTask.LinkFunc.SharedMir) void {
149    // We would like to assert that `mir` is not pending, but that would race with a worker thread
150    // potentially freeing it.
151    {
152        q.mutex.lock();
153        defer q.mutex.unlock();
154        switch (q.state) {
155            .finished, .running => return,
156            .wait_for_mir => |wait_for| if (wait_for != func_index) return,
157        }
158        // We were waiting for `mir`, so we will restart the linker thread.
159        q.state = .running;
160    }
161    assert(mir.status.load(.acquire) != .pending);
162    comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
163}
164
165/// Enqueues all prelink tasks in `tasks`. Asserts that they were expected, i.e. that
166/// `prelink_wait_count` is not yet 0. Also asserts that `tasks.len` is not 0.
167pub fn enqueuePrelink(q: *Queue, comp: *Compilation, tasks: []const PrelinkTask) Allocator.Error!void {
168    {
169        q.mutex.lock();
170        defer q.mutex.unlock();
171        assert(q.prelink_wait_count > 0);
172        try q.queued_prelink.appendSlice(comp.gpa, tasks);
173        switch (q.state) {
174            .wait_for_mir => unreachable, // we've not started zcu tasks yet
175            .running => return,
176            .finished => {},
177        }
178        // Restart the linker thread, because it was waiting for a task
179        q.state = .running;
180    }
181    comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
182}
183
184pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!void {
185    assert(comp.separateCodegenThreadOk());
186    {
187        q.mutex.lock();
188        defer q.mutex.unlock();
189        // If this is a `link_func` task, we might need to wait for `air_bytes_in_flight` to fall.
190        if (task == .link_func) {
191            const max_in_flight = max_air_bytes_in_flight -| task.link_func.air_bytes;
192            while (q.air_bytes_in_flight > max_in_flight) {
193                q.air_bytes_waiting = task.link_func.air_bytes;
194                q.air_bytes_cond.wait(&q.mutex);
195                q.air_bytes_waiting = 0;
196            }
197            q.air_bytes_in_flight += task.link_func.air_bytes;
198        }
199        try q.queued_zcu.append(comp.gpa, task);
200        switch (q.state) {
201            .running, .wait_for_mir => return,
202            .finished => if (q.prelink_wait_count > 0) return,
203        }
204        // Restart the linker thread, unless it would immediately be blocked
205        if (task == .link_func and task.link_func.mir.status.load(.acquire) == .pending) {
206            q.state = .{ .wait_for_mir = task.link_func.func };
207            return;
208        }
209        q.state = .running;
210    }
211    comp.thread_pool.spawnWgId(&comp.link_task_wait_group, flushTaskQueue, .{ q, comp });
212}
213
214fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void {
215    q.flush_safety.lock(); // every `return` site should unlock this before unlocking `q.mutex`
216    if (std.debug.runtime_safety) {
217        q.mutex.lock();
218        defer q.mutex.unlock();
219        assert(q.state == .running);
220    }
221
222    var have_idle_tasks = true;
223    prelink: while (true) {
224        assert(q.wip_prelink.items.len == 0);
225        swap_queues: while (true) {
226            {
227                q.mutex.lock();
228                defer q.mutex.unlock();
229                std.mem.swap(std.ArrayList(PrelinkTask), &q.queued_prelink, &q.wip_prelink);
230                if (q.wip_prelink.items.len > 0) break :swap_queues;
231                if (q.prelink_wait_count == 0) break :prelink; // prelink is done
232                if (!have_idle_tasks) {
233                    // We're expecting more prelink tasks so can't move on to ZCU tasks.
234                    q.state = .finished;
235                    q.flush_safety.unlock();
236                    return;
237                }
238            }
239            have_idle_tasks = link.doIdleTask(comp, tid) catch |err| switch (err) {
240                error.OutOfMemory => have_idle_tasks: {
241                    comp.link_diags.setAllocFailure();
242                    break :have_idle_tasks false;
243                },
244                error.LinkFailure => false,
245            };
246        }
247        for (q.wip_prelink.items) |task| {
248            link.doPrelinkTask(comp, task);
249        }
250        have_idle_tasks = true;
251        q.wip_prelink.clearRetainingCapacity();
252    }
253
254    // We've finished the prelink tasks, so run prelink if necessary.
255    if (comp.bin_file) |lf| {
256        if (!lf.post_prelink) {
257            if (lf.prelink()) |_| {
258                lf.post_prelink = true;
259            } else |err| switch (err) {
260                error.OutOfMemory => comp.link_diags.setAllocFailure(),
261                error.LinkFailure => {},
262            }
263        }
264    }
265
266    // Now we can run ZCU tasks.
267    while (true) {
268        if (q.wip_zcu.items.len == q.wip_zcu_idx) swap_queues: {
269            q.wip_zcu.clearRetainingCapacity();
270            q.wip_zcu_idx = 0;
271            while (true) {
272                {
273                    q.mutex.lock();
274                    defer q.mutex.unlock();
275                    std.mem.swap(std.ArrayList(ZcuTask), &q.queued_zcu, &q.wip_zcu);
276                    if (q.wip_zcu.items.len > 0) break :swap_queues;
277                    if (!have_idle_tasks) {
278                        // We've exhausted all available tasks.
279                        q.state = .finished;
280                        q.flush_safety.unlock();
281                        return;
282                    }
283                }
284                have_idle_tasks = link.doIdleTask(comp, tid) catch |err| switch (err) {
285                    error.OutOfMemory => have_idle_tasks: {
286                        comp.link_diags.setAllocFailure();
287                        break :have_idle_tasks false;
288                    },
289                    error.LinkFailure => false,
290                };
291            }
292        }
293        const task = q.wip_zcu.items[q.wip_zcu_idx];
294        // If the task is a `link_func`, we might have to stop until its MIR is populated.
295        pending: {
296            if (task != .link_func) break :pending;
297            const status_ptr = &task.link_func.mir.status;
298            while (true) {
299                // First check without the mutex to optimize for the common case where MIR is ready.
300                if (status_ptr.load(.acquire) != .pending) break :pending;
301                if (have_idle_tasks) have_idle_tasks = link.doIdleTask(comp, tid) catch |err| switch (err) {
302                    error.OutOfMemory => have_idle_tasks: {
303                        comp.link_diags.setAllocFailure();
304                        break :have_idle_tasks false;
305                    },
306                    error.LinkFailure => false,
307                };
308                if (!have_idle_tasks) break;
309            }
310            q.mutex.lock();
311            defer q.mutex.unlock();
312            if (status_ptr.load(.acquire) != .pending) break :pending;
313            // We will stop for now, and get restarted once this MIR is ready.
314            q.state = .{ .wait_for_mir = task.link_func.func };
315            q.flush_safety.unlock();
316            return;
317        }
318        link.doZcuTask(comp, tid, task);
319        task.deinit(comp.zcu.?);
320        if (task == .link_func) {
321            // Decrease `air_bytes_in_flight`, since we've finished processing this MIR.
322            q.mutex.lock();
323            defer q.mutex.unlock();
324            q.air_bytes_in_flight -= task.link_func.air_bytes;
325            if (q.air_bytes_waiting != 0 and
326                q.air_bytes_in_flight <= max_air_bytes_in_flight -| q.air_bytes_waiting)
327            {
328                q.air_bytes_cond.signal();
329            }
330        }
331        q.wip_zcu_idx += 1;
332        have_idle_tasks = true;
333    }
334}
335
336const std = @import("std");
337const assert = std.debug.assert;
338const Allocator = std.mem.Allocator;
339const Compilation = @import("../Compilation.zig");
340const InternPool = @import("../InternPool.zig");
341const link = @import("../link.zig");
342const PrelinkTask = link.PrelinkTask;
343const ZcuTask = link.ZcuTask;
344const Queue = @This();