Commit 0732ff2263

Andrew Kelley <andrew@ziglang.org>
2025-10-14 07:05:18
std.Io.Threaded: implement connecting to unix sockets
1 parent e8cea8a
Changed files (3)
lib/std/Io/net.zig
@@ -206,6 +206,9 @@ pub const IpAddress = union(enum) {
         SystemFdQuotaExceeded,
         /// The requested address family (IPv4 or IPv6) is not supported by the operating system.
         AddressFamilyUnsupported,
+        ProtocolUnsupportedBySystem,
+        ProtocolUnsupportedByAddressFamily,
+        SocketModeUnsupported,
     } || Io.UnexpectedError || Io.Cancelable;
 
     pub const ListenOptions = struct {
@@ -216,6 +219,16 @@ pub const IpAddress = union(enum) {
         /// Sets SO_REUSEADDR and SO_REUSEPORT on POSIX.
         /// Sets SO_REUSEADDR on Windows, which is roughly equivalent.
         reuse_address: bool = false,
+        /// Only connection-oriented modes may be used here, which includes:
+        /// * `Socket.Mode.stream`
+        /// * `Socket.Mode.seqpacket`
+        mode: Socket.Mode = .stream,
+        /// Only connection-oriented protocols may be used here, which includes:
+        /// * `Protocol.tcp`
+        /// * `Protocol.tp`
+        /// * `Protocol.dccp`
+        /// * `Protocol.sctp`
+        protocol: Protocol = .tcp,
     };
 
     /// Waits for a TCP connection. When using this API, `bind` does not need
@@ -276,7 +289,6 @@ pub const IpAddress = union(enum) {
         ConnectionPending,
         ConnectionRefused,
         ConnectionResetByPeer,
-        AlreadyConnected,
         HostUnreachable,
         NetworkUnreachable,
         ConnectionTimedOut,
@@ -842,7 +854,22 @@ pub const UnixAddress = struct {
         } };
     }
 
