Commit e8cea8accb

Andrew Kelley <andrew@ziglang.org>
2025-10-14 05:17:51
std.Io.Threaded: implement netListenUnix
1 parent d680b9e
Changed files (5)
lib/std/Io/net/test.zig
@@ -273,7 +273,7 @@ test "listen on a unix socket, send bytes, receive bytes" {
     const socket_addr = try net.UnixAddress.init(socket_path);
     defer std.fs.cwd().deleteFile(socket_path) catch {};
 
-    var server = try socket_addr.listen(io);
+    var server = try socket_addr.listen(io, .{});
     defer server.socket.close(io);
 
     const S = struct {
lib/std/Io/net.zig
@@ -51,6 +51,8 @@ pub const has_unix_sockets = switch (native_os) {
     else => true,
 };
 
+pub const default_kernel_backlog = 128;
+
 pub const IpAddress = union(enum) {
     ip4: Ip4Address,
     ip6: Ip6Address,
@@ -210,7 +212,7 @@ pub const IpAddress = union(enum) {
         /// How many connections the kernel will accept on the application's behalf.
         /// If more than this many connections pool in the kernel, clients will start
         /// seeing "Connection refused".
-        kernel_backlog: u31 = 128,
+        kernel_backlog: u31 = default_kernel_backlog,
         /// Sets SO_REUSEADDR and SO_REUSEPORT on POSIX.
         /// Sets SO_REUSEADDR on Windows, which is roughly equivalent.
         reuse_address: bool = false,
@@ -288,6 +290,11 @@ pub const IpAddress = union(enum) {
         ProtocolUnsupportedBySystem,
         ProtocolUnsupportedByAddressFamily,
         SocketModeUnsupported,
+        /// The user tried to connect to a broadcast address without having the socket broadcast flag enabled or
+        /// the connection request failed because of a local firewall rule.
+        AccessDenied,
+        /// Non-blocking was requested and the operation cannot return immediately.
+        WouldBlock,
     } || Io.Timeout.Error || Io.UnexpectedError || Io.Cancelable;
 
     pub const ConnectOptions = struct {
@@ -804,19 +811,40 @@ pub const UnixAddress = struct {
         return .{ .path = p };
     }
 
-    pub const ListenError = error{};
+    pub const ListenError = error{
+        AddressFamilyUnsupported,
+        AddressInUse,
+        NetworkDown,
+        SystemResources,
+        SymLinkLoop,
+        FileNotFound,
+        NotDir,
+        ReadOnlyFileSystem,
+        ProcessFdQuotaExceeded,
+        SystemFdQuotaExceeded,
+        AccessDenied,
+        PermissionDenied,
+        AddressUnavailable,
+    } || Io.Cancelable || Io.UnexpectedError;
+
+    pub const ListenOptions = struct {
+        /// How many connections the kernel will accept on the application's behalf.
+        /// If more than this many connections pool in the kernel, clients will start
+        /// seeing "Connection refused".
+        kernel_backlog: u31 = default_kernel_backlog,
+    };
 
-    pub fn listen(ua: UnixAddress, io: Io) ListenError!Server {
+    pub fn listen(ua: *const UnixAddress, io: Io, options: ListenOptions) ListenError!Server {
         assert(ua.path.len <= max_len);
         return .{ .socket = .{
-            .handle = try io.vtable.netListenUnix(io.userdata, ua),
+            .handle = try io.vtable.netListenUnix(io.userdata, ua, options),
             .address = .{ .ip4 = .loopback(0) },
         } };
     }
 
-    pub const ConnectError = error{};
+    pub const ConnectError = error{} || Io.Cancelable || Io.UnexpectedError;
 
-    pub fn connect(ua: UnixAddress, io: Io) ConnectError!Stream {
+    pub fn connect(ua: *const UnixAddress, io: Io) ConnectError!Stream {
         assert(ua.path.len <= max_len);
         return .{ .socket = .{
             .handle = try io.vtable.netConnectUnix(io.userdata, ua),
lib/std/Io/Threaded.zig
@@ -1756,11 +1756,80 @@ fn netListenIpPosix(
     };
 }
 
-fn netListenUnix(userdata: ?*anyopaque, address: Io.net.UnixAddress) Io.net.UnixAddress.ListenError!Io.net.Socket.Handle {
+fn netListenUnix(
+    userdata: ?*anyopaque,
+    address: *const Io.net.UnixAddress,
+    options: Io.net.UnixAddress.ListenOptions,
+) Io.net.UnixAddress.ListenError!Io.net.Socket.Handle {
+    if (!Io.net.has_unix_sockets) return error.AddressFamilyUnsupported;
     const pool: *Pool = @ptrCast(@alignCast(userdata));
-    _ = pool;
-    _ = address;
-    @panic("TODO");
+    const protocol: u32 = 0;
+    const socket_fd = while (true) {
+        try pool.checkCancel();
+        const flags: u32 = posix.SOCK.STREAM | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
+        const socket_rc = posix.system.socket(posix.AF.UNIX, flags, protocol);
+        switch (posix.errno(socket_rc)) {
+            .SUCCESS => {
+                const fd: posix.fd_t = @intCast(socket_rc);
+                errdefer posix.close(fd);
+                if (socket_flags_unsupported) while (true) {
+                    try pool.checkCancel();
+                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
+                        .SUCCESS => break,
+                        .INTR => continue,
+                        else => |err| return posix.unexpectedErrno(err),
+                    }
+                };
+                break fd;
+            },
+            .INTR => continue,
+            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+            .MFILE => return error.ProcessFdQuotaExceeded,
+            .NFILE => return error.SystemFdQuotaExceeded,
+            .NOBUFS => return error.SystemResources,
+            .NOMEM => return error.SystemResources,
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    };
+    errdefer posix.close(socket_fd);
+
+    var storage: UnixAddress = undefined;
+    const addr_len = addressUnixToPosix(address, &storage);
+    while (true) {
+        try pool.checkCancel();
+        switch (posix.errno(posix.system.bind(socket_fd, &storage.any, addr_len))) {
+            .SUCCESS => break,
+            .INTR => continue,
+            .ACCES => return error.AccessDenied,
+            .PERM => return error.PermissionDenied,
+            .ADDRINUSE => return error.AddressInUse,
+            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+            .ADDRNOTAVAIL => return error.AddressUnavailable,
+            .NOMEM => return error.SystemResources,
+            .LOOP => return error.SymLinkLoop,
+            .NOENT => return error.FileNotFound,
+            .NOTDIR => return error.NotDir,
+            .ROFS => return error.ReadOnlyFileSystem,
+            .BADF => |err| return errnoBug(err), // always a race condition if this error is returned
+            .INVAL => |err| return errnoBug(err), // invalid parameters
+            .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
+            .FAULT => |err| return errnoBug(err), // invalid `addr` pointer
+            .NAMETOOLONG => |err| return errnoBug(err),
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+
+    while (true) {
+        try pool.checkCancel();
+        switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
+            .SUCCESS => break,
+            .ADDRINUSE => return error.AddressInUse,
+            .BADF => |err| return errnoBug(err),
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+
+    return socket_fd;
 }
 
 fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
@@ -1791,7 +1860,7 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka
             .ADDRINUSE => return error.AddressInUse,
             .ADDRNOTAVAIL => return error.AddressUnavailable,
             .AFNOSUPPORT => return error.AddressFamilyUnsupported,
-            .AGAIN, .INPROGRESS => |err| return errnoBug(err),
+            .AGAIN, .INPROGRESS => return error.WouldBlock,
             .ALREADY => return error.ConnectionPending,
             .BADF => |err| return errnoBug(err),
             .CONNREFUSED => return error.ConnectionRefused,
@@ -1804,8 +1873,8 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka
             .PROTOTYPE => |err| return errnoBug(err),
             .TIMEDOUT => return error.ConnectionTimedOut,
             .CONNABORTED => |err| return errnoBug(err),
+            .ACCES => return error.AccessDenied,
             // UNIX socket error codes:
-            .ACCES => |err| return errnoBug(err),
             .PERM => |err| return errnoBug(err),
             .NOENT => |err| return errnoBug(err),
             else => |err| return posix.unexpectedErrno(err),
@@ -1867,7 +1936,10 @@ fn netConnectIpPosix(
     } };
 }
 
-fn netConnectUnix(userdata: ?*anyopaque, address: Io.net.UnixAddress) Io.net.UnixAddress.ConnectError!Io.net.Socket.Handle {
+fn netConnectUnix(
+    userdata: ?*anyopaque,
+    address: *const Io.net.UnixAddress,
+) Io.net.UnixAddress.ConnectError!Io.net.Socket.Handle {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     _ = pool;
     _ = address;
@@ -2503,6 +2575,11 @@ const PosixAddress = extern union {
     in6: posix.sockaddr.in6,
 };
 
+const UnixAddress = extern union {
+    any: posix.sockaddr,
+    un: posix.sockaddr.un,
+};
+
 fn posixAddressFamily(a: *const Io.net.IpAddress) posix.sa_family_t {
     return switch (a.*) {
         .ip4 => posix.AF.INET,
@@ -2531,6 +2608,13 @@ fn addressToPosix(a: *const Io.net.IpAddress, storage: *PosixAddress) posix.sock
     };
 }
 
+fn addressUnixToPosix(a: *const Io.net.UnixAddress, storage: *UnixAddress) posix.socklen_t {
+    @memcpy(storage.un.path[0..a.path.len], a.path);
+    storage.un.family = posix.AF.UNIX;
+    storage.un.path[a.path.len] = 0;
+    return @sizeOf(posix.sockaddr.un);
+}
+
 fn address4FromPosix(in: *posix.sockaddr.in) Io.net.Ip4Address {
     return .{
         .port = std.mem.bigToNative(u16, in.port),
lib/std/Io.zig
@@ -672,12 +672,12 @@ pub const VTable = struct {
     now: *const fn (?*anyopaque, Clock) Clock.Error!Timestamp,
     sleep: *const fn (?*anyopaque, Timeout) SleepError!void,
 
-    netListenIp: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
+    netListenIp: *const fn (?*anyopaque, address: net.IpAddress, net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
     netAccept: *const fn (?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream,
     netBindIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
     netConnectIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream,
-    netListenUnix: *const fn (?*anyopaque, net.UnixAddress) net.UnixAddress.ListenError!net.Socket.Handle,
-    netConnectUnix: *const fn (?*anyopaque, net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
+    netListenUnix: *const fn (?*anyopaque, *const net.UnixAddress, net.UnixAddress.ListenOptions) net.UnixAddress.ListenError!net.Socket.Handle,
+    netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
     netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
     netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
     /// Returns 0 on end of stream.
BRANCH_TODO
@@ -10,5 +10,6 @@
 * address the cancelation race condition (signal received between checkCancel and syscall)
 * update signal values to be an enum
 * move fs.File.Writer to Io
+* add non-blocking flag to network operations, handle EAGAIN
 * finish moving std.fs to Io
 * finish moving all of std.posix into Threaded