Commit 9e0338b82e

Jonathan Marler <johnnymarler@gmail.com>
2021-06-18 01:36:42
finish ChildProcess collectOutputWindows
This finishes LemonBoy's Draft PR ziglang#6750. It updates ChildProcess to collect the output from stdout/stderr asynchronously using Overlapped IO and named pipes.
1 parent 1e0d68e
Changed files (2)
lib/std/os/windows/kernel32.zig
@@ -8,6 +8,7 @@ usingnamespace @import("bits.zig");
 pub extern "kernel32" fn AddVectoredExceptionHandler(First: c_ulong, Handler: ?VECTORED_EXCEPTION_HANDLER) callconv(WINAPI) ?*c_void;
 pub extern "kernel32" fn RemoveVectoredExceptionHandler(Handle: HANDLE) callconv(WINAPI) c_ulong;
 
+pub extern "kernel32" fn CancelIo(hFile: HANDLE) callconv(WINAPI) BOOL;
 pub extern "kernel32" fn CancelIoEx(hFile: HANDLE, lpOverlapped: ?LPOVERLAPPED) callconv(WINAPI) BOOL;
 
 pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL;
@@ -15,29 +16,6 @@ pub extern "kernel32" fn CloseHandle(hObject: HANDLE) callconv(WINAPI) BOOL;
 pub extern "kernel32" fn CreateDirectoryW(lpPathName: [*:0]const u16, lpSecurityAttributes: ?*SECURITY_ATTRIBUTES) callconv(WINAPI) BOOL;
 pub extern "kernel32" fn SetEndOfFile(hFile: HANDLE) callconv(WINAPI) BOOL;
 
-pub extern "kernel32" fn GetCurrentProcessId() callconv(WINAPI) DWORD;
-
-pub extern "kernel32" fn CreateNamedPipeA(
-    lpName: [*:0]const u8,
-    dwOpenMode: DWORD,
-    dwPipeMode: DWORD,
-    nMaxInstances: DWORD,
-    nOutBufferSize: DWORD,
-    nInBufferSize: DWORD,
-    nDefaultTimeOut: DWORD,
-    lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES,
-) callconv(WINAPI) HANDLE;
-pub extern "kernel32" fn CreateNamedPipeW(
-    lpName: LPCWSTR,
-    dwOpenMode: DWORD,
-    dwPipeMode: DWORD,
-    nMaxInstances: DWORD,
-    nOutBufferSize: DWORD,
-    nInBufferSize: DWORD,
-    nDefaultTimeOut: DWORD,
-    lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES,
-) callconv(WINAPI) HANDLE;
-
 pub extern "kernel32" fn CreateEventExW(
     lpEventAttributes: ?*SECURITY_ATTRIBUTES,
     lpName: [*:0]const u16,
@@ -55,16 +33,6 @@ pub extern "kernel32" fn CreateFileW(
     hTemplateFile: ?HANDLE,
 ) callconv(WINAPI) HANDLE;
 
-pub extern "kernel32" fn CreateFileA(
-    lpFileName: [*:0]const u8,
-    dwDesiredAccess: DWORD,
-    dwShareMode: DWORD,
-    lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES,
-    dwCreationDisposition: DWORD,
-    dwFlagsAndAttributes: DWORD,
-    hTemplateFile: ?HANDLE,
-) callconv(WINAPI) HANDLE;
-
 pub extern "kernel32" fn CreatePipe(
     hReadPipe: *HANDLE,
     hWritePipe: *HANDLE,
@@ -72,6 +40,17 @@ pub extern "kernel32" fn CreatePipe(
     nSize: DWORD,
 ) callconv(WINAPI) BOOL;
 
+pub extern "kernel32" fn CreateNamedPipeW(
+    lpName: LPCWSTR,
+    dwOpenMode: DWORD,
+    dwPipeMode: DWORD,
+    nMaxInstances: DWORD,
+    nOutBufferSize: DWORD,
+    nInBufferSize: DWORD,
+    nDefaultTimeOut: DWORD,
+    lpSecurityAttributes: ?*const SECURITY_ATTRIBUTES,
+) callconv(WINAPI) HANDLE;
+
 pub extern "kernel32" fn CreateProcessW(
     lpApplicationName: ?LPWSTR,
     lpCommandLine: LPWSTR,
@@ -132,6 +111,8 @@ pub extern "kernel32" fn GetCurrentDirectoryW(nBufferLength: DWORD, lpBuffer: ?[
 pub extern "kernel32" fn GetCurrentThread() callconv(WINAPI) HANDLE;
 pub extern "kernel32" fn GetCurrentThreadId() callconv(WINAPI) DWORD;
 
+pub extern "kernel32" fn GetCurrentProcessId() callconv(WINAPI) DWORD;
+
 pub extern "kernel32" fn GetCurrentProcess() callconv(WINAPI) HANDLE;
 
 pub extern "kernel32" fn GetEnvironmentStringsW() callconv(WINAPI) ?[*:0]u16;
lib/std/child_process.zig
@@ -13,6 +13,7 @@ const process = std.process;
 const File = std.fs.File;
 const windows = os.windows;
 const mem = std.mem;
+const math = std.math;
 const debug = std.debug;
 const BufMap = std.BufMap;
 const builtin = std.builtin;
@@ -257,58 +258,76 @@ pub const ChildProcess = struct {
         }
     }
 
-    fn collectOutputWindows(child: *const ChildProcess, stdout: *std.ArrayList(u8), stderr: *std.ArrayList(u8), max_output_bytes: usize) !void {
-        var wait_objects = [_]windows.kernel32.HANDLE{
-            child.stdout.?.handle, child.stderr.?.handle,
+    fn collectOutputWindows(child: *const ChildProcess, outs: [2]*std.ArrayList(u8), max_output_bytes: usize) !void {
+        const bump_amt = 512;
+        const handles = [_]windows.HANDLE{
+            child.stdout.?.handle,
+            child.stderr.?.handle,
         };
-        var waiting_objects: u32 = wait_objects.len;
 
-        // XXX: Calling zeroes([2]windows.OVERLAPPED) causes the stage1 compiler
-        // to crash and burn.
         var overlapped = [_]windows.OVERLAPPED{
             mem.zeroes(windows.OVERLAPPED),
             mem.zeroes(windows.OVERLAPPED),
         };
-        var temp_buf: [2][4096]u8 = undefined;
-
-        // Kickstart the loop by issuing two async reads.
-        // ReadFile returns false and GetLastError returns ERROR_IO_PENDING if
-        // everything is ok.
-        _ = windows.kernel32.ReadFile(wait_objects[0], &temp_buf[0], temp_buf[0].len, null, &overlapped[0]);
-        _ = windows.kernel32.ReadFile(wait_objects[1], &temp_buf[1], temp_buf[1].len, null, &overlapped[1]);
-
-        poll: while (waiting_objects > 0) {
-            const status = windows.kernel32.WaitForMultipleObjects(waiting_objects, &wait_objects, 0, windows.INFINITE);
-            switch (status) {
-                windows.WAIT_OBJECT_0 + 0...windows.WAIT_OBJECT_0 + 1 => {
-                    // stdout (or stderr) is ready.
-                    const object = status - windows.WAIT_OBJECT_0;
-
-                    var read_bytes: u32 = undefined;
-                    if (windows.kernel32.GetOverlappedResult(wait_objects[object], &overlapped[object], &read_bytes, 0) == 0) {
-                        switch (windows.kernel32.GetLastError()) {
-                            .BROKEN_PIPE => {
-                                // Move it to the end to remove it.
-                                if (object != waiting_objects - 1)
-                                    mem.swap(windows.kernel32.HANDLE, &wait_objects[object], &wait_objects[waiting_objects - 1]);
-                                waiting_objects -= 1;
-                                continue :poll;
-                            },
-                            else => |err| return windows.unexpectedError(err),
-                        }
-                    }
-                    try stdout.appendSlice(temp_buf[object][0..read_bytes]);
-                    _ = windows.kernel32.ReadFile(wait_objects[object], &temp_buf[object], temp_buf[object].len, null, &overlapped[object]);
-                },
-                windows.WAIT_FAILED => {
-                    switch (windows.kernel32.GetLastError()) {
-                        else => |err| return windows.unexpectedError(err),
-                    }
-                },
-                // We're waiting with an infinite timeout
-                windows.WAIT_TIMEOUT => unreachable,
-                else => unreachable,
+
+        var wait_objects: [2]windows.HANDLE = undefined;
+        var wait_object_count: u2 = 0;
+
+        // we need to cancel all pending IO before returning so our OVERLAPPED values don't go out of scope
+        defer for (wait_objects[0..wait_object_count]) |o| {
+            _ = windows.kernel32.CancelIo(o);
+        };
+
+        // Windows Async IO requires an initial call to ReadFile before waiting on the handle
+        for ([_]u1{ 0, 1 }) |i| {
+            try outs[i].ensureCapacity(bump_amt);
+            const buf = outs[i].unusedCapacitySlice();
+            _ = windows.kernel32.ReadFile(handles[i], buf.ptr, math.cast(u32, buf.len) catch maxInt(u32), null, &overlapped[i]);
+            wait_objects[wait_object_count] = handles[i];
+            wait_object_count += 1;
+        }
+
+        while (true) {
+            const status = windows.kernel32.WaitForMultipleObjects(wait_object_count, &wait_objects, 0, windows.INFINITE);
+            if (status == windows.WAIT_FAILED) {
+                switch (windows.kernel32.GetLastError()) {
+                    else => |err| return windows.unexpectedError(err),
+                }
             }
+            if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + wait_object_count - 1)
+                unreachable;
+
+            const wait_idx = status - windows.WAIT_OBJECT_0;
+
+            // this extra `i` index is needed to map the wait handle back to the stdout or stderr
+            // values since the wait_idx can change which handle it corresponds with
+            const i: u1 = if (wait_objects[wait_idx] == handles[0]) 0 else 1;
+
+            // remove completed event from the wait list
+            wait_object_count -= 1;
+            if (wait_idx == 0)
+                wait_objects[0] = wait_objects[1];
+
+            var read_bytes: u32 = undefined;
+            if (windows.kernel32.GetOverlappedResult(handles[i], &overlapped[i], &read_bytes, 0) == 0) {
+                switch (windows.kernel32.GetLastError()) {
+                    .BROKEN_PIPE => {
+                        if (wait_object_count == 0)
+                            break;
+                        continue;
+                    },
+                    else => |err| return windows.unexpectedError(err),
+                }
+            }
+
+            outs[i].items.len += read_bytes;
+            const new_capacity = std.math.min(outs[i].items.len + bump_amt, max_output_bytes);
+            try outs[i].ensureCapacity(new_capacity);
+            const buf = outs[i].unusedCapacitySlice();
+            if (buf.len == 0) return if (i == 0) error.StdoutStreamTooLong else error.StderrStreamTooLong;
+            _ = windows.kernel32.ReadFile(handles[i], buf.ptr, math.cast(u32, buf.len) catch maxInt(u32), null, &overlapped[i]);
+            wait_objects[wait_object_count] = handles[i];
+            wait_object_count += 1;
         }
     }
 
@@ -361,12 +380,8 @@ pub const ChildProcess = struct {
             stderr.deinit();
         }
 
-        try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes);
-
-        // XXX: Respect max_output_bytes
-        // XXX: Smarter reading logic, read directly into the ArrayList
         if (builtin.os.tag == .windows) {
-            try collectOutputWindows(child, &stdout, &stderr, args.max_output_bytes);
+            try collectOutputWindows(child, [_]*std.ArrayList(u8){ &stdout, &stderr }, args.max_output_bytes);
         } else {
             try collectOutputPosix(child, &stdout, &stderr, args.max_output_bytes);
         }
@@ -707,7 +722,7 @@ pub const ChildProcess = struct {
         var g_hChildStd_OUT_Wr: ?windows.HANDLE = null;
         switch (self.stdout_behavior) {
             StdIo.Pipe => {
-                try windowsMakePipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr);
+                try windowsMakeAsyncPipe(&g_hChildStd_OUT_Rd, &g_hChildStd_OUT_Wr, &saAttr);
             },
             StdIo.Ignore => {
                 g_hChildStd_OUT_Wr = nul_handle;
@@ -727,7 +742,7 @@ pub const ChildProcess = struct {
         var g_hChildStd_ERR_Wr: ?windows.HANDLE = null;
         switch (self.stderr_behavior) {
             StdIo.Pipe => {
-                try windowsMakePipe(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr);
+                try windowsMakeAsyncPipe(&g_hChildStd_ERR_Rd, &g_hChildStd_ERR_Wr, &saAttr);
             },
             StdIo.Ignore => {
                 g_hChildStd_ERR_Wr = nul_handle;
@@ -960,25 +975,43 @@ fn windowsDestroyPipe(rd: ?windows.HANDLE, wr: ?windows.HANDLE) void {
     if (wr) |h| os.close(h);
 }
 
-var pipe_name_counter = std.atomic.Int(u32).init(1);
+fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
+    var rd_h: windows.HANDLE = undefined;
+    var wr_h: windows.HANDLE = undefined;
+    try windows.CreatePipe(&rd_h, &wr_h, sattr);
+    errdefer windowsDestroyPipe(rd_h, wr_h);
+    try windows.SetHandleInformation(wr_h, windows.HANDLE_FLAG_INHERIT, 0);
+    rd.* = rd_h;
+    wr.* = wr_h;
+}
 
-fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
-    var tmp_buf: [128]u8 = undefined;
-    // Forge a random path for the pipe.
-    const pipe_path = std.fmt.bufPrintZ(
-        &tmp_buf,
-        "\\\\.\\pipe\\zig-childprocess-{d}-{d}",
-        .{ windows.kernel32.GetCurrentProcessId(), pipe_name_counter.fetchAdd(1) },
-    ) catch unreachable;
+var pipe_name_counter = std.atomic.Atomic(u32).init(1);
+
+fn windowsMakeAsyncPipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
+    var tmp_bufw: [128]u16 = undefined;
+
+    // We must make a named pipe on windows because anonymous pipes do not support async IO
+    const pipe_path = blk: {
+        var tmp_buf: [128]u8 = undefined;
+        // Forge a random path for the pipe.
+        const pipe_path = std.fmt.bufPrintZ(
+            &tmp_buf,
+            "\\\\.\\pipe\\zig-childprocess-{d}-{d}",
+            .{ windows.kernel32.GetCurrentProcessId(), pipe_name_counter.fetchAdd(1, .Monotonic) },
+        ) catch unreachable;
+        const len = std.unicode.utf8ToUtf16Le(&tmp_bufw, pipe_path) catch unreachable;
+        tmp_bufw[len] = 0;
+        break :blk tmp_bufw[0..len :0];
+    };
 
     // Create the read handle that can be used with overlapped IO ops.
-    const read_handle = windows.kernel32.CreateNamedPipeA(
-        pipe_path,
+    const read_handle = windows.kernel32.CreateNamedPipeW(
+        pipe_path.ptr,
         windows.PIPE_ACCESS_INBOUND | windows.FILE_FLAG_OVERLAPPED,
         windows.PIPE_TYPE_BYTE,
         1,
-        0x1000,
-        0x1000,
+        4096,
+        4096,
         0,
         sattr,
     );
@@ -987,12 +1020,14 @@ fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const win
             else => |err| return windows.unexpectedError(err),
         }
     }
+    errdefer os.close(read_handle);
 
-    const write_handle = windows.kernel32.CreateFileA(
-        pipe_path,
+    var sattr_copy = sattr.*;
+    const write_handle = windows.kernel32.CreateFileW(
+        pipe_path.ptr,
         windows.GENERIC_WRITE,
         0,
-        sattr,
+        &sattr_copy,
         windows.OPEN_EXISTING,
         windows.FILE_ATTRIBUTE_NORMAL,
         null,
@@ -1002,6 +1037,7 @@ fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const win
             else => |err| return windows.unexpectedError(err),
         }
     }
+    errdefer os.close(write_handle);
 
     try windows.SetHandleInformation(read_handle, windows.HANDLE_FLAG_INHERIT, 0);
 
@@ -1009,26 +1045,6 @@ fn windowsMakePipe(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const win
     wr.* = write_handle;
 }
 
-fn windowsMakePipeIn(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
-    var rd_h: windows.HANDLE = undefined;
-    var wr_h: windows.HANDLE = undefined;
-    try windows.CreatePipe(&rd_h, &wr_h, sattr);
-    errdefer windowsDestroyPipe(rd_h, wr_h);
-    try windows.SetHandleInformation(wr_h, windows.HANDLE_FLAG_INHERIT, 0);
-    rd.* = rd_h;
-    wr.* = wr_h;
-}
-
-fn windowsMakePipeOut(rd: *?windows.HANDLE, wr: *?windows.HANDLE, sattr: *const windows.SECURITY_ATTRIBUTES) !void {
-    var rd_h: windows.HANDLE = undefined;
-    var wr_h: windows.HANDLE = undefined;
-    try windows.CreatePipe(&rd_h, &wr_h, sattr);
-    errdefer windowsDestroyPipe(rd_h, wr_h);
-    try windows.SetHandleInformation(rd_h, windows.HANDLE_FLAG_INHERIT, 0);
-    rd.* = rd_h;
-    wr.* = wr_h;
-}
-
 fn destroyPipe(pipe: [2]os.fd_t) void {
     os.close(pipe[0]);
     if (pipe[0] != pipe[1]) os.close(pipe[1]);