Commit 668f905243

Andrew Kelley <andrew@ziglang.org>
2025-07-20 08:36:20
add some networking
1 parent d801a71
Changed files (3)
lib/std/Io/net.zig
@@ -0,0 +1,450 @@
+const builtin = @import("builtin");
+const native_os = builtin.os.tag;
+const std = @import("../std.zig");
+const Io = std.Io;
+
+pub const ListenError = std.net.Address.ListenError || Io.Cancelable;
+
+pub const ListenOptions = struct {
+    /// How many connections the kernel will accept on the application's behalf.
+    /// If more than this many connections pool in the kernel, clients will start
+    /// seeing "Connection refused".
+    kernel_backlog: u31 = 128,
+    /// Sets SO_REUSEADDR and SO_REUSEPORT on POSIX.
+    /// Sets SO_REUSEADDR on Windows, which is roughly equivalent.
+    reuse_address: bool = false,
+    force_nonblocking: bool = false,
+};
+
+pub const IpAddress = union(enum) {
+    ip4: Ip4Address,
+    ip6: Ip6Address,
+
+    /// Parse the given IP address string into an `IpAddress` value.
+    pub fn parse(name: []const u8, port: u16) !IpAddress {
+        if (parseIp4(name, port)) |ip4| return ip4 else |err| switch (err) {
+            error.Overflow,
+            error.InvalidEnd,
+            error.InvalidCharacter,
+            error.Incomplete,
+            error.NonCanonical,
+            => {},
+        }
+
+        if (parseIp6(name, port)) |ip6| return ip6 else |err| switch (err) {
+            error.Overflow,
+            error.InvalidEnd,
+            error.InvalidCharacter,
+            error.Incomplete,
+            error.InvalidIpv4Mapping,
+            => {},
+        }
+
+        return error.InvalidIpAddressFormat;
+    }
+
+    pub fn parseIp6(buffer: []const u8, port: u16) Ip6Address.ParseError!IpAddress {
+        return .{ .ip6 = try Ip6Address.parse(buffer, port) };
+    }
+
+    pub fn parseIp4(buffer: []const u8, port: u16) Ip4Address.ParseError!IpAddress {
+        return .{ .ip4 = try Ip4Address.parse(buffer, port) };
+    }
+
+    /// Returns the port in native endian.
+    pub fn getPort(a: IpAddress) u16 {
+        return switch (a) {
+            inline .ip4, .ip6 => |x| x.port,
+        };
+    }
+
+    /// `port` is native-endian.
+    pub fn setPort(a: *IpAddress, port: u16) void {
+        switch (a) {
+            inline .ip4, .ip6 => |*x| x.port = port,
+        }
+    }
+
+    pub fn format(a: IpAddress, w: *std.io.Writer) std.io.Writer.Error!void {
+        switch (a) {
+            .ip4, .ip6 => |x| return x.format(w),
+        }
+    }
+
+    pub fn eql(a: IpAddress, b: IpAddress) bool {
+        return switch (a) {
+            .ip4 => |a_ip4| switch (b) {
+                .ip4 => |b_ip4| a_ip4.eql(b_ip4),
+                else => false,
+            },
+            .ip6 => |a_ip6| switch (b) {
+                .ip6 => |b_ip6| a_ip6.eql(b_ip6),
+                else => false,
+            },
+        };
+    }
+
+    /// The returned `Server` has an open `stream`.
+    pub fn listen(address: IpAddress, io: Io, options: ListenOptions) ListenError!Server {
+        return io.vtable.listen(io.userdata, address, options);
+    }
+};
+
+pub const Ip4Address = struct {
+    bytes: [4]u8,
+    port: u16,
+
+    pub const ParseError = error{
+        Overflow,
+        InvalidEnd,
+        InvalidCharacter,
+        Incomplete,
+        NonCanonical,
+    };
+
+    pub fn parse(buffer: []const u8, port: u16) ParseError!Ip4Address {
+        var bytes: [4]u8 = @splat(0);
+        var index: u8 = 0;
+        var saw_any_digits = false;
+        var has_zero_prefix = false;
+        for (buffer) |c| switch (c) {
+            '.' => {
+                if (!saw_any_digits) return error.InvalidCharacter;
+                if (index == 3) return error.InvalidEnd;
+                index += 1;
+                saw_any_digits = false;
+                has_zero_prefix = false;
+            },
+            '0'...'9' => {
+                if (c == '0' and !saw_any_digits) {
+                    has_zero_prefix = true;
+                } else if (has_zero_prefix) {
+                    return error.NonCanonical;
+                }
+                saw_any_digits = true;
+                bytes[index] = try std.math.mul(u8, bytes[index], 10);
+                bytes[index] = try std.math.add(u8, bytes[index], c - '0');
+            },
+            else => return error.InvalidCharacter,
+        };
+        if (index == 3 and saw_any_digits) return .{
+            .bytes = bytes,
+            .port = port,
+        };
+        return error.Incomplete;
+    }
+
+    pub fn format(a: Ip4Address, w: *std.io.Writer) std.io.Writer.Error!void {
+        const bytes = &a.bytes;
+        try w.print("{d}.{d}.{d}.{d}:{d}", .{ bytes[0], bytes[1], bytes[2], bytes[3], a.port });
+    }
+
+    pub fn eql(a: Ip4Address, b: Ip4Address) bool {
+        const a_int: u32 = @bitCast(a.bytes);
+        const b_int: u32 = @bitCast(b.bytes);
+        return a.port == b.port and a_int == b_int;
+    }
+};
+
+pub const Ip6Address = struct {
+    /// Native endian
+    port: u16,
+    /// Big endian
+    bytes: [16]u8,
+    flowinfo: u32 = 0,
+    scope_id: u32 = 0,
+
+    pub const ParseError = error{
+        Overflow,
+        InvalidCharacter,
+        InvalidEnd,
+        InvalidIpv4Mapping,
+        Incomplete,
+    };
+
+    pub fn parse(buffer: []const u8, port: u16) ParseError!Ip6Address {
+        var result: Ip6Address = .{
+            .port = port,
+            .bytes = undefined,
+        };
+        var ip_slice: *[16]u8 = &result.bytes;
+
+        var tail: [16]u8 = undefined;
+
+        var x: u16 = 0;
+        var saw_any_digits = false;
+        var index: u8 = 0;
+        var scope_id = false;
+        var abbrv = false;
+        for (buffer, 0..) |c, i| {
+            if (scope_id) {
+                if (c >= '0' and c <= '9') {
+                    const digit = c - '0';
+                    {
+                        const ov = @mulWithOverflow(result.scope_id, 10);
+                        if (ov[1] != 0) return error.Overflow;
+                        result.scope_id = ov[0];
+                    }
+                    {
+                        const ov = @addWithOverflow(result.scope_id, digit);
+                        if (ov[1] != 0) return error.Overflow;
+                        result.scope_id = ov[0];
+                    }
+                } else {
+                    return error.InvalidCharacter;
+                }
+            } else if (c == ':') {
+                if (!saw_any_digits) {
+                    if (abbrv) return error.InvalidCharacter; // ':::'
+                    if (i != 0) abbrv = true;
+                    @memset(ip_slice[index..], 0);
+                    ip_slice = tail[0..];
+                    index = 0;
+                    continue;
+                }
+                if (index == 14) {
+                    return error.InvalidEnd;
+                }
+                ip_slice[index] = @as(u8, @truncate(x >> 8));
+                index += 1;
+                ip_slice[index] = @as(u8, @truncate(x));
+                index += 1;
+
+                x = 0;
+                saw_any_digits = false;
+            } else if (c == '%') {
+                if (!saw_any_digits) {
+                    return error.InvalidCharacter;
+                }
+                scope_id = true;
+                saw_any_digits = false;
+            } else if (c == '.') {
+                if (!abbrv or ip_slice[0] != 0xff or ip_slice[1] != 0xff) {
+                    // must start with '::ffff:'
+                    return error.InvalidIpv4Mapping;
+                }
+                const start_index = std.mem.lastIndexOfScalar(u8, buffer[0..i], ':').? + 1;
+                const addr = (Ip4Address.parse(buffer[start_index..], 0) catch {
+                    return error.InvalidIpv4Mapping;
+                }).bytes;
+                ip_slice = result.bytes[0..];
+                ip_slice[10] = 0xff;
+                ip_slice[11] = 0xff;
+
+                ip_slice[12] = addr[0];
+                ip_slice[13] = addr[1];
+                ip_slice[14] = addr[2];
+                ip_slice[15] = addr[3];
+                return result;
+            } else {
+                const digit = try std.fmt.charToDigit(c, 16);
+                {
+                    const ov = @mulWithOverflow(x, 16);
+                    if (ov[1] != 0) return error.Overflow;
+                    x = ov[0];
+                }
+                {
+                    const ov = @addWithOverflow(x, digit);
+                    if (ov[1] != 0) return error.Overflow;
+                    x = ov[0];
+                }
+                saw_any_digits = true;
+            }
+        }
+
+        if (!saw_any_digits and !abbrv) {
+            return error.Incomplete;
+        }
+        if (!abbrv and index < 14) {
+            return error.Incomplete;
+        }
+
+        if (index == 14) {
+            ip_slice[14] = @as(u8, @truncate(x >> 8));
+            ip_slice[15] = @as(u8, @truncate(x));
+            return result;
+        } else {
+            ip_slice[index] = @as(u8, @truncate(x >> 8));
+            index += 1;
+            ip_slice[index] = @as(u8, @truncate(x));
+            index += 1;
+            @memcpy(result.bytes[16 - index ..][0..index], ip_slice[0..index]);
+            return result;
+        }
+    }
+
+    pub fn format(a: Ip6Address, w: *std.io.Writer) std.io.Writer.Error!void {
+        const bytes = &a.bytes;
+        if (std.mem.eql(u8, bytes[0..12], &[_]u8{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff })) {
+            try w.print("[::ffff:{d}.{d}.{d}.{d}]:{d}", .{
+                bytes[12], bytes[13], bytes[14], bytes[15], a.port,
+            });
+            return;
+        }
+        const parts: [8]u16 = .{
+            std.mem.readInt(u16, bytes[0..2], .big),
+            std.mem.readInt(u16, bytes[2..4], .big),
+            std.mem.readInt(u16, bytes[4..6], .big),
+            std.mem.readInt(u16, bytes[6..8], .big),
+            std.mem.readInt(u16, bytes[8..10], .big),
+            std.mem.readInt(u16, bytes[10..12], .big),
+            std.mem.readInt(u16, bytes[12..14], .big),
+            std.mem.readInt(u16, bytes[14..16], .big),
+        };
+
+        // Find the longest zero run
+        var longest_start: usize = 8;
+        var longest_len: usize = 0;
+        var current_start: usize = 0;
+        var current_len: usize = 0;
+
+        for (parts, 0..) |part, i| {
+            if (part == 0) {
+                if (current_len == 0) {
+                    current_start = i;
+                }
+                current_len += 1;
+                if (current_len > longest_len) {
+                    longest_start = current_start;
+                    longest_len = current_len;
+                }
+            } else {
+                current_len = 0;
+            }
+        }
+
+        // Only compress if the longest zero run is 2 or more
+        if (longest_len < 2) {
+            longest_start = 8;
+            longest_len = 0;
+        }
+
+        try w.writeAll("[");
+        var i: usize = 0;
+        var abbrv = false;
+        while (i < parts.len) : (i += 1) {
+            if (i == longest_start) {
+                // Emit "::" for the longest zero run
+                if (!abbrv) {
+                    try w.writeAll(if (i == 0) "::" else ":");
+                    abbrv = true;
+                }
+                i += longest_len - 1; // Skip the compressed range
+                continue;
+            }
+            if (abbrv) {
+                abbrv = false;
+            }
+            try w.print("{x}", .{parts[i]});
+            if (i != parts.len - 1) {
+                try w.writeAll(":");
+            }
+        }
+        try w.print("]:{d}", .{a.port});
+    }
+
+    pub fn eql(a: Ip6Address, b: Ip6Address) bool {
+        return a.port == b.port and std.mem.eql(u8, &a.bytes, &b.bytes);
+    }
+};
+
+pub const Stream = struct {
+    /// Underlying platform-defined type which may or may not be
+    /// interchangeable with a file system file descriptor.
+    handle: Handle,
+
+    pub const Handle = switch (native_os) {
+        .windows => std.windows.ws2_32.SOCKET,
+        else => std.posix.fd_t,
+    };
+
+    pub fn close(s: Stream, io: Io) void {
+        return io.vtable.close(io.userdata, s);
+    }
+
+    pub const Reader = struct {
+        io: Io,
+        interface: Io.Reader,
+        stream: Stream,
+        err: ?Error,
+
+        pub const Error = std.net.Stream.ReadError || Io.Cancelable || Io.Writer.Error || error{EndOfStream};
+
+        pub fn init(stream: Stream, buffer: []u8) Reader {
+            return .{
+                .interface = .{
+                    .vtable = &.{ .stream = streamImpl },
+                    .buffer = buffer,
+                    .seek = 0,
+                    .end = 0,
+                },
+                .stream = stream,
+                .err = null,
+            };
+        }
+
+        fn streamImpl(io_r: *Io.Reader, io_w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!usize {
+            const r: *Reader = @alignCast(@fieldParentPtr("interface", io_r));
+            const io = r.io;
+            return io.vtable.netRead(io.vtable.userdata, r.stream, io_w, limit);
+        }
+    };
+
+    pub const Writer = struct {
+        io: Io,
+        interface: Io.Writer,
+        stream: Stream,
+        err: ?Error = null,
+
+        pub const Error = std.net.Stream.WriteError || Io.Cancelable;
+
+        pub fn init(stream: Stream, buffer: []u8) Writer {
+            return .{
+                .stream = stream,
+                .interface = .{
+                    .vtable = &.{ .drain = drain },
+                    .buffer = buffer,
+                },
+            };
+        }
+
+        fn drain(io_w: *Io.Writer, data: []const []const u8, splat: usize) Io.Writer.Error!usize {
+            const w: *Writer = @alignCast(@fieldParentPtr("interface", io_w));
+            const io = w.io;
+            const buffered = io_w.buffered();
+            const n = try io.vtable.netWrite(io.vtable.userdata, w.stream, buffered, data, splat);
+            return io_w.consume(n);
+        }
+    };
+
+    pub fn reader(stream: Stream, buffer: []u8) Reader {
+        return .init(stream, buffer);
+    }
+
+    pub fn writer(stream: Stream, buffer: []u8) Writer {
+        return .init(stream, buffer);
+    }
+};
+
+pub const Server = struct {
+    listen_address: IpAddress,
+    stream: Stream,
+
+    pub const Connection = struct {
+        stream: Stream,
+        address: IpAddress,
+    };
+
+    pub fn deinit(s: *Server, io: Io) void {
+        s.stream.close(io);
+        s.* = undefined;
+    }
+
+    pub const AcceptError = std.posix.AcceptError || Io.Cancelable;
+
+    /// Blocks until a client connects to the server. The returned `Connection` has
+    /// an open stream.
+    pub fn accept(s: *Server, io: Io) AcceptError!Connection {
+        return io.vtable.accept(io, s);
+    }
+};
lib/std/Io/ThreadPool.zig
@@ -3,6 +3,7 @@ const std = @import("../std.zig");
 const Allocator = std.mem.Allocator;
 const assert = std.debug.assert;
 const WaitGroup = std.Thread.WaitGroup;
+const posix = std.posix;
 const Io = std.Io;
 const Pool = @This();
 
@@ -19,6 +20,9 @@ parallel_count: usize,
 
 threadlocal var current_closure: ?*AsyncClosure = null;
 
+const max_iovecs_len = 8;
+const splat_buffer_size = 64;
+
 pub const Runnable = struct {
     start: Start,
     node: std.SinglyLinkedList.Node = .{},
@@ -107,6 +111,18 @@ pub fn io(pool: *Pool) Io {
 
             .now = now,
             .sleep = sleep,
+
+            .listen = listen,
+            .accept = accept,
+            .netRead = switch (builtin.os.tag) {
+                .windows => @panic("TODO"),
+                else => netReadPosix,
+            },
+            .netWrite = switch (builtin.os.tag) {
+                .windows => @panic("TODO"),
+                else => netWritePosix,
+            },
+            .netClose = netClose,
         },
     };
 }
