Commit 138e8b162a

Jonathan Marler <johnnymarler@gmail.com>
2023-02-28 22:10:44
std.child_process: use std.io.poll for collectOutput
1 parent 4f58a80
Changed files (1)
lib/std/child_process.zig
@@ -197,6 +197,19 @@ pub const ChildProcess = struct {
         stderr: []u8,
     };
 
+    fn fifoToOwnedArrayList(fifo: *std.io.PollFifo) std.ArrayList(u8) {
+        if (fifo.head > 0) {
+            std.mem.copy(u8, fifo.buf[0..fifo.count], fifo.buf[fifo.head .. fifo.head + fifo.count]);
+        }
+        const result = std.ArrayList(u8){
+            .items = fifo.buf[0..fifo.count],
+            .capacity = fifo.buf.len,
+            .allocator = fifo.allocator,
+        };
+        fifo.* = std.io.PollFifo.init(fifo.allocator);
+        return result;
+    }
+
     /// Collect the output from the process's stdout and stderr. Will return once all output
     /// has been collected. This does not mean that the process has ended. `wait` should still
     /// be called to wait for and clean up the process.
@@ -210,189 +223,28 @@ pub const ChildProcess = struct {
     ) !void {
         debug.assert(child.stdout_behavior == .Pipe);
         debug.assert(child.stderr_behavior == .Pipe);
-        if (builtin.os.tag == .windows) {
-            try collectOutputWindows(child, stdout, stderr, max_output_bytes);
-        } else {
-            try collectOutputPosix(child, stdout, stderr, max_output_bytes);
-        }
-    }
 
-    fn collectOutputPosix(
-        child: ChildProcess,
-        stdout: *std.ArrayList(u8),
-        stderr: *std.ArrayList(u8),
-        max_output_bytes: usize,
-    ) !void {
-        var poll_fds = [_]os.pollfd{
-            .{ .fd = child.stdout.?.handle, .events = os.POLL.IN, .revents = undefined },
-            .{ .fd = child.stderr.?.handle, .events = os.POLL.IN, .revents = undefined },
-        };
+        // we could make this work with multiple allocators but YAGNI
+        if (stdout.allocator.ptr != stderr.allocator.ptr or
+            stdout.allocator.vtable != stderr.allocator.vtable)
+            @panic("ChildProcess.collectOutput only supports 1 allocator");
 
-        var dead_fds: usize = 0;
-        // We ask for ensureTotalCapacity with this much extra space. This has more of an
-        // effect on small reads because once the reads start to get larger the amount
-        // of space an ArrayList will allocate grows exponentially.
-        const bump_amt = 512;
-
-        const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP;
-
-        while (dead_fds < poll_fds.len) {
-            const events = try os.poll(&poll_fds, std.math.maxInt(i32));
-            if (events == 0) continue;
-
-            var remove_stdout = false;
-            var remove_stderr = false;
-            // Try reading whatever is available before checking the error
-            // conditions.
-            // It's still possible to read after a POLL.HUP is received, always
-            // check if there's some data waiting to be read first.
-            if (poll_fds[0].revents & os.POLL.IN != 0) {
-                // stdout is ready.
-                const new_capacity = std.math.min(stdout.items.len + bump_amt, max_output_bytes);
-                try stdout.ensureTotalCapacity(new_capacity);
-                const buf = stdout.unusedCapacitySlice();
-                if (buf.len == 0) return error.StdoutStreamTooLong;
-                const nread = try os.read(poll_fds[0].fd, buf);
-                stdout.items.len += nread;
-
-                // Remove the fd when the EOF condition is met.
-                remove_stdout = nread == 0;
-            } else {
-                remove_stdout = poll_fds[0].revents & err_mask != 0;
-            }
-
-            if (poll_fds[1].revents & os.POLL.IN != 0) {
-                // stderr is ready.
-                const new_capacity = std.math.min(stderr.items.len + bump_amt, max_output_bytes);
-                try stderr.ensureTotalCapacity(new_capacity);
-                const buf = stderr.unusedCapacitySlice();
-                if (buf.len == 0) return error.StderrStreamTooLong;
-                const nread = try os.read(poll_fds[1].fd, buf);
-                stderr.items.len += nread;
-
-                // Remove the fd when the EOF condition is met.
-                remove_stderr = nread == 0;
-            } else {
-                remove_stderr = poll_fds[1].revents & err_mask != 0;
-            }
-
-            // Exclude the fds that signaled an error.
-            if (remove_stdout) {
-                poll_fds[0].fd = -1;
-                dead_fds += 1;
-            }
-            if (remove_stderr) {
-                poll_fds[1].fd = -1;
-                dead_fds += 1;
-            }
-        }
-    }
-
-    const WindowsAsyncReadResult = enum {
-        pending,
-        closed,
-        full,
-    };
-
-    fn windowsAsyncRead(
-        handle: windows.HANDLE,
-        overlapped: *windows.OVERLAPPED,
-        buf: *std.ArrayList(u8),
-        bump_amt: usize,
-        max_output_bytes: usize,
-    ) !WindowsAsyncReadResult {
-        while (true) {
-            const new_capacity = std.math.min(buf.items.len + bump_amt, max_output_bytes);
-            try buf.ensureTotalCapacity(new_capacity);
-            const next_buf = buf.unusedCapacitySlice();
-            if (next_buf.len == 0) return .full;
-            var read_bytes: u32 = undefined;
-            const read_result = windows.kernel32.ReadFile(handle, next_buf.ptr, math.cast(u32, next_buf.len) orelse maxInt(u32), &read_bytes, overlapped);
-            if (read_result == 0) return switch (windows.kernel32.GetLastError()) {
-                .IO_PENDING => .pending,
-                .BROKEN_PIPE => .closed,
-                else => |err| windows.unexpectedError(err),
-            };
-            buf.items.len += read_bytes;
-        }
-    }
-
-    fn collectOutputWindows(child: ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void {
-        const bump_amt = 512;
-        const outs = [_]*std.ArrayList(u8){
-            stdout,
-            stderr,
-        };
-        const handles = [_]windows.HANDLE{
-            child.stdout.?.handle,
-            child.stderr.?.handle,
-        };
-
-        var overlapped = [_]windows.OVERLAPPED{
-            mem.zeroes(windows.OVERLAPPED),
-            mem.zeroes(windows.OVERLAPPED),
-        };
-
-        var wait_objects: [2]windows.HANDLE = undefined;
-        var wait_object_count: u2 = 0;
-
-        // we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope
-        defer for (wait_objects[0..wait_object_count]) |o| {
-            _ = windows.kernel32.CancelIo(o);
-        };
-
-        // Windows Async IO requires an initial call to ReadFile before waiting on the handle
-        for ([_]u1{ 0, 1 }) |i| {
-            switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) {
-                .pending => {
-                    wait_objects[wait_object_count] = handles[i];
-                    wait_object_count += 1;
-                },
-                .closed => {}, // don't add to the wait_objects list
-                .full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong,
-            }
+        var poller = std.io.poll(stdout.allocator, enum { stdout, stderr }, .{
+            .stdout = child.stdout.?,
+            .stderr = child.stderr.?,
+        });
+        defer poller.deinit();
+
+        while (!poller.done()) {
+            try poller.poll();
+            if (poller.fifo(.stdout).count > max_output_bytes)
+                return error.StdoutStreamTooLong;
+            if (poller.fifo(.stderr).count > max_output_bytes)
+                return error.StderrStreamTooLong;
         }
 
-        while (wait_object_count > 0) {
-            const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE);
-            if (status == windows.WAIT_FAILED) {
-                switch (windows.kernel32.GetLastError()) {
-                    else => |err| return windows.unexpectedError(err),
-                }
-            }
-            if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1)
-                unreachable;
-
-            const wait_idx = status - windows.WAIT_OBJECT_0;
-
-            // this extra `i` index is needed to map the wait handle back to the stdout or stderr
-            // values since the wait_idx can change which handle it corresponds with
-            const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1;
-
-            // remove completed event from the wait list
-            wait_object_count -= 1;
-            if (wait_idx == 0)
-                wait_objects[0] = wait_objects[1];
-
-            var read_bytes: u32 = undefined;
-            if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) {
-                switch (windows.kernel32.GetLastError()) {
-                    .BROKEN_PIPE => continue,
-                    else => |err| return windows.unexpectedError(err),
-                }
-            }
-
-            outs[i].items.len += read_bytes;
-
-            switch (try windowsAsyncRead(handles[i], &overlapped[i], outs[i], bump_amt, max_output_bytes)) {
-                .pending => {
-                    wait_objects[wait_object_count] = handles[i];
-                    wait_object_count += 1;
-                },
-                .closed => {}, // don't add to the wait_objects list
-                .full => return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong,
-            }
-        }
+        stdout.* = fifoToOwnedArrayList(poller.fifo(.stdout));
+        stderr.* = fifoToOwnedArrayList(poller.fifo(.stderr));
     }
 
     /// Spawns a child process, waits for it, collecting stdout and stderr, and then returns.