Commit 95dee2af9c

Andrew Kelley <andrew@ziglang.org>
2025-10-02 01:07:50
std.Io: implement netSend
1 parent bcb6760
Changed files (5)
lib/std/Io/net/HostName.zig
@@ -278,7 +278,8 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio
             for (mapped_nameservers) |*ns| {
                 message_buffer[message_i] = .{
                     .address = ns,
-                    .data = query,
+                    .data_ptr = query.ptr,
+                    .data_len = query.len,
                 };
                 message_i += 1;
             }
@@ -324,11 +325,12 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio
                     if (next_answer_buffer == answers.len) break :send;
                 },
                 2 => {
-                    const message: Io.net.OutgoingMessage = .{
+                    var message: Io.net.OutgoingMessage = .{
                         .address = ns,
-                        .data = query,
+                        .data_ptr = query.ptr,
+                        .data_len = query.len,
                     };
-                    io.vtable.netSend(io.userdata, socket.handle, &.{message}, .{}) catch {};
+                    io.vtable.netSend(io.userdata, socket.handle, (&message)[0..1], .{}) catch {};
                     continue;
                 },
                 else => continue,
lib/std/Io/net.zig
@@ -154,7 +154,7 @@ pub const IpAddress = union(enum) {
         /// A nonexistent interface was requested or the requested address was not local.
         AddressUnavailable,
         /// The local network interface used to reach the destination is offline.
-        NetworkSubsystemDown,
+        NetworkDown,
         /// Insufficient memory or other resource internal to the operating system.
         SystemResources,
         /// Per-process limit on the number of open file descriptors has been reached.
@@ -192,7 +192,7 @@ pub const IpAddress = union(enum) {
         /// Insufficient memory or other resource internal to the operating system.
         SystemResources,
         /// The local network interface used to reach the destination is offline.
-        NetworkSubsystemDown,
+        NetworkDown,
         ProtocolUnsupportedBySystem,
         ProtocolUnsupportedByAddressFamily,
         /// Per-process limit on the number of open file descriptors has been reached.
@@ -702,7 +702,10 @@ pub const ReceivedMessage = struct {
 
 pub const OutgoingMessage = struct {
     address: *const IpAddress,
-    data: []const u8,
+    data_ptr: [*]const u8,
+    /// Initialized with how many bytes of `data_ptr` to send. After sending
+    /// succeeds, replaced with how many bytes were actually sent.
+    data_len: usize,
     control: []const u8 = &.{},
 };
 
@@ -808,9 +811,10 @@ pub const Socket = struct {
     }
 
     pub const SendError = error{
-        /// The socket type requires that message be sent atomically, and the size of the message
-        /// to be sent made this impossible. The message is not transmitted.
-        MessageTooBig,
+        /// The socket type requires that message be sent atomically, and the
+        /// size of the message to be sent made this impossible. The message
+        /// was not transmitted, or was partially transmitted.
+        MessageOversize,
         /// The output queue for a network interface was full. This generally indicates that the
         /// interface has stopped sending, but may be caused by transient congestion. (Normally,
         /// this does not occur in Linux. Packets are just silently dropped when a device queue
@@ -823,21 +827,29 @@ pub const Socket = struct {
         /// Network reached but no route to host.
         HostUnreachable,
         /// The local network interface used to reach the destination is offline.
-        NetworkSubsystemDown,
+        NetworkDown,
         /// The destination address is not listening. Can still occur for
         /// connectionless messages.
         ConnectionRefused,
         /// Operating system or protocol does not support the address family.
         AddressFamilyUnsupported,
+        /// Another TCP Fast Open is already in progress.
+        FastOpenAlreadyInProgress,
+        /// Network connection was unexpectedly closed by recipient.
+        ConnectionResetByPeer,
+        /// Local end has been shut down on a connection-oriented socket, or
+        /// the socket was never connected.
+        SocketNotConnected,
     } || Io.UnexpectedError || Io.Cancelable;
 
-    /// Transfers `data` to `dest`, connectionless.
+    /// Transfers `data` to `dest`, connectionless, in one packet.
     pub fn send(s: *const Socket, io: Io, dest: *const IpAddress, data: []const u8) SendError!void {
-        const message: OutgoingMessage = .{ .address = dest, .data = data };
-        return io.vtable.netSend(io.userdata, s.handle, &.{message}, .{});
+        var message: OutgoingMessage = .{ .address = dest, .data_ptr = data.ptr, .data_len = data.len };
+        try io.vtable.netSend(io.userdata, s.handle, &message, .{});
+        if (message.data_len != data.len) return error.MessageOversize;
     }
 
-    pub fn sendMany(s: *const Socket, io: Io, messages: []const OutgoingMessage, flags: SendFlags) SendError!void {
+    pub fn sendMany(s: *const Socket, io: Io, messages: []OutgoingMessage, flags: SendFlags) SendError!void {
         return io.vtable.netSend(io.userdata, s.handle, messages, flags);
     }
 
lib/std/Io/Threaded.zig
@@ -1108,8 +1108,8 @@ fn listenPosix(
     }
 
     var storage: PosixAddress = undefined;
-    var socklen = addressToPosix(address, &storage);
-    try posixBind(pool, socket_fd, &storage.any, socklen);
+    var addr_len = addressToPosix(&address, &storage);
+    try posixBind(pool, socket_fd, &storage.any, addr_len);
 
     while (true) {
         try pool.checkCancel();
@@ -1121,7 +1121,7 @@ fn listenPosix(
         }
     }
 
-    try posixGetSockName(pool, socket_fd, &storage.any, &socklen);
+    try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
     return .{
         .socket = .{
             .handle = socket_fd,
@@ -1226,9 +1226,9 @@ fn ipBindPosix(
     }
 
     var storage: PosixAddress = undefined;
-    var socklen = addressToPosix(address, &storage);
-    try posixBind(pool, socket_fd, &storage.any, socklen);
-    try posixGetSockName(pool, socket_fd, &storage.any, &socklen);
+    var addr_len = addressToPosix(&address, &storage);
+    try posixBind(pool, socket_fd, &storage.any, addr_len);
+    try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
     return .{
         .handle = socket_fd,
         .address = addressFromPosix(&storage),
@@ -1306,21 +1306,102 @@ fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.n
     return n;
 }
 
+const have_sendmmsg = builtin.os.tag == .linux;
+
 fn netSend(
     userdata: ?*anyopaque,
     handle: Io.net.Socket.Handle,
-    messages: []const Io.net.OutgoingMessage,
+    messages: []Io.net.OutgoingMessage,
     flags: Io.net.SendFlags,
 ) Io.net.Socket.SendError!void {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
-    try pool.checkCancel();
 
-    _ = handle;
-    _ = messages;
-    _ = flags;
+    if (have_sendmmsg) {
+        var i: usize = 0;
+        while (messages.len - i != 0) {
+            i += try netSendMany(pool, handle, messages[i..], flags);
+        }
+        return;
+    }
+
+    try pool.checkCancel();
     @panic("TODO");
 }
 
+fn netSendMany(
+    pool: *Pool,
+    handle: Io.net.Socket.Handle,
+    messages: []Io.net.OutgoingMessage,
+    flags: Io.net.SendFlags,
+) Io.net.Socket.SendError!usize {
+    var msg_buffer: [64]std.os.linux.mmsghdr = undefined;
+    var addr_buffer: [msg_buffer.len]PosixAddress = undefined;
+    var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined;
+    const min_len: usize = @min(messages.len, msg_buffer.len);
+    const clamped_messages = messages[0..min_len];
+    const clamped_msgs = (&msg_buffer)[0..min_len];
+    const clamped_addrs = (&addr_buffer)[0..min_len];
+    const clamped_iovecs = (&iovecs_buffer)[0..min_len];
+
+    for (clamped_messages, clamped_msgs, clamped_addrs, clamped_iovecs) |*message, *msg, *addr, *iovec| {
+        iovec.* = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
+        msg.* = .{
+            .hdr = .{
+                .name = &addr.any,
+                .namelen = addressToPosix(message.address, addr),
+                .iov = iovec[0..1],
+                .iovlen = 1,
+                .control = @constCast(message.control.ptr),
+                .controllen = message.control.len,
+                .flags = 0,
+            },
+            .len = undefined, // Populated by calling sendmmsg below.
+        };
+    }
+
+    const posix_flags: u32 =
+        @as(u32, if (flags.confirm) posix.MSG.CONFIRM else 0) |
+        @as(u32, if (flags.dont_route) posix.MSG.DONTROUTE else 0) |
+        @as(u32, if (flags.eor) posix.MSG.EOR else 0) |
+        @as(u32, if (flags.oob) posix.MSG.OOB else 0) |
+        @as(u32, if (flags.fastopen) posix.MSG.FASTOPEN else 0) |
+        posix.MSG.NOSIGNAL;
+
+    while (true) {
+        try pool.checkCancel();
+        const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), posix_flags);
+        switch (posix.errno(rc)) {
+            .SUCCESS => {
+                for (clamped_messages[0..rc], clamped_msgs[0..rc]) |*message, *msg| {
+                    message.data_len = msg.len;
+                }
+                return rc;
+            },
+            .AGAIN => |err| return errnoBug(err),
+            .ALREADY => return error.FastOpenAlreadyInProgress,
+            .BADF => |err| return errnoBug(err), // Always a race condition.
+            .CONNRESET => return error.ConnectionResetByPeer,
+            .DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set.
+            .FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument.
+            .INTR => continue,
+            .INVAL => |err| return errnoBug(err), // Invalid argument passed.
+            .ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified
+            .MSGSIZE => return error.MessageOversize,
+            .NOBUFS => return error.SystemResources,
+            .NOMEM => return error.SystemResources,
+            .NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket.
+            .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
+            .PIPE => return error.SocketNotConnected,
+            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+            .HOSTUNREACH => return error.NetworkUnreachable,
+            .NETUNREACH => return error.NetworkUnreachable,
+            .NOTCONN => return error.SocketNotConnected,
+            .NETDOWN => return error.NetworkDown,
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+}
+
 fn netReceive(
     userdata: ?*anyopaque,
     handle: Io.net.Socket.Handle,
@@ -1503,13 +1584,13 @@ fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress {
     };
 }
 
-fn addressToPosix(a: Io.net.IpAddress, storage: *PosixAddress) posix.socklen_t {
-    return switch (a) {
+fn addressToPosix(a: *const Io.net.IpAddress, storage: *PosixAddress) posix.socklen_t {
+    return switch (a.*) {
         .ip4 => |ip4| {
             storage.in = address4ToPosix(ip4);
             return @sizeOf(posix.sockaddr.in);
         },
-        .ip6 => |ip6| {
+        .ip6 => |*ip6| {
             storage.in6 = address6ToPosix(ip6);
             return @sizeOf(posix.sockaddr.in6);
         },
@@ -1539,7 +1620,7 @@ fn address4ToPosix(a: Io.net.Ip4Address) posix.sockaddr.in {
     };
 }
 
-fn address6ToPosix(a: Io.net.Ip6Address) posix.sockaddr.in6 {
+fn address6ToPosix(a: *const Io.net.Ip6Address) posix.sockaddr.in6 {
     return .{
         .port = std.mem.nativeToBig(u16, a.port),
         .flowinfo = a.flow,
lib/std/os/linux.zig
@@ -2079,7 +2079,7 @@ pub fn sendmsg(fd: i32, msg: *const msghdr_const, flags: u32) usize {
     }
 }
 
-pub fn sendmmsg(fd: i32, msgvec: [*]mmsghdr_const, vlen: u32, flags: u32) usize {
+pub fn sendmmsg(fd: i32, msgvec: [*]mmsghdr, vlen: u32, flags: u32) usize {
     return syscall4(.sendmmsg, @as(usize, @bitCast(@as(isize, fd))), @intFromPtr(msgvec), vlen, flags);
 }
 
@@ -5955,11 +5955,6 @@ pub const mmsghdr = extern struct {
     len: u32,
 };
 
-pub const mmsghdr_const = extern struct {
-    hdr: msghdr_const,
-    len: u32,
-};
-
 pub const epoll_data = extern union {
     ptr: usize,
     fd: i32,
lib/std/Io.zig
@@ -671,7 +671,7 @@ pub const VTable = struct {
     listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
     accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Stream,
     ipBind: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
-    netSend: *const fn (?*anyopaque, net.Socket.Handle, []const net.OutgoingMessage, net.SendFlags) net.Socket.SendError!void,
+    netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) net.Socket.SendError!void,
     netReceive: *const fn (?*anyopaque, handle: net.Socket.Handle, buffer: []u8, timeout: Timeout) net.Socket.ReceiveTimeoutError!net.ReceivedMessage,
     netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize,
     netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,