@@ -461,7 +477,7 @@ fn cancel(
             .linux => _ = std.os.linux.tgkill(
                 std.os.linux.getpid(),
                 @bitCast(cancel_tid),
-                std.posix.SIG.IO,
+                posix.SIG.IO,
             ),
             else => {},
         },
@@ -635,7 +651,7 @@ fn closeFile(userdata: ?*anyopaque, file: Io.File) void {
     return fs_file.close();
 }
 
-fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.off_t) Io.File.PReadError!usize {
+fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: posix.off_t) Io.File.PReadError!usize {
     const pool: *Pool = @alignCast(@ptrCast(userdata));
     try pool.checkCancel();
     const fs_file: std.fs.File = .{ .handle = file.handle };
@@ -645,7 +661,7 @@ fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.o
     };
 }
 
-fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.posix.off_t) Io.File.PWriteError!usize {
+fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posix.off_t) Io.File.PWriteError!usize {
     const pool: *Pool = @alignCast(@ptrCast(userdata));
     try pool.checkCancel();
     const fs_file: std.fs.File = .{ .handle = file.handle };
@@ -655,20 +671,20 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.
     };
 }
 
-fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
+fn now(userdata: ?*anyopaque, clockid: posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
     const pool: *Pool = @alignCast(@ptrCast(userdata));
     try pool.checkCancel();
-    const timespec = try std.posix.clock_gettime(clockid);
+    const timespec = try posix.clock_gettime(clockid);
     return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
 }
 
-fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
+fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
     const pool: *Pool = @alignCast(@ptrCast(userdata));
     const deadline_nanoseconds: i96 = switch (deadline) {
         .duration => |duration| duration.nanoseconds,
         .timestamp => |timestamp| @intFromEnum(timestamp),
     };
-    var timespec: std.posix.timespec = .{
+    var timespec: posix.timespec = .{
         .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
         .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
     };
@@ -682,7 +698,7 @@ fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadl
             .FAULT => unreachable,
             .INTR => {},
             .INVAL => return error.UnsupportedClock,
-            else => |err| return std.posix.unexpectedErrno(err),
+            else => |err| return posix.unexpectedErrno(err),
         }
     }
 }
@@ -718,3 +734,206 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
     }
     return result.?;
 }
+
+fn listen(userdata: ?*anyopaque, address: Io.net.IpAddress, options: Io.net.ListenOptions) Io.net.ListenError!Io.net.Server {
+    const pool: *Pool = @alignCast(@ptrCast(userdata));
+    try pool.checkCancel();
+
+    const nonblock: u32 = if (options.force_nonblocking) posix.SOCK.NONBLOCK else 0;
+    const sock_flags = posix.SOCK.STREAM | posix.SOCK.CLOEXEC | nonblock;
+    const proto: u32 = posix.IPPROTO.TCP;
+    const family = posixAddressFamily(address);
+    const sockfd = try posix.socket(family, sock_flags, proto);
+    const stream: std.net.Stream = .{ .handle = sockfd };
+    errdefer stream.close();
+
+    if (options.reuse_address) {
+        try posix.setsockopt(
+            sockfd,
+            posix.SOL.SOCKET,
+            posix.SO.REUSEADDR,
+            &std.mem.toBytes(@as(c_int, 1)),
+        );
+        if (@hasDecl(posix.SO, "REUSEPORT") and family != posix.AF.UNIX) {
+            try posix.setsockopt(
+                sockfd,
+                posix.SOL.SOCKET,
+                posix.SO.REUSEPORT,
+                &std.mem.toBytes(@as(c_int, 1)),
+            );
+        }
+    }
+
+    var storage: PosixAddress = undefined;
+    var socklen = addressToPosix(address, &storage);
+    try posix.bind(sockfd, &storage.any, socklen);
+    try posix.listen(sockfd, options.kernel_backlog);
+    try posix.getsockname(sockfd, &storage.any, &socklen);
+    return .{
+        .listen_address = addressFromPosix(&storage),
+        .stream = .{ .handle = stream.handle },
+    };
+}
+
+fn accept(userdata: ?*anyopaque, server: *Io.net.Server) Io.net.Server.AcceptError!Io.net.Server.Connection {
+    const pool: *Pool = @alignCast(@ptrCast(userdata));
+    try pool.checkCancel();
+
+    var storage: PosixAddress = undefined;
+    var addr_len: posix.socklen_t = @sizeOf(PosixAddress);
+    const fd = try posix.accept(server.stream.handle, &storage.any, &addr_len, posix.SOCK.CLOEXEC);
+    return .{
+        .stream = .{ .handle = fd },
+        .address = addressFromPosix(&storage),
+    };
+}
+
+fn netReadPosix(
+    userdata: ?*anyopaque,
+    stream: Io.net.Stream,
+    w: *Io.Writer,
+    limit: Io.Limit,
+) Io.net.Stream.Reader.Error!usize {
+    const pool: *Pool = @alignCast(@ptrCast(userdata));
+    try pool.checkCancel();
+
+    var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
+    const dest = try w.writableVectorPosix(&iovecs_buffer, limit);
+    assert(dest[0].len > 0);
+    const n = try posix.readv(stream.handle, dest);
+    if (n == 0) return error.EndOfStream;
+    return n;
+}
+
+fn netWritePosix(
+    userdata: ?*anyopaque,
+    stream: Io.net.Stream,
+    header: []const u8,
+    data: []const []const u8,
+    splat: usize,
+) Io.net.Stream.Writer.Error!usize {
+    const pool: *Pool = @alignCast(@ptrCast(userdata));
+    try pool.checkCancel();
+
+    var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
+    var msg: posix.msghdr_const = .{
+        .name = null,
+        .namelen = 0,
+        .iov = &iovecs,
+        .iovlen = 0,
+        .control = null,
+        .controllen = 0,
+        .flags = 0,
+    };
+    addBuf(&iovecs, &msg.iovlen, header);
+    for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &msg.iovlen, bytes);
+    const pattern = data[data.len - 1];
+    if (iovecs.len - msg.iovlen != 0) switch (splat) {
+        0 => {},
+        1 => addBuf(&iovecs, &msg.iovlen, pattern),
+        else => switch (pattern.len) {
+            0 => {},
+            1 => {
+                var backup_buffer: [splat_buffer_size]u8 = undefined;
+                const splat_buffer = &backup_buffer;
+                const memset_len = @min(splat_buffer.len, splat);
+                const buf = splat_buffer[0..memset_len];
+                @memset(buf, pattern[0]);
+                addBuf(&iovecs, &msg.iovlen, buf);
+                var remaining_splat = splat - buf.len;
+                while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) {
+                    assert(buf.len == splat_buffer.len);
+                    addBuf(&iovecs, &msg.iovlen, splat_buffer);
+                    remaining_splat -= splat_buffer.len;
+                }
+                addBuf(&iovecs, &msg.iovlen, splat_buffer[0..remaining_splat]);
+            },
+            else => for (0..@min(splat, iovecs.len - msg.iovlen)) |_| {
+                addBuf(&iovecs, &msg.iovlen, pattern);
+            },
+        },
+    };
+    const flags = posix.MSG.NOSIGNAL;
+    return posix.sendmsg(stream.handle, &msg, flags);
+}
+
+fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void {
+    // OS checks ptr addr before length so zero length vectors must be omitted.
+    if (bytes.len == 0) return;
+    if (v.len - i.* == 0) return;
+    v[i.*] = .{ .base = bytes.ptr, .len = bytes.len };
+    i.* += 1;
+}
+
+fn netClose(userdata: ?*anyopaque, stream: Io.net.Stream) void {
+    const pool: *Pool = @alignCast(@ptrCast(userdata));
+    _ = pool;
+    const net_stream: std.net.Stream = .{ .handle = stream.handle };
+    return net_stream.close();
+}
+
+const PosixAddress = extern union {
+    any: posix.sockaddr,
+    in: posix.sockaddr.in,
+    in6: posix.sockaddr.in6,
+};
+
+fn posixAddressFamily(a: Io.net.IpAddress) posix.sa_family_t {
+    return switch (a) {
+        .ip4 => posix.AF.INET,
+        .ip6 => posix.AF.INET6,
+    };
+}
+
+fn addressFromPosix(posix_address: *PosixAddress) Io.net.IpAddress {
+    return switch (posix_address.any.family) {
+        posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) },
+        posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) },
+        else => unreachable,
+    };
+}
+
+fn addressToPosix(a: Io.net.IpAddress, storage: *PosixAddress) posix.socklen_t {
+    return switch (a) {
+        .ip4 => |ip4| {
+            storage.in = address4ToPosix(ip4);
+            return @sizeOf(posix.sockaddr.in);
+        },
+        .ip6 => |ip6| {
+            storage.in6 = address6ToPosix(ip6);
+            return @sizeOf(posix.sockaddr.in6);
+        },
+    };
+}
+
+fn address4FromPosix(in: *posix.sockaddr.in) Io.net.Ip4Address {
+    return .{
+        .port = std.mem.bigToNative(u16, in.port),
+        .bytes = @bitCast(in.addr),
+    };
+}
+
+fn address6FromPosix(in6: *posix.sockaddr.in6) Io.net.Ip6Address {
+    return .{
+        .port = std.mem.bigToNative(u16, in6.port),
+        .bytes = in6.addr,
+        .flowinfo = in6.flowinfo,
+        .scope_id = in6.scope_id,
+    };
+}
+
+fn address4ToPosix(a: Io.net.Ip4Address) posix.sockaddr.in {
+    return .{
+        .port = std.mem.nativeToBig(u16, a.port),
+        .addr = @bitCast(a.bytes),
+    };
+}
+
+fn address6ToPosix(a: Io.net.Ip6Address) posix.sockaddr.in6 {
+    return .{
+        .port = std.mem.nativeToBig(u16, a.port),
+        .flowinfo = a.flowinfo,
+        .addr = a.bytes,
+        .scope_id = a.scope_id,
+    };
+}
lib/std/Io.zig
@@ -559,6 +559,7 @@ const Io = @This();
 
 pub const EventLoop = @import("Io/EventLoop.zig");
 pub const ThreadPool = @import("Io/ThreadPool.zig");
+pub const net = @import("Io/net.zig");
 
 userdata: ?*anyopaque,
 vtable: *const VTable,
@@ -656,6 +657,12 @@ pub const VTable = struct {
 
     now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp,
     sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void,
+
+    listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.ListenOptions) net.ListenError!net.Server,
+    accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Server.Connection,
+    netRead: *const fn (?*anyopaque, src: net.Stream, dest: *Io.Writer, limit: Io.Limit) net.Stream.Reader.Error!usize,
+    netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
+    netClose: *const fn (?*anyopaque, stream: net.Stream) void,
 };
 
 pub const Cancelable = error{