Commit b22400271f

Andrew Kelley <andrew@ziglang.org>
2025-09-30 08:27:00
std.Io.net.HostName: finish implementing DNS lookup
1 parent 2e1ab5d
Changed files (4)
lib/std/Io/net/HostName.zig
@@ -46,15 +46,13 @@ pub const LookupOptions = struct {
     family: ?IpAddress.Family = null,
 };
 
-pub const LookupError = Io.Cancelable || Io.File.OpenError || Io.File.Reader.Error || error{
+pub const LookupError = error{
     UnknownHostName,
     ResolvConfParseFailed,
-    // TODO remove from error set; retry a few times then report a different error
-    TemporaryNameServerFailure,
     InvalidDnsARecord,
     InvalidDnsAAAARecord,
     NameServerFailure,
-};
+} || Io.NowError || IpAddress.BindError || Io.File.OpenError || Io.File.Reader.Error || Io.Cancelable;
 
 pub const LookupResult = struct {
     /// How many `LookupOptions.addresses_buffer` elements are populated.
@@ -185,7 +183,7 @@ fn sortLookupResults(options: LookupOptions, result: LookupResult) !LookupResult
     return result;
 }
 
-fn lookupDnsSearch(host_name: HostName, io: Io, options: LookupOptions) !LookupResult {
+fn lookupDnsSearch(host_name: HostName, io: Io, options: LookupOptions) LookupError!LookupResult {
     const rc = ResolvConf.init(io) catch return error.ResolvConfParseFailed;
 
     // Count dots, suppress search when >=ndots or name ends in
@@ -218,19 +216,17 @@ fn lookupDnsSearch(host_name: HostName, io: Io, options: LookupOptions) !LookupR
     return lookupDns(io, lookup_canon_name, &rc, options);
 }
 
-const DnsReply = struct {
-    buf: [512]u8,
-    len: usize,
-};
-
-fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, options: LookupOptions) !LookupResult {
+fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, options: LookupOptions) LookupError!LookupResult {
     const family_records: [2]struct { af: IpAddress.Family, rr: u8 } = .{
         .{ .af = .ip6, .rr = std.posix.RR.A },
         .{ .af = .ip4, .rr = std.posix.RR.AAAA },
     };
     var query_buffers: [2][280]u8 = undefined;
+    var answer_buffers: [2][512]u8 = undefined;
     var queries_buffer: [2][]const u8 = undefined;
+    var answers_buffer: [2][]const u8 = undefined;
     var nq: usize = 0;
+    var next_answer_buffer: usize = 0;
 
     for (family_records) |fr| {
         if (options.family != fr.af) {
@@ -241,41 +237,123 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio
         }
     }
 
+    var ip4_mapped: [ResolvConf.max_nameservers]IpAddress = undefined;
+    var any_ip6 = false;
+    for (rc.nameservers(), &ip4_mapped) |*ns, *m| {
+        m.* = .{ .ip6 = .fromAny(ns.*) };
+        any_ip6 = any_ip6 or ns.* == .ip6;
+    }
+    var socket = s: {
+        if (any_ip6) ip6: {
+            const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) };
+            const socket = ip6_addr.bind(io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) {
+                error.AddressFamilyUnsupported => break :ip6,
+                else => |e| return e,
+            };
+            break :s socket;
+        }
+        any_ip6 = false;
+        const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) };
+        const socket = try ip4_addr.bind(io, .{ .mode = .dgram });
+        break :s socket;
+    };
+    defer socket.close(io);
+
+    const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers();
     const queries = queries_buffer[0..nq];
