Commit ef72cd6698

Jonathan Marler <johnnymarler@gmail.com>
2023-02-28 20:28:53
std.io.poll initial windows implementation
1 parent d8c3738
Changed files (1)
lib
std
lib/std/io.zig
@@ -175,6 +175,19 @@ pub fn poll(
 ) Poller(StreamEnum) {
     const enum_fields = @typeInfo(StreamEnum).Enum.fields;
     var result: Poller(StreamEnum) = undefined;
+
+    if (builtin.os.tag == .windows) result.windows = .{
+        .first_read_done = false,
+        .overlapped = [1]os.windows.OVERLAPPED {
+            mem.zeroes(os.windows.OVERLAPPED),
+        } ** enum_fields.len,
+        .active = .{
+            .count = 0,
+            .handles_buf = undefined,
+            .stream_map = undefined,
+        },
+    };
+
     inline for (0..enum_fields.len) |i| {
         result.fifos[i] = .{
             .allocator = allocator,
@@ -182,26 +195,56 @@ pub fn poll(
             .head = 0,
             .count = 0,
         };
-        result.poll_fds[i] = .{
-            .fd = @field(files, enum_fields[i].name).handle,
-            .events = os.POLL.IN,
-            .revents = undefined,
-        };
+        if (builtin.os.tag == .windows) {
+            result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle;
+        } else {
+            result.poll_fds[i] = .{
+                .fd = @field(files, enum_fields[i].name).handle,
+                .events = os.POLL.IN,
+                .revents = undefined,
+            };
+        }
     }
     return result;
 }
 
+pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic);
+
 pub fn Poller(comptime StreamEnum: type) type {
     return struct {
         const enum_fields = @typeInfo(StreamEnum).Enum.fields;
-        const Fifo = std.fifo.LinearFifo(u8, .Dynamic);
-
-        fifos: [enum_fields.len]Fifo,
-        poll_fds: [enum_fields.len]std.os.pollfd,
+        const PollFd = if (builtin.os.tag == .windows) void else std.os.pollfd;
+
+        fifos: [enum_fields.len]PollFifo,
+        poll_fds: [enum_fields.len]PollFd,
+        windows: if (builtin.os.tag == .windows) struct {
+            first_read_done: bool,
+            overlapped: [enum_fields.len]os.windows.OVERLAPPED,
+            active: struct {
+                count: math.IntFittingRange(0, enum_fields.len),
+                handles_buf: [enum_fields.len]os.windows.HANDLE,
+                stream_map: [enum_fields.len]StreamEnum,
+
+                pub fn removeAt(self: *@This(), index: u32) void {
+                    std.debug.assert(index < self.count);
+                    for (index + 1 .. self.count) |i| {
+                        self.handles_buf[i - 1] = self.handles_buf[i];
+                        self.stream_map[i - 1] = self.stream_map[i];
+                    }
+                    self.count -= 1;
+                }
+            },
+        } else void,
 
         const Self = @This();
 
         pub fn deinit(self: *Self) void {
+            if (builtin.os.tag == .windows) {
+                // cancel any pending IO to prevent clobbering OVERLAPPED value
+                for (self.windows.active.handles_buf[0 .. self.windows.active.count]) |h| {
+                    _ = os.windows.kernel32.CancelIo(h);
+                }
+            }
             inline for (&self.fifos) |*q| q.deinit();
             self.* = undefined;
         }
@@ -214,19 +257,89 @@ pub fn Poller(comptime StreamEnum: type) type {
             }
         }
 
-        pub inline fn fifo(self: *Self, comptime which: StreamEnum) *Fifo {
+        pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo {
             return &self.fifos[@enumToInt(which)];
         }
 
         pub fn done(self: Self) bool {
+            if (builtin.os.tag == .windows)
+                return self.windows.first_read_done and self.windows.active.count == 0;
+
             for (self.poll_fds) |poll_fd| {
                 if (poll_fd.fd != -1) return false;
             } else return true;
         }
 
         fn pollWindows(self: *Self) !void {
-            _ = self;
-            @compileError("TODO");
+            const bump_amt = 512;
+
+            if (!self.windows.first_read_done) {
+                // Windows Async IO requires an initial call to ReadFile before waiting on the handle
+                for (0..enum_fields.len) |i| {
+                    const handle = self.windows.active.handles_buf[i];
+                    switch (try windowsAsyncRead(
+                        handle,
+                        &self.windows.overlapped[i],
+                        &self.fifos[i],
+                        bump_amt,
+                    )) {
+                        .pending => {
+                            self.windows.active.handles_buf[self.windows.active.count] = handle;
+                            self.windows.active.stream_map[self.windows.active.count] = @intToEnum(StreamEnum, i);
+                            self.windows.active.count += 1;
+                        },
+                        .closed => {}, // don't add to the wait_objects list
+                    }
+                }
+                self.windows.first_read_done = true;
+            }
+
+            while (true) {
+                if (self.windows.active.count == 0) return;
+
+                const status = os.windows.kernel32.WaitForMultipleObjects(
+                    self.windows.active.count,
+                    &self.windows.active.handles_buf,
+                    0,
+                    os.windows.INFINITE,
+                );
+                if (status == os.windows.WAIT_FAILED)
+                    return os.windows.unexpectedError(os.windows.kernel32.GetLastError());
+
+                if (status < os.windows.WAIT_OBJECT_0 or status > os.windows.WAIT_OBJECT_0 + enum_fields.len - 1)
+                    unreachable;
+
+                const active_idx = status - os.windows.WAIT_OBJECT_0;
+
+                const handle = self.windows.active.handles_buf[active_idx];
+                const stream_idx = @enumToInt(self.windows.active.stream_map[active_idx]);
+                var read_bytes: u32 = undefined;
+                if (0 == os.windows.kernel32.GetOverlappedResult(
+                    handle,
+                    &self.windows.overlapped[stream_idx],
+                    &read_bytes,
+                    0,
+                )) switch (os.windows.kernel32.GetLastError()) {
+                    .BROKEN_PIPE => {
+                        self.windows.active.removeAt(active_idx);
+                        continue;
+                    },
+                    else => |err| return os.windows.unexpectedError(err),
+                };
+
+                self.fifos[stream_idx].update(read_bytes);
+
+                switch (try windowsAsyncRead(
+                    handle,
+                    &self.windows.overlapped[stream_idx],
+                    &self.fifos[stream_idx],
+                    bump_amt,
+                )) {
+                    .pending => {},
+                    .closed => self.windows.active.removeAt(active_idx),
+                }
+                return;
+            }
         }
 
         fn pollPosix(self: *Self) !void {
@@ -263,6 +376,25 @@ pub fn Poller(comptime StreamEnum: type) type {
     };
 }
 
+fn windowsAsyncRead(
+    handle: os.windows.HANDLE,
+    overlapped: *os.windows.OVERLAPPED,
+    fifo: *PollFifo,
+    bump_amt: usize,
+) !enum{ pending, closed } {
+    while (true) {
+        const buf = try fifo.writableWithSize(bump_amt);
+        var read_bytes: u32 = undefined;
+        const read_result = os.windows.kernel32.ReadFile(handle, buf.ptr, math.cast(u32, buf.len) orelse math.maxInt(u32), &read_bytes, overlapped);
+        if (read_result == 0) return switch (os.windows.kernel32.GetLastError()) {
+            .IO_PENDING => .pending,
+            .BROKEN_PIPE => .closed,
+            else => |err| os.windows.unexpectedError(err),
+        };
+        fifo.update(read_bytes);
+    }
+}
+
 /// Given an enum, returns a struct with fields of that enum, each field
 /// representing an I/O stream for polling.
 pub fn PollFiles(comptime StreamEnum: type) type {