-    pub const ConnectError = error{} || Io.Cancelable || Io.UnexpectedError;
+    pub const ConnectError = error{
+        SystemResources,
+        ProcessFdQuotaExceeded,
+        SystemFdQuotaExceeded,
+        AddressFamilyUnsupported,
+        ProtocolUnsupportedBySystem,
+        ProtocolUnsupportedByAddressFamily,
+        SocketModeUnsupported,
+        AccessDenied,
+        PermissionDenied,
+        SymLinkLoop,
+        FileNotFound,
+        NotDir,
+        ReadOnlyFileSystem,
+        WouldBlock,
+    } || Io.Cancelable || Io.UnexpectedError;
 
     pub fn connect(ua: *const UnixAddress, io: Io) ConnectError!Stream {
         assert(ua.path.len <= max_len);
lib/std/Io/Threaded.zig
@@ -1697,34 +1697,10 @@ fn netListenIpPosix(
 ) Io.net.IpAddress.ListenError!Io.net.Server {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     const family = posixAddressFamily(&address);
-    const protocol: u32 = posix.IPPROTO.TCP;
-    const socket_fd = while (true) {
-        try pool.checkCancel();
-        const flags: u32 = posix.SOCK.STREAM | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
-        const socket_rc = posix.system.socket(family, flags, protocol);
-        switch (posix.errno(socket_rc)) {
-            .SUCCESS => {
-                const fd: posix.fd_t = @intCast(socket_rc);
-                errdefer posix.close(fd);
-                if (socket_flags_unsupported) while (true) {
-                    try pool.checkCancel();
-                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
-                        .SUCCESS => break,
-                        .INTR => continue,
-                        else => |err| return posix.unexpectedErrno(err),
-                    }
-                };
-                break fd;
-            },
-            .INTR => continue,
-            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
-            .MFILE => return error.ProcessFdQuotaExceeded,
-            .NFILE => return error.SystemFdQuotaExceeded,
-            .NOBUFS => return error.SystemResources,
-            .NOMEM => return error.SystemResources,
-            else => |err| return posix.unexpectedErrno(err),
-        }
-    };
+    const socket_fd = try openSocketPosix(pool, family, .{
+        .mode = options.mode,
+        .protocol = options.protocol,
+    });
     errdefer posix.close(socket_fd);
 
     if (options.reuse_address) {
@@ -1763,53 +1739,49 @@ fn netListenUnix(
 ) Io.net.UnixAddress.ListenError!Io.net.Socket.Handle {
     if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported;
     const pool: *Pool = @ptrCast(@alignCast(userdata));
-    const protocol: u32 = 0;
-    const socket_fd = while (true) {
-        try pool.checkCancel();
-        const flags: u32 = posix.SOCK.STREAM | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
-        const socket_rc = posix.system.socket(posix.AF.UNIX, flags, protocol);
-        switch (posix.errno(socket_rc)) {
-            .SUCCESS => {
-                const fd: posix.fd_t = @intCast(socket_rc);
-                errdefer posix.close(fd);
-                if (socket_flags_unsupported) while (true) {
-                    try pool.checkCancel();
-                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
-                        .SUCCESS => break,
-                        .INTR => continue,
-                        else => |err| return posix.unexpectedErrno(err),
-                    }
-                };
-                break fd;
-            },
-            .INTR => continue,
-            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
-            .MFILE => return error.ProcessFdQuotaExceeded,
-            .NFILE => return error.SystemFdQuotaExceeded,
-            .NOBUFS => return error.SystemResources,
-            .NOMEM => return error.SystemResources,
-            else => |err| return posix.unexpectedErrno(err),
-        }
+    const socket_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
+        error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported,
+        error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported,
+        error.SocketModeUnsupported => return error.AddressFamilyUnsupported,
+        else => |e| return e,
     };
     errdefer posix.close(socket_fd);
 
     var storage: UnixAddress = undefined;
     const addr_len = addressUnixToPosix(address, &storage);
+    try posixBindUnix(pool, socket_fd, &storage.any, addr_len);
+
     while (true) {
         try pool.checkCancel();
-        switch (posix.errno(posix.system.bind(socket_fd, &storage.any, addr_len))) {
+        switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
+            .SUCCESS => break,
+            .ADDRINUSE => return error.AddressInUse,
+            .BADF => |err| return errnoBug(err),
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+
+    return socket_fd;
+}
+
+fn posixBindUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
+    while (true) {
+        try pool.checkCancel();
+        switch (posix.errno(posix.system.bind(fd, addr, addr_len))) {
             .SUCCESS => break,
             .INTR => continue,
             .ACCES => return error.AccessDenied,
-            .PERM => return error.PermissionDenied,
             .ADDRINUSE => return error.AddressInUse,
             .AFNOSUPPORT => return error.AddressFamilyUnsupported,
             .ADDRNOTAVAIL => return error.AddressUnavailable,
             .NOMEM => return error.SystemResources,
+
             .LOOP => return error.SymLinkLoop,
             .NOENT => return error.FileNotFound,
             .NOTDIR => return error.NotDir,
             .ROFS => return error.ReadOnlyFileSystem,
+            .PERM => return error.PermissionDenied,
+
             .BADF => |err| return errnoBug(err), // always a race condition if this error is returned
             .INVAL => |err| return errnoBug(err), // invalid parameters
             .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
@@ -1818,18 +1790,6 @@ fn netListenUnix(
             else => |err| return posix.unexpectedErrno(err),
         }
     }
-
-    while (true) {
-        try pool.checkCancel();
-        switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
-            .SUCCESS => break,
-            .ADDRINUSE => return error.AddressInUse,
-            .BADF => |err| return errnoBug(err),
-            else => |err| return posix.unexpectedErrno(err),
-        }
-    }
-
-    return socket_fd;
 }
 
 fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
@@ -1857,7 +1817,6 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka
         switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
             .SUCCESS => return,
             .INTR => continue,
-            .ADDRINUSE => return error.AddressInUse,
             .ADDRNOTAVAIL => return error.AddressUnavailable,
             .AFNOSUPPORT => return error.AddressFamilyUnsupported,
             .AGAIN, .INPROGRESS => return error.WouldBlock,
@@ -1866,7 +1825,7 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka
             .CONNREFUSED => return error.ConnectionRefused,
             .CONNRESET => return error.ConnectionResetByPeer,
             .FAULT => |err| return errnoBug(err),
-            .ISCONN => return error.AlreadyConnected,
+            .ISCONN => |err| return errnoBug(err),
             .HOSTUNREACH => return error.HostUnreachable,
             .NETUNREACH => return error.NetworkUnreachable,
             .NOTSOCK => |err| return errnoBug(err),
@@ -1874,7 +1833,6 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka
             .TIMEDOUT => return error.ConnectionTimedOut,
             .CONNABORTED => |err| return errnoBug(err),
             .ACCES => return error.AccessDenied,
-            // UNIX socket error codes:
             .PERM => |err| return errnoBug(err),
             .NOENT => |err| return errnoBug(err),
             else => |err| return posix.unexpectedErrno(err),
@@ -1882,6 +1840,35 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka
     }
 }
 
