Commit 0497f88d39

Andrew Kelley <andrew@ziglang.org>
2025-10-23 13:27:20
std.Io.Kqueue: implement netRead
1 parent 1b0dcd4
Changed files (1)
lib
std
lib/std/Io/Kqueue.zig
@@ -24,6 +24,7 @@ const idle_stack_size = 256 * 1024;
 
 const max_idle_search = 4;
 const max_steal_ready_search = 4;
+const max_iovecs_len = 8;
 
 const changes_buffer_len = 64;
 
@@ -1431,13 +1432,47 @@ fn netReceive(
     _ = timeout;
     @panic("TODO");
 }
-fn netRead(userdata: ?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
+
+fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
     const k: *Kqueue = @ptrCast(@alignCast(userdata));
-    _ = k;
-    _ = src;
-    _ = data;
-    @panic("TODO");
+
+    var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
+    var i: usize = 0;
+    for (data) |buf| {
+        if (iovecs_buffer.len - i == 0) break;
+        if (buf.len != 0) {
+            iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
+            i += 1;
+        }
+    }
+    const dest = iovecs_buffer[0..i];
+    assert(dest[0].len > 0);
+
+    while (true) {
+        try k.checkCancel();
+        std.debug.print("calling readv\n", .{});
+        const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
+        switch (posix.errno(rc)) {
+            .SUCCESS => return @intCast(rc),
+            .INTR => continue,
+            .CANCELED => return error.Canceled,
+            .AGAIN => @panic("TODO"),
+
+            .INVAL => |err| return errnoBug(err),
+            .FAULT => |err| return errnoBug(err),
+            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
+            .NOBUFS => return error.SystemResources,
+            .NOMEM => return error.SystemResources,
+            .NOTCONN => return error.SocketUnconnected,
+            .CONNRESET => return error.ConnectionResetByPeer,
+            .TIMEDOUT => return error.Timeout,
+            .PIPE => return error.SocketUnconnected,
+            .NETDOWN => return error.NetworkDown,
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
 }
+
 fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize {
     const k: *Kqueue = @ptrCast(@alignCast(userdata));
     _ = k;
@@ -1529,7 +1564,7 @@ fn openSocketPosix(
                             else => |err| return posix.unexpectedErrno(err),
                         }
                     };
-                    fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
+                    fl_flags |= @as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
                     while (true) {
                         try k.checkCancel();
                         switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {