Commit e0e463bcf7

Andrew Kelley <andrew@ziglang.org>
2025-10-14 02:34:07
std.Io.net.Stream.Reader: fix not using buffer
1 parent 4d62f08
Changed files (3)
lib/std/Io/net.zig
@@ -1076,6 +1076,8 @@ pub const Socket = struct {
 pub const Stream = struct {
     socket: Socket,
 
+    const max_iovecs_len = 8;
+
     pub fn close(s: *Stream, io: Io) void {
         io.vtable.netClose(io.userdata, s.socket.handle);
         s.* = undefined;
@@ -1097,7 +1099,6 @@ pub const Stream = struct {
             /// from it.
             AccessDenied,
             NetworkDown,
-            EndOfStream,
         } || Io.Cancelable || Io.UnexpectedError;
 
         pub fn init(stream: Stream, io: Io, buffer: []u8) Reader {
@@ -1128,10 +1129,22 @@ pub const Stream = struct {
         fn readVec(io_r: *Io.Reader, data: [][]u8) Io.Reader.Error!usize {
             const r: *Reader = @alignCast(@fieldParentPtr("interface", io_r));
             const io = r.io;
-            return io.vtable.netRead(io.userdata, r.stream, data) catch |err| {
+            var iovecs_buffer: [max_iovecs_len][]u8 = undefined;
+            const dest_n, const data_size = try io_r.writableVector(&iovecs_buffer, data);
+            const dest = iovecs_buffer[0..dest_n];
+            assert(dest[0].len > 0);
+            const n = io.vtable.netRead(io.userdata, r.stream.socket.handle, dest) catch |err| {
                 r.err = err;
                 return error.ReadFailed;
             };
+            if (n == 0) {
+                return error.EndOfStream;
+            }
+            if (n > data_size) {
+                r.interface.end += n - data_size;
+                return data_size;
+            }
+            return n;
         }
     };
 
@@ -1166,7 +1179,8 @@ pub const Stream = struct {
             const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
             const io = w.io;
             const buffered = io_w.buffered();
-            const n = io.vtable.netWrite(io.userdata, w.stream, buffered, data, splat) catch |err| {
+            const handle = w.stream.socket.handle;
+            const n = io.vtable.netWrite(io.userdata, handle, buffered, data, splat) catch |err| {
                 w.err = err;
                 return error.WriteFailed;
             };
lib/std/Io/Threaded.zig
@@ -1986,9 +1986,8 @@ fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: Io.net.Socket.Handle) Io.net
     } };
 }
 
-fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.net.Stream.Reader.Error!usize {
+fn netReadPosix(userdata: ?*anyopaque, fd: Io.net.Socket.Handle, data: [][]u8) Io.net.Stream.Reader.Error!usize {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
-    const fd = stream.socket.handle;
 
     var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
     var i: usize = 0;
@@ -2006,11 +2005,9 @@ fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.n
         try pool.checkCancel();
         var n: usize = undefined;
         switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) {
-            .SUCCESS => {
-                if (n == 0) return error.EndOfStream;
-                return n;
-            },
+            .SUCCESS => return n,
             .INTR => continue,
+
             .INVAL => |err| return errnoBug(err),
             .FAULT => |err| return errnoBug(err),
             .AGAIN => |err| return errnoBug(err),
@@ -2029,12 +2026,9 @@ fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.n
         try pool.checkCancel();
         const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
         switch (posix.errno(rc)) {
-            .SUCCESS => {
-                const n: usize = @intCast(rc);
-                if (n == 0) return error.EndOfStream;
-                return n;
-            },
+            .SUCCESS => return @intCast(rc),
             .INTR => continue,
+
             .INVAL => |err| return errnoBug(err),
             .FAULT => |err| return errnoBug(err),
             .AGAIN => |err| return errnoBug(err),
@@ -2359,7 +2353,7 @@ fn netReceive(
 
 fn netWritePosix(
     userdata: ?*anyopaque,
-    stream: Io.net.Stream,
+    fd: Io.net.Socket.Handle,
     header: []const u8,
     data: []const []const u8,
     splat: usize,
@@ -2406,7 +2400,7 @@ fn netWritePosix(
         },
     };
     const flags = posix.MSG.NOSIGNAL;
-    return posix.sendmsg(stream.socket.handle, &msg, flags);
+    return posix.sendmsg(fd, &msg, flags);
 }
 
 fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void {
lib/std/Io.zig
@@ -680,8 +680,9 @@ pub const VTable = struct {
     netConnectUnix: *const fn (?*anyopaque, net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
     netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
     netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
-    netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize,
-    netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
+    /// Returns 0 on end of stream.
+    netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
+    netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
     netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void,
     netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface,
     netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name,