Commit 717cf00fe0

Andrew Kelley <andrew@ziglang.org>
2020-12-29 19:13:00
std.ChildProcess: improvements to collectOutputPosix
* read directly into the ArrayList buffers. * respect max_output_bytes * std.ArrayList: - make `allocatedSlice` public. - add `unusedCapacitySlice`. I removed the Windows implementation of this stuff; I am doing a partial merge of LemonBoy's patch with the understanding that a later patch can add the Windows implementation after it is vetted.
1 parent 892b37c
lib/std/os/windows/bits.zig
@@ -438,19 +438,6 @@ pub const SECURITY_ATTRIBUTES = extern struct {
 pub const PSECURITY_ATTRIBUTES = *SECURITY_ATTRIBUTES;
 pub const LPSECURITY_ATTRIBUTES = *SECURITY_ATTRIBUTES;
 
-pub const PIPE_ACCESS_INBOUND = 0x00000001;
-pub const PIPE_ACCESS_OUTBOUND = 0x00000002;
-pub const PIPE_ACCESS_DUPLEX = 0x00000003;
-
-pub const PIPE_TYPE_BYTE = 0x00000000;
-pub const PIPE_TYPE_MESSAGE = 0x00000004;
-
-pub const PIPE_READMODE_BYTE = 0x00000000;
-pub const PIPE_READMODE_MESSAGE = 0x00000002;
-
-pub const PIPE_WAIT = 0x00000000;
-pub const PIPE_NOWAIT = 0x00000001;
-
 pub const GENERIC_READ = 0x80000000;
 pub const GENERIC_WRITE = 0x40000000;
 pub const GENERIC_EXECUTE = 0x20000000;
lib/std/os/windows/kernel32.zig
@@ -15,29 +15,6 @@ pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL;
 pub extern "kernel32" fn CreateDirectoryW(lpPathName: [*:0]const u16, lpSecurityAttributes: ?*SECURITY_ATTRIBUTES) callconv(WINAPI) BOOL;
 pub extern "kernel32" fn SetEndOfFile(hFile: HANDLE) callconv(WINAPI) BOOL;
 
-pub extern "kernel32" fn GetCurrentProcessId() callconv(WINAPI) DWORD;
-
-pub extern "kernel32" fn CreateNamedPipeA(
-    lpName: [*:0]const u8,
-    dwOpenMode: DWORD,
-    dwPipeMode: DWORD,
-    nMaxInstances: DWORD,
-    nOutBufferSize: DWORD,
-    nInBufferSize: DWORD,
-    nDefaultTimeOut: DWORD,
-    lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES,
-) callconv(WINAPI) HANDLE;
-pub extern "kernel32" fn CreateNamedPipeW(
-    lpName: LPCWSTR,
-    dwOpenMode: DWORD,
-    dwPipeMode: DWORD,
-    nMaxInstances: DWORD,
-    nOutBufferSize: DWORD,
-    nInBufferSize: DWORD,
-    nDefaultTimeOut: DWORD,
-    lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES,
-) callconv(WINAPI) HANDLE;
-
 pub extern "kernel32" fn CreateEventExW(
     lpEventAttributes: ?*SECURITY_ATTRIBUTES,
     lpName: [*:0]const u16,
@@ -55,16 +32,6 @@ pub extern "kernel32" fn CreateFileW(
     hTemplateFile: ?HANDLE,
 ) callconv(WINAPI) HANDLE;
 
-pub extern "kernel32" fn CreateFileA(
-    lpFileName: [*:0]const u8,
-    dwDesiredAccess: DWORD,
-    dwShareMode: DWORD,
-    lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES,
-    dwCreationDisposition: DWORD,
-    dwFlagsAndAttributes: DWORD,
-    hTemplateFile: ?HANDLE,
-) callconv(WINAPI) HANDLE;
-
 pub extern "kernel32" fn CreatePipe(
     hReadPipe: *HANDLE,
     hWritePipe: *HANDLE,
lib/std/array_list.zig
@@ -337,11 +337,21 @@ pub fn ArrayListAligned(comptime T: type, comptime alignment: ?u29) type {
             return self.pop();
         }
 
-        // For a nicer API, `items.len` is the length, not the capacity.
-        // This requires "unsafe" slicing.
-        fn allocatedSlice(self: Self) Slice {
+        /// Returns a slice of all the items plus the extra capacity, whose memory
+        /// contents are undefined.
+        pub fn allocatedSlice(self: Self) Slice {
+            // For a nicer API, `items.len` is the length, not the capacity.
+            // This requires "unsafe" slicing.
             return self.items.ptr[0..self.capacity];
         }
+
+        /// Returns a slice of only the extra capacity after items.
+        /// This can be useful for writing directly into an `ArrayList`.
+        /// Note that such an operation must be followed up with a direct
+        /// modification of `self.items.len`.
+        pub fn unusedCapacitySlice(self: Self) Slice {
+            return self.allocatedSlice()[self.items.len..];
+        }
     };
 }
 
lib/std/child_process.zig
@@ -186,14 +186,22 @@ pub const ChildProcess = struct {
 
     pub const exec2 = @compileError("deprecated: exec2 is renamed to exec");
 
-    fn collectOutputPosix(child: *const ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void {
+    fn collectOutputPosix(
+        child: *const ChildProcess,
+        stdout: *std.ArrayList(u8),
+        stderr: *std.ArrayList(u8),
+        max_output_bytes: usize,
+    ) !void {
         var poll_fds = [_]os.pollfd{
             .{ .fd = child.stdout.?.handle, .events = os.POLLIN, .revents = undefined },
             .{ .fd = child.stderr.?.handle, .events = os.POLLIN, .revents = undefined },
         };
 
         var dead_fds: usize = 0;
-        var loop_buf: [4096]u8 = undefined;
+        // We ask for ensureCapacity with this much extra space. This has more of an
+        // effect on small reads because once the reads start to get larger the amount
+        // of space an ArrayList will allocate grows exponentially.
+        const bump_amt = 512;
 
         while (dead_fds < poll_fds.len) {
             const events = try os.poll(&poll_fds, std.math.maxInt(i32));
@@ -203,13 +211,17 @@ pub const ChildProcess = struct {
             // conditions.
             if (poll_fds[0].revents & os.POLLIN != 0) {
                 // stdout is ready.
-                const n = try os.read(poll_fds[0].fd, &loop_buf);
-                try stdout.appendSlice(loop_buf[0..n]);
+                const new_capacity = std.math.min(stdout.items.len + bump_amt, max_output_bytes);
+                if (new_capacity == stdout.capacity) return error.StdoutStreamTooLong;
+                try stdout.ensureCapacity(new_capacity);
+                stdout.items.len += try os.read(poll_fds[0].fd, stdout.unusedCapacitySlice());
             }
             if (poll_fds[1].revents & os.POLLIN != 0) {
                 // stderr is ready.
-                const n = try os.read(poll_fds[1].fd, &loop_buf);
-                try stderr.appendSlice(loop_buf[0..n]);
+                const new_capacity = std.math.min(stderr.items.len + bump_amt, max_output_bytes);
+                if (new_capacity == stderr.capacity) return error.StderrStreamTooLong;
+                try stderr.ensureCapacity(new_capacity);
+                stderr.items.len += try os.read(poll_fds[1].fd, stderr.unusedCapacitySlice());
             }
 
             // Exclude the fds that signaled an error.
@@ -224,61 +236,6 @@ pub const ChildProcess = struct {
         }
     }
 
-    fn collectOutputWindows(child: *const ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void {
-        var wait_objects = [_]windows.kernel32.HANDLE{
-            child.stdout.?.handle, child.stderr.?.handle,
-        };
-        var waiting_objects: u32 = wait_objects.len;
-
-        // XXX: Calling zeroes([2]windows.OVERLAPPED) causes the stage1 compiler
-        // to crash and burn.
-        var overlapped = [_]windows.OVERLAPPED{
-            mem.zeroes(windows.OVERLAPPED),
-            mem.zeroes(windows.OVERLAPPED),
-        };
-        var temp_buf: [2][4096]u8 = undefined;
-
-        // Kickstart the loop by issuing two async reads.
-        // ReadFile returns false and GetLastError returns ERROR_IO_PENDING if
-        // everything is ok.
-        _ = windows.kernel32.ReadFile(wait_objects[0], &temp_buf[0], temp_buf[0].len, null, &overlapped[0]);
-        _ = windows.kernel32.ReadFile(wait_objects[1], &temp_buf[1], temp_buf[1].len, null, &overlapped[1]);
-
-        poll: while (waiting_objects > 0) {
-            const status = windows.kernel32.WaitForMultipleObjects(waiting_objects, &wait_objects, 0, windows.INFINITE);
-            switch (status) {
-                windows.WAIT_OBJECT_0 + 0...windows.WAIT_OBJECT_0 + 1 => {
-                    // stdout (or stderr) is ready.
-                    const object = status - windows.WAIT_OBJECT_0;
-
-                    var read_bytes: u32 = undefined;
-                    if (windows.kernel32.GetOverlappedResult(wait_objects[object], &overlapped[object], &read_bytes, 0) == 0) {
-                        switch (windows.kernel32.GetLastError()) {
-                            .BROKEN_PIPE => {
-                                // Move it to the end to remove it.
-                                if (object != waiting_objects - 1)
-                                    mem.swap(windows.kernel32.HANDLE, &wait_objects[object], &wait_objects[waiting_objects - 1]);
-                                waiting_objects -= 1;
-                                continue :poll;
-                            },
-                            else => |err| return windows.unexpectedError(err),
-                        }
-                    }
-                    try stdout.appendSlice(temp_buf[object][0..read_bytes]);
-                    _ = windows.kernel32.ReadFile(wait_objects[object], &temp_buf[object], temp_buf[object].len, null, &overlapped[object]);
-                },
-                windows.WAIT_FAILED => {
-                    switch (windows.kernel32.GetLastError()) {
-                        else => |err| return windows.unexpectedError(err),
-                    }
-                },
-                // We're waiting with an infinite timeout
-                windows.WAIT_TIMEOUT => unreachable,
-                else => unreachable,
-            }
-        }
-    }
-
     /// Spawns a child process, waits for it, collecting stdout and stderr, and then returns.
     /// If it succeeds, the caller owns result.stdout and result.stderr memory.
     pub fn exec(args: struct {
@@ -303,16 +260,28 @@ pub const ChildProcess = struct {
 
         try child.spawn();
 
+        // TODO collect output in a deadlock-avoiding way on Windows.
+        // https://github.com/ziglang/zig/issues/6343
+        if (builtin.os.tag == .windows) {
+            const stdout_in = child.stdout.?.reader();
+            const stderr_in = child.stderr.?.reader();
+
+            const stdout = try stdout_in.readAllAlloc(args.allocator, args.max_output_bytes);
+            errdefer args.allocator.free(stdout);
+            const stderr = try stderr_in.readAllAlloc(args.allocator, args.max_output_bytes);
+            errdefer args.allocator.free(stderr);
+
+            return ExecResult{
+                .term = try child.wait(),
+                .stdout = stdout,
+                .stderr = stderr,
+            };
+        }
+
         var stdout = std.ArrayList(u8).init(args.allocator);
         var stderr = std.ArrayList(u8).init(args.allocator);
 
-        // XXX: Respect max_output_bytes
-        // XXX: Smarter reading logic, read directly into the ArrayList
-        if (builtin.os.tag == .windows) {
-            try collectOutputWindows(child, &stdout, &stderr, args.max_output_bytes);
-        } else {
-            try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes);
-        }
+        try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes);
 
         return ExecResult{
             .term = try child.wait(),
@@ -650,7 +619,7 @@ pub const ChildProcess = struct {
         var g_hChildStd_OUT_Wr: ?windows.HANDLE = null;
         switch (self.stdout_behavior) {
             StdIo.Pipe => {
-                try windowsMakePipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr);
+                try windowsMakePipeOut(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr);
             },
             StdIo.Ignore => {
                 g_hChildStd_OUT_Wr = nul_handle;
@@ -670,7 +639,7 @@ pub const ChildProcess = struct {
         var g_hChildStd_ERR_Wr: ?windows.HANDLE = null;
         switch (self.stderr_behavior) {
             StdIo.Pipe => {
-                try windowsMakePipe(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr);
+                try windowsMakePipeOut(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr);
             },
             StdIo.Ignore => {
                 g_hChildStd_ERR_Wr = nul_handle;
@@ -903,55 +872,6 @@ fn windowsDestroyPipe(rd: ?windows.HANDLE, wr: ?windows.HANDLE) void {
     if (wr) |h| os.close(h);
 }
 
-var pipe_name_counter = std.atomic.Int(u32).init(1);
-
-fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
-    var tmp_buf: [128]u8 = undefined;
-    // Forge a random path for the pipe.
-    const pipe_path = std.fmt.bufPrintZ(
-        &tmp_buf,
-        "\\\\.\\pipe\\zig-childprocess-{d}-{d}",
-        .{ windows.kernel32.GetCurrentProcessId(), pipe_name_counter.fetchAdd(1) },
-    ) catch unreachable;
-
-    // Create the read handle that can be used with overlapped IO ops.
-    const read_handle = windows.kernel32.CreateNamedPipeA(
-        pipe_path,
-        windows.PIPE_ACCESS_INBOUND | windows.FILE_FLAG_OVERLAPPED,
-        windows.PIPE_TYPE_BYTE,
-        1,
-        0x1000,
-        0x1000,
-        0,
-        sattr,
-    );
-    if (read_handle == windows.INVALID_HANDLE_VALUE) {
-        switch (windows.kernel32.GetLastError()) {
-            else => |err| return windows.unexpectedError(err),
-        }
-    }
-
-    const write_handle = windows.kernel32.CreateFileA(
-        pipe_path,
-        windows.GENERIC_WRITE,
-        0,
-        sattr,
-        windows.OPEN_EXISTING,
-        windows.FILE_ATTRIBUTE_NORMAL,
-        null,
-    );
-    if (write_handle == windows.INVALID_HANDLE_VALUE) {
-        switch (windows.kernel32.GetLastError()) {
-            else => |err| return windows.unexpectedError(err),
-        }
-    }
-
-    try windows.SetHandleInformation(read_handle, windows.HANDLE_FLAG_INHERIT, 0);
-
-    rd.* = read_handle;
-    wr.* = write_handle;
-}
-
 fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
     var rd_h: windows.HANDLE = undefined;
     var wr_h: windows.HANDLE = undefined;
lib/std/os.zig
@@ -5269,8 +5269,7 @@ pub const PollError = error{
 
 pub fn poll(fds: []pollfd, timeout: i32) PollError!usize {
     while (true) {
-        const fds_count = math.cast(nfds_t, fds.len) catch
-            return error.SystemResources;
+        const fds_count = math.cast(nfds_t, fds.len) catch return error.SystemResources;
         const rc = system.poll(fds.ptr, fds_count, timeout);
         if (builtin.os.tag == .windows) {
             if (rc == windows.ws2_32.SOCKET_ERROR) {