Commit 5bb5aaf932

mlugg <mlugg@mlugg.co.uk>
2025-06-12 10:56:37
compiler: don't queue too much AIR/MIR
Without this cap, unlucky scheduling and/or details of what pipeline stages perform best on the host machine could cause many gigabytes of MIR to be stuck in the queue. At a certain point, pause the main thread until some of the functions in flight have been processed.
1 parent f9a670d
Changed files (3)
src/link/Queue.zig
@@ -39,6 +39,21 @@ wip_zcu: std.ArrayListUnmanaged(ZcuTask),
 /// index into `wip_zcu` which we have reached.
 wip_zcu_idx: usize,
 
+/// The sum of all `air_bytes` for all currently-queued `ZcuTask.link_func` tasks. Because
+/// MIR bytes are approximately proportional to AIR bytes, this acts to limit the amount of
+/// AIR and MIR which is queued for codegen and link respectively, to prevent excessive
+/// memory usage if analysis produces AIR faster than it can be processed by codegen/link.
+/// The cap is `max_air_bytes_in_flight`.
+/// Guarded by `mutex`.
+air_bytes_in_flight: u32,
+/// If nonzero, then a call to `enqueueZcu` is blocked waiting to add a `link_func` task, but
+/// cannot until `air_bytes_in_flight` is no greater than this value.
+/// Guarded by `mutex`.
+air_bytes_waiting: u32,
+/// After setting `air_bytes_waiting`, `enqueueZcu` will wait on this condition (with `mutex`).
+/// When `air_bytes_waiting` many bytes can be queued, this condition should be signaled.
+air_bytes_cond: std.Thread.Condition,
+
 /// Guarded by `mutex`.
 state: union(enum) {
     /// The link thread is currently running or queued to run.
@@ -52,6 +67,11 @@ state: union(enum) {
     wait_for_mir: *ZcuTask.LinkFunc.SharedMir,
 },
 
+/// In the worst observed case, MIR is around 50 times as large as AIR. More typically, the ratio is
+/// around 20. Going by that 50x multiplier, and assuming we want to consume no more than 500 MiB of
+/// memory on AIR/MIR, we see a limit of around 10 MiB of AIR in-flight.
+const max_air_bytes_in_flight = 10 * 1024 * 1024;
+
 /// The initial `Queue` state, containing no tasks, expecting no prelink tasks, and with no running worker thread.
 /// The `pending_prelink_tasks` and `queued_prelink` fields may be modified as needed before calling `start`.
 pub const empty: Queue = .{
@@ -64,6 +84,9 @@ pub const empty: Queue = .{
     .wip_zcu = .empty,
     .wip_zcu_idx = 0,
     .state = .finished,
+    .air_bytes_in_flight = 0,
+    .air_bytes_waiting = 0,
+    .air_bytes_cond = .{},
 };
 /// `lf` is needed to correctly deinit any pending `ZcuTask`s.
 pub fn deinit(q: *Queue, comp: *Compilation) void {
@@ -131,6 +154,16 @@ pub fn enqueueZcu(q: *Queue, comp: *Compilation, task: ZcuTask) Allocator.Error!
     {
         q.mutex.lock();
         defer q.mutex.unlock();
+        // If this is a `link_func` task, we might need to wait for `air_bytes_in_flight` to fall.
+        if (task == .link_func) {
+            const max_in_flight = max_air_bytes_in_flight -| task.link_func.air_bytes;
+            while (q.air_bytes_in_flight > max_in_flight) {
+                q.air_bytes_waiting = task.link_func.air_bytes;
+                q.air_bytes_cond.wait(&q.mutex);
+                q.air_bytes_waiting = 0;
+            }
+            q.air_bytes_in_flight += task.link_func.air_bytes;
+        }
         try q.queued_zcu.append(comp.gpa, task);
         switch (q.state) {
             .running, .wait_for_mir => return,
@@ -221,6 +254,17 @@ fn flushTaskQueue(tid: usize, q: *Queue, comp: *Compilation) void {
         }
         link.doZcuTask(comp, tid, task);
         task.deinit(comp.zcu.?);
+        if (task == .link_func) {
+            // Decrease `air_bytes_in_flight`, since we've finished processing this MIR.
+            q.mutex.lock();
+            defer q.mutex.unlock();
+            q.air_bytes_in_flight -= task.link_func.air_bytes;
+            if (q.air_bytes_waiting != 0 and
+                q.air_bytes_in_flight <= max_air_bytes_in_flight -| q.air_bytes_waiting)
+            {
+                q.air_bytes_cond.signal();
+            }
+        }
         q.wip_zcu_idx += 1;
     }
 }
src/Compilation.zig
@@ -4607,12 +4607,17 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void {
             };
             assert(zcu.pending_codegen_jobs.rmw(.Add, 1, .monotonic) > 0); // the "Code Generation" node hasn't been ended
             zcu.codegen_prog_node.increaseEstimatedTotalItems(1);
+            // This value is used as a heuristic to avoid queueing too much AIR/MIR at once (hence
+            // using a lot of memory). If this would cause too many AIR bytes to be in-flight, we
+            // will block on the `dispatchZcuLinkTask` call below.
+            const air_bytes: u32 = @intCast(air.instructions.len * 5 + air.extra.items.len * 4);
             if (comp.separateCodegenThreadOk()) {
                 // `workerZcuCodegen` takes ownership of `air`.
                 comp.thread_pool.spawnWgId(&comp.link_task_wait_group, workerZcuCodegen, .{ comp, func.func, air, shared_mir });
                 comp.dispatchZcuLinkTask(tid, .{ .link_func = .{
                     .func = func.func,
                     .mir = shared_mir,
+                    .air_bytes = air_bytes,
                 } });
             } else {
                 {
@@ -4624,6 +4629,7 @@ fn processOneJob(tid: usize, comp: *Compilation, job: Job) JobError!void {
                 comp.dispatchZcuLinkTask(tid, .{ .link_func = .{
                     .func = func.func,
                     .mir = shared_mir,
+                    .air_bytes = air_bytes,
                 } });
                 air.deinit(gpa);
             }
src/link.zig
@@ -1267,6 +1267,11 @@ pub const ZcuTask = union(enum) {
         /// the codegen job to ensure that the linker receives functions in a deterministic order,
         /// allowing reproducible builds.
         mir: *SharedMir,
+        /// This is not actually used by `doZcuTask`. Instead, `Queue` uses this value as a heuristic
+        /// to avoid queueing too much AIR/MIR for codegen/link at a time. Essentially, we cap the
+        /// total number of AIR bytes which are being processed at once, preventing unbounded memory
+        /// usage when AIR is produced faster than it is processed.
+        air_bytes: u32,
 
         pub const SharedMir = struct {
             /// This is initially `.pending`. When `value` is populated, the codegen thread will set