Commit adaef433d2

Andrew Kelley <andrew@ziglang.org>
2025-10-15 19:20:04
std.net.HostName.connect: rework to avoid waiting for DNS
The previous implementation would eagerly attempt TCP connection upon receiving a DNS reply, but it would still wait for all the DNS results before returning from the function. This implementation returns immediately upon first successful TCP connection, canceling not only in-flight TCP connection attempts but also unfinished DNS queries.
1 parent d3c4158
Changed files (1)
lib
std
lib/std/Io/net/HostName.zig
@@ -206,34 +206,16 @@ pub fn connect(
     port: u16,
     options: IpAddress.ConnectOptions,
 ) ConnectError!Stream {
-    var canonical_name_buffer: [max_len]u8 = undefined;
-    var results_buffer: [32]HostName.LookupResult = undefined;
-    var results: Io.Queue(LookupResult) = .init(&results_buffer);
-
-    var lookup_task = io.async(HostName.lookup, .{ host_name, io, &results, .{
-        .port = port,
-        .canonical_name_buffer = &canonical_name_buffer,
-    } });
-    defer lookup_task.cancel(io);
-
-    const Result = union(enum) { connect_result: IpAddress.ConnectError!Stream };
-    var finished_task_buffer: [results_buffer.len]Result = undefined;
-    var select: Io.Select(Result) = .init(io, &finished_task_buffer);
-    defer select.cancel();
+    var connect_many_buffer: [32]ConnectManyResult = undefined;
+    var connect_many_queue: Io.Queue(ConnectManyResult) = .init(&connect_many_buffer);
 
-    while (results.getOne(io)) |result| switch (result) {
-        .address => |address| select.async(.connect_result, IpAddress.connect, .{ address, io, options }),
-        .canonical_name => continue,
-        .end => |lookup_result| {
-            try lookup_result;
-            break;
-        },
-    } else |err| return err;
+    var connect_many = io.async(connectMany, .{ host_name, io, port, &connect_many_queue, options });
+    defer connect_many.cancel(io);
 
     var aggregate_error: ConnectError = error.UnknownHostName;
 
-    while (select.outstanding != 0) switch (try select.wait()) {
-        .connect_result => |connect_result| if (connect_result) |stream| return stream else |err| switch (err) {
+    while (connect_many_queue.getOne(io)) |result| switch (result) {
+        .connection => |connection| if (connection) |stream| return stream else |err| switch (err) {
             error.SystemResources => |e| return e,
             error.OptionUnsupported => |e| return e,
             error.ProcessFdQuotaExceeded => |e| return e,
@@ -242,9 +224,59 @@ pub fn connect(
             error.WouldBlock => return error.Unexpected,
             else => |e| aggregate_error = e,
         },
-    };
+        .end => |end| {
+            try end;
+            return aggregate_error;
+        },
+    } else |err| return err;
+}
+
+pub const ConnectManyResult = union(enum) {
+    connection: IpAddress.ConnectError!Stream,
+    end: ConnectError!void,
+};
+
+/// Asynchronously establishes a connection to all IP addresses associated with
+/// a host name, adding them to a results queue upon completion.
+pub fn connectMany(
+    host_name: HostName,
+    io: Io,
+    port: u16,
+    results: *Io.Queue(ConnectManyResult),
+    options: IpAddress.ConnectOptions,
+) void {
+    var canonical_name_buffer: [max_len]u8 = undefined;
+    var lookup_buffer: [32]HostName.LookupResult = undefined;
+    var lookup_queue: Io.Queue(LookupResult) = .init(&lookup_buffer);
+
+    host_name.lookup(io, &lookup_queue, .{
+        .port = port,
+        .canonical_name_buffer = &canonical_name_buffer,
+    });
+
+    var group: Io.Group = .init;
+    defer group.cancel(io);
+
+    while (lookup_queue.getOne(io)) |dns_result| switch (dns_result) {
+        .address => |address| group.async(io, enqueueConnection, .{ address, io, results, options }),
+        .canonical_name => continue,
+        .end => |lookup_result| {
+            group.wait(io);
+            results.putOneUncancelable(io, .{ .end = lookup_result });
+            return;
+        },
+    } else |err| switch (err) {
+        error.Canceled => |e| results.putOneUncancelable(io, .{ .end = e }),
+    }
+}
 
-    return aggregate_error;
+fn enqueueConnection(
+    address: IpAddress,
+    io: Io,
+    queue: *Io.Queue(ConnectManyResult),
+    options: IpAddress.ConnectOptions,
+) void {
+    queue.putOneUncancelable(io, .{ .connection = address.connect(io, options) });
 }
 
 pub const ResolvConf = struct {