Commit 774df26835

Andrew Kelley <andrew@ziglang.org>
2025-10-04 01:15:37
WIP: hack at std.Io on a plane
1 parent 00f26cb
lib/std/Build/Cache.zig
@@ -1317,6 +1317,8 @@ fn testGetCurrentFileTimestamp(dir: fs.Dir) !i128 {
 }
 
 test "cache file and then recall it" {
+    const io = std.testing.io;
+
     var tmp = testing.tmpDir(.{});
     defer tmp.cleanup();
 
@@ -1328,7 +1330,7 @@ test "cache file and then recall it" {
     // Wait for file timestamps to tick
     const initial_time = try testGetCurrentFileTimestamp(tmp.dir);
     while ((try testGetCurrentFileTimestamp(tmp.dir)) == initial_time) {
-        std.Thread.sleep(1);
+        try std.Io.Duration.sleep(.fromNanoseconds(1), io);
     }
 
     var digest1: HexDigest = undefined;
@@ -1378,6 +1380,8 @@ test "cache file and then recall it" {
 }
 
 test "check that changing a file makes cache fail" {
+    const io = std.testing.io;
+
     var tmp = testing.tmpDir(.{});
     defer tmp.cleanup();
 
@@ -1391,7 +1395,7 @@ test "check that changing a file makes cache fail" {
     // Wait for file timestamps to tick
     const initial_time = try testGetCurrentFileTimestamp(tmp.dir);
     while ((try testGetCurrentFileTimestamp(tmp.dir)) == initial_time) {
-        std.Thread.sleep(1);
+        try std.Io.Duration.sleep(.fromNanoseconds(1), io);
     }
 
     var digest1: HexDigest = undefined;
@@ -1490,6 +1494,8 @@ test "no file inputs" {
 }
 
 test "Manifest with files added after initial hash work" {
+    const io = std.testing.io;
+
     var tmp = testing.tmpDir(.{});
     defer tmp.cleanup();
 
@@ -1503,7 +1509,7 @@ test "Manifest with files added after initial hash work" {
     // Wait for file timestamps to tick
     const initial_time = try testGetCurrentFileTimestamp(tmp.dir);
     while ((try testGetCurrentFileTimestamp(tmp.dir)) == initial_time) {
-        std.Thread.sleep(1);
+        try std.Io.Duration.sleep(.fromNanoseconds(1), io);
     }
 
     var digest1: HexDigest = undefined;
@@ -1553,7 +1559,7 @@ test "Manifest with files added after initial hash work" {
         // Wait for file timestamps to tick
         const initial_time2 = try testGetCurrentFileTimestamp(tmp.dir);
         while ((try testGetCurrentFileTimestamp(tmp.dir)) == initial_time2) {
-            std.Thread.sleep(1);
+            try std.Io.Duration.sleep(.fromNanoseconds(1), io);
         }
 
         {
lib/std/fs/test.zig
@@ -2265,6 +2265,8 @@ test "seekTo flushes buffered data" {
     var tmp = std.testing.tmpDir(.{});
     defer tmp.cleanup();
 
+    const io = std.testing.io;
+
     const contents = "data";
 
     const file = try tmp.dir.createFile("seek.bin", .{ .read = true });
@@ -2279,7 +2281,7 @@ test "seekTo flushes buffered data" {
     }
 
     var read_buffer: [16]u8 = undefined;
-    var file_reader: std.fs.File.Reader = .init(file, &read_buffer);
+    var file_reader: std.Io.File.Reader = .init(file, io, &read_buffer);
 
     var buf: [4]u8 = undefined;
     try file_reader.interface.readSliceAll(&buf);
lib/std/http/test.zig
@@ -1,27 +1,31 @@
 const builtin = @import("builtin");
+const native_endian = builtin.cpu.arch.endian();
+
 const std = @import("std");
 const http = std.http;
 const mem = std.mem;
-const native_endian = builtin.cpu.arch.endian();
+const net = std.Io.net;
+const Io = std.Io;
 const expect = std.testing.expect;
 const expectEqual = std.testing.expectEqual;
 const expectEqualStrings = std.testing.expectEqualStrings;
 const expectError = std.testing.expectError;
 
 test "trailers" {
-    const test_server = try createTestServer(struct {
+    const io = std.testing.io;
+    const test_server = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [1024]u8 = undefined;
             var send_buffer: [1024]u8 = undefined;
             var remaining: usize = 1;
             while (remaining != 0) : (remaining -= 1) {
-                const connection = try net_server.accept();
-                defer connection.stream.close();
+                var stream = try net_server.accept(io);
+                defer stream.close(io);
 
-                var connection_br = connection.stream.reader(&recv_buffer);
-                var connection_bw = connection.stream.writer(&send_buffer);
-                var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+                var connection_br = stream.reader(io, &recv_buffer);
+                var connection_bw = stream.writer(io, &send_buffer);
+                var server = http.Server.init(&connection_br.interface, &connection_bw.interface);
 
                 try expectEqual(.ready, server.reader.state);
                 var request = try server.receiveHead();
@@ -92,17 +96,18 @@ test "trailers" {
 }
 
 test "HTTP server handles a chunked transfer coding request" {
-    const test_server = try createTestServer(struct {
+    const io = std.testing.io;
+    const test_server = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [8192]u8 = undefined;
             var send_buffer: [500]u8 = undefined;
-            const connection = try net_server.accept();
-            defer connection.stream.close();
+            var stream = try net_server.accept(io);
+            defer stream.close(io);
 
-            var connection_br = connection.stream.reader(&recv_buffer);
-            var connection_bw = connection.stream.writer(&send_buffer);
-            var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+            var connection_br = stream.reader(io, &recv_buffer);
+            var connection_bw = stream.writer(io, &send_buffer);
+            var server = http.Server.init(&connection_br.interface, &connection_bw.interface);
             var request = try server.receiveHead();
 
             try expect(request.head.transfer_encoding == .chunked);
@@ -137,8 +142,8 @@ test "HTTP server handles a chunked transfer coding request" {
         "\r\n";
 
     const gpa = std.testing.allocator;
-    const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
-    defer stream.close();
+    var stream = try net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
+    defer stream.close(io);
     var stream_writer = stream.writer(&.{});
     try stream_writer.interface.writeAll(request_bytes);
 
@@ -156,19 +161,20 @@ test "HTTP server handles a chunked transfer coding request" {
 }
 
 test "echo content server" {
-    const test_server = try createTestServer(struct {
+    const io = std.testing.io;
+    const test_server = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [1024]u8 = undefined;
             var send_buffer: [100]u8 = undefined;
 
             accept: while (!test_server.shutting_down) {
-                const connection = try net_server.accept();
-                defer connection.stream.close();
+                var stream = try net_server.accept(io);
+                defer stream.close(io);
 
-                var connection_br = connection.stream.reader(&recv_buffer);
-                var connection_bw = connection.stream.writer(&send_buffer);
-                var http_server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+                var connection_br = stream.reader(io, &recv_buffer);
+                var connection_bw = stream.writer(io, &send_buffer);
+                var http_server = http.Server.init(&connection_br.interface, &connection_bw.interface);
 
                 while (http_server.reader.state == .ready) {
                     var request = http_server.receiveHead() catch |err| switch (err) {
@@ -243,6 +249,8 @@ test "echo content server" {
 }
 
 test "Server.Request.respondStreaming non-chunked, unknown content-length" {
+    const io = std.testing.io;
+
     if (builtin.os.tag == .windows) {
         // https://github.com/ziglang/zig/issues/21457
         return error.SkipZigTest;
@@ -250,19 +258,19 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" {
 
     // In this case, the response is expected to stream until the connection is
     // closed, indicating the end of the body.
-    const test_server = try createTestServer(struct {
+    const test_server = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [1000]u8 = undefined;
             var send_buffer: [500]u8 = undefined;
             var remaining: usize = 1;
             while (remaining != 0) : (remaining -= 1) {
-                const connection = try net_server.accept();
-                defer connection.stream.close();
+                var stream = try net_server.accept(io);
+                defer stream.close(io);
 
-                var connection_br = connection.stream.reader(&recv_buffer);
-                var connection_bw = connection.stream.writer(&send_buffer);
-                var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+                var connection_br = stream.reader(io, &recv_buffer);
+                var connection_bw = stream.writer(io, &send_buffer);
+                var server = http.Server.init(&connection_br.interface, &connection_bw.interface);
 
                 try expectEqual(.ready, server.reader.state);
                 var request = try server.receiveHead();
@@ -287,8 +295,8 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" {
 
     const request_bytes = "GET /foo HTTP/1.1\r\n\r\n";
     const gpa = std.testing.allocator;
-    const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
-    defer stream.close();
+    var stream = try net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
+    defer stream.close(io);
     var stream_writer = stream.writer(&.{});
     try stream_writer.interface.writeAll(request_bytes);
 
@@ -316,19 +324,21 @@ test "Server.Request.respondStreaming non-chunked, unknown content-length" {
 }
 
 test "receiving arbitrary http headers from the client" {
-    const test_server = try createTestServer(struct {
+    const io = std.testing.io;
+
+    const test_server = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [666]u8 = undefined;
             var send_buffer: [777]u8 = undefined;
             var remaining: usize = 1;
             while (remaining != 0) : (remaining -= 1) {
-                const connection = try net_server.accept();
-                defer connection.stream.close();
+                var stream = try net_server.accept(io);
+                defer stream.close(io);
 
-                var connection_br = connection.stream.reader(&recv_buffer);
-                var connection_bw = connection.stream.writer(&send_buffer);
-                var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+                var connection_br = stream.reader(io, &recv_buffer);
+                var connection_bw = stream.writer(io, &send_buffer);
+                var server = http.Server.init(&connection_br.interface, &connection_bw.interface);
 
                 try expectEqual(.ready, server.reader.state);
                 var request = try server.receiveHead();
@@ -357,8 +367,8 @@ test "receiving arbitrary http headers from the client" {
         "aoeu:  asdf \r\n" ++
         "\r\n";
     const gpa = std.testing.allocator;
-    const stream = try std.net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
-    defer stream.close();
+    var stream = try net.tcpConnectToHost(gpa, "127.0.0.1", test_server.port());
+    defer stream.close(io);
     var stream_writer = stream.writer(&.{});
     try stream_writer.interface.writeAll(request_bytes);
 
@@ -376,24 +386,26 @@ test "receiving arbitrary http headers from the client" {
 }
 
 test "general client/server API coverage" {
+    const io = std.testing.io;
+
     if (builtin.os.tag == .windows) {
         // This test was never passing on Windows.
         return error.SkipZigTest;
     }
 
-    const test_server = try createTestServer(struct {
+    const test_server = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [1024]u8 = undefined;
             var send_buffer: [100]u8 = undefined;
 
             outer: while (!test_server.shutting_down) {
-                var connection = try net_server.accept();
-                defer connection.stream.close();
+                var stream = try net_server.accept(io);
+                defer stream.close(io);
 
-                var connection_br = connection.stream.reader(&recv_buffer);
-                var connection_bw = connection.stream.writer(&send_buffer);
-                var http_server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+                var connection_br = stream.reader(io, &recv_buffer);
+                var connection_bw = stream.writer(io, &send_buffer);
+                var http_server = http.Server.init(&connection_br.interface, &connection_bw.interface);
 
                 while (http_server.reader.state == .ready) {
                     var request = http_server.receiveHead() catch |err| switch (err) {
@@ -530,7 +542,7 @@ test "general client/server API coverage" {
         }
 
         fn getUnusedTcpPort() !u16 {
-            const addr = try std.net.Address.parseIp("127.0.0.1", 0);
+            const addr = try net.IpAddress.parse("127.0.0.1", 0);
             var s = try addr.listen(.{});
             defer s.deinit();
             return s.listen_address.in.getPort();
@@ -867,18 +879,20 @@ test "general client/server API coverage" {
 }
 
 test "Server streams both reading and writing" {
-    const test_server = try createTestServer(struct {
+    const io = std.testing.io;
+
+    const test_server = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [1024]u8 = undefined;
             var send_buffer: [777]u8 = undefined;
 
-            const connection = try net_server.accept();
-            defer connection.stream.close();
+            var stream = try net_server.accept(io);
+            defer stream.close(io);
 
-            var connection_br = connection.stream.reader(&recv_buffer);
-            var connection_bw = connection.stream.writer(&send_buffer);
-            var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+            var connection_br = stream.reader(io, &recv_buffer);
+            var connection_bw = stream.writer(io, &send_buffer);
+            var server = http.Server.init(&connection_br.interface, &connection_bw.interface);
             var request = try server.receiveHead();
             var read_buffer: [100]u8 = undefined;
             var br = try request.readerExpectContinue(&read_buffer);
@@ -1077,11 +1091,11 @@ fn echoTests(client: *http.Client, port: u16) !void {
 const TestServer = struct {
     shutting_down: bool,
     server_thread: std.Thread,
-    net_server: std.net.Server,
+    net_server: net.Server,
 
     fn destroy(self: *@This()) void {
         self.shutting_down = true;
-        const conn = std.net.tcpConnectToAddress(self.net_server.listen_address) catch @panic("shutdown failure");
+        const conn = net.tcpConnectToAddress(self.net_server.listen_address) catch @panic("shutdown failure");
         conn.close();
 
         self.server_thread.join();
@@ -1090,21 +1104,21 @@ const TestServer = struct {
     }
 
     fn port(self: @This()) u16 {
-        return self.net_server.listen_address.in.getPort();
+        return self.net_server.socket.address.getPort();
     }
 };
 
-fn createTestServer(S: type) !*TestServer {
+fn createTestServer(io: Io, S: type) !*TestServer {
     if (builtin.single_threaded) return error.SkipZigTest;
     if (builtin.zig_backend == .stage2_llvm and native_endian == .big) {
         // https://github.com/ziglang/zig/issues/13782
         return error.SkipZigTest;
     }
 
-    const address = try std.net.Address.parseIp("127.0.0.1", 0);
+    const address = try net.IpAddress.parse("127.0.0.1", 0);
     const test_server = try std.testing.allocator.create(TestServer);
     test_server.* = .{
-        .net_server = try address.listen(.{ .reuse_address = true }),
+        .net_server = try address.listen(io, .{ .reuse_address = true }),
         .shutting_down = false,
         .server_thread = try std.Thread.spawn(.{}, S.run, .{test_server}),
     };
@@ -1112,18 +1126,19 @@ fn createTestServer(S: type) !*TestServer {
 }
 
 test "redirect to different connection" {
-    const test_server_new = try createTestServer(struct {
+    const io = std.testing.io;
+    const test_server_new = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [888]u8 = undefined;
             var send_buffer: [777]u8 = undefined;
 
-            const connection = try net_server.accept();
-            defer connection.stream.close();
+            var stream = try net_server.accept(io);
+            defer stream.close(io);
 
-            var connection_br = connection.stream.reader(&recv_buffer);
-            var connection_bw = connection.stream.writer(&send_buffer);
-            var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+            var connection_br = stream.reader(io, &recv_buffer);
+            var connection_bw = stream.writer(io, &send_buffer);
+            var server = http.Server.init(&connection_br.interface, &connection_bw.interface);
             var request = try server.receiveHead();
             try expectEqualStrings(request.head.target, "/ok");
             try request.respond("good job, you pass", .{});
@@ -1136,23 +1151,23 @@ test "redirect to different connection" {
     };
     global.other_port = test_server_new.port();
 
-    const test_server_orig = try createTestServer(struct {
+    const test_server_orig = try createTestServer(io, struct {
         fn run(test_server: *TestServer) anyerror!void {
             const net_server = &test_server.net_server;
             var recv_buffer: [999]u8 = undefined;
             var send_buffer: [100]u8 = undefined;
 
-            const connection = try net_server.accept();
-            defer connection.stream.close();
+            var stream = try net_server.accept(io);
+            defer stream.close(io);
 
             var loc_buf: [50]u8 = undefined;
             const new_loc = try std.fmt.bufPrint(&loc_buf, "http://127.0.0.1:{d}/ok", .{
                 global.other_port.?,
             });
 
-            var connection_br = connection.stream.reader(&recv_buffer);
-            var connection_bw = connection.stream.writer(&send_buffer);
-            var server = http.Server.init(connection_br.interface(), &connection_bw.interface);
+            var connection_br = stream.reader(io, &recv_buffer);
+            var connection_bw = stream.writer(io, &send_buffer);
+            var server = http.Server.init(&connection_br.interface, &connection_bw.interface);
             var request = try server.receiveHead();
             try expectEqualStrings(request.head.target, "/help");
             try request.respond("", .{
@@ -1167,7 +1182,10 @@ test "redirect to different connection" {
 
     const gpa = std.testing.allocator;
 
-    var client: http.Client = .{ .allocator = gpa };
+    var client: http.Client = .{
+        .allocator = gpa,
+        .io = io,
+    };
     defer client.deinit();
 
     var loc_buf: [100]u8 = undefined;
lib/std/Io/net/test.zig
@@ -6,16 +6,16 @@ const testing = std.testing;
 
 test "parse and render IP addresses at comptime" {
     comptime {
-        const ipv6addr = net.IpAddress.parseIp("::1", 0) catch unreachable;
+        const ipv6addr = net.IpAddress.parse("::1", 0) catch unreachable;
         try std.testing.expectFmt("[::1]:0", "{f}", .{ipv6addr});
 
-        const ipv4addr = net.IpAddress.parseIp("127.0.0.1", 0) catch unreachable;
+        const ipv4addr = net.IpAddress.parse("127.0.0.1", 0) catch unreachable;
         try std.testing.expectFmt("127.0.0.1:0", "{f}", .{ipv4addr});
 
-        try testing.expectError(error.InvalidIpAddressFormat, net.IpAddress.parseIp("::123.123.123.123", 0));
-        try testing.expectError(error.InvalidIpAddressFormat, net.IpAddress.parseIp("127.01.0.1", 0));
-        try testing.expectError(error.InvalidIpAddressFormat, net.IpAddress.resolveIp("::123.123.123.123", 0));
-        try testing.expectError(error.InvalidIpAddressFormat, net.IpAddress.resolveIp("127.01.0.1", 0));
+        try testing.expectError(error.ParseFailed, net.IpAddress.parse("::123.123.123.123", 0));
+        try testing.expectError(error.ParseFailed, net.IpAddress.parse("127.01.0.1", 0));
+        try testing.expectError(error.ParseFailed, net.IpAddress.resolveIp("::123.123.123.123", 0));
+        try testing.expectError(error.ParseFailed, net.IpAddress.resolveIp("127.01.0.1", 0));
     }
 }
 
@@ -161,8 +161,8 @@ test "resolve DNS" {
 
     // Resolve localhost, this should not fail.
     {
-        const localhost_v4 = try net.IpAddress.parseIp("127.0.0.1", 80);
-        const localhost_v6 = try net.IpAddress.parseIp("::2", 80);
+        const localhost_v4 = try net.IpAddress.parse("127.0.0.1", 80);
+        const localhost_v6 = try net.IpAddress.parse("::2", 80);
 
         const result = try net.getAddressList(testing.allocator, "localhost", 80);
         defer result.deinit();
@@ -198,7 +198,7 @@ test "listen on a port, send bytes, receive bytes" {
 
     // Try only the IPv4 variant as some CI builders have no IPv6 localhost
     // configured.
-    const localhost = try net.IpAddress.parseIp("127.0.0.1", 0);
+    const localhost = try net.IpAddress.parse("127.0.0.1", 0);
 
     var server = try localhost.listen(.{});
     defer server.deinit();
@@ -232,7 +232,7 @@ test "listen on an in use port" {
         return error.SkipZigTest;
     }
 
-    const localhost = try net.IpAddress.parseIp("127.0.0.1", 0);
+    const localhost = try net.IpAddress.parse("127.0.0.1", 0);
 
     var server1 = try localhost.listen(.{ .reuse_address = true });
     defer server1.deinit();
@@ -351,7 +351,7 @@ test "non-blocking tcp server" {
         return error.SkipZigTest;
     }
 
-    const localhost = try net.IpAddress.parseIp("127.0.0.1", 0);
+    const localhost = try net.IpAddress.parse("127.0.0.1", 0);
     var server = localhost.listen(.{ .force_nonblocking = true });
     defer server.deinit();
 
lib/std/Io/net.zig
@@ -208,6 +208,9 @@ pub const IpAddress = union(enum) {
         /// System-wide limit on the total number of open files has been reached.
         SystemFdQuotaExceeded,
         SocketModeUnsupported,
+        /// One of the `BindOptions` is not supported by the Io
+        /// implementation.
+        OptionUnsupported,
     } || Io.UnexpectedError || Io.Cancelable;
 
     pub const BindOptions = struct {
@@ -228,6 +231,29 @@ pub const IpAddress = union(enum) {
     pub fn bind(address: IpAddress, io: Io, options: BindOptions) BindError!Socket {
         return io.vtable.ipBind(io.userdata, address, options);
     }
+
+    pub const ConnectError = error{
+        AddressInUse,
+        AddressUnavailable,
+        AddressFamilyUnsupported,
+        ConnectionPending,
+        ConnectionRefused,
+        ConnectionResetByPeer,
+        AlreadyConnected,
+        HostUnreachable,
+        NetworkUnreachable,
+        ConnectionTimedOut,
+        /// One of the `ConnectOptions` is not supported by the Io
+        /// implementation.
+        OptionUnsupported,
+    } || Io.UnexpectedError || Io.Cancelable;
+
+    pub const ConnectOptions = BindOptions;
+
+    /// Initiates a connection-oriented network stream.
+    pub fn connect(address: IpAddress, io: Io, options: ConnectOptions) ConnectError!Stream {
+        return io.vtable.ipConnect(io.userdata, address, options);
+    }
 };
 
 /// An IPv4 address in binary memory layout.
@@ -758,14 +784,6 @@ pub const SendFlags = packed struct(u8) {
     _: u3 = 0,
 };
 
-pub const SendResult = union(enum) {
-    success,
-    fail: struct {
-        err: Socket.SendError,
-        sent: usize,
-    },
-};
-
 pub const Interface = struct {
     /// Value 0 indicates `none`.
     index: u32,
@@ -978,8 +996,9 @@ pub const Socket = struct {
 pub const Stream = struct {
     socket: Socket,
 
-    pub fn close(s: Stream, io: Io) void {
-        return io.vtable.netClose(io.userdata, s.socket);
+    pub fn close(s: *Stream, io: Io) void {
+        io.vtable.netClose(io.userdata, s.socket);
+        s.* = undefined;
     }
 
     pub const Reader = struct {
@@ -996,8 +1015,9 @@ pub const Stream = struct {
             SocketUnconnected,
         } || Io.Cancelable || Io.Writer.Error || error{EndOfStream};
 
-        pub fn init(stream: Stream, buffer: []u8) Reader {
+        pub fn init(stream: Stream, io: Io, buffer: []u8) Reader {
             return .{
+                .io = io,
                 .interface = .{
                     .vtable = &.{
                         .stream = streamImpl,
@@ -1043,8 +1063,9 @@ pub const Stream = struct {
             Unexpected,
         } || Io.Cancelable;
 
-        pub fn init(stream: Stream, buffer: []u8) Writer {
+        pub fn init(stream: Stream, io: Io, buffer: []u8) Writer {
             return .{
+                .io = io,
                 .stream = stream,
                 .interface = .{
                     .vtable = &.{ .drain = drain },
@@ -1062,12 +1083,12 @@ pub const Stream = struct {
         }
     };
 
-    pub fn reader(stream: Stream, buffer: []u8) Reader {
-        return .init(stream, buffer);
+    pub fn reader(stream: Stream, io: Io, buffer: []u8) Reader {
+        return .init(stream, io, buffer);
     }
 
-    pub fn writer(stream: Stream, buffer: []u8) Writer {
-        return .init(stream, buffer);
+    pub fn writer(stream: Stream, io: Io, buffer: []u8) Writer {
+        return .init(stream, io, buffer);
     }
 };
 
lib/std/Io/Threaded.zig
@@ -145,8 +145,17 @@ pub fn io(pool: *Pool) Io {
             .fileSeekBy = fileSeekBy,
             .fileSeekTo = fileSeekTo,
 
-            .now = now,
-            .sleep = sleep,
+            .now = switch (builtin.os.tag) {
+                .windows => nowWindows,
+                .wasi => nowWasi,
+                else => nowPosix,
+            },
+            .sleep = switch (builtin.os.tag) {
+                .windows => sleepWindows,
+                .wasi => sleepWasi,
+                .linux => sleepLinux,
+                else => sleepPosix,
+            },
 
             .listen = switch (builtin.os.tag) {
                 .windows => @panic("TODO"),
@@ -160,6 +169,10 @@ pub fn io(pool: *Pool) Io {
                 .windows => @panic("TODO"),
                 else => ipBindPosix,
             },
+            .ipConnect = switch (builtin.os.tag) {
+                .windows => @panic("TODO"),
+                else => ipConnectPosix,
+            },
             .netClose = netClose,
             .netRead = switch (builtin.os.tag) {
                 .windows => @panic("TODO"),
@@ -797,16 +810,15 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
     const dest = iovecs_buffer[0..i];
     assert(dest[0].len > 0);
 
-    if (native_os == .wasi and !builtin.link_libc) {
+    if (native_os == .wasi and !builtin.link_libc) while (true) {
         try pool.checkCancel();
         var nread: usize = undefined;
         switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
             .SUCCESS => return nread,
-            .INTR => unreachable,
-            .INVAL => unreachable,
+            .INTR => continue,
+            .INVAL => |err| return errnoBug(err),
             .FAULT => |err| return errnoBug(err),
-            .AGAIN => unreachable, // currently not support in WASI
-            .BADF => return error.NotOpenForReading, // can be a race condition
+            .BADF => |err| return errnoBug(err),
             .IO => return error.InputOutput,
             .ISDIR => return error.IsDir,
             .NOBUFS => return error.SystemResources,
@@ -817,15 +829,15 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
             .NOTCAPABLE => return error.AccessDenied,
             else => |err| return posix.unexpectedErrno(err),
         }
-    }
+    };
 
     while (true) {
         try pool.checkCancel();
-        const rc = posix.system.readv(file.handle, dest.ptr, dest.len);
+        const rc = posix.system.readv(file.handle, dest.ptr, @intCast(dest.len));
         switch (posix.errno(rc)) {
             .SUCCESS => return @intCast(rc),
             .INTR => continue,
-            .INVAL => unreachable,
+            .INVAL => |err| return errnoBug(err),
             .FAULT => |err| return errnoBug(err),
             .SRCH => return error.ProcessNotFound,
             .AGAIN => return error.WouldBlock,
@@ -845,14 +857,6 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
 fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
 
-    const have_pread_but_not_preadv = switch (native_os) {
-        .windows, .macos, .ios, .watchos, .tvos, .visionos, .haiku, .serenity => true,
-        else => false,
-    };
-    if (have_pread_but_not_preadv) {
-        @compileError("TODO");
-    }
-
     if (is_windows) {
         const DWORD = windows.DWORD;
         const OVERLAPPED = windows.OVERLAPPED;
@@ -907,6 +911,14 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
         return total;
     }
 
+    const have_pread_but_not_preadv = switch (native_os) {
+        .windows, .haiku, .serenity => true,
+        else => false,
+    };
+    if (have_pread_but_not_preadv) {
+        @compileError("TODO");
+    }
+
     var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
     var i: usize = 0;
     for (data) |buf| {
@@ -919,15 +931,15 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
     const dest = iovecs_buffer[0..i];
     assert(dest[0].len > 0);
 
-    if (native_os == .wasi and !builtin.link_libc) {
+    if (native_os == .wasi and !builtin.link_libc) while (true) {
         try pool.checkCancel();
         var nread: usize = undefined;
         switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) {
             .SUCCESS => return nread,
-            .INTR => unreachable,
-            .INVAL => unreachable,
+            .INTR => continue,
+            .INVAL => |err| return errnoBug(err),
             .FAULT => |err| return errnoBug(err),
-            .AGAIN => unreachable,
+            .AGAIN => |err| return errnoBug(err),
             .BADF => return error.NotOpenForReading, // can be a race condition
             .IO => return error.InputOutput,
             .ISDIR => return error.IsDir,
@@ -942,16 +954,16 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
             .NOTCAPABLE => return error.AccessDenied,
             else => |err| return posix.unexpectedErrno(err),
         }
-    }
+    };
 
     const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv;
     while (true) {
         try pool.checkCancel();
-        const rc = preadv_sym(file.handle, dest.ptr, dest.len, @bitCast(offset));
+        const rc = preadv_sym(file.handle, dest.ptr, @intCast(dest.len), @bitCast(offset));
         switch (posix.errno(rc)) {
             .SUCCESS => return @bitCast(rc),
             .INTR => continue,
-            .INVAL => unreachable,
+            .INVAL => |err| return errnoBug(err),
             .FAULT => |err| return errnoBug(err),
             .SRCH => return error.ProcessNotFound,
             .AGAIN => return error.WouldBlock,
@@ -999,7 +1011,7 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posi
     };
 }
 
-fn now(userdata: ?*anyopaque, clock: Io.Timestamp.Clock) Io.Timestamp.Error!i96 {
+fn nowPosix(userdata: ?*anyopaque, clock: Io.Timestamp.Clock) Io.Timestamp.Error!i96 {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     _ = pool;
     const clock_id: posix.clockid_t = clockToPosix(clock);
@@ -1011,7 +1023,35 @@ fn now(userdata: ?*anyopaque, clock: Io.Timestamp.Clock) Io.Timestamp.Error!i96
     }
 }
 
-fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
+fn nowWindows(userdata: ?*anyopaque, clock: Io.Timestamp.Clock) Io.Timestamp.Error!i96 {
+    const pool: *Pool = @ptrCast(@alignCast(userdata));
+    _ = pool;
+    switch (clock) {
+        .realtime => {
+            // RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds
+            // and uses the NTFS/Windows epoch, which is 1601-01-01.
+            return @as(i96, windows.ntdll.RtlGetSystemTimePrecise()) * 100;
+        },
+        .monotonic, .boottime => {
+            // QPC on windows doesn't fail on >= XP/2000 and includes time suspended.
+            return .{ .timestamp = windows.QueryPerformanceCounter() };
+        },
+        .process_cputime_id,
+        .thread_cputime_id,
+        => return error.UnsupportedClock,
+    }
+}
+
+fn nowWasi(userdata: ?*anyopaque, clock: Io.Timestamp.Clock) Io.Timestamp.Error!i96 {
+    const pool: *Pool = @ptrCast(@alignCast(userdata));
+    _ = pool;
+    var ns: std.os.wasi.timestamp_t = undefined;
+    const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns);
+    if (err != .SUCCESS) return error.Unexpected;
+    return ns;
+}
+
+fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     const clock_id: posix.clockid_t = clockToPosix(switch (timeout) {
         .none => .monotonic,
@@ -1041,6 +1081,73 @@ fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
     }
 }
 
+fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
+    const pool: *Pool = @ptrCast(@alignCast(userdata));
+    try pool.checkCancel();
+    const ms = ms: {
+        const duration_and_clock = (try timeout.toDurationFromNow(pool.io())) orelse
+            break :ms std.math.maxInt(windows.DWORD);
+        if (duration_and_clock.clock != .monotonic) return error.UnsupportedClock;
+        break :ms std.math.lossyCast(windows.DWORD, duration_and_clock.duration.toMilliseconds());
+    };
+    windows.kernel32.Sleep(ms);
+}
+
+fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
+    const pool: *Pool = @ptrCast(@alignCast(userdata));
+    try pool.checkCancel();
+
+    const w = std.os.wasi;
+
+    const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(pool.io())) |d| .{
+        .id = clockToWasi(d.clock),
+        .timeout = std.math.lossyCast(u64, d.duration.nanoseconds),
+        .precision = 0,
+        .flags = 0,
+    } else .{
+        .id = .MONOTONIC,
+        .timeout = std.math.maxInt(u64),
+        .precision = 0,
+        .flags = 0,
+    };
+    const in: w.subscription_t = .{
+        .userdata = 0,
+        .u = .{
+            .tag = .CLOCK,
+            .u = .{ .clock = clock },
+        },
+    };
+    var event: w.event_t = undefined;
+    var nevents: usize = undefined;
+    _ = w.poll_oneoff(&in, &event, 1, &nevents);
+}
+
+fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
+    const pool: *Pool = @ptrCast(@alignCast(userdata));
+    const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type;
+    const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type;
+
+    var timespec: posix.timespec = t: {
+        const d = (try timeout.toDurationFromNow(pool.io())) orelse break :t .{
+            .sec = std.math.maxInt(sec_type),
+            .nsec = std.math.maxInt(nsec_type),
+        };
+        if (d.clock != .monotonic) return error.UnsupportedClock;
+        const ns = d.duration.nanoseconds;
+        break :t .{
+            .sec = @intCast(@divFloor(ns, std.time.ns_per_s)),
+            .nsec = @intCast(@mod(ns, std.time.ns_per_s)),
+        };
+    };
+    while (true) {
+        try pool.checkCancel();
+        switch (posix.errno(posix.system.nanosleep(&timespec, &timespec))) {
+            .INTR => continue,
+            else => return, // This prong handles success as well as unexpected errors.
+        }
+    }
+}
+
 fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     _ = pool;
@@ -1091,7 +1198,7 @@ fn listenPosix(
                 errdefer posix.close(fd);
                 if (socket_flags_unsupported) while (true) {
                     try pool.checkCancel();
-                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, posix.FD_CLOEXEC))) {
+                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
                         .SUCCESS => break,
                         .INTR => continue,
                         else => |err| return posix.unexpectedErrno(err),
@@ -1158,6 +1265,37 @@ fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr
     }
 }
 
+fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
+    while (true) {
+        try pool.checkCancel();
+        switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
+            .SUCCESS => return,
+            .INTR => continue,
+            .ADDRINUSE => return error.AddressInUse,
+            .ADDRNOTAVAIL => return error.AddressUnavailable,
+            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
+            .AGAIN, .INPROGRESS => |err| return errnoBug(err),
+            .ALREADY => return error.ConnectionPending,
+            .BADF => |err| return errnoBug(err),
+            .CONNREFUSED => return error.ConnectionRefused,
+            .CONNRESET => return error.ConnectionResetByPeer,
+            .FAULT => |err| return errnoBug(err),
+            .ISCONN => return error.AlreadyConnected,
+            .HOSTUNREACH => return error.HostUnreachable,
+            .NETUNREACH => return error.NetworkUnreachable,
+            .NOTSOCK => |err| return errnoBug(err),
+            .PROTOTYPE => |err| return errnoBug(err),
+            .TIMEDOUT => return error.ConnectionTimedOut,
+            .CONNABORTED => |err| return errnoBug(err),
+            // UNIX socket error codes:
+            .ACCES => |err| return errnoBug(err),
+            .PERM => |err| return errnoBug(err),
+            .NOENT => |err| return errnoBug(err),
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
+}
+
 fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
     while (true) {
         try pool.checkCancel();
@@ -1190,14 +1328,45 @@ fn setSocketOption(pool: *Pool, fd: posix.fd_t, level: i32, opt_name: u32, optio
     }
 }
 
+fn ipConnectPosix(
+    userdata: ?*anyopaque,
+    address: *const Io.net.IpAddress,
+    options: Io.net.IpAddress.BindOptions,
+) Io.net.IpAddress.ConnectError!Io.net.Stream {
+    const pool: *Pool = @ptrCast(@alignCast(userdata));
+    const family = posixAddressFamily(address);
+    const socket_fd = try openSocketPosix(pool, family, options);
+    var storage: PosixAddress = undefined;
+    var addr_len = addressToPosix(address, &storage);
+    try posixConnect(pool, socket_fd, &storage.any, addr_len);
+    try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
+    return .{ .socket = .{
+        .handle = socket_fd,
+        .address = addressFromPosix(&storage),
+    } };
+}
+
 fn ipBindPosix(
     userdata: ?*anyopaque,
-    address: Io.net.IpAddress,
+    address: *const Io.net.IpAddress,
     options: Io.net.IpAddress.BindOptions,
 ) Io.net.IpAddress.BindError!Io.net.Socket {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
+    const family = posixAddressFamily(address);
+    const socket_fd = try openSocketPosix(pool, family, options);
+    errdefer posix.close(socket_fd);
+    var storage: PosixAddress = undefined;
+    var addr_len = addressToPosix(address, &storage);
+    try posixBind(pool, socket_fd, &storage.any, addr_len);
+    try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
+    return .{
+        .handle = socket_fd,
+        .address = addressFromPosix(&storage),
+    };
+}
+
+fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: Io.net.IpAddress.BindOptions) !posix.socket_t {
     const mode = posixSocketMode(options.mode);
-    const family = posixAddressFamily(&address);
     const protocol = posixProtocol(options.protocol);
     const socket_fd = while (true) {
         try pool.checkCancel();
@@ -1209,7 +1378,7 @@ fn ipBindPosix(
                 errdefer posix.close(fd);
                 if (socket_flags_unsupported) while (true) {
                     try pool.checkCancel();
-                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, posix.FD_CLOEXEC))) {
+                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
                         .SUCCESS => break,
                         .INTR => continue,
                         else => |err| return posix.unexpectedErrno(err),
@@ -1229,19 +1398,14 @@ fn ipBindPosix(
             else => |err| return posix.unexpectedErrno(err),
         }
     };
+    errdefer posix.close(socket_fd);
 
     if (options.ip6_only) {
+        if (posix.IPV6 == void) return error.OptionUnsupported;
         try setSocketOption(pool, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
     }
 
-    var storage: PosixAddress = undefined;
-    var addr_len = addressToPosix(&address, &storage);
-    try posixBind(pool, socket_fd, &storage.any, addr_len);
-    try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
-    return .{
-        .handle = socket_fd,
-        .address = addressFromPosix(&storage),
-    };
+    return socket_fd;
 }
 
 const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haiku; // 💩💩
@@ -1264,7 +1428,7 @@ fn acceptPosix(userdata: ?*anyopaque, server: *Io.net.Server) Io.net.Server.Acce
                 errdefer posix.close(fd);
                 if (!have_accept4) while (true) {
                     try pool.checkCancel();
-                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, posix.FD_CLOEXEC))) {
+                    switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
                         .SUCCESS => break,
                         .INTR => continue,
                         else => |err| return posix.unexpectedErrno(err),
@@ -1322,29 +1486,118 @@ fn netSend(
     handle: Io.net.Socket.Handle,
     messages: []Io.net.OutgoingMessage,
     flags: Io.net.SendFlags,
-) Io.net.SendResult {
+) struct { ?Io.net.Socket.SendError, usize } {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
 
-    if (have_sendmmsg) {
-        var i: usize = 0;
-        while (messages.len - i != 0) {
-            i += netSendMany(pool, handle, messages[i..], flags) catch |err| return .{ .fail = .{
-                .err = err,
-                .sent = i,
-            } };
+    const posix_flags: u32 =
+        @as(u32, if (flags.confirm) posix.MSG.CONFIRM else 0) |
+        @as(u32, if (flags.dont_route) posix.MSG.DONTROUTE else 0) |
+        @as(u32, if (flags.eor) posix.MSG.EOR else 0) |
+        @as(u32, if (flags.oob) posix.MSG.OOB else 0) |
+        @as(u32, if (flags.fastopen) posix.MSG.FASTOPEN else 0) |
+        posix.MSG.NOSIGNAL;
+
+    var i: usize = 0;
+    while (messages.len - i != 0) {
+        if (have_sendmmsg) {
+            i += netSendMany(pool, handle, messages[i..], posix_flags) catch |err| return .{ err, i };
+            continue;
         }
-        return .success;
+        netSendOne(pool, handle, &messages[i], posix_flags) catch |err| return .{ err, i };
+        i += 1;
     }
+    return .{ null, i };
+}
 
-    try pool.checkCancel();
-    @panic("TODO");
+fn netSendOne(
+    pool: *Pool,
+    handle: Io.net.Socket.Handle,
+    message: *Io.net.OutgoingMessage,
+    flags: u32,
+) Io.net.Socket.SendError!void {
+    var addr: PosixAddress = undefined;
+    var iovec: posix.iovec = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
+    const msg: posix.msghdr = .{
+        .name = &addr.any,
+        .namelen = addressToPosix(message.address, &addr),
+        .iov = iovec[0..1],
+        .iovlen = 1,
+        .control = @constCast(message.control.ptr),
+        .controllen = message.control.len,
+        .flags = 0,
+    };
+    while (true) {
+        try pool.checkCancel();
+        const rc = posix.system.sendmsg(handle, msg, flags);
+        if (is_windows) {
+            if (rc == windows.ws2_32.SOCKET_ERROR) {
+                switch (windows.ws2_32.WSAGetLastError()) {
+                    .WSAEACCES => return error.AccessDenied,
+                    .WSAEADDRNOTAVAIL => return error.AddressNotAvailable,
+                    .WSAECONNRESET => return error.ConnectionResetByPeer,
+                    .WSAEMSGSIZE => return error.MessageTooBig,
+                    .WSAENOBUFS => return error.SystemResources,
+                    .WSAENOTSOCK => return error.FileDescriptorNotASocket,
+                    .WSAEAFNOSUPPORT => return error.AddressFamilyNotSupported,
+                    .WSAEDESTADDRREQ => unreachable, // A destination address is required.
+                    .WSAEFAULT => unreachable, // The lpBuffers, lpTo, lpOverlapped, lpNumberOfBytesSent, or lpCompletionRoutine parameters are not part of the user address space, or the lpTo parameter is too small.
+                    .WSAEHOSTUNREACH => return error.NetworkUnreachable,
+                    // TODO: WSAEINPROGRESS, WSAEINTR
+                    .WSAEINVAL => unreachable,
+                    .WSAENETDOWN => return error.NetworkSubsystemFailed,
+                    .WSAENETRESET => return error.ConnectionResetByPeer,
+                    .WSAENETUNREACH => return error.NetworkUnreachable,
+                    .WSAENOTCONN => return error.SocketUnconnected,
+                    .WSAESHUTDOWN => unreachable, // The socket has been shut down; it is not possible to WSASendTo on a socket after shutdown has been invoked with how set to SD_SEND or SD_BOTH.
+                    .WSAEWOULDBLOCK => return error.WouldBlock,
+                    .WSANOTINITIALISED => unreachable, // A successful WSAStartup call must occur before using this function.
+                    else => |err| return windows.unexpectedWSAError(err),
+                }
+            } else {
+                message.data_len = @intCast(rc);
+                return;
+            }
+        }
+        switch (posix.errno(rc)) {
+            .SUCCESS => {
+                message.data_len = @intCast(rc);
+                return;
+            },
+            .ACCES => return error.AccessDenied,
+            .AGAIN => return error.WouldBlock,
+            .ALREADY => return error.FastOpenAlreadyInProgress,
+            .BADF => |err| return errnoBug(err),
+            .CONNRESET => return error.ConnectionResetByPeer,
+            .DESTADDRREQ => |err| return errnoBug(err),
+            .FAULT => |err| return errnoBug(err),
+            .INTR => continue,
+            .INVAL => |err| return errnoBug(err),
+            .ISCONN => |err| return errnoBug(err),
+            .MSGSIZE => return error.MessageTooBig,
+            .NOBUFS => return error.SystemResources,
+            .NOMEM => return error.SystemResources,
+            .NOTSOCK => |err| return errnoBug(err),
+            .OPNOTSUPP => |err| return errnoBug(err),
+            .PIPE => return error.BrokenPipe,
+            .AFNOSUPPORT => return error.AddressFamilyNotSupported,
+            .LOOP => return error.SymLinkLoop,
+            .NAMETOOLONG => return error.NameTooLong,
+            .NOENT => return error.FileNotFound,
+            .NOTDIR => return error.NotDir,
+            .HOSTUNREACH => return error.NetworkUnreachable,
+            .NETUNREACH => return error.NetworkUnreachable,
+            .NOTCONN => return error.SocketUnconnected,
+            .NETDOWN => return error.NetworkSubsystemFailed,
+            else => |err| return posix.unexpectedErrno(err),
+        }
+    }
 }
 
 fn netSendMany(
     pool: *Pool,
     handle: Io.net.Socket.Handle,
     messages: []Io.net.OutgoingMessage,
-    flags: Io.net.SendFlags,
+    flags: u32,
 ) Io.net.Socket.SendError!usize {
     var msg_buffer: [64]std.os.linux.mmsghdr = undefined;
     var addr_buffer: [msg_buffer.len]PosixAddress = undefined;
@@ -1371,17 +1624,9 @@ fn netSendMany(
         };
     }
 
-    const posix_flags: u32 =
-        @as(u32, if (flags.confirm) posix.MSG.CONFIRM else 0) |
-        @as(u32, if (flags.dont_route) posix.MSG.DONTROUTE else 0) |
-        @as(u32, if (flags.eor) posix.MSG.EOR else 0) |
-        @as(u32, if (flags.oob) posix.MSG.OOB else 0) |
-        @as(u32, if (flags.fastopen) posix.MSG.FASTOPEN else 0) |
-        posix.MSG.NOSIGNAL;
-
     while (true) {
         try pool.checkCancel();
-        const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), posix_flags);
+        const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), flags);
         switch (posix.errno(rc)) {
             .SUCCESS => {
                 for (clamped_messages[0..rc], clamped_msgs[0..rc]) |*message, *msg| {
@@ -1782,5 +2027,17 @@ fn clockToPosix(clock: Io.Timestamp.Clock) posix.clockid_t {
         .realtime => posix.CLOCK.REALTIME,
         .monotonic => posix.CLOCK.MONOTONIC,
         .boottime => posix.CLOCK.BOOTTIME,
+        .process_cputime_id => posix.CLOCK.PROCESS_CPUTIME_ID,
+        .thread_cputime_id => posix.CLOCK.THREAD_CPUTIME_ID,
+    };
+}
+
+fn clockToWasi(clock: Io.Timestamp.Clock) std.os.wasi.clockid_t {
+    return switch (clock) {
+        .realtime => .REALTIME,
+        .monotonic => .MONOTONIC,
+        .boottime => .MONOTONIC,
+        .process_cputime_id => .PROCESS_CPUTIME_ID,
+        .thread_cputime_id => .THREAD_CPUTIME_ID,
     };
 }
lib/std/posix/test.zig
@@ -637,7 +637,7 @@ test "shutdown socket" {
         error.SocketUnconnected => {},
         else => |e| return e,
     };
-    std.net.Stream.close(.{ .handle = sock });
+    std.posix.close(sock);
 }
 
 test "sigrtmin/max" {
lib/std/Io.zig
@@ -670,8 +670,9 @@ pub const VTable = struct {
 
     listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
     accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Stream,
-    ipBind: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
-    netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) net.SendResult,
+    ipBind: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
+    ipConnect: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream,
+    netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
     netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
     netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize,
     netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
@@ -710,10 +711,14 @@ pub const Timestamp = struct {
         /// time (e.g., if the system administrator manually changes the
         /// clock), and by frequency adjust‐ ments performed by NTP and similar
         /// applications.
-        /// This clock normally counts the number of seconds since
-        /// 1970-01-01 00:00:00 Coordinated Universal Time (UTC) except that it
-        /// ignores leap seconds; near a leap second it is typically
-        /// adjusted by NTP to stay roughly in sync with UTC.
+        ///
+        /// This clock normally counts the number of seconds since 1970-01-01
+        /// 00:00:00 Coordinated Universal Time (UTC) except that it ignores
+        /// leap seconds; near a leap second it is typically adjusted by NTP to
+        /// stay roughly in sync with UTC.
+        ///
+        /// The epoch is implementation-defined. For example NTFS/Windows uses
+        /// 1601-01-01.
         realtime,
         /// A nonsettable system-wide clock that represents time since some
         /// unspecified point in the past.
@@ -729,10 +734,16 @@ pub const Timestamp = struct {
         /// Guarantees that the time returned by consecutive calls will not go
         /// backwards, but successive calls may return identical
         /// (not-increased) time values.
+        ///
+        /// May or may not include time the system is suspended, but
+        /// implementations should exclude that time if possible.
         monotonic,
         /// Identical to `monotonic` except it also includes any time that the
-        /// system is suspended.
+        /// system is suspended, if possible. However, it may be implemented
+        /// identically to `monotonic`.
         boottime,
+        process_cputime_id,
+        thread_cputime_id,
     };
 
     pub fn durationTo(from: Timestamp, to: Timestamp) Duration {
@@ -791,6 +802,12 @@ pub const Timestamp = struct {
 pub const Duration = struct {
     nanoseconds: i96,
 
+    pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) };
+
+    pub fn fromNanoseconds(x: i96) Duration {
+        return .{ .nanoseconds = x };
+    }
+
     pub fn fromMilliseconds(x: i64) Duration {
         return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_ms };
     }
@@ -806,6 +823,10 @@ pub const Duration = struct {
     pub fn toSeconds(d: Duration) i64 {
         return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_s));
     }
+
+    pub fn sleep(duration: Duration, io: Io) SleepError!void {
+        return io.vtable.sleep(io.userdata, .{ .duration = .{ .duration = duration, .clock = .monotonic } });
+    }
 };
 
 /// Declares under what conditions an operation should return `error.Timeout`.
@@ -828,6 +849,18 @@ pub const Timeout = union(enum) {
             .deadline => |d| d,
         };
     }
+
+    pub fn toDurationFromNow(t: Timeout, io: Io) Timestamp.Error!?ClockAndDuration {
+        return switch (t) {
+            .none => null,
+            .duration => |d| d,
+            .deadline => |d| .{ .clock = d.clock, .duration = try d.durationFromNow(io) },
+        };
+    }
+
+    pub fn sleep(timeout: Timeout, io: Io) SleepError!void {
+        return io.vtable.sleep(io.userdata, timeout);
+    }
 };
 
 pub const AnyFuture = opaque {};
@@ -1322,14 +1355,6 @@ pub fn cancelRequested(io: Io) bool {
 
 pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
 
-pub fn sleep(io: Io, timeout: Timeout) SleepError!void {
-    return io.vtable.sleep(io.userdata, timeout);
-}
-
-pub fn sleepDuration(io: Io, duration: Duration) SleepError!void {
-    return io.vtable.sleep(io.userdata, .MONOTONIC, .{ .duration = duration });
-}
-
 /// Given a struct with each field a `*Future`, returns a union with the same
 /// fields, each field type the future's result.
 pub fn SelectUnion(S: type) type {
lib/std/Thread.zig
@@ -73,10 +73,7 @@ pub const ResetEvent = enum(u32) {
     /// timedWait() returns without error.
     pub fn timedWait(re: *ResetEvent, timeout_ns: u64) error{Timeout}!void {
         if (builtin.single_threaded) switch (re.*) {
-            .unset => {
-                sleep(timeout_ns);
-                return error.Timeout;
-            },
+            .unset => return error.Timeout,
             .waiting => unreachable, // Invalid state.
             .is_set => return,
         };
@@ -142,82 +139,6 @@ pub const ResetEvent = enum(u32) {
     }
 };
 
-/// Spurious wakeups are possible and no precision of timing is guaranteed.
-pub fn sleep(nanoseconds: u64) void {
-    if (builtin.os.tag == .windows) {
-        const big_ms_from_ns = nanoseconds / std.time.ns_per_ms;
-        const ms = math.cast(windows.DWORD, big_ms_from_ns) orelse math.maxInt(windows.DWORD);
-        windows.kernel32.Sleep(ms);
-        return;
-    }
-
-    if (builtin.os.tag == .wasi) {
-        const w = std.os.wasi;
-        const userdata: w.userdata_t = 0x0123_45678;
-        const clock: w.subscription_clock_t = .{
-            .id = .MONOTONIC,
-            .timeout = nanoseconds,
-            .precision = 0,
-            .flags = 0,
-        };
-        const in: w.subscription_t = .{
-            .userdata = userdata,
-            .u = .{
-                .tag = .CLOCK,
-                .u = .{ .clock = clock },
-            },
-        };
-
-        var event: w.event_t = undefined;
-        var nevents: usize = undefined;
-        _ = w.poll_oneoff(&in, &event, 1, &nevents);
-        return;
-    }
-
-    if (builtin.os.tag == .uefi) {
-        const boot_services = std.os.uefi.system_table.boot_services.?;
-        const us_from_ns = nanoseconds / std.time.ns_per_us;
-        const us = math.cast(usize, us_from_ns) orelse math.maxInt(usize);
-        boot_services.stall(us) catch unreachable;
-        return;
-    }
-
-    const s = nanoseconds / std.time.ns_per_s;
-    const ns = nanoseconds % std.time.ns_per_s;
-
-    // Newer kernel ports don't have old `nanosleep()` and `clock_nanosleep()` has been around
-    // since Linux 2.6 and glibc 2.1 anyway.
-    if (builtin.os.tag == .linux) {
-        const linux = std.os.linux;
-
-        var req: linux.timespec = .{
-            .sec = std.math.cast(linux.time_t, s) orelse std.math.maxInt(linux.time_t),
-            .nsec = std.math.cast(linux.time_t, ns) orelse std.math.maxInt(linux.time_t),
-        };
-        var rem: linux.timespec = undefined;
-
-        while (true) {
-            switch (linux.E.init(linux.clock_nanosleep(.MONOTONIC, .{ .ABSTIME = false }, &req, &rem))) {
-                .SUCCESS => return,
-                .INTR => {
-                    req = rem;
-                    continue;
-                },
-                .FAULT => unreachable,
-                .INVAL => unreachable,
-                .OPNOTSUPP => unreachable,
-                else => return,
-            }
-        }
-    }
-
-    posix.nanosleep(s, ns);
-}
-
-test sleep {
-    sleep(1);
-}
-
 const Thread = @This();
 const Impl = if (native_os == .windows)
     WindowsThreadImpl
lib/std/time.zig
@@ -8,74 +8,6 @@ const posix = std.posix;
 
 pub const epoch = @import("time/epoch.zig");
 
-/// Get a calendar timestamp, in seconds, relative to UTC 1970-01-01.
-/// Precision of timing depends on the hardware and operating system.
-/// The return value is signed because it is possible to have a date that is
-/// before the epoch.
-/// See `posix.clock_gettime` for a POSIX timestamp.
-pub fn timestamp() i64 {
-    return @divFloor(milliTimestamp(), ms_per_s);
-}
-
-/// Get a calendar timestamp, in milliseconds, relative to UTC 1970-01-01.
-/// Precision of timing depends on the hardware and operating system.
-/// The return value is signed because it is possible to have a date that is
-/// before the epoch.
-/// See `posix.clock_gettime` for a POSIX timestamp.
-pub fn milliTimestamp() i64 {
-    return @as(i64, @intCast(@divFloor(nanoTimestamp(), ns_per_ms)));
-}
-
-/// Get a calendar timestamp, in microseconds, relative to UTC 1970-01-01.
-/// Precision of timing depends on the hardware and operating system.
-/// The return value is signed because it is possible to have a date that is
-/// before the epoch.
-/// See `posix.clock_gettime` for a POSIX timestamp.
-pub fn microTimestamp() i64 {
-    return @as(i64, @intCast(@divFloor(nanoTimestamp(), ns_per_us)));
-}
-
-/// Get a calendar timestamp, in nanoseconds, relative to UTC 1970-01-01.
-/// Precision of timing depends on the hardware and operating system.
-/// On Windows this has a maximum granularity of 100 nanoseconds.
-/// The return value is signed because it is possible to have a date that is
-/// before the epoch.
-/// See `posix.clock_gettime` for a POSIX timestamp.
-pub fn nanoTimestamp() i128 {
-    switch (builtin.os.tag) {
-        .windows => {
-            // RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds and uses the NTFS/Windows epoch,
-            // which is 1601-01-01.
-            const epoch_adj = epoch.windows * (ns_per_s / 100);
-            return @as(i128, windows.ntdll.RtlGetSystemTimePrecise() + epoch_adj) * 100;
-        },
-        .wasi => {
-            var ns: std.os.wasi.timestamp_t = undefined;
-            const err = std.os.wasi.clock_time_get(.REALTIME, 1, &ns);
-            assert(err == .SUCCESS);
-            return ns;
-        },
-        .uefi => {
-            const value, _ = std.os.uefi.system_table.runtime_services.getTime() catch return 0;
-            return value.toEpoch();
-        },
-        else => {
-            const ts = posix.clock_gettime(.REALTIME) catch |err| switch (err) {
-                error.UnsupportedClock, error.Unexpected => return 0, // "Precision of timing depends on hardware and OS".
-            };
-            return (@as(i128, ts.sec) * ns_per_s) + ts.nsec;
-        },
-    }
-}
-
-test milliTimestamp {
-    const time_0 = milliTimestamp();
-    std.Thread.sleep(ns_per_ms);
-    const time_1 = milliTimestamp();
-    const interval = time_1 - time_0;
-    try testing.expect(interval > 0);
-}
-
 // Divisions of a nanosecond.
 pub const ns_per_us = 1000;
 pub const ns_per_ms = 1000 * ns_per_us;
@@ -268,9 +200,11 @@ pub const Timer = struct {
 };
 
 test Timer {
+    const io = std.testing.io;
+
     var timer = try Timer.start();
 
-    std.Thread.sleep(10 * ns_per_ms);
+    try std.Io.Duration.sleep(.fromMilliseconds(10), io);
     const time_0 = timer.read();
     try testing.expect(time_0 > 0);