Commit df84dc18bc

Andrew Kelley <andrew@ziglang.org>
2025-10-22 22:21:13
add bind
1 parent d6b0686
Changed files (2)
lib/std/Io/Kqueue.zig
@@ -9,6 +9,9 @@ const net = std.Io.net;
 const assert = std.debug.assert;
 const Allocator = std.mem.Allocator;
 const Alignment = std.mem.Alignment;
+const posix = std.posix;
+const IpAddress = std.Io.net.IpAddress;
+const errnoBug = std.Io.Threaded.errnoBug;
 
 /// Must be a thread-safe allocator.
 gpa: Allocator,
@@ -97,14 +100,10 @@ fn async(
     context_alignment: std.mem.Alignment,
     start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 ) ?*Io.AnyFuture {
-    const k: *Kqueue = @ptrCast(@alignCast(userdata));
-    _ = k;
-    _ = result;
-    _ = result_alignment;
-    _ = context;
-    _ = context_alignment;
-    _ = start;
-    @panic("TODO");
+    return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
+        start(context.ptr, result.ptr);
+        return null;
+    };
 }
 
 fn concurrent(
@@ -156,7 +155,7 @@ fn cancel(
 fn cancelRequested(userdata: ?*anyopaque) bool {
     const k: *Kqueue = @ptrCast(@alignCast(userdata));
     _ = k;
-    @panic("TODO");
+    return false; // TODO
 }
 
 fn groupAsync(
@@ -419,12 +418,23 @@ fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.Accept
     _ = server;
     @panic("TODO");
 }
-fn netBindIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket {
-    const k: *Kqueue = @ptrCast(@alignCast(userdata));
-    _ = k;
-    _ = address;
-    _ = options;
-    @panic("TODO");
+fn netBindIp(
+    userdata: ?*anyopaque,
+    address: *const net.IpAddress,
+    options: net.IpAddress.BindOptions,
+) net.IpAddress.BindError!net.Socket {
+    const k: *Kqueue = @ptrCast(@alignCast(userdata));
+    const family = Io.Threaded.posixAddressFamily(address);
+    const socket_fd = try openSocketPosix(k, family, options);
+    errdefer posix.close(socket_fd);
+    var storage: Io.Threaded.PosixAddress = undefined;
+    var addr_len = Io.Threaded.addressToPosix(address, &storage);
+    try posixBind(k, socket_fd, &storage.any, addr_len);
+    try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
+    return .{
+        .handle = socket_fd,
+        .address = Io.Threaded.addressFromPosix(&storage),
+    };
 }
 fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream {
     const k: *Kqueue = @ptrCast(@alignCast(userdata));
@@ -453,6 +463,7 @@ fn netConnectUnix(
     _ = unix_address;
     @panic("TODO");
 }
+
 fn netSend(
     userdata: ?*anyopaque,
     handle: net.Socket.Handle,
@@ -460,12 +471,22 @@ fn netSend(
     flags: net.SendFlags,
 ) struct { ?net.Socket.SendError, usize } {
     const k: *Kqueue = @ptrCast(@alignCast(userdata));
+
+    const posix_flags: u32 =
+        @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
+        @as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) |
+        @as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) |
+        @as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) |
+        @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
+        posix.MSG.NOSIGNAL;
+
     _ = k;
+    _ = posix_flags;
     _ = handle;
     _ = outgoing_messages;
-    _ = flags;
     @panic("TODO");
 }
+
 fn netReceive(
     userdata: ?*anyopaque,
     handle: net.Socket.Handle,
@@ -533,3 +554,153 @@ fn netLookup(
     _ = options;
     @panic("TODO");
 }
+
+fn openSocketPosix(
+    k: *Kqueue,
+    family: posix.sa_family_t,
+    options: IpAddress.BindOptions,
+) error{
+    AddressFamilyUnsupported,
+    ProtocolUnsupportedBySystem,
+    ProcessFdQuotaExceeded,
+    SystemFdQuotaExceeded,
+    SystemResources,
+    ProtocolUnsupportedByAddressFamily,
+    SocketModeUnsupported,
+    OptionUnsupported,
+    Unexpected,
+    Canceled,
+}!posix.socket_t {
+    const mode = Io.Threaded.posixSocketMode(options.mode);
+    const protocol = Io.Threaded.posixProtocol(options.protocol);
+    const socket_fd = while (true) {
+        try k.checkCancel();
+        const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
+        const socket_rc = posix.system.socket(family, flags, protocol);
+        switch (posix.errno(socket_rc)) {
+            .SUCCESS => {
+                const fd: posix.fd_t = @intCast(socket_rc);
+                errdefer posix.close(fd);
+                if (Io.Threaded.socket_flags_unsupported) {
+                    while (true) {
+                        try k.checkCancel();
+                        switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
+                            .SUCCESS => break,
+                            .INTR => continue,
+                            .CANCELED => return error.Canceled,
+                            else => |err| return posix.unexpectedErrno(err),
+                        }
+                    }
+
+                    var fl_flags: usize = while (true) {
+                        try k.checkCancel();
+                        const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
+                        switch (posix.errno(rc)) {
+                            .SUCCESS => break @intCast(rc),
+                            .INTR => continue,
+                            .CANCELED => return error.Canceled,
+                            else => |err| return posix.unexpectedErrno(err),
+                        }
+                    };
+                    fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
+                    while (true) {
+                        try k.checkCancel();
+                        switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
+                            .SUCCESS => break,
+                            .INTR => continue,
+                            .CANCELED => return error.Canceled,
+                            else => |err| return posix.unexpectedErrno(err),
+                        }
+                    }
+                }
+                break fd;
+            },
+            .INTR => continue,
+            .CANCELED => return error.Canceled,
+
+            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+            .INVAL => return error.ProtocolUnsupportedBySystem,
+            .MFILE => return error.ProcessFdQuotaExceeded,
+            .NFILE => return error.SystemFdQuotaExceeded,
+            .NOBUFS => return error.SystemResources,
+            .NOMEM => return error.SystemResources,
+            .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
+            .PROTOTYPE => return error.SocketModeUnsupported,
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    };
+    errdefer posix.close(socket_fd);
+
+    if (options.ip6_only) {
+        if (posix.IPV6 == void) return error.OptionUnsupported;
+        try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
+    }
+
+    return socket_fd;
+}
+
+fn posixBind(
+    k: *Kqueue,
+    socket_fd: posix.socket_t,
+    addr: *const posix.sockaddr,
+    addr_len: posix.socklen_t,
+) !void {
+    while (true) {
+        try k.checkCancel();
+        switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
+            .SUCCESS => break,
+            .INTR => continue,
+            .CANCELED => return error.Canceled,
+
+            .ADDRINUSE => return error.AddressInUse,
+            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
+            .INVAL => |err| return errnoBug(err), // invalid parameters
+            .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
+            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+            .ADDRNOTAVAIL => return error.AddressUnavailable,
+            .FAULT => |err| return errnoBug(err), // invalid `addr` pointer
+            .NOMEM => return error.SystemResources,
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+}
+
+fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
+    while (true) {
+        try k.checkCancel();
+        switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
+            .SUCCESS => break,
+            .INTR => continue,
+            .CANCELED => return error.Canceled,
+
+            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
+            .FAULT => |err| return errnoBug(err),
+            .INVAL => |err| return errnoBug(err), // invalid parameters
+            .NOTSOCK => |err| return errnoBug(err), // always a race condition
+            .NOBUFS => return error.SystemResources,
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+}
+
+fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
+    const o: []const u8 = @ptrCast(&option);
+    while (true) {
+        try k.checkCancel();
+        switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
+            .SUCCESS => return,
+            .INTR => continue,
+            .CANCELED => return error.Canceled,
+
+            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
+            .NOTSOCK => |err| return errnoBug(err),
+            .INVAL => |err| return errnoBug(err),
+            .FAULT => |err| return errnoBug(err),
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+}
+
+fn checkCancel(k: *Kqueue) error{Canceled}!void {
+    if (cancelRequested(k)) return error.Canceled;
+}
lib/std/Io/Threaded.zig
@@ -285,7 +285,7 @@ pub fn io(t: *Threaded) Io {
     };
 }
 
-const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // ๐Ÿ’ฉ๐Ÿ’ฉ
+pub const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // ๐Ÿ’ฉ๐Ÿ’ฉ
 const have_accept4 = !socket_flags_unsupported;
 const have_flock_open_flags = @hasField(posix.O, "EXLOCK");
 const have_networking = builtin.os.tag != .wasi;
@@ -4283,7 +4283,7 @@ fn netLookupFallible(
     return error.OptionUnsupported;
 }
 
-const PosixAddress = extern union {
+pub const PosixAddress = extern union {
     any: posix.sockaddr,
     in: posix.sockaddr.in,
     in6: posix.sockaddr.in6,
@@ -4301,14 +4301,14 @@ const WsaAddress = extern union {
     un: ws2_32.sockaddr.un,
 };
 
-fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t {
+pub fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t {
     return switch (a.*) {
         .ip4 => posix.AF.INET,
         .ip6 => posix.AF.INET6,
     };
 }
 
-fn addressFromPosix(posix_address: *const PosixAddress) IpAddress {
+pub fn addressFromPosix(posix_address: *const PosixAddress) IpAddress {
     return switch (posix_address.any.family) {
         posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) },
         posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) },
@@ -4324,7 +4324,7 @@ fn addressFromWsa(wsa_address: *const WsaAddress) IpAddress {
     };
 }
 
-fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t {
+pub fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t {
     return switch (a.*) {
         .ip4 => |ip4| {
             storage.in = address4ToPosix(ip4);
@@ -4405,7 +4405,7 @@ fn address6ToPosix(a: *const net.Ip6Address) posix.sockaddr.in6 {
     };
 }
 
-fn errnoBug(err: posix.E) Io.UnexpectedError {
+pub fn errnoBug(err: posix.E) Io.UnexpectedError {
     switch (builtin.mode) {
         .Debug => std.debug.panic("programmer bug caused syscall error: {t}", .{err}),
         else => return error.Unexpected,
@@ -4419,7 +4419,7 @@ fn wsaErrorBug(err: ws2_32.WinsockError) Io.UnexpectedError {
     }
 }
 
-fn posixSocketMode(mode: net.Socket.Mode) u32 {
+pub fn posixSocketMode(mode: net.Socket.Mode) u32 {
     return switch (mode) {
         .stream => posix.SOCK.STREAM,
         .dgram => posix.SOCK.DGRAM,
@@ -4429,7 +4429,7 @@ fn posixSocketMode(mode: net.Socket.Mode) u32 {
     };
 }
 
-fn posixProtocol(protocol: ?net.Protocol) u32 {
+pub fn posixProtocol(protocol: ?net.Protocol) u32 {
     return @intFromEnum(protocol orelse return 0);
 }