+fn posixConnectUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
+    while (true) {
+        try pool.checkCancel();
+        switch (posix.errno(posix.system.connect(fd, addr, addr_len))) {
+            .SUCCESS => return,
+            .INTR => continue,
+
+            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+            .AGAIN => return error.WouldBlock,
+            .INPROGRESS => return error.WouldBlock,
+            .ACCES => return error.AccessDenied,
+
+            .LOOP => return error.SymLinkLoop,
+            .NOENT => return error.FileNotFound,
+            .NOTDIR => return error.NotDir,
+            .ROFS => return error.ReadOnlyFileSystem,
+            .PERM => return error.PermissionDenied,
+
+            .BADF => |err| return errnoBug(err),
+            .CONNABORTED => |err| return errnoBug(err),
+            .FAULT => |err| return errnoBug(err),
+            .ISCONN => |err| return errnoBug(err),
+            .NOTSOCK => |err| return errnoBug(err),
+            .PROTOTYPE => |err| return errnoBug(err),
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+}
+
 fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
     while (true) {
         try pool.checkCancel();
@@ -1926,6 +1913,7 @@ fn netConnectIpPosix(
         .mode = options.mode,
         .protocol = options.protocol,
     });
+    errdefer posix.close(socket_fd);
     var storage: PosixAddress = undefined;
     var addr_len = addressToPosix(address, &storage);
     try posixConnect(pool, socket_fd, &storage.any, addr_len);
@@ -1940,10 +1928,14 @@ fn netConnectUnix(
     userdata: ?*anyopaque,
     address: *const Io.net.UnixAddress,
 ) Io.net.UnixAddress.ConnectError!Io.net.Socket.Handle {
+    if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported;
     const pool: *Pool = @ptrCast(@alignCast(userdata));
-    _ = pool;
-    _ = address;
-    @panic("TODO");
+    const socket_fd = try openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream });
+    errdefer posix.close(socket_fd);
+    var storage: UnixAddress = undefined;
+    const addr_len = addressUnixToPosix(address, &storage);
+    try posixConnectUnix(pool, socket_fd, &storage.any, addr_len);
+    return socket_fd;
 }
 
 fn netBindIpPosix(
@@ -2497,18 +2489,16 @@ fn netInterfaceNameResolve(
     name: *const Io.net.Interface.Name,
 ) Io.net.Interface.Name.ResolveError!Io.net.Interface {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
-    try pool.checkCancel();
 
     if (native_os == .linux) {
-        const rc = posix.system.socket(posix.AF.UNIX, posix.SOCK.DGRAM | posix.SOCK.CLOEXEC, 0);
-        const sock_fd: posix.fd_t = switch (posix.errno(rc)) {
-            .SUCCESS => @intCast(rc),
-            .ACCES => return error.AccessDenied,
-            .MFILE => return error.SystemResources,
-            .NFILE => return error.SystemResources,
-            .NOBUFS => return error.SystemResources,
-            .NOMEM => return error.SystemResources,
-            else => |err| return posix.unexpectedErrno(err),
+        const sock_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) {
+            error.ProcessFdQuotaExceeded => return error.SystemResources,
+            error.SystemFdQuotaExceeded => return error.SystemResources,
+            error.AddressFamilyUnsupported => return error.Unexpected,
+            error.ProtocolUnsupportedBySystem => return error.Unexpected,
+            error.ProtocolUnsupportedByAddressFamily => return error.Unexpected,
+            error.SocketModeUnsupported => return error.Unexpected,
+            else => |e| return e,
         };
         defer posix.close(sock_fd);
 
@@ -2521,12 +2511,12 @@ fn netInterfaceNameResolve(
             try pool.checkCancel();
             switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) {
                 .SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) },
+                .INTR => continue,
                 .INVAL => |err| return errnoBug(err), // Bad parameters.
                 .NOTTY => |err| return errnoBug(err),
                 .NXIO => |err| return errnoBug(err),
                 .BADF => |err| return errnoBug(err), // Always a race condition.
                 .FAULT => |err| return errnoBug(err), // Bad pointer parameter.
-                .INTR => continue,
                 .IO => |err| return errnoBug(err), // sock_fd is not a file descriptor
                 .NODEV => return error.InterfaceNotFound,
                 else => |err| return posix.unexpectedErrno(err),
@@ -2535,12 +2525,14 @@ fn netInterfaceNameResolve(
     }
 
     if (native_os == .windows) {
+        try pool.checkCancel();
         const index = std.os.windows.ws2_32.if_nametoindex(&name.bytes);
         if (index == 0) return error.InterfaceNotFound;
         return .{ .index = index };
     }
 
     if (builtin.link_libc) {
+        try pool.checkCancel();
         const index = std.c.if_nametoindex(&name.bytes);
         if (index == 0) return error.InterfaceNotFound;
         return .{ .index = @bitCast(index) };
@@ -2591,7 +2583,7 @@ fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress {
     return switch (posix_address.any.family) {
         posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) },
         posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) },
-        else => unreachable,
+        else => .{ .ip4 = .loopback(0) },
     };
 }
 
BRANCH_TODO
@@ -5,11 +5,12 @@
 * fix Group.wait not handling cancelation (need to move impl of ResetEvent to Threaded)
 * implement cancelRequest for non-linux posix
 * finish converting all Threaded into directly calling system functions and handling EINTR
+* audit the TODOs
 
 * move max_iovecs_len to std.Io
 * address the cancelation race condition (signal received between checkCancel and syscall)
 * update signal values to be an enum
 * move fs.File.Writer to Io
-* add non-blocking flag to network operations, handle EAGAIN
+* add non-blocking flag to net and fs operations, handle EAGAIN
 * finish moving std.fs to Io
 * finish moving all of std.posix into Threaded