Commit 6cff32c7ee

Andrew Kelley <andrew@ziglang.org>
2025-10-26 20:04:52
std.Io.Threaded: implement netRead for Windows
1 parent a8f95e5
Changed files (1)
lib
lib/std/Io/Threaded.zig
@@ -2976,7 +2976,7 @@ fn netListenIpWindows(
         if (rc != ws2_32.SOCKET_ERROR) break;
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -2998,7 +2998,7 @@ fn netListenIpWindows(
         if (rc != ws2_32.SOCKET_ERROR) break;
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -3231,7 +3231,7 @@ fn wsaGetSockName(t: *Threaded, handle: ws2_32.SOCKET, addr: *ws2_32.sockaddr, a
         if (rc != ws2_32.SOCKET_ERROR) break;
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -3270,7 +3270,7 @@ fn setSocketOptionWsa(t: *Threaded, socket: Io.net.Socket.Handle, level: i32, op
         if (rc != ws2_32.SOCKET_ERROR) return;
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -3331,7 +3331,7 @@ fn netConnectIpWindows(
         if (rc != ws2_32.SOCKET_ERROR) break;
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -3454,7 +3454,7 @@ fn netBindIpWindows(
         if (rc != ws2_32.SOCKET_ERROR) break;
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -3560,7 +3560,7 @@ fn openSocketWsa(t: *Threaded, family: posix.sa_family_t, options: IpAddress.Bin
         if (rc != ws2_32.INVALID_SOCKET) return rc;
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -3639,7 +3639,7 @@ fn netAcceptWindows(userdata: ?*anyopaque, listen_handle: net.Socket.Handle) net
         } };
         switch (ws2_32.WSAGetLastError()) {
             .EINTR => continue,
-            .ECANCELLED, .E_CANCELLED => return error.Canceled,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
             .NOTINITIALISED => {
                 try initializeWsa(t);
                 continue;
@@ -3728,10 +3728,76 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.
 fn netReadWindows(userdata: ?*anyopaque, handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
     if (!have_networking) return error.NetworkDown;
     const t: *Threaded = @ptrCast(@alignCast(userdata));
-    _ = t;
-    _ = handle;
-    _ = data;
-    @panic("TODO implement netReadWindows");
+
+    const bufs = b: {
+        var iovec_buffer: [max_iovecs_len]ws2_32.WSABUF = undefined;
+        var i: usize = 0;
+        var n: usize = 0;
+        for (data) |buf| {
+            if (iovec_buffer.len - i == 0) break;
+            if (buf.len == 0) continue;
+            if (std.math.cast(u32, buf.len)) |len| {
+                iovec_buffer[i] = .{ .buf = buf.ptr, .len = len };
+                i += 1;
+                n += len;
+                continue;
+            }
+            iovec_buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) };
+            i += 1;
+            n += std.math.maxInt(u32);
+            break;
+        }
+
+        const bufs = iovec_buffer[0..i];
+        assert(bufs[0].len != 0);
+
+        break :b bufs;
+    };
+
+    while (true) {
+        try t.checkCancel();
+
+        var flags: u32 = 0;
+        var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED);
+        var n: u32 = undefined;
+        const rc = ws2_32.WSARecv(handle, bufs.ptr, @intCast(bufs.len), &n, &flags, &overlapped, null);
+        if (rc != ws2_32.SOCKET_ERROR) return n;
+        const wsa_error: ws2_32.WinsockError = switch (ws2_32.WSAGetLastError()) {
+            .IO_PENDING => e: {
+                var result_flags: u32 = undefined;
+                const overlapped_rc = ws2_32.WSAGetOverlappedResult(
+                    handle,
+                    &overlapped,
+                    &n,
+                    windows.TRUE,
+                    &result_flags,
+                );
+                if (overlapped_rc == windows.FALSE) {
+                    break :e ws2_32.WSAGetLastError();
+                } else {
+                    return n;
+                }
+            },
+            else => |err| err,
+        };
+        switch (wsa_error) {
+            .EINTR => continue,
+            .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
+            .NOTINITIALISED => {
+                try initializeWsa(t);
+                continue;
+            },
+
+            .ECONNRESET => return error.ConnectionResetByPeer,
+            .EFAULT => unreachable, // a pointer is not completely contained in user address space.
+            .EINVAL => |err| return wsaErrorBug(err),
+            .EMSGSIZE => |err| return wsaErrorBug(err),
+            .ENETDOWN => return error.NetworkDown,
+            .ENETRESET => return error.ConnectionResetByPeer,
+            .ENOTCONN => return error.SocketUnconnected,
+            else => |err| return windows.unexpectedWSAError(err),
+        }
+    }
 }
 
 fn netReadUnavailable(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
@@ -3823,7 +3889,7 @@ fn netSendOne(
             if (rc == ws2_32.SOCKET_ERROR) {
                 switch (ws2_32.WSAGetLastError()) {
                     .EINTR => continue,
-                    .ECANCELLED, .E_CANCELLED => return error.Canceled,
+                    .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
                     .NOTINITIALISED => {
                         try initializeWsa(t);
                         continue;
@@ -4422,7 +4488,7 @@ fn netLookupFallible(
             switch (rc) {
                 @as(ws2_32.WinsockError, @enumFromInt(0)) => break,
                 .EINTR => continue,
-                .ECANCELLED, .E_CANCELLED => return error.Canceled,
+                .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
                 .NOTINITIALISED => {
                     try initializeWsa(t);
                     continue;