-    var replies_buffer: [2]DnsReply = undefined;
-    var replies: Io.Queue(DnsReply) = .init(&replies_buffer);
-    try rc.sendMessage(io, queries, &replies);
-
-    for (replies) |reply| {
-        if (reply.len < 4 or (reply[3] & 15) == 2) return error.TemporaryNameServerFailure;
-        if ((reply[3] & 15) == 3) return .empty;
-        if ((reply[3] & 15) != 0) return error.UnknownHostName;
+    const answers = answers_buffer[0..queries.len];
+    for (answers) |*answer| answer.len = 0;
+
+    var now_ts = try io.now(.MONOTONIC);
+    const final_ts = now_ts.addDuration(.seconds(rc.timeout_seconds));
+    const attempt_duration: Io.Duration = .{
+        .nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts,
+    };
+
+    send: while (now_ts.compare(.lt, final_ts)) : (now_ts = try io.now(.MONOTONIC)) {
+        var group: Io.Group = .init;
+        defer group.cancel(io);
+
+        for (queries, answers) |query, *answer| {
+            if (answer.len != 0) continue;
+            for (mapped_nameservers) |*ns| {
+                group.async(io, sendIgnoringResult, .{ io, socket.handle, ns, query });
+            }
+        }
+
+        const timeout: Io.Timeout = .{ .deadline = now_ts.addDuration(attempt_duration) };
+
+        while (true) {
+            const buf = &answer_buffers[next_answer_buffer];
+            const reply = socket.receiveTimeout(io, buf, timeout) catch |err| switch (err) {
+                error.Canceled => return error.Canceled,
+                error.Timeout => continue :send,
+                else => continue,
+            };
+
+            // Ignore non-identifiable packets.
+            if (reply.len < 4) continue;
+
+            // Ignore replies from addresses we didn't send to.
+            const ns = for (mapped_nameservers) |*ns| {
+                if (reply.from.eql(ns)) break ns;
+            } else {
+                continue;
+            };
+
+            const reply_msg = buf[0..reply.len];
+
+            // Find which query this answer goes with, if any.
+            const query, const answer = for (queries, answers) |query, *answer| {
+                if (reply_msg[0] == query[0] and reply_msg[1] == query[1]) break .{ query, answer };
+            } else {
+                continue;
+            };
+            if (answer.len != 0) continue;
+
+            // Only accept positive or negative responses; retry immediately on
+            // server failure, and ignore all other codes such as refusal.
+            switch (reply_msg[3] & 15) {
+                0, 3 => {
+                    answer.* = reply_msg;
+                    next_answer_buffer += 1;
+                    if (next_answer_buffer == answers.len) break :send;
+                },
+                2 => {
+                    group.async(io, sendIgnoringResult, .{ io, socket.handle, ns, query });
+                    continue;
+                },
+                else => continue,
+            }
+        }
+    } else {
+        return error.NameServerFailure;
     }
 
     var addresses_len: usize = 0;
     var canonical_name: ?HostName = null;
 
-    for (replies) |reply| {
-        var it = DnsResponse.init(reply) catch {
+    for (answers) |answer| {
+        var it = DnsResponse.init(answer) catch {
             // TODO accept a diagnostics struct and append warnings
             continue;
         };
         while (it.next() catch {
             // TODO accept a diagnostics struct and append warnings
             continue;
-        }) |answer| switch (answer.rr) {
+        }) |record| switch (record.rr) {
             std.posix.RR.A => {
-                if (answer.data.len != 4) return error.InvalidDnsARecord;
+                if (record.data.len != 4) return error.InvalidDnsARecord;
                 options.addresses_buffer[addresses_len] = .{ .ip4 = .{
-                    .bytes = answer.data[0..4].*,
+                    .bytes = record.data[0..4].*,
                     .port = options.port,
                 } };
                 addresses_len += 1;
             },
             std.posix.RR.AAAA => {
-                if (answer.data.len != 16) return error.InvalidDnsAAAARecord;
+                if (record.data.len != 16) return error.InvalidDnsAAAARecord;
                 options.addresses_buffer[addresses_len] = .{ .ip6 = .{
-                    .bytes = answer.data[0..16].*,
+                    .bytes = record.data[0..16].*,
                     .port = options.port,
                 } };
                 addresses_len += 1;
@@ -285,7 +363,7 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio
                 @panic("TODO");
                 //var tmp: [256]u8 = undefined;
                 //// Returns len of compressed name. strlen to get canon name.
-                //_ = try posix.dn_expand(packet, answer.data, &tmp);
+                //_ = try posix.dn_expand(packet, record.data, &tmp);
                 //const canon_name = mem.sliceTo(&tmp, 0);
                 //if (isValidHostName(canon_name)) {
                 //    ctx.canon.items.len = 0;
@@ -304,6 +382,10 @@ fn lookupDns(io: Io, lookup_canon_name: []const u8, rc: *const ResolvConf, optio
     return error.NameServerFailure;
 }
 
+fn sendIgnoringResult(io: Io, socket_handle: Io.net.Socket.Handle, dest: *const IpAddress, msg: []const u8) void {
+    _ = io.vtable.netSend(io.userdata, socket_handle, dest, msg) catch {};
+}
+
 fn lookupHosts(host_name: HostName, io: Io, options: LookupOptions) !LookupResult {
     const file = Io.File.openAbsolute(io, "/etc/hosts", .{}) catch |err| switch (err) {
         error.FileNotFound,
@@ -523,7 +605,7 @@ pub fn connectTcp(host_name: HostName, io: Io, port: u16) ConnectTcpError!Stream
 pub const ResolvConf = struct {
     attempts: u32,
     ndots: u32,
-    timeout: Io.Duration,
+    timeout_seconds: u32,
     nameservers_buffer: [max_nameservers]IpAddress,
     nameservers_len: usize,
     search_buffer: [max_len]u8,
@@ -539,7 +621,7 @@ pub const ResolvConf = struct {
             .search_buffer = undefined,
             .search_len = 0,
             .ndots = 1,
-            .timeout = .seconds(5),
+            .timeout_seconds = 5,
             .attempts = 2,
         };
 
@@ -589,7 +671,7 @@ pub const ResolvConf = struct {
                     switch (std.meta.stringToEnum(Option, name) orelse continue) {
                         .ndots => rc.ndots = @min(value, 15),
                         .attempts => rc.attempts = @min(value, 10),
-                        .timeout => rc.timeout = .seconds(@min(value, 60)),
+                        .timeout => rc.timeout_seconds = @min(value, 60),
                     }
                 },
                 .nameserver => {
@@ -621,68 +703,6 @@ pub const ResolvConf = struct {
     fn nameservers(rc: *const ResolvConf) []const IpAddress {
         return rc.nameservers_buffer[0..rc.nameservers_len];
     }
-
-    fn sendMessage(
-        rc: *const ResolvConf,
-        io: Io,
-        queries: []const []const u8,
-        replies: *Io.Queue(DnsReply),
-    ) !void {
-        var ip4_mapped: [ResolvConf.max_nameservers]IpAddress = undefined;
-        var any_ip6 = false;
-        for (rc.nameservers(), &ip4_mapped) |*ns, *m| {
-            m.* = .{ .ip6 = .fromAny(ns.*) };
-            any_ip6 = any_ip6 or ns.* == .ip6;
-        }
-
-        const socket = s: {
-            if (any_ip6) ip6: {
-                const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) };
-                const socket = ip6_addr.bind(io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) {
-                    error.AddressFamilyUnsupported => break :ip6,
-                    else => |e| return e,
-                };
-                break :s socket;
-            }
-            any_ip6 = false;
-            const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) };
-            const socket = try ip4_addr.bind(io, .{ .mode = .dgram });
-            break :s socket;
-        };
-        defer socket.close();
-
-        const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers();
-
-        var group: Io.Group = .init;
-        defer group.cancel();
-
-        for (queries) |query| {
-            for (mapped_nameservers) |*ns| {
-                group.async(sendOneMessage, .{ io, query, ns });
-            }
-        }
-
-        const deadline: Io.Deadline = .fromDuration(rc.timeout);
-
-        for (0..queries.len) |_| {
-            const msg = socket.receiveDeadline(deadline) catch |err| switch (err) {
-                error.Timeout => return error.Timeout,
-                error.Canceled => return error.Canceled,
-                else => continue,
-            };
-            _ = msg;
-            _ = replies;
-            @panic("TODO check msg for dns reply and put into replies queue");
-        }
-    }
-
-    fn sendOneMessage(
-        io: Io,
-        query: []const u8,
-        ns: *const IpAddress,
-    ) void {
-        io.vtable.netSend(io.userdata, ns.*, &.{query}) catch |err| switch (err) {};
-    }
 };
 
 test ResolvConf {
@@ -702,7 +722,7 @@ test ResolvConf {
         .search_buffer = undefined,
         .search_len = 0,
         .ndots = 1,
-        .timeout = .seconds(5),
+        .timeout_seconds = 5,
         .attempts = 2,
     };
 
lib/std/Io/net.zig
@@ -134,13 +134,13 @@ pub const IpAddress = union(enum) {
         }
     }
 
-    pub fn eql(a: IpAddress, b: IpAddress) bool {
-        return switch (a) {
-            .ip4 => |a_ip4| switch (b) {
+    pub fn eql(a: *const IpAddress, b: *const IpAddress) bool {
+        return switch (a.*) {
+            .ip4 => |a_ip4| switch (b.*) {
                 .ip4 => |b_ip4| a_ip4.eql(b_ip4),
                 else => false,
             },
-            .ip6 => |a_ip6| switch (b) {
+            .ip6 => |a_ip6| switch (b.*) {
                 .ip6 => |b_ip6| a_ip6.eql(b_ip6),
                 else => false,
             },
@@ -695,6 +695,11 @@ pub const Ip6Address = struct {
     };
 };
 
+pub const ReceivedMessage = struct {
+    from: IpAddress,
+    len: usize,
+};
+
 pub const Interface = struct {
     /// Value 0 indicates `none`.
     index: u32,
@@ -816,14 +821,31 @@ pub const Socket = struct {
         return io.vtable.netSend(io.userdata, s.handle, dest, data);
     }
 
-    pub const ReceiveError = error{} || Io.Cancelable;
+    pub const ReceiveError = error{} || Io.UnexpectedError || Io.Cancelable;
 
-    /// Transfers `data` from `source`, connectionless.
+    /// Waits for data. Connectionless.
     ///
-    /// Returned slice has same pointer as `buffer` with possibly shorter length.
-    pub fn receive(s: *const Socket, io: Io, source: *const IpAddress, buffer: []u8) ReceiveError![]u8 {
-        const n = try io.vtable.netReceive(io.userdata, s.handle, source, buffer);
-        return buffer[0..n];
+    /// See also:
+    /// * `receiveTimeout`
+    pub fn receive(s: *const Socket, io: Io, source: *const IpAddress, buffer: []u8) ReceiveError!ReceivedMessage {
+        return io.vtable.netReceive(io.userdata, s.handle, source, buffer, .none);
+    }
+
+    pub const ReceiveTimeoutError = ReceiveError || Io.Timeout.Error;
+
+    /// Waits for data. Connectionless.
+    ///
+    /// Returns `error.Timeout` if no message arrives early enough.
+    ///
+    /// See also:
+    /// * `receive`
+    pub fn receiveTimeout(
+        s: *const Socket,
+        io: Io,
+        buffer: []u8,
+        timeout: Io.Timeout,
+    ) ReceiveTimeoutError!ReceivedMessage {
+        return io.vtable.netReceive(io.userdata, s.handle, buffer, timeout);
     }
 };
 
lib/std/Io/Threaded.zig
@@ -463,16 +463,19 @@ fn groupAsync(
         },
         .pool = pool,
         .group = group,
-        .node = .{ .next = @ptrCast(@alignCast(group.token)) },
+        .node = undefined,
         .func = start,
         .context_alignment = context_alignment,
         .context_len = context.len,
     };
-    group.token = &gc.node;
     @memcpy(gc.contextPointer()[0..context.len], context);
 
     pool.mutex.lock();
 
+    // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
+    gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
+    group.token = &gc.node;
+
     const thread_capacity = cpu_count - 1 + pool.concurrent_count;
 
     pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
@@ -493,6 +496,8 @@ fn groupAsync(
         pool.threads.appendAssumeCapacity(thread);
     }
 
+    // This needs to be done before unlocking the mutex to avoid a race with
+    // the associated task finishing.
     const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
     std.Thread.WaitGroup.startStateless(group_state);
 
@@ -500,21 +505,16 @@ fn groupAsync(
     pool.cond.signal();
 }
 
-fn groupWait(userdata: ?*anyopaque, group: *Io.Group) void {
-    if (builtin.single_threaded) return;
+fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     _ = pool;
+
+    if (builtin.single_threaded) return;
+
     const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
     const reset_event: *ResetEvent = @ptrCast(&group.context);
     std.Thread.WaitGroup.waitStateless(group_state, reset_event);
-}
 
-fn groupCancel(userdata: ?*anyopaque, group: *Io.Group) void {
-    if (builtin.single_threaded) return;
-    const pool: *Pool = @ptrCast(@alignCast(userdata));
-    _ = pool;
-    const token = group.token.?;
-    group.token = null;
     var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
     while (true) {
         const gc: *GroupClosure = @fieldParentPtr("node", node);
@@ -523,6 +523,36 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group) void {
     }
 }
 
+fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
+    const pool: *Pool = @ptrCast(@alignCast(userdata));
+    const gpa = pool.allocator;
+
+    if (builtin.single_threaded) return;
+
+    {
+        var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
+        while (true) {
+            const gc: *GroupClosure = @fieldParentPtr("node", node);
+            gc.closure.requestCancel();
+            node = node.next orelse break;
+        }
+    }
+
+    const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
+    const reset_event: *ResetEvent = @ptrCast(&group.context);
+    std.Thread.WaitGroup.waitStateless(group_state, reset_event);
+
+    {
+        var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
+        while (true) {
+            const gc: *GroupClosure = @fieldParentPtr("node", node);
+            const node_next = node.next;
+            gc.free(gpa);
+            node = node_next orelse break;
+        }
+    }
+}
+
 fn await(
     userdata: ?*anyopaque,
     any_future: *Io.AnyFuture,
@@ -774,7 +804,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
             .SUCCESS => return nread,
             .INTR => unreachable,
             .INVAL => unreachable,
-            .FAULT => unreachable,
+            .FAULT => |err| return errnoBug(err),
             .AGAIN => unreachable, // currently not support in WASI
             .BADF => return error.NotOpenForReading, // can be a race condition
             .IO => return error.InputOutput,
@@ -796,7 +826,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
             .SUCCESS => return @intCast(rc),
             .INTR => continue,
             .INVAL => unreachable,
-            .FAULT => unreachable,
+            .FAULT => |err| return errnoBug(err),
             .SRCH => return error.ProcessNotFound,
             .AGAIN => return error.WouldBlock,
             .BADF => return error.NotOpenForReading, // can be a race condition
@@ -896,7 +926,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
             .SUCCESS => return nread,
             .INTR => unreachable,
             .INVAL => unreachable,
-            .FAULT => unreachable,
+            .FAULT => |err| return errnoBug(err),
             .AGAIN => unreachable,
             .BADF => return error.NotOpenForReading, // can be a race condition
             .IO => return error.InputOutput,
@@ -922,7 +952,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
             .SUCCESS => return @bitCast(rc),
             .INTR => continue,
             .INVAL => unreachable,
-            .FAULT => unreachable,
+            .FAULT => |err| return errnoBug(err),
             .SRCH => return error.ProcessNotFound,
             .AGAIN => return error.WouldBlock,
             .BADF => return error.NotOpenForReading, // can be a race condition
@@ -969,18 +999,19 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posi
     };
 }
 
-fn now(userdata: ?*anyopaque, clockid: posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
+fn now(userdata: ?*anyopaque, clockid: posix.clockid_t) Io.NowError!Io.Timestamp {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     try pool.checkCancel();
     const timespec = try posix.clock_gettime(clockid);
     return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
 }
 
-fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
+fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, timeout: Io.Timeout) Io.SleepError!void {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
-    const deadline_nanoseconds: i96 = switch (deadline) {
+    const deadline_nanoseconds: i96 = switch (timeout) {
+        .none => std.math.maxInt(i96),
         .duration => |duration| duration.nanoseconds,
-        .timestamp => |timestamp| @intFromEnum(timestamp),
+        .deadline => |deadline| @intFromEnum(deadline),
     };
     var timespec: posix.timespec = .{
         .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
@@ -988,12 +1019,12 @@ fn sleep(userdata: ?*anyopaque, clockid: posix.clockid_t, deadline: Io.Deadline)
     };
     while (true) {
         try pool.checkCancel();
-        switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (deadline) {
-            .duration => false,
-            .timestamp => true,
+        switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (timeout) {
+            .none, .duration => false,
+            .deadline => true,
         } }, &timespec, &timespec))) {
             .SUCCESS => return,
-            .FAULT => unreachable,
+            .FAULT => |err| return errnoBug(err),
             .INTR => {},
             .INVAL => return error.UnsupportedClock,
             else => |err| return posix.unexpectedErrno(err),
@@ -1278,7 +1309,7 @@ fn netReadPosix(userdata: ?*anyopaque, stream: Io.net.Stream, data: [][]u8) Io.n
 fn netSend(
     userdata: ?*anyopaque,
     handle: Io.net.Socket.Handle,
-    address: Io.net.IpAddress,
+    address: *const Io.net.IpAddress,
     data: []const u8,
 ) Io.net.Socket.SendError!void {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
@@ -1293,15 +1324,15 @@ fn netSend(
 fn netReceive(
     userdata: ?*anyopaque,
     handle: Io.net.Socket.Handle,
-    address: Io.net.IpAddress,
     buffer: []u8,
-) Io.net.Socket.ReceiveError!void {
+    timeout: Io.Timeout,
+) Io.net.Socket.ReceiveTimeoutError!Io.net.ReceivedMessage {
     const pool: *Pool = @ptrCast(@alignCast(userdata));
     try pool.checkCancel();
 
     _ = handle;
-    _ = address;
     _ = buffer;
+    _ = timeout;
     @panic("TODO");
 }
 
lib/std/Io.zig
@@ -641,8 +641,8 @@ pub const VTable = struct {
         context_alignment: std.mem.Alignment,
         start: *const fn (context: *const anyopaque) void,
     ) void,
-    groupWait: *const fn (?*anyopaque, *Group) void,
-    groupCancel: *const fn (?*anyopaque, *Group) void,
+    groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
+    groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
 
     /// Blocks until one of the futures from the list has a result ready, such
     /// that awaiting it will not block. Returns that index.
@@ -665,14 +665,14 @@ pub const VTable = struct {
     fileSeekBy: *const fn (?*anyopaque, file: File, offset: i64) File.SeekError!void,
     fileSeekTo: *const fn (?*anyopaque, file: File, offset: u64) File.SeekError!void,
 
-    now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp,
-    sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void,
+    now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) NowError!Timestamp,
+    sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, timeout: Timeout) SleepError!void,
 
     listen: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
     accept: *const fn (?*anyopaque, server: *net.Server) net.Server.AcceptError!net.Stream,
     ipBind: *const fn (?*anyopaque, address: net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
-    netSend: *const fn (?*anyopaque, handle: net.Socket.Handle, address: net.IpAddress, data: []const u8) net.Socket.SendError!void,
-    netReceive: *const fn (?*anyopaque, handle: net.Socket.Handle, address: net.IpAddress, buffer: []u8) net.Socket.ReceiveError!void,
+    netSend: *const fn (?*anyopaque, handle: net.Socket.Handle, address: *const net.IpAddress, data: []const u8) net.Socket.SendError!void,
+    netReceive: *const fn (?*anyopaque, handle: net.Socket.Handle, buffer: []u8, timeout: Timeout) net.Socket.ReceiveTimeoutError!net.ReceivedMessage,
     netRead: *const fn (?*anyopaque, src: net.Stream, data: [][]u8) net.Stream.Reader.Error!usize,
     netWrite: *const fn (?*anyopaque, dest: net.Stream, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
     netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void,
@@ -710,6 +710,15 @@ pub const Timestamp = enum(i96) {
     pub fn addDuration(from: Timestamp, duration: Duration) Timestamp {
         return @enumFromInt(@intFromEnum(from) + duration.nanoseconds);
     }
+
+    pub fn fromNow(io: Io, clockid: std.posix.clockid_t, duration: Duration) NowError!Timestamp {
+        const now_ts = try now(io, clockid);
+        return addDuration(now_ts, duration);
+    }
+
+    pub fn compare(lhs: Timestamp, op: std.math.CompareOperator, rhs: Timestamp) bool {
+        return std.math.compare(@intFromEnum(lhs), op, @intFromEnum(rhs));
+    }
 };
 pub const Duration = struct {
     nanoseconds: i96,
@@ -722,11 +731,14 @@ pub const Duration = struct {
         return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_s };
     }
 };
-pub const Deadline = union(enum) {
+pub const Timeout = union(enum) {
+    none,
     duration: Duration,
-    timestamp: Timestamp,
+    deadline: Timestamp,
+
+    pub const Error = error{Timeout};
 };
-pub const ClockGetTimeError = std.posix.ClockGetTimeError || Cancelable;
+pub const NowError = std.posix.ClockGetTimeError || Cancelable;
 pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled };
 
 pub const AnyFuture = opaque {};
@@ -768,8 +780,10 @@ pub const Group = struct {
     ///
     /// `function` *may* be called immediately, before `async` returns.
     ///
-    /// After this is called, `wait` must be called before the group is
-    /// deinitialized.
+    /// After this is called, `wait` or `cancel` must be called before the
+    /// group is deinitialized.
+    ///
+    /// Threadsafe.
     ///
     /// See also:
     /// * `Io.async`
@@ -789,7 +803,9 @@ pub const Group = struct {
     ///
     /// Idempotent. Not threadsafe.
     pub fn wait(g: *Group, io: Io) void {
-        io.vtable.groupWait(io.userdata, g);
+        const token = g.token orelse return;
+        g.token = null;
+        io.vtable.groupWait(io.userdata, g, token);
     }
 
     /// Equivalent to `wait` but requests cancellation on all tasks owned by
@@ -797,9 +813,9 @@ pub const Group = struct {
     ///
     /// Idempotent. Not threadsafe.
     pub fn cancel(g: *Group, io: Io) void {
-        if (g.token == null) return;
-        io.vtable.groupCancel(io.userdata, g);
-        assert(g.token == null);
+        const token = g.token orelse return;
+        g.token = null;
+        io.vtable.groupCancel(io.userdata, g, token);
     }
 };
 
@@ -1215,12 +1231,12 @@ pub fn cancelRequested(io: Io) bool {
     return io.vtable.cancelRequested(io.userdata);
 }
 
-pub fn now(io: Io, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp {
+pub fn now(io: Io, clockid: std.posix.clockid_t) NowError!Timestamp {
     return io.vtable.now(io.userdata, clockid);
 }
 
-pub fn sleep(io: Io, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void {
-    return io.vtable.sleep(io.userdata, clockid, deadline);
+pub fn sleep(io: Io, clockid: std.posix.clockid_t, timeout: Timeout) SleepError!void {
+    return io.vtable.sleep(io.userdata, clockid, timeout);
 }
 
 pub fn sleepDuration(io: Io, duration: Duration) SleepError!void {