Commit df46f5af69

Andrew Kelley <andrew@ziglang.org>
2024-05-24 02:01:06
std.Progress: include subtrees from child processes
1 parent f071164
Changed files (2)
lib
lib/std/process/Child.zig
@@ -98,7 +98,10 @@ resource_usage_statistics: ResourceUsageStatistics = .{},
 /// write end of the pipe will be specified in the `ZIG_PROGRESS`
 /// environment variable inside the child process. The progress reported by
 /// the child will be attached to this progress node in the parent process.
-parent_progress_node: std.Progress.Node = .{ .index = .none },
+///
+/// The child's progress tree will be grafted into the parent's progress tree,
+/// by substituting this node with the child's root node.
+progress_node: std.Progress.Node = .{ .index = .none },
 
 pub const ResourceUsageStatistics = struct {
     rusage: @TypeOf(rusage_init) = rusage_init,
@@ -581,11 +584,11 @@ fn spawnPosix(self: *ChildProcess) SpawnError!void {
     }
 
     const prog_pipe: [2]posix.fd_t = p: {
-        if (self.parent_progress_node.index == .none) {
+        if (self.progress_node.index == .none) {
             break :p .{ -1, -1 };
         } else {
             // No CLOEXEC because the child needs access to this file descriptor.
-            break :p try posix.pipe2(.{});
+            break :p try posix.pipe2(.{ .NONBLOCK = true });
         }
     };
     errdefer destroyPipe(prog_pipe);
@@ -685,18 +688,18 @@ fn spawnPosix(self: *ChildProcess) SpawnError!void {
 
     // we are the parent
     const pid: i32 = @intCast(pid_result);
-    if (self.stdin_behavior == StdIo.Pipe) {
-        self.stdin = File{ .handle = stdin_pipe[1] };
+    if (self.stdin_behavior == .Pipe) {
+        self.stdin = .{ .handle = stdin_pipe[1] };
     } else {
         self.stdin = null;
     }
-    if (self.stdout_behavior == StdIo.Pipe) {
-        self.stdout = File{ .handle = stdout_pipe[0] };
+    if (self.stdout_behavior == .Pipe) {
+        self.stdout = .{ .handle = stdout_pipe[0] };
     } else {
         self.stdout = null;
     }
-    if (self.stderr_behavior == StdIo.Pipe) {
-        self.stderr = File{ .handle = stderr_pipe[0] };
+    if (self.stderr_behavior == .Pipe) {
+        self.stderr = .{ .handle = stderr_pipe[0] };
     } else {
         self.stderr = null;
     }
@@ -705,15 +708,17 @@ fn spawnPosix(self: *ChildProcess) SpawnError!void {
     self.err_pipe = err_pipe;
     self.term = null;
 
-    if (self.stdin_behavior == StdIo.Pipe) {
+    if (self.stdin_behavior == .Pipe) {
         posix.close(stdin_pipe[0]);
     }
-    if (self.stdout_behavior == StdIo.Pipe) {
+    if (self.stdout_behavior == .Pipe) {
         posix.close(stdout_pipe[1]);
     }
-    if (self.stderr_behavior == StdIo.Pipe) {
+    if (self.stderr_behavior == .Pipe) {
         posix.close(stderr_pipe[1]);
     }
+
+    self.progress_node.setIpcFd(prog_pipe[0]);
 }
 
 fn spawnWindows(self: *ChildProcess) SpawnError!void {
lib/std/Progress.zig
@@ -83,6 +83,22 @@ pub const Node = struct {
         /// Little endian.
         estimated_total_count: u32,
         name: [max_name_len]u8,
+
+        fn getIpcFd(s: Storage) ?posix.fd_t {
+            if (s.estimated_total_count != std.math.maxInt(u32))
+                return null;
+
+            return @bitCast(s.completed_count);
+        }
+
+        fn setIpcFd(s: *Storage, fd: posix.fd_t) void {
+            s.estimated_total_count = std.math.maxInt(u32);
+            s.completed_count = @bitCast(fd);
+        }
+
+        comptime {
+            assert((@sizeOf(Storage) % 4) == 0);
+        }
     };
 
     const Parent = enum(u16) {
@@ -201,6 +217,13 @@ pub const Node = struct {
         }
     }
 
+    /// Posix-only. Used by `std.process.Child`.
+    pub fn setIpcFd(node: Node, fd: posix.fd_t) void {
+        const index = node.index.unwrap() orelse return;
+        assert(fd != -1);
+        storageByIndex(index).setIpcFd(fd);
+    }
+
     fn storageByIndex(index: Node.Index) *Node.Storage {
         return &global_progress.node_storage[@intFromEnum(index)];
     }
@@ -475,14 +498,8 @@ fn serialize() Serialized {
         }
     }
 
-    // Now we can analyze our copy of the graph without atomics, reconstructing
-    // children lists which do not exist in the canonical data. These are
-    // needed for tree traversal below.
-    const serialized_node_parents = serialized_node_parents_buffer[0..serialized_len];
-    const serialized_node_storage = serialized_node_storage_buffer[0..serialized_len];
-
     // Remap parents to point inside serialized arrays.
-    for (serialized_node_parents) |*parent| {
+    for (serialized_node_parents_buffer[0..serialized_len]) |*parent| {
         parent.* = switch (parent.*) {
             .unused => unreachable,
             .none => .none,
@@ -490,15 +507,99 @@ fn serialize() Serialized {
         };
     }
 
+    // Find nodes which correspond to child processes.
+    var pipe_buf: [4096]u8 align(4) = undefined;
+
+    for (
+        serialized_node_parents_buffer[0..serialized_len],
+        serialized_node_storage_buffer[0..serialized_len],
+        0..,
+    ) |main_parent, *main_storage, main_index| {
+        if (main_parent == .unused) continue;
+        const fd = main_storage.getIpcFd() orelse continue;
+        var bytes_read: usize = 0;
+        while (true) {
+            bytes_read += posix.read(fd, pipe_buf[bytes_read..]) catch |err| switch (err) {
+                error.WouldBlock => break,
+                else => |e| {
+                    std.log.warn("failed to read child progress data: {s}", .{@errorName(e)});
+                    main_storage.completed_count = 0;
+                    main_storage.estimated_total_count = 0;
+                    continue;
+                },
+            };
+        }
+        // Ignore all but the last message on the pipe.
+        var input: []align(2) u8 = pipe_buf[0..bytes_read];
+        if (input.len == 0) {
+            main_storage.completed_count = 0;
+            main_storage.estimated_total_count = 0;
+            continue;
+        }
+
+        const storage, const parents = while (true) {
+            if (input.len < 4) {
+                std.log.warn("short read: {d} out of 4 header bytes", .{input.len});
+                main_storage.completed_count = 0;
+                main_storage.estimated_total_count = 0;
+                continue;
+            }
+            const subtree_len = std.mem.readInt(u32, input[0..4], .little);
+            const expected_bytes = 4 + subtree_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent));
+            if (input.len < expected_bytes) {
+                std.log.warn("short read: {d} out of {d} ({d} nodes)", .{ input.len, expected_bytes, subtree_len });
+                main_storage.completed_count = 0;
+                main_storage.estimated_total_count = 0;
+                continue;
+            }
+            if (input.len > expected_bytes) {
+                input = @alignCast(input[expected_bytes..]);
+                continue;
+            }
+            const storage_bytes = input[4..][0 .. subtree_len * @sizeOf(Node.Storage)];
+            const parents_bytes = input[4 + storage_bytes.len ..][0 .. subtree_len * @sizeOf(Node.Parent)];
+            break .{
+                std.mem.bytesAsSlice(Node.Storage, storage_bytes),
+                std.mem.bytesAsSlice(Node.Parent, parents_bytes),
+            };
+        };
+
+        // Mount the root here.
+        main_storage.* = storage[0];
+
+        // Copy the rest of the tree to the end.
+        @memcpy(serialized_node_storage_buffer[serialized_len..][0 .. storage.len - 1], storage[1..]);
+
+        // Patch up parent pointers taking into account how the subtree is mounted.
+        serialized_node_parents_buffer[serialized_len] = .none;
+
+        for (serialized_node_parents_buffer[serialized_len..][0 .. parents.len - 1], parents[1..]) |*dest, p| {
+            dest.* = switch (p) {
+                // Fix bad data so the rest of the code does not see `unused`.
+                .none, .unused => .none,
+                // Root node is being mounted here.
+                @as(Node.Parent, @enumFromInt(0)) => @enumFromInt(main_index),
+                // Other nodes mounted at the end.
+                _ => |off| @enumFromInt(serialized_len + @intFromEnum(off) - 1),
+            };
+        }
+
+        serialized_len += storage.len - 1;
+    }
+
     return .{
-        .parents = serialized_node_parents,
-        .storage = serialized_node_storage,
+        .parents = serialized_node_parents_buffer[0..serialized_len],
+        .storage = serialized_node_storage_buffer[0..serialized_len],
     };
 }
 
 fn computeRedraw() []u8 {
     const serialized = serialize();
 
+    // Now we can analyze our copy of the graph without atomics, reconstructing
+    // children lists which do not exist in the canonical data. These are
+    // needed for tree traversal below.
+
     var children_buffer: [default_node_storage_buffer_len]Children = undefined;
     const children = children_buffer[0..serialized.parents.len];
 
@@ -624,7 +725,8 @@ fn write(buf: []const u8) void {
 
 fn writeIpc(fd: posix.fd_t, serialized: Serialized) void {
     assert(serialized.parents.len == serialized.storage.len);
-    const header = std.mem.asBytes(&serialized.parents.len);
+    const serialized_len: u32 = @intCast(serialized.parents.len);
+    const header = std.mem.asBytes(&serialized_len);
     const storage = std.mem.sliceAsBytes(serialized.storage);
     const parents = std.mem.sliceAsBytes(serialized.parents);
 
@@ -637,10 +739,17 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) void {
     // TODO: if big endian, byteswap
     // this is needed because the parent or child process might be running in qemu
 
-    const file: std.fs.File = .{ .handle = fd };
-    file.writevAll(&vecs) catch |err| {
-        std.log.warn("failed to send progress to parent process: {s}", .{@errorName(err)});
-    };
+    // If this write would block we do not want to keep trying, but we need to
+    // know if a partial message was written.
+    if (posix.writev(fd, &vecs)) |written| {
+        const total = header.len + storage.len + parents.len;
+        if (written < total) {
+            std.log.warn("short write: {d} out of {d}", .{ written, total });
+        }
+    } else |err| switch (err) {
+        error.WouldBlock => {},
+        else => |e| std.log.warn("failed to send progress to parent process: {s}", .{@errorName(e)}),
+    }
 }
 
 fn maybeUpdateSize(resize_flag: bool) void {