Commit 90db767721

mlugg <mlugg@mlugg.co.uk>
2024-09-29 01:03:46
std: async read into small temporary buffer between `poll` calls on Windows
This commit changes how `std.io.poll` is implemented on Windows. The new implementation unfortunately incurs a little extra system call overhead, but fixes several bugs in the old implementation: * The `lpNumberOfBytesRead` parameter of `ReadFile` was used with overlapped I/O. This is explicitly disallowed by the documentation, as the value written to this pointer is "potentially erroneous"; instead, `GetOverlappedResult` must always be used, even if the operation immediately returns. Documentation states that `lpNumberOfBytesRead` cannot be passed as null on Windows 7, so for compatibility, the parameter is passed as a pointer to a dummy global. * If the initial `ReadFile` returned data, and the next read returned `BROKEN_PIPE`, the received data was silently ignored in the sense that `pollWindows` did not `return`, instead waiting for data to come in on another file (or for all files to close). * The asynchronous `ReadFile` calls which were left pending between calls to `pollWindows` pointed to a potentially unstable buffer, since the user of `poll` may use part of the `LinearFifo` API which rotate its ring buffer. This race condition was causing CI failures in some uses of the compiler server protocol. These issues are all resolved. Now, `pollWindows` will queue an initial read to a small (128-byte) stable buffer per file. When this read is completed, reads directly into the FIFO's writable slice are performed until one is left pending, at which point that read is cancelled (with a check to see if it was completed between the `ReadFile` and `CancelIo` calls) and the next read into the small stable buffer is queued. These small buffer reads are the ones left pending between `pollWindows` calls, avoiding the race condition described above. Related: #21565
1 parent ada6061
Changed files (1)
lib
std
lib/std/io.zig
@@ -442,6 +442,7 @@ pub fn poll(
         .overlapped = [1]windows.OVERLAPPED{
             mem.zeroes(windows.OVERLAPPED),
         } ** enum_fields.len,
+        .small_bufs = undefined,
         .active = .{
             .count = 0,
             .handles_buf = undefined,
@@ -481,6 +482,7 @@ pub fn Poller(comptime StreamEnum: type) type {
         windows: if (is_windows) struct {
             first_read_done: bool,
             overlapped: [enum_fields.len]windows.OVERLAPPED,
+            small_bufs: [enum_fields.len][128]u8,
             active: struct {
                 count: math.IntFittingRange(0, enum_fields.len),
                 handles_buf: [enum_fields.len]windows.HANDLE,
@@ -534,24 +536,31 @@ pub fn Poller(comptime StreamEnum: type) type {
             const bump_amt = 512;
 
             if (!self.windows.first_read_done) {
-                // Windows Async IO requires an initial call to ReadFile before waiting on the handle
+                var already_read_data = false;
                 for (0..enum_fields.len) |i| {
                     const handle = self.windows.active.handles_buf[i];
-                    switch (try windowsAsyncRead(
+                    switch (try windowsAsyncReadToFifoAndQueueSmallRead(
                         handle,
                         &self.windows.overlapped[i],
                         &self.fifos[i],
+                        &self.windows.small_bufs[i],
                         bump_amt,
                     )) {
-                        .pending => {
+                        .populated, .empty => |state| {
+                            if (state == .populated) already_read_data = true;
                             self.windows.active.handles_buf[self.windows.active.count] = handle;
                             self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i));
                             self.windows.active.count += 1;
                         },
                         .closed => {}, // don't add to the wait_objects list
+                        .closed_populated => {
+                            // don't add to the wait_objects list, but we did already get data
+                            already_read_data = true;
+                        },
                     }
                 }
                 self.windows.first_read_done = true;
+                if (already_read_data) return true;
             }
 
             while (true) {
@@ -576,32 +585,35 @@ pub fn Poller(comptime StreamEnum: type) type {
 
                 const active_idx = status - windows.WAIT_OBJECT_0;
 
-                const handle = self.windows.active.handles_buf[active_idx];
                 const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]);
-                var read_bytes: u32 = undefined;
-                if (0 == windows.kernel32.GetOverlappedResult(
-                    handle,
-                    &self.windows.overlapped[stream_idx],
-                    &read_bytes,
-                    0,
-                )) switch (windows.GetLastError()) {
-                    .BROKEN_PIPE => {
+                const handle = self.windows.active.handles_buf[active_idx];
+
+                const overlapped = &self.windows.overlapped[stream_idx];
+                const stream_fifo = &self.fifos[stream_idx];
+                const small_buf = &self.windows.small_bufs[stream_idx];
+
+                const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
+                    .success => |n| n,
+                    .closed => {
                         self.windows.active.removeAt(active_idx);
                         continue;
                     },
-                    else => |err| return windows.unexpectedError(err),
+                    .aborted => unreachable,
                 };
+                try stream_fifo.write(small_buf[0..num_bytes_read]);
 
-                self.fifos[stream_idx].update(read_bytes);
-
-                switch (try windowsAsyncRead(
+                switch (try windowsAsyncReadToFifoAndQueueSmallRead(
                     handle,
-                    &self.windows.overlapped[stream_idx],
-                    &self.fifos[stream_idx],
+                    overlapped,
+                    stream_fifo,
+                    small_buf,
                     bump_amt,
                 )) {
-                    .pending => {},
-                    .closed => self.windows.active.removeAt(active_idx),
+                    .empty => {}, // irrelevant, we already got data from the small buffer
+                    .populated => {},
+                    .closed,
+                    .closed_populated, // identical, since we already got data from the small buffer
+                    => self.windows.active.removeAt(active_idx),
                 }
                 return true;
             }
@@ -654,25 +666,145 @@ pub fn Poller(comptime StreamEnum: type) type {
     };
 }
 
-fn windowsAsyncRead(
+/// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
+/// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
+/// compatibility, we point it to this dummy variables, which we never otherwise access.
+/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
+var win_dummy_bytes_read: u32 = undefined;
+
+/// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
+/// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
+/// is available. `handle` must have no pending asynchronous operation.
+fn windowsAsyncReadToFifoAndQueueSmallRead(
     handle: windows.HANDLE,
     overlapped: *windows.OVERLAPPED,
     fifo: *PollFifo,
+    small_buf: *[128]u8,
     bump_amt: usize,
-) !enum { pending, closed } {
+) !enum { empty, populated, closed_populated, closed } {
+    var read_any_data = false;
     while (true) {
-        const buf = try fifo.writableWithSize(bump_amt);
-        var read_bytes: u32 = undefined;
-        const read_result = windows.kernel32.ReadFile(handle, buf.ptr, math.cast(u32, buf.len) orelse math.maxInt(u32), &read_bytes, overlapped);
-        if (read_result == 0) return switch (windows.GetLastError()) {
-            .IO_PENDING => .pending,
-            .BROKEN_PIPE => .closed,
-            else => |err| windows.unexpectedError(err),
+        const fifo_read_pending = while (true) {
+            const buf = try fifo.writableWithSize(bump_amt);
+            const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
+
+            if (0 == windows.kernel32.ReadFile(
+                handle,
+                buf.ptr,
+                buf_len,
+                &win_dummy_bytes_read,
+                overlapped,
+            )) switch (windows.GetLastError()) {
+                .IO_PENDING => break true,
+                .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
+                else => |err| return windows.unexpectedError(err),
+            };
+
+            const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
+                .success => |n| n,
+                .closed => return if (read_any_data) .closed_populated else .closed,
+                .aborted => unreachable,
+            };
+
+            read_any_data = true;
+            fifo.update(num_bytes_read);
+
+            if (num_bytes_read == buf_len) {
+                // We filled the buffer, so there's probably more data available.
+                continue;
+            } else {
+                // We didn't fill the buffer, so assume we're out of data.
+                // There is no pending read.
+                break false;
+            }
         };
-        fifo.update(read_bytes);
+
+        if (fifo_read_pending) cancel_read: {
+            // Cancel the pending read into the FIFO.
+            _ = windows.kernel32.CancelIo(handle);
+
+            // We have to wait for the handle to be signalled, i.e. for the cancellation to complete.
+            switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
+                windows.WAIT_OBJECT_0 => {},
+                windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
+                else => unreachable,
+            }
+
+            // If it completed before we canceled, make sure to tell the FIFO!
+            const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) {
+                .success => |n| n,
+                .closed => return if (read_any_data) .closed_populated else .closed,
+                .aborted => break :cancel_read,
+            };
+            read_any_data = true;
+            fifo.update(num_bytes_read);
+        }
+
+        // Try to queue the 1-byte read.
+        if (0 == windows.kernel32.ReadFile(
+            handle,
+            small_buf,
+            small_buf.len,
+            &win_dummy_bytes_read,
+            overlapped,
+        )) switch (windows.GetLastError()) {
+            .IO_PENDING => {
+                // 1-byte read pending as intended
+                return if (read_any_data) .populated else .empty;
+            },
+            .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
+            else => |err| return windows.unexpectedError(err),
+        };
+
+        // We got data back this time. Write it to the FIFO and run the main loop again.
+        const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
+            .success => |n| n,
+            .closed => return if (read_any_data) .closed_populated else .closed,
+            .aborted => unreachable,
+        };
+        try fifo.write(small_buf[0..num_bytes_read]);
+        read_any_data = true;
     }
 }
 
+/// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
+/// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
+///
+/// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
+/// operation immediately returns data:
+/// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
+/// erroneous results."
+/// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
+/// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
+/// get the actual number of bytes read."
+/// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
+fn windowsGetReadResult(
+    handle: windows.HANDLE,
+    overlapped: *windows.OVERLAPPED,
+    allow_aborted: bool,
+) !union(enum) {
+    success: u32,
+    closed,
+    aborted,
+} {
+    var num_bytes_read: u32 = undefined;
+    if (0 == windows.kernel32.GetOverlappedResult(
+        handle,
+        overlapped,
+        &num_bytes_read,
+        0,
+    )) switch (windows.GetLastError()) {
+        .BROKEN_PIPE => return .closed,
+        .OPERATION_ABORTED => |err| if (allow_aborted) {
+            return .aborted;
+        } else {
+            return windows.unexpectedError(err);
+        },
+        else => |err| return windows.unexpectedError(err),
+    };
+    return .{ .success = num_bytes_read };
+}
+
 /// Given an enum, returns a struct with fields of that enum, each field
 /// representing an I/O stream for polling.
 pub fn PollFiles(comptime StreamEnum: type) type {