master
1const Threaded = @This();
2
3const builtin = @import("builtin");
4const native_os = builtin.os.tag;
5const is_windows = native_os == .windows;
6const windows = std.os.windows;
7const ws2_32 = std.os.windows.ws2_32;
8const is_debug = builtin.mode == .Debug;
9
10const std = @import("../std.zig");
11const Io = std.Io;
12const net = std.Io.net;
13const HostName = std.Io.net.HostName;
14const IpAddress = std.Io.net.IpAddress;
15const Allocator = std.mem.Allocator;
16const Alignment = std.mem.Alignment;
17const assert = std.debug.assert;
18const posix = std.posix;
19
20/// Thread-safe.
21allocator: Allocator,
22mutex: std.Thread.Mutex = .{},
23cond: std.Thread.Condition = .{},
24run_queue: std.SinglyLinkedList = .{},
25join_requested: bool = false,
26stack_size: usize,
27/// All threads are spawned detached; this is how we wait until they all exit.
28wait_group: std.Thread.WaitGroup = .{},
29/// Maximum thread pool size (excluding main thread) when dispatching async
30/// tasks. Until this limit, calls to `Io.async` when all threads are busy will
31/// cause a new thread to be spawned and permanently added to the pool. After
32/// this limit, calls to `Io.async` when all threads are busy run the task
33/// immediately.
34///
35/// Defaults to a number equal to logical CPU cores.
36///
37/// Protected by `mutex` once the I/O instance is already in use. See
38/// `setAsyncLimit`.
39async_limit: Io.Limit,
40/// Maximum thread pool size (excluding main thread) for dispatching concurrent
41/// tasks. Until this limit, calls to `Io.concurrent` will increase the thread
42/// pool size.
43///
44/// concurrent tasks. After this number, calls to `Io.concurrent` return
45/// `error.ConcurrencyUnavailable`.
46concurrent_limit: Io.Limit = .unlimited,
47/// Error from calling `std.Thread.getCpuCount` in `init`.
48cpu_count_error: ?std.Thread.CpuCountError,
49/// Number of threads that are unavailable to take tasks. To calculate
50/// available count, subtract this from either `async_limit` or
51/// `concurrent_limit`.
52busy_count: usize = 0,
53
54wsa: if (is_windows) Wsa else struct {} = .{},
55
56have_signal_handler: bool,
57old_sig_io: if (have_sig_io) posix.Sigaction else void,
58old_sig_pipe: if (have_sig_pipe) posix.Sigaction else void,
59
60threadlocal var current_closure: ?*Closure = null;
61
62const max_iovecs_len = 8;
63const splat_buffer_size = 64;
64
65comptime {
66 if (@TypeOf(posix.IOV_MAX) != void) assert(max_iovecs_len <= posix.IOV_MAX);
67}
68
69const CancelId = enum(usize) {
70 none = 0,
71 canceling = std.math.maxInt(usize),
72 _,
73
74 const ThreadId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id;
75
76 fn currentThread() CancelId {
77 if (std.Thread.use_pthreads) {
78 return @enumFromInt(@intFromPtr(std.c.pthread_self()));
79 } else {
80 return @enumFromInt(std.Thread.getCurrentId());
81 }
82 }
83
84 fn toThreadId(cancel_id: CancelId) ThreadId {
85 if (std.Thread.use_pthreads) {
86 return @ptrFromInt(@intFromEnum(cancel_id));
87 } else {
88 return @intCast(@intFromEnum(cancel_id));
89 }
90 }
91};
92
93const Closure = struct {
94 start: Start,
95 node: std.SinglyLinkedList.Node = .{},
96 cancel_tid: CancelId,
97
98 const Start = *const fn (*Closure) void;
99
100 fn requestCancel(closure: *Closure) void {
101 switch (@atomicRmw(CancelId, &closure.cancel_tid, .Xchg, .canceling, .acq_rel)) {
102 .none, .canceling => {},
103 else => |tid| {
104 if (std.Thread.use_pthreads) {
105 const rc = std.c.pthread_kill(tid.toThreadId(), .IO);
106 if (is_debug) assert(rc == 0);
107 } else if (native_os == .linux) {
108 _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid.toThreadId()), .IO);
109 }
110 },
111 }
112 }
113};
114
115/// Related:
116/// * `init_single_threaded`
117pub fn init(
118 /// Must be threadsafe. Only used for the following functions:
119 /// * `Io.VTable.async`
120 /// * `Io.VTable.concurrent`
121 /// * `Io.VTable.groupAsync`
122 /// * `Io.VTable.groupConcurrent`
123 /// If these functions are avoided, then `Allocator.failing` may be passed
124 /// here.
125 gpa: Allocator,
126) Threaded {
127 if (builtin.single_threaded) return .init_single_threaded;
128
129 const cpu_count = std.Thread.getCpuCount();
130
131 var t: Threaded = .{
132 .allocator = gpa,
133 .stack_size = std.Thread.SpawnConfig.default_stack_size,
134 .async_limit = if (cpu_count) |n| .limited(n - 1) else |_| .nothing,
135 .cpu_count_error = if (cpu_count) |_| null else |e| e,
136 .old_sig_io = undefined,
137 .old_sig_pipe = undefined,
138 .have_signal_handler = false,
139 };
140
141 if (posix.Sigaction != void) {
142 // This causes sending `posix.SIG.IO` to thread to interrupt blocking
143 // syscalls, returning `posix.E.INTR`.
144 const act: posix.Sigaction = .{
145 .handler = .{ .handler = doNothingSignalHandler },
146 .mask = posix.sigemptyset(),
147 .flags = 0,
148 };
149 if (have_sig_io) posix.sigaction(.IO, &act, &t.old_sig_io);
150 if (have_sig_pipe) posix.sigaction(.PIPE, &act, &t.old_sig_pipe);
151 t.have_signal_handler = true;
152 }
153
154 return t;
155}
156
157/// Statically initialize such that calls to `Io.VTable.concurrent` will fail
158/// with `error.ConcurrencyUnavailable`.
159///
160/// When initialized this way:
161/// * cancel requests have no effect.
162/// * `deinit` is safe, but unnecessary to call.
163pub const init_single_threaded: Threaded = .{
164 .allocator = .failing,
165 .stack_size = std.Thread.SpawnConfig.default_stack_size,
166 .async_limit = .nothing,
167 .cpu_count_error = null,
168 .concurrent_limit = .nothing,
169 .old_sig_io = undefined,
170 .old_sig_pipe = undefined,
171 .have_signal_handler = false,
172};
173
174pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
175 t.mutex.lock();
176 defer t.mutex.unlock();
177 t.async_limit = new_limit;
178}
179
180pub fn deinit(t: *Threaded) void {
181 t.join();
182 if (is_windows and t.wsa.status == .initialized) {
183 if (ws2_32.WSACleanup() != 0) recoverableOsBugDetected();
184 }
185 if (posix.Sigaction != void and t.have_signal_handler) {
186 if (have_sig_io) posix.sigaction(.IO, &t.old_sig_io, null);
187 if (have_sig_pipe) posix.sigaction(.PIPE, &t.old_sig_pipe, null);
188 }
189 t.* = undefined;
190}
191
192fn join(t: *Threaded) void {
193 if (builtin.single_threaded) return;
194 {
195 t.mutex.lock();
196 defer t.mutex.unlock();
197 t.join_requested = true;
198 }
199 t.cond.broadcast();
200 t.wait_group.wait();
201}
202
203fn worker(t: *Threaded) void {
204 defer t.wait_group.finish();
205
206 t.mutex.lock();
207 defer t.mutex.unlock();
208
209 while (true) {
210 while (t.run_queue.popFirst()) |closure_node| {
211 t.mutex.unlock();
212 const closure: *Closure = @fieldParentPtr("node", closure_node);
213 closure.start(closure);
214 t.mutex.lock();
215 t.busy_count -= 1;
216 }
217 if (t.join_requested) break;
218 t.cond.wait(&t.mutex);
219 }
220}
221
222pub fn io(t: *Threaded) Io {
223 return .{
224 .userdata = t,
225 .vtable = &.{
226 .async = async,
227 .concurrent = concurrent,
228 .await = await,
229 .cancel = cancel,
230 .cancelRequested = cancelRequested,
231 .select = select,
232
233 .groupAsync = groupAsync,
234 .groupConcurrent = groupConcurrent,
235 .groupWait = groupWait,
236 .groupCancel = groupCancel,
237
238 .mutexLock = mutexLock,
239 .mutexLockUncancelable = mutexLockUncancelable,
240 .mutexUnlock = mutexUnlock,
241
242 .conditionWait = conditionWait,
243 .conditionWaitUncancelable = conditionWaitUncancelable,
244 .conditionWake = conditionWake,
245
246 .dirMake = dirMake,
247 .dirMakePath = dirMakePath,
248 .dirMakeOpenPath = dirMakeOpenPath,
249 .dirStat = dirStat,
250 .dirStatPath = dirStatPath,
251 .fileStat = fileStat,
252 .dirAccess = dirAccess,
253 .dirCreateFile = dirCreateFile,
254 .dirOpenFile = dirOpenFile,
255 .dirOpenDir = dirOpenDir,
256 .dirClose = dirClose,
257 .fileClose = fileClose,
258 .fileWriteStreaming = fileWriteStreaming,
259 .fileWritePositional = fileWritePositional,
260 .fileReadStreaming = fileReadStreaming,
261 .fileReadPositional = fileReadPositional,
262 .fileSeekBy = fileSeekBy,
263 .fileSeekTo = fileSeekTo,
264 .openSelfExe = openSelfExe,
265
266 .now = now,
267 .sleep = sleep,
268
269 .netListenIp = switch (native_os) {
270 .windows => netListenIpWindows,
271 else => netListenIpPosix,
272 },
273 .netListenUnix = switch (native_os) {
274 .windows => netListenUnixWindows,
275 else => netListenUnixPosix,
276 },
277 .netAccept = switch (native_os) {
278 .windows => netAcceptWindows,
279 else => netAcceptPosix,
280 },
281 .netBindIp = switch (native_os) {
282 .windows => netBindIpWindows,
283 else => netBindIpPosix,
284 },
285 .netConnectIp = switch (native_os) {
286 .windows => netConnectIpWindows,
287 else => netConnectIpPosix,
288 },
289 .netConnectUnix = switch (native_os) {
290 .windows => netConnectUnixWindows,
291 else => netConnectUnixPosix,
292 },
293 .netClose = netClose,
294 .netRead = switch (native_os) {
295 .windows => netReadWindows,
296 else => netReadPosix,
297 },
298 .netWrite = switch (native_os) {
299 .windows => netWriteWindows,
300 else => netWritePosix,
301 },
302 .netSend = switch (native_os) {
303 .windows => netSendWindows,
304 else => netSendPosix,
305 },
306 .netReceive = switch (native_os) {
307 .windows => netReceiveWindows,
308 else => netReceivePosix,
309 },
310 .netInterfaceNameResolve = netInterfaceNameResolve,
311 .netInterfaceName = netInterfaceName,
312 .netLookup = netLookup,
313 },
314 };
315}
316
317/// Same as `io` but disables all networking functionality, which has
318/// an additional dependency on Windows (ws2_32).
319pub fn ioBasic(t: *Threaded) Io {
320 return .{
321 .userdata = t,
322 .vtable = &.{
323 .async = async,
324 .concurrent = concurrent,
325 .await = await,
326 .cancel = cancel,
327 .cancelRequested = cancelRequested,
328 .select = select,
329
330 .groupAsync = groupAsync,
331 .groupConcurrent = groupConcurrent,
332 .groupWait = groupWait,
333 .groupCancel = groupCancel,
334
335 .mutexLock = mutexLock,
336 .mutexLockUncancelable = mutexLockUncancelable,
337 .mutexUnlock = mutexUnlock,
338
339 .conditionWait = conditionWait,
340 .conditionWaitUncancelable = conditionWaitUncancelable,
341 .conditionWake = conditionWake,
342
343 .dirMake = dirMake,
344 .dirMakePath = dirMakePath,
345 .dirMakeOpenPath = dirMakeOpenPath,
346 .dirStat = dirStat,
347 .dirStatPath = dirStatPath,
348 .fileStat = fileStat,
349 .dirAccess = dirAccess,
350 .dirCreateFile = dirCreateFile,
351 .dirOpenFile = dirOpenFile,
352 .dirOpenDir = dirOpenDir,
353 .dirClose = dirClose,
354 .fileClose = fileClose,
355 .fileWriteStreaming = fileWriteStreaming,
356 .fileWritePositional = fileWritePositional,
357 .fileReadStreaming = fileReadStreaming,
358 .fileReadPositional = fileReadPositional,
359 .fileSeekBy = fileSeekBy,
360 .fileSeekTo = fileSeekTo,
361 .openSelfExe = openSelfExe,
362
363 .now = now,
364 .sleep = sleep,
365
366 .netListenIp = netListenIpUnavailable,
367 .netListenUnix = netListenUnixUnavailable,
368 .netAccept = netAcceptUnavailable,
369 .netBindIp = netBindIpUnavailable,
370 .netConnectIp = netConnectIpUnavailable,
371 .netConnectUnix = netConnectUnixUnavailable,
372 .netClose = netCloseUnavailable,
373 .netRead = netReadUnavailable,
374 .netWrite = netWriteUnavailable,
375 .netSend = netSendUnavailable,
376 .netReceive = netReceiveUnavailable,
377 .netInterfaceNameResolve = netInterfaceNameResolveUnavailable,
378 .netInterfaceName = netInterfaceNameUnavailable,
379 .netLookup = netLookupUnavailable,
380 },
381 };
382}
383
384pub const socket_flags_unsupported = native_os.isDarwin() or native_os == .haiku;
385const have_accept4 = !socket_flags_unsupported;
386const have_flock_open_flags = @hasField(posix.O, "EXLOCK");
387const have_networking = native_os != .wasi;
388const have_flock = @TypeOf(posix.system.flock) != void;
389const have_sendmmsg = native_os == .linux;
390const have_futex = switch (builtin.cpu.arch) {
391 .wasm32, .wasm64 => builtin.cpu.has(.wasm, .atomics),
392 else => true,
393};
394const have_preadv = switch (native_os) {
395 .windows, .haiku => false,
396 else => true,
397};
398const have_sig_io = posix.SIG != void and @hasField(posix.SIG, "IO");
399const have_sig_pipe = posix.SIG != void and @hasField(posix.SIG, "PIPE");
400
401const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
402const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat;
403const fstatat_sym = if (posix.lfs64_abi) posix.system.fstatat64 else posix.system.fstatat;
404const lseek_sym = if (posix.lfs64_abi) posix.system.lseek64 else posix.system.lseek;
405const preadv_sym = if (posix.lfs64_abi) posix.system.preadv64 else posix.system.preadv;
406
407/// Trailing data:
408/// 1. context
409/// 2. result
410const AsyncClosure = struct {
411 closure: Closure,
412 func: *const fn (context: *anyopaque, result: *anyopaque) void,
413 reset_event: ResetEvent,
414 select_condition: ?*ResetEvent,
415 context_alignment: Alignment,
416 result_offset: usize,
417 alloc_len: usize,
418
419 const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));
420
421 fn start(closure: *Closure) void {
422 const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
423 const tid: CancelId = .currentThread();
424 if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| {
425 assert(cancel_tid == .canceling);
426 // Even though we already know the task is canceled, we must still
427 // run the closure in order to make the return value valid and in
428 // case there are side effects.
429 }
430 current_closure = closure;
431 ac.func(ac.contextPointer(), ac.resultPointer());
432 current_closure = null;
433
434 // In case a cancel happens after successful task completion, prevents
435 // signal from being delivered to the thread in `requestCancel`.
436 if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| {
437 assert(cancel_tid == .canceling);
438 }
439
440 if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| {
441 assert(select_reset != done_reset_event);
442 select_reset.set();
443 }
444 ac.reset_event.set();
445 }
446
447 fn resultPointer(ac: *AsyncClosure) [*]u8 {
448 const base: [*]u8 = @ptrCast(ac);
449 return base + ac.result_offset;
450 }
451
452 fn contextPointer(ac: *AsyncClosure) [*]u8 {
453 const base: [*]u8 = @ptrCast(ac);
454 const context_offset = ac.context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure)) - @intFromPtr(ac);
455 return base + context_offset;
456 }
457
458 fn init(
459 gpa: Allocator,
460 result_len: usize,
461 result_alignment: Alignment,
462 context: []const u8,
463 context_alignment: Alignment,
464 func: *const fn (context: *const anyopaque, result: *anyopaque) void,
465 ) Allocator.Error!*AsyncClosure {
466 const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure);
467 const worst_case_context_offset = context_alignment.forward(@sizeOf(AsyncClosure) + max_context_misalignment);
468 const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len);
469 const alloc_len = worst_case_result_offset + result_len;
470
471 const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), alloc_len)));
472 errdefer comptime unreachable;
473
474 const actual_context_addr = context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure));
475 const actual_result_addr = result_alignment.forward(actual_context_addr + context.len);
476 const actual_result_offset = actual_result_addr - @intFromPtr(ac);
477 ac.* = .{
478 .closure = .{
479 .cancel_tid = .none,
480 .start = start,
481 },
482 .func = func,
483 .context_alignment = context_alignment,
484 .result_offset = actual_result_offset,
485 .alloc_len = alloc_len,
486 .reset_event = .unset,
487 .select_condition = null,
488 };
489 @memcpy(ac.contextPointer()[0..context.len], context);
490 return ac;
491 }
492
493 fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void {
494 ac.reset_event.wait(t) catch |err| switch (err) {
495 error.Canceled => {
496 ac.closure.requestCancel();
497 ac.reset_event.waitUncancelable();
498 },
499 };
500 @memcpy(result, ac.resultPointer()[0..result.len]);
501 ac.deinit(t.allocator);
502 }
503
504 fn deinit(ac: *AsyncClosure, gpa: Allocator) void {
505 const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac);
506 gpa.free(base[0..ac.alloc_len]);
507 }
508};
509
510fn async(
511 userdata: ?*anyopaque,
512 result: []u8,
513 result_alignment: Alignment,
514 context: []const u8,
515 context_alignment: Alignment,
516 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
517) ?*Io.AnyFuture {
518 const t: *Threaded = @ptrCast(@alignCast(userdata));
519 if (builtin.single_threaded) {
520 start(context.ptr, result.ptr);
521 return null;
522 }
523 const gpa = t.allocator;
524 const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch {
525 start(context.ptr, result.ptr);
526 return null;
527 };
528
529 t.mutex.lock();
530
531 const busy_count = t.busy_count;
532
533 if (busy_count >= @intFromEnum(t.async_limit)) {
534 t.mutex.unlock();
535 ac.deinit(gpa);
536 start(context.ptr, result.ptr);
537 return null;
538 }
539
540 t.busy_count = busy_count + 1;
541
542 const pool_size = t.wait_group.value();
543 if (pool_size - busy_count == 0) {
544 t.wait_group.start();
545 const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
546 t.wait_group.finish();
547 t.busy_count = busy_count;
548 t.mutex.unlock();
549 ac.deinit(gpa);
550 start(context.ptr, result.ptr);
551 return null;
552 };
553 thread.detach();
554 }
555
556 t.run_queue.prepend(&ac.closure.node);
557 t.mutex.unlock();
558 t.cond.signal();
559 return @ptrCast(ac);
560}
561
562fn concurrent(
563 userdata: ?*anyopaque,
564 result_len: usize,
565 result_alignment: Alignment,
566 context: []const u8,
567 context_alignment: Alignment,
568 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
569) Io.ConcurrentError!*Io.AnyFuture {
570 if (builtin.single_threaded) return error.ConcurrencyUnavailable;
571
572 const t: *Threaded = @ptrCast(@alignCast(userdata));
573
574 const gpa = t.allocator;
575 const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch
576 return error.ConcurrencyUnavailable;
577 errdefer ac.deinit(gpa);
578
579 t.mutex.lock();
580 defer t.mutex.unlock();
581
582 const busy_count = t.busy_count;
583
584 if (busy_count >= @intFromEnum(t.concurrent_limit))
585 return error.ConcurrencyUnavailable;
586
587 t.busy_count = busy_count + 1;
588 errdefer t.busy_count = busy_count;
589
590 const pool_size = t.wait_group.value();
591 if (pool_size - busy_count == 0) {
592 t.wait_group.start();
593 errdefer t.wait_group.finish();
594
595 const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
596 return error.ConcurrencyUnavailable;
597 thread.detach();
598 }
599
600 t.run_queue.prepend(&ac.closure.node);
601 t.cond.signal();
602 return @ptrCast(ac);
603}
604
605const GroupClosure = struct {
606 closure: Closure,
607 t: *Threaded,
608 group: *Io.Group,
609 /// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
610 node: std.SinglyLinkedList.Node,
611 func: *const fn (*Io.Group, context: *anyopaque) void,
612 context_alignment: Alignment,
613 alloc_len: usize,
614
615 fn start(closure: *Closure) void {
616 const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
617 const tid: CancelId = .currentThread();
618 const group = gc.group;
619 const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
620 const reset_event: *ResetEvent = @ptrCast(&group.context);
621 if (@cmpxchgStrong(CancelId, &closure.cancel_tid, .none, tid, .acq_rel, .acquire)) |cancel_tid| {
622 assert(cancel_tid == .canceling);
623 // Even though we already know the task is canceled, we must still
624 // run the closure in case there are side effects.
625 }
626 current_closure = closure;
627 gc.func(group, gc.contextPointer());
628 current_closure = null;
629
630 // In case a cancel happens after successful task completion, prevents
631 // signal from being delivered to the thread in `requestCancel`.
632 if (@cmpxchgStrong(CancelId, &closure.cancel_tid, tid, .none, .acq_rel, .acquire)) |cancel_tid| {
633 assert(cancel_tid == .canceling);
634 }
635
636 const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel);
637 assert((prev_state / sync_one_pending) > 0);
638 if (prev_state == (sync_one_pending | sync_is_waiting)) reset_event.set();
639 }
640
641 fn contextPointer(gc: *GroupClosure) [*]u8 {
642 const base: [*]u8 = @ptrCast(gc);
643 const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc);
644 return base + context_offset;
645 }
646
647 /// Does not initialize the `node` field.
648 fn init(
649 gpa: Allocator,
650 t: *Threaded,
651 group: *Io.Group,
652 context: []const u8,
653 context_alignment: Alignment,
654 func: *const fn (*Io.Group, context: *const anyopaque) void,
655 ) Allocator.Error!*GroupClosure {
656 const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure);
657 const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment);
658 const alloc_len = worst_case_context_offset + context.len;
659
660 const gc: *GroupClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(GroupClosure), alloc_len)));
661 errdefer comptime unreachable;
662
663 gc.* = .{
664 .closure = .{
665 .cancel_tid = .none,
666 .start = start,
667 },
668 .t = t,
669 .group = group,
670 .node = undefined,
671 .func = func,
672 .context_alignment = context_alignment,
673 .alloc_len = alloc_len,
674 };
675 @memcpy(gc.contextPointer()[0..context.len], context);
676 return gc;
677 }
678
679 fn deinit(gc: *GroupClosure, gpa: Allocator) void {
680 const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
681 gpa.free(base[0..gc.alloc_len]);
682 }
683
684 const sync_is_waiting: usize = 1 << 0;
685 const sync_one_pending: usize = 1 << 1;
686};
687
688fn groupAsync(
689 userdata: ?*anyopaque,
690 group: *Io.Group,
691 context: []const u8,
692 context_alignment: Alignment,
693 start: *const fn (*Io.Group, context: *const anyopaque) void,
694) void {
695 const t: *Threaded = @ptrCast(@alignCast(userdata));
696 if (builtin.single_threaded) return start(group, context.ptr);
697
698 const gpa = t.allocator;
699 const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
700 return start(group, context.ptr);
701
702 t.mutex.lock();
703
704 const busy_count = t.busy_count;
705
706 if (busy_count >= @intFromEnum(t.async_limit)) {
707 t.mutex.unlock();
708 gc.deinit(gpa);
709 return start(group, context.ptr);
710 }
711
712 t.busy_count = busy_count + 1;
713
714 const pool_size = t.wait_group.value();
715 if (pool_size - busy_count == 0) {
716 t.wait_group.start();
717 const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
718 t.wait_group.finish();
719 t.busy_count = busy_count;
720 t.mutex.unlock();
721 gc.deinit(gpa);
722 return start(group, context.ptr);
723 };
724 thread.detach();
725 }
726
727 // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
728 gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
729 group.token = &gc.node;
730
731 t.run_queue.prepend(&gc.closure.node);
732
733 // This needs to be done before unlocking the mutex to avoid a race with
734 // the associated task finishing.
735 const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
736 const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic);
737 assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending));
738
739 t.mutex.unlock();
740 t.cond.signal();
741}
742
743fn groupConcurrent(
744 userdata: ?*anyopaque,
745 group: *Io.Group,
746 context: []const u8,
747 context_alignment: Alignment,
748 start: *const fn (*Io.Group, context: *const anyopaque) void,
749) Io.ConcurrentError!void {
750 if (builtin.single_threaded) return error.ConcurrencyUnavailable;
751
752 const t: *Threaded = @ptrCast(@alignCast(userdata));
753
754 const gpa = t.allocator;
755 const gc = GroupClosure.init(gpa, t, group, context, context_alignment, start) catch
756 return error.ConcurrencyUnavailable;
757
758 t.mutex.lock();
759 defer t.mutex.unlock();
760
761 const busy_count = t.busy_count;
762
763 if (busy_count >= @intFromEnum(t.concurrent_limit))
764 return error.ConcurrencyUnavailable;
765
766 t.busy_count = busy_count + 1;
767 errdefer t.busy_count = busy_count;
768
769 const pool_size = t.wait_group.value();
770 if (pool_size - busy_count == 0) {
771 t.wait_group.start();
772 errdefer t.wait_group.finish();
773
774 const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch
775 return error.ConcurrencyUnavailable;
776 thread.detach();
777 }
778
779 // Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe.
780 gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
781 group.token = &gc.node;
782
783 t.run_queue.prepend(&gc.closure.node);
784
785 // This needs to be done before unlocking the mutex to avoid a race with
786 // the associated task finishing.
787 const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
788 const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic);
789 assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending));
790
791 t.cond.signal();
792}
793
794fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
795 const t: *Threaded = @ptrCast(@alignCast(userdata));
796 const gpa = t.allocator;
797
798 if (builtin.single_threaded) return;
799
800 const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
801 const reset_event: *ResetEvent = @ptrCast(&group.context);
802 const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire);
803 assert(prev_state & GroupClosure.sync_is_waiting == 0);
804 if ((prev_state / GroupClosure.sync_one_pending) > 0) reset_event.wait(t) catch |err| switch (err) {
805 error.Canceled => {
806 var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
807 while (true) {
808 const gc: *GroupClosure = @fieldParentPtr("node", node);
809 gc.closure.requestCancel();
810 node = node.next orelse break;
811 }
812 reset_event.waitUncancelable();
813 },
814 };
815
816 var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
817 while (true) {
818 const gc: *GroupClosure = @fieldParentPtr("node", node);
819 const node_next = node.next;
820 gc.deinit(gpa);
821 node = node_next orelse break;
822 }
823}
824
825fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
826 const t: *Threaded = @ptrCast(@alignCast(userdata));
827 const gpa = t.allocator;
828
829 if (builtin.single_threaded) return;
830
831 {
832 var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
833 while (true) {
834 const gc: *GroupClosure = @fieldParentPtr("node", node);
835 gc.closure.requestCancel();
836 node = node.next orelse break;
837 }
838 }
839
840 const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
841 const reset_event: *ResetEvent = @ptrCast(&group.context);
842 const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire);
843 assert(prev_state & GroupClosure.sync_is_waiting == 0);
844 if ((prev_state / GroupClosure.sync_one_pending) > 0) reset_event.waitUncancelable();
845
846 {
847 var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
848 while (true) {
849 const gc: *GroupClosure = @fieldParentPtr("node", node);
850 const node_next = node.next;
851 gc.deinit(gpa);
852 node = node_next orelse break;
853 }
854 }
855}
856
857fn await(
858 userdata: ?*anyopaque,
859 any_future: *Io.AnyFuture,
860 result: []u8,
861 result_alignment: Alignment,
862) void {
863 _ = result_alignment;
864 const t: *Threaded = @ptrCast(@alignCast(userdata));
865 const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
866 closure.waitAndDeinit(t, result);
867}
868
869fn cancel(
870 userdata: ?*anyopaque,
871 any_future: *Io.AnyFuture,
872 result: []u8,
873 result_alignment: Alignment,
874) void {
875 _ = result_alignment;
876 const t: *Threaded = @ptrCast(@alignCast(userdata));
877 const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
878 ac.closure.requestCancel();
879 ac.waitAndDeinit(t, result);
880}
881
882fn cancelRequested(userdata: ?*anyopaque) bool {
883 const t: *Threaded = @ptrCast(@alignCast(userdata));
884 _ = t;
885 const closure = current_closure orelse return false;
886 return @atomicLoad(CancelId, &closure.cancel_tid, .acquire) == .canceling;
887}
888
889fn checkCancel(t: *Threaded) error{Canceled}!void {
890 if (cancelRequested(t)) return error.Canceled;
891}
892
893fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
894 if (builtin.single_threaded) unreachable; // Interface should have prevented this.
895 if (native_os == .netbsd) @panic("TODO");
896 const t: *Threaded = @ptrCast(@alignCast(userdata));
897 if (prev_state == .contended) {
898 try futexWait(t, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
899 }
900 while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
901 try futexWait(t, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
902 }
903}
904
905fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
906 if (builtin.single_threaded) unreachable; // Interface should have prevented this.
907 if (native_os == .netbsd) @panic("TODO");
908 _ = userdata;
909 if (prev_state == .contended) {
910 futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
911 }
912 while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
913 futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
914 }
915}
916
917fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
918 if (builtin.single_threaded) unreachable; // Interface should have prevented this.
919 if (native_os == .netbsd) @panic("TODO");
920 _ = userdata;
921 _ = prev_state;
922 if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) {
923 futexWake(@ptrCast(&mutex.state), 1);
924 }
925}
926
927fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
928 if (builtin.single_threaded) unreachable; // Deadlock.
929 if (native_os == .netbsd) @panic("TODO");
930 const t: *Threaded = @ptrCast(@alignCast(userdata));
931 const t_io = ioBasic(t);
932 comptime assert(@TypeOf(cond.state) == u64);
933 const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
934 const cond_state = &ints[0];
935 const cond_epoch = &ints[1];
936 const one_waiter = 1;
937 const waiter_mask = 0xffff;
938 const one_signal = 1 << 16;
939 const signal_mask = 0xffff << 16;
940 var epoch = cond_epoch.load(.acquire);
941 var state = cond_state.fetchAdd(one_waiter, .monotonic);
942 assert(state & waiter_mask != waiter_mask);
943 state += one_waiter;
944
945 mutex.unlock(t_io);
946 defer mutex.lockUncancelable(t_io);
947
948 while (true) {
949 futexWaitUncancelable(cond_epoch, epoch);
950 epoch = cond_epoch.load(.acquire);
951 state = cond_state.load(.monotonic);
952 while (state & signal_mask != 0) {
953 const new_state = state - one_waiter - one_signal;
954 state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
955 }
956 }
957}
958
959fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
960 if (builtin.single_threaded) unreachable; // Deadlock.
961 if (native_os == .netbsd) @panic("TODO");
962 const t: *Threaded = @ptrCast(@alignCast(userdata));
963 const t_io = ioBasic(t);
964 comptime assert(@TypeOf(cond.state) == u64);
965 const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
966 const cond_state = &ints[0];
967 const cond_epoch = &ints[1];
968 const one_waiter = 1;
969 const waiter_mask = 0xffff;
970 const one_signal = 1 << 16;
971 const signal_mask = 0xffff << 16;
972 // Observe the epoch, then check the state again to see if we should wake up.
973 // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
974 //
975 // - T1: s = LOAD(&state)
976 // - T2: UPDATE(&s, signal)
977 // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
978 // - T1: e = LOAD(&epoch) (was reordered after the state load)
979 // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
980 //
981 // Acquire barrier to ensure the epoch load happens before the state load.
982 var epoch = cond_epoch.load(.acquire);
983 var state = cond_state.fetchAdd(one_waiter, .monotonic);
984 assert(state & waiter_mask != waiter_mask);
985 state += one_waiter;
986
987 mutex.unlock(t_io);
988 defer mutex.lockUncancelable(t_io);
989
990 while (true) {
991 try futexWait(t, cond_epoch, epoch);
992
993 epoch = cond_epoch.load(.acquire);
994 state = cond_state.load(.monotonic);
995
996 // Try to wake up by consuming a signal and decremented the waiter we
997 // added previously. Acquire barrier ensures code before the wake()
998 // which added the signal happens before we decrement it and return.
999 while (state & signal_mask != 0) {
1000 const new_state = state - one_waiter - one_signal;
1001 state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
1002 }
1003 }
1004}
1005
1006fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
1007 if (builtin.single_threaded) unreachable; // Nothing to wake up.
1008 const t: *Threaded = @ptrCast(@alignCast(userdata));
1009 _ = t;
1010 comptime assert(@TypeOf(cond.state) == u64);
1011 const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
1012 const cond_state = &ints[0];
1013 const cond_epoch = &ints[1];
1014 const one_waiter = 1;
1015 const waiter_mask = 0xffff;
1016 const one_signal = 1 << 16;
1017 const signal_mask = 0xffff << 16;
1018 var state = cond_state.load(.monotonic);
1019 while (true) {
1020 const waiters = (state & waiter_mask) / one_waiter;
1021 const signals = (state & signal_mask) / one_signal;
1022
1023 // Reserves which waiters to wake up by incrementing the signals count.
1024 // Therefore, the signals count is always less than or equal to the
1025 // waiters count. We don't need to Futex.wake if there's nothing to
1026 // wake up or if other wake() threads have reserved to wake up the
1027 // current waiters.
1028 const wakeable = waiters - signals;
1029 if (wakeable == 0) {
1030 return;
1031 }
1032
1033 const to_wake = switch (wake) {
1034 .one => 1,
1035 .all => wakeable,
1036 };
1037
1038 // Reserve the amount of waiters to wake by incrementing the signals
1039 // count. Release barrier ensures code before the wake() happens before
1040 // the signal it posted and consumed by the wait() threads.
1041 const new_state = state + (one_signal * to_wake);
1042 state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse {
1043 // Wake up the waiting threads we reserved above by changing the epoch value.
1044 //
1045 // A waiting thread could miss a wake up if *exactly* ((1<<32)-1)
1046 // wake()s happen between it observing the epoch and sleeping on
1047 // it. This is very unlikely due to how many precise amount of
1048 // Futex.wake() calls that would be between the waiting thread's
1049 // potential preemption.
1050 //
1051 // Release barrier ensures the signal being added to the state
1052 // happens before the epoch is changed. If not, the waiting thread
1053 // could potentially deadlock from missing both the state and epoch
1054 // change:
1055 //
1056 // - T2: UPDATE(&epoch, 1) (reordered before the state change)
1057 // - T1: e = LOAD(&epoch)
1058 // - T1: s = LOAD(&state)
1059 // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
1060 // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
1061 _ = cond_epoch.fetchAdd(1, .release);
1062 if (native_os == .netbsd) @panic("TODO");
1063 futexWake(cond_epoch, to_wake);
1064 return;
1065 };
1066 }
1067}
1068
1069const dirMake = switch (native_os) {
1070 .windows => dirMakeWindows,
1071 .wasi => dirMakeWasi,
1072 else => dirMakePosix,
1073};
1074
1075fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
1076 const t: *Threaded = @ptrCast(@alignCast(userdata));
1077
1078 var path_buffer: [posix.PATH_MAX]u8 = undefined;
1079 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
1080
1081 while (true) {
1082 try t.checkCancel();
1083 switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) {
1084 .SUCCESS => return,
1085 .INTR => continue,
1086 .CANCELED => return error.Canceled,
1087
1088 .ACCES => return error.AccessDenied,
1089 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1090 .PERM => return error.PermissionDenied,
1091 .DQUOT => return error.DiskQuota,
1092 .EXIST => return error.PathAlreadyExists,
1093 .FAULT => |err| return errnoBug(err),
1094 .LOOP => return error.SymLinkLoop,
1095 .MLINK => return error.LinkQuotaExceeded,
1096 .NAMETOOLONG => return error.NameTooLong,
1097 .NOENT => return error.FileNotFound,
1098 .NOMEM => return error.SystemResources,
1099 .NOSPC => return error.NoSpaceLeft,
1100 .NOTDIR => return error.NotDir,
1101 .ROFS => return error.ReadOnlyFileSystem,
1102 // dragonfly: when dir_fd is unlinked from filesystem
1103 .NOTCONN => return error.FileNotFound,
1104 .ILSEQ => return error.BadPathName,
1105 else => |err| return posix.unexpectedErrno(err),
1106 }
1107 }
1108}
1109
1110fn dirMakeWasi(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
1111 if (builtin.link_libc) return dirMakePosix(userdata, dir, sub_path, mode);
1112 const t: *Threaded = @ptrCast(@alignCast(userdata));
1113 while (true) {
1114 try t.checkCancel();
1115 switch (std.os.wasi.path_create_directory(dir.handle, sub_path.ptr, sub_path.len)) {
1116 .SUCCESS => return,
1117 .INTR => continue,
1118 .CANCELED => return error.Canceled,
1119
1120 .ACCES => return error.AccessDenied,
1121 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1122 .PERM => return error.PermissionDenied,
1123 .DQUOT => return error.DiskQuota,
1124 .EXIST => return error.PathAlreadyExists,
1125 .FAULT => |err| return errnoBug(err),
1126 .LOOP => return error.SymLinkLoop,
1127 .MLINK => return error.LinkQuotaExceeded,
1128 .NAMETOOLONG => return error.NameTooLong,
1129 .NOENT => return error.FileNotFound,
1130 .NOMEM => return error.SystemResources,
1131 .NOSPC => return error.NoSpaceLeft,
1132 .NOTDIR => return error.NotDir,
1133 .ROFS => return error.ReadOnlyFileSystem,
1134 .NOTCAPABLE => return error.AccessDenied,
1135 .ILSEQ => return error.BadPathName,
1136 else => |err| return posix.unexpectedErrno(err),
1137 }
1138 }
1139}
1140
1141fn dirMakeWindows(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
1142 const t: *Threaded = @ptrCast(@alignCast(userdata));
1143 try t.checkCancel();
1144
1145 const sub_path_w = try windows.sliceToPrefixedFileW(dir.handle, sub_path);
1146 _ = mode;
1147 const sub_dir_handle = windows.OpenFile(sub_path_w.span(), .{
1148 .dir = dir.handle,
1149 .access_mask = windows.GENERIC_READ | windows.SYNCHRONIZE,
1150 .creation = windows.FILE_CREATE,
1151 .filter = .dir_only,
1152 }) catch |err| switch (err) {
1153 error.IsDir => return error.Unexpected,
1154 error.PipeBusy => return error.Unexpected,
1155 error.NoDevice => return error.Unexpected,
1156 error.WouldBlock => return error.Unexpected,
1157 error.AntivirusInterference => return error.Unexpected,
1158 else => |e| return e,
1159 };
1160 windows.CloseHandle(sub_dir_handle);
1161}
1162
1163const dirMakePath = switch (native_os) {
1164 .windows => dirMakePathWindows,
1165 else => dirMakePathPosix,
1166};
1167
1168fn dirMakePathPosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
1169 const t: *Threaded = @ptrCast(@alignCast(userdata));
1170 _ = t;
1171 _ = dir;
1172 _ = sub_path;
1173 _ = mode;
1174 @panic("TODO implement dirMakePathPosix");
1175}
1176
1177fn dirMakePathWindows(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
1178 const t: *Threaded = @ptrCast(@alignCast(userdata));
1179 _ = t;
1180 _ = dir;
1181 _ = sub_path;
1182 _ = mode;
1183 @panic("TODO implement dirMakePathWindows");
1184}
1185
1186const dirMakeOpenPath = switch (native_os) {
1187 .windows => dirMakeOpenPathWindows,
1188 .wasi => dirMakeOpenPathWasi,
1189 else => dirMakeOpenPathPosix,
1190};
1191
1192fn dirMakeOpenPathPosix(
1193 userdata: ?*anyopaque,
1194 dir: Io.Dir,
1195 sub_path: []const u8,
1196 options: Io.Dir.OpenOptions,
1197) Io.Dir.MakeOpenPathError!Io.Dir {
1198 const t: *Threaded = @ptrCast(@alignCast(userdata));
1199 const t_io = ioBasic(t);
1200 return dirOpenDirPosix(t, dir, sub_path, options) catch |err| switch (err) {
1201 error.FileNotFound => {
1202 try dir.makePath(t_io, sub_path);
1203 return dirOpenDirPosix(t, dir, sub_path, options);
1204 },
1205 else => |e| return e,
1206 };
1207}
1208
1209fn dirMakeOpenPathWindows(
1210 userdata: ?*anyopaque,
1211 dir: Io.Dir,
1212 sub_path: []const u8,
1213 options: Io.Dir.OpenOptions,
1214) Io.Dir.MakeOpenPathError!Io.Dir {
1215 const t: *Threaded = @ptrCast(@alignCast(userdata));
1216 const w = windows;
1217 const access_mask = w.STANDARD_RIGHTS_READ | w.FILE_READ_ATTRIBUTES | w.FILE_READ_EA |
1218 w.SYNCHRONIZE | w.FILE_TRAVERSE |
1219 (if (options.iterate) w.FILE_LIST_DIRECTORY else @as(u32, 0));
1220
1221 var it = std.fs.path.componentIterator(sub_path);
1222 // If there are no components in the path, then create a dummy component with the full path.
1223 var component: std.fs.path.NativeComponentIterator.Component = it.last() orelse .{
1224 .name = "",
1225 .path = sub_path,
1226 };
1227
1228 while (true) {
1229 try t.checkCancel();
1230
1231 const sub_path_w_array = try w.sliceToPrefixedFileW(dir.handle, component.path);
1232 const sub_path_w = sub_path_w_array.span();
1233 const is_last = it.peekNext() == null;
1234 const create_disposition: u32 = if (is_last) w.FILE_OPEN_IF else w.FILE_CREATE;
1235
1236 var result: Io.Dir = .{ .handle = undefined };
1237
1238 const path_len_bytes: u16 = @intCast(sub_path_w.len * 2);
1239 var nt_name: w.UNICODE_STRING = .{
1240 .Length = path_len_bytes,
1241 .MaximumLength = path_len_bytes,
1242 .Buffer = @constCast(sub_path_w.ptr),
1243 };
1244 var attr: w.OBJECT_ATTRIBUTES = .{
1245 .Length = @sizeOf(w.OBJECT_ATTRIBUTES),
1246 .RootDirectory = if (std.fs.path.isAbsoluteWindowsWtf16(sub_path_w)) null else dir.handle,
1247 .Attributes = 0, // Note we do not use OBJ_CASE_INSENSITIVE here.
1248 .ObjectName = &nt_name,
1249 .SecurityDescriptor = null,
1250 .SecurityQualityOfService = null,
1251 };
1252 const open_reparse_point: w.DWORD = if (!options.follow_symlinks) w.FILE_OPEN_REPARSE_POINT else 0x0;
1253 var io_status_block: w.IO_STATUS_BLOCK = undefined;
1254 const rc = w.ntdll.NtCreateFile(
1255 &result.handle,
1256 access_mask,
1257 &attr,
1258 &io_status_block,
1259 null,
1260 w.FILE_ATTRIBUTE_NORMAL,
1261 w.FILE_SHARE_READ | w.FILE_SHARE_WRITE | w.FILE_SHARE_DELETE,
1262 create_disposition,
1263 w.FILE_DIRECTORY_FILE | w.FILE_SYNCHRONOUS_IO_NONALERT | w.FILE_OPEN_FOR_BACKUP_INTENT | open_reparse_point,
1264 null,
1265 0,
1266 );
1267
1268 switch (rc) {
1269 .SUCCESS => {
1270 component = it.next() orelse return result;
1271 w.CloseHandle(result.handle);
1272 continue;
1273 },
1274 .OBJECT_NAME_INVALID => return error.BadPathName,
1275 .OBJECT_NAME_COLLISION => {
1276 assert(!is_last);
1277 // stat the file and return an error if it's not a directory
1278 // this is important because otherwise a dangling symlink
1279 // could cause an infinite loop
1280 check_dir: {
1281 // workaround for windows, see https://github.com/ziglang/zig/issues/16738
1282 const fstat = dirStatPathWindows(t, dir, component.path, .{
1283 .follow_symlinks = options.follow_symlinks,
1284 }) catch |stat_err| switch (stat_err) {
1285 error.IsDir => break :check_dir,
1286 else => |e| return e,
1287 };
1288 if (fstat.kind != .directory) return error.NotDir;
1289 }
1290
1291 component = it.next().?;
1292 continue;
1293 },
1294
1295 .OBJECT_NAME_NOT_FOUND,
1296 .OBJECT_PATH_NOT_FOUND,
1297 => {
1298 component = it.previous() orelse return error.FileNotFound;
1299 continue;
1300 },
1301
1302 .NOT_A_DIRECTORY => return error.NotDir,
1303 // This can happen if the directory has 'List folder contents' permission set to 'Deny'
1304 // and the directory is trying to be opened for iteration.
1305 .ACCESS_DENIED => return error.AccessDenied,
1306 .INVALID_PARAMETER => |err| return w.statusBug(err),
1307 else => return w.unexpectedStatus(rc),
1308 }
1309 }
1310}
1311
1312fn dirMakeOpenPathWasi(
1313 userdata: ?*anyopaque,
1314 dir: Io.Dir,
1315 sub_path: []const u8,
1316 options: Io.Dir.OpenOptions,
1317) Io.Dir.MakeOpenPathError!Io.Dir {
1318 const t: *Threaded = @ptrCast(@alignCast(userdata));
1319 const t_io = ioBasic(t);
1320 return dirOpenDirWasi(t, dir, sub_path, options) catch |err| switch (err) {
1321 error.FileNotFound => {
1322 try dir.makePath(t_io, sub_path);
1323 return dirOpenDirWasi(t, dir, sub_path, options);
1324 },
1325 else => |e| return e,
1326 };
1327}
1328
1329fn dirStat(userdata: ?*anyopaque, dir: Io.Dir) Io.Dir.StatError!Io.Dir.Stat {
1330 const t: *Threaded = @ptrCast(@alignCast(userdata));
1331 try t.checkCancel();
1332
1333 _ = dir;
1334 @panic("TODO implement dirStat");
1335}
1336
1337const dirStatPath = switch (native_os) {
1338 .linux => dirStatPathLinux,
1339 .windows => dirStatPathWindows,
1340 .wasi => dirStatPathWasi,
1341 else => dirStatPathPosix,
1342};
1343
1344fn dirStatPathLinux(
1345 userdata: ?*anyopaque,
1346 dir: Io.Dir,
1347 sub_path: []const u8,
1348 options: Io.Dir.StatPathOptions,
1349) Io.Dir.StatPathError!Io.File.Stat {
1350 const t: *Threaded = @ptrCast(@alignCast(userdata));
1351 const linux = std.os.linux;
1352
1353 var path_buffer: [posix.PATH_MAX]u8 = undefined;
1354 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
1355
1356 const flags: u32 = linux.AT.NO_AUTOMOUNT |
1357 @as(u32, if (!options.follow_symlinks) linux.AT.SYMLINK_NOFOLLOW else 0);
1358
1359 while (true) {
1360 try t.checkCancel();
1361 var statx = std.mem.zeroes(linux.Statx);
1362 const rc = linux.statx(
1363 dir.handle,
1364 sub_path_posix,
1365 flags,
1366 linux.STATX_INO | linux.STATX_SIZE | linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME,
1367 &statx,
1368 );
1369 switch (linux.errno(rc)) {
1370 .SUCCESS => return statFromLinux(&statx),
1371 .INTR => continue,
1372 .CANCELED => return error.Canceled,
1373
1374 .ACCES => return error.AccessDenied,
1375 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1376 .FAULT => |err| return errnoBug(err),
1377 .INVAL => |err| return errnoBug(err),
1378 .LOOP => return error.SymLinkLoop,
1379 .NAMETOOLONG => |err| return errnoBug(err), // Handled by pathToPosix() above.
1380 .NOENT => return error.FileNotFound,
1381 .NOTDIR => return error.NotDir,
1382 .NOMEM => return error.SystemResources,
1383 else => |err| return posix.unexpectedErrno(err),
1384 }
1385 }
1386}
1387
1388fn dirStatPathPosix(
1389 userdata: ?*anyopaque,
1390 dir: Io.Dir,
1391 sub_path: []const u8,
1392 options: Io.Dir.StatPathOptions,
1393) Io.Dir.StatPathError!Io.File.Stat {
1394 const t: *Threaded = @ptrCast(@alignCast(userdata));
1395
1396 var path_buffer: [posix.PATH_MAX]u8 = undefined;
1397 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
1398
1399 const flags: u32 = if (!options.follow_symlinks) posix.AT.SYMLINK_NOFOLLOW else 0;
1400
1401 while (true) {
1402 try t.checkCancel();
1403 var stat = std.mem.zeroes(posix.Stat);
1404 switch (posix.errno(fstatat_sym(dir.handle, sub_path_posix, &stat, flags))) {
1405 .SUCCESS => return statFromPosix(&stat),
1406 .INTR => continue,
1407 .CANCELED => return error.Canceled,
1408
1409 .INVAL => |err| return errnoBug(err),
1410 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1411 .NOMEM => return error.SystemResources,
1412 .ACCES => return error.AccessDenied,
1413 .PERM => return error.PermissionDenied,
1414 .FAULT => |err| return errnoBug(err),
1415 .NAMETOOLONG => return error.NameTooLong,
1416 .LOOP => return error.SymLinkLoop,
1417 .NOENT => return error.FileNotFound,
1418 .NOTDIR => return error.FileNotFound,
1419 .ILSEQ => return error.BadPathName,
1420 else => |err| return posix.unexpectedErrno(err),
1421 }
1422 }
1423}
1424
1425fn dirStatPathWindows(
1426 userdata: ?*anyopaque,
1427 dir: Io.Dir,
1428 sub_path: []const u8,
1429 options: Io.Dir.StatPathOptions,
1430) Io.Dir.StatPathError!Io.File.Stat {
1431 const t: *Threaded = @ptrCast(@alignCast(userdata));
1432 const file = try dirOpenFileWindows(t, dir, sub_path, .{
1433 .follow_symlinks = options.follow_symlinks,
1434 });
1435 defer windows.CloseHandle(file.handle);
1436 return fileStatWindows(t, file);
1437}
1438
1439fn dirStatPathWasi(
1440 userdata: ?*anyopaque,
1441 dir: Io.Dir,
1442 sub_path: []const u8,
1443 options: Io.Dir.StatPathOptions,
1444) Io.Dir.StatPathError!Io.File.Stat {
1445 if (builtin.link_libc) return dirStatPathPosix(userdata, dir, sub_path, options);
1446 const t: *Threaded = @ptrCast(@alignCast(userdata));
1447 const wasi = std.os.wasi;
1448 const flags: wasi.lookupflags_t = .{
1449 .SYMLINK_FOLLOW = options.follow_symlinks,
1450 };
1451 var stat: wasi.filestat_t = undefined;
1452 while (true) {
1453 try t.checkCancel();
1454 switch (wasi.path_filestat_get(dir.handle, flags, sub_path.ptr, sub_path.len, &stat)) {
1455 .SUCCESS => return statFromWasi(&stat),
1456 .INTR => continue,
1457 .CANCELED => return error.Canceled,
1458
1459 .INVAL => |err| return errnoBug(err),
1460 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1461 .NOMEM => return error.SystemResources,
1462 .ACCES => return error.AccessDenied,
1463 .FAULT => |err| return errnoBug(err),
1464 .NAMETOOLONG => return error.NameTooLong,
1465 .NOENT => return error.FileNotFound,
1466 .NOTDIR => return error.FileNotFound,
1467 .NOTCAPABLE => return error.AccessDenied,
1468 .ILSEQ => return error.BadPathName,
1469 else => |err| return posix.unexpectedErrno(err),
1470 }
1471 }
1472}
1473
1474const fileStat = switch (native_os) {
1475 .linux => fileStatLinux,
1476 .windows => fileStatWindows,
1477 .wasi => fileStatWasi,
1478 else => fileStatPosix,
1479};
1480
1481fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
1482 const t: *Threaded = @ptrCast(@alignCast(userdata));
1483
1484 if (posix.Stat == void) return error.Streaming;
1485
1486 while (true) {
1487 try t.checkCancel();
1488 var stat = std.mem.zeroes(posix.Stat);
1489 switch (posix.errno(fstat_sym(file.handle, &stat))) {
1490 .SUCCESS => return statFromPosix(&stat),
1491 .INTR => continue,
1492 .CANCELED => return error.Canceled,
1493
1494 .INVAL => |err| return errnoBug(err),
1495 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1496 .NOMEM => return error.SystemResources,
1497 .ACCES => return error.AccessDenied,
1498 else => |err| return posix.unexpectedErrno(err),
1499 }
1500 }
1501}
1502
1503fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
1504 const t: *Threaded = @ptrCast(@alignCast(userdata));
1505 const linux = std.os.linux;
1506 while (true) {
1507 try t.checkCancel();
1508 var statx = std.mem.zeroes(linux.Statx);
1509 const rc = linux.statx(
1510 file.handle,
1511 "",
1512 linux.AT.EMPTY_PATH,
1513 linux.STATX_INO | linux.STATX_SIZE | linux.STATX_TYPE | linux.STATX_MODE | linux.STATX_ATIME | linux.STATX_MTIME | linux.STATX_CTIME,
1514 &statx,
1515 );
1516 switch (linux.errno(rc)) {
1517 .SUCCESS => return statFromLinux(&statx),
1518 .INTR => continue,
1519 .CANCELED => return error.Canceled,
1520
1521 .ACCES => |err| return errnoBug(err),
1522 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1523 .FAULT => |err| return errnoBug(err),
1524 .INVAL => |err| return errnoBug(err),
1525 .LOOP => |err| return errnoBug(err),
1526 .NAMETOOLONG => |err| return errnoBug(err),
1527 .NOENT => |err| return errnoBug(err),
1528 .NOMEM => return error.SystemResources,
1529 .NOTDIR => |err| return errnoBug(err),
1530 else => |err| return posix.unexpectedErrno(err),
1531 }
1532 }
1533}
1534
1535fn fileStatWindows(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
1536 const t: *Threaded = @ptrCast(@alignCast(userdata));
1537 try t.checkCancel();
1538
1539 var io_status_block: windows.IO_STATUS_BLOCK = undefined;
1540 var info: windows.FILE_ALL_INFORMATION = undefined;
1541 const rc = windows.ntdll.NtQueryInformationFile(file.handle, &io_status_block, &info, @sizeOf(windows.FILE_ALL_INFORMATION), .FileAllInformation);
1542 switch (rc) {
1543 .SUCCESS => {},
1544 // Buffer overflow here indicates that there is more information available than was able to be stored in the buffer
1545 // size provided. This is treated as success because the type of variable-length information that this would be relevant for
1546 // (name, volume name, etc) we don't care about.
1547 .BUFFER_OVERFLOW => {},
1548 .INVALID_PARAMETER => unreachable,
1549 .ACCESS_DENIED => return error.AccessDenied,
1550 else => return windows.unexpectedStatus(rc),
1551 }
1552 return .{
1553 .inode = info.InternalInformation.IndexNumber,
1554 .size = @as(u64, @bitCast(info.StandardInformation.EndOfFile)),
1555 .mode = 0,
1556 .kind = if (info.BasicInformation.FileAttributes & windows.FILE_ATTRIBUTE_REPARSE_POINT != 0) reparse_point: {
1557 var tag_info: windows.FILE_ATTRIBUTE_TAG_INFO = undefined;
1558 const tag_rc = windows.ntdll.NtQueryInformationFile(file.handle, &io_status_block, &tag_info, @sizeOf(windows.FILE_ATTRIBUTE_TAG_INFO), .FileAttributeTagInformation);
1559 switch (tag_rc) {
1560 .SUCCESS => {},
1561 // INFO_LENGTH_MISMATCH and ACCESS_DENIED are the only documented possible errors
1562 // https://learn.microsoft.com/en-us/openspecs/windows_protocols/ms-fscc/d295752f-ce89-4b98-8553-266d37c84f0e
1563 .INFO_LENGTH_MISMATCH => unreachable,
1564 .ACCESS_DENIED => return error.AccessDenied,
1565 else => return windows.unexpectedStatus(rc),
1566 }
1567 if (tag_info.ReparseTag & windows.reparse_tag_name_surrogate_bit != 0) {
1568 break :reparse_point .sym_link;
1569 }
1570 // Unknown reparse point
1571 break :reparse_point .unknown;
1572 } else if (info.BasicInformation.FileAttributes & windows.FILE_ATTRIBUTE_DIRECTORY != 0)
1573 .directory
1574 else
1575 .file,
1576 .atime = windows.fromSysTime(info.BasicInformation.LastAccessTime),
1577 .mtime = windows.fromSysTime(info.BasicInformation.LastWriteTime),
1578 .ctime = windows.fromSysTime(info.BasicInformation.ChangeTime),
1579 };
1580}
1581
1582fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
1583 if (builtin.link_libc) return fileStatPosix(userdata, file);
1584 const t: *Threaded = @ptrCast(@alignCast(userdata));
1585 while (true) {
1586 try t.checkCancel();
1587 var stat: std.os.wasi.filestat_t = undefined;
1588 switch (std.os.wasi.fd_filestat_get(file.handle, &stat)) {
1589 .SUCCESS => return statFromWasi(&stat),
1590 .INTR => continue,
1591 .CANCELED => return error.Canceled,
1592
1593 .INVAL => |err| return errnoBug(err),
1594 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1595 .NOMEM => return error.SystemResources,
1596 .ACCES => return error.AccessDenied,
1597 .NOTCAPABLE => return error.AccessDenied,
1598 else => |err| return posix.unexpectedErrno(err),
1599 }
1600 }
1601}
1602
1603const dirAccess = switch (native_os) {
1604 .windows => dirAccessWindows,
1605 .wasi => dirAccessWasi,
1606 else => dirAccessPosix,
1607};
1608
1609fn dirAccessPosix(
1610 userdata: ?*anyopaque,
1611 dir: Io.Dir,
1612 sub_path: []const u8,
1613 options: Io.Dir.AccessOptions,
1614) Io.Dir.AccessError!void {
1615 const t: *Threaded = @ptrCast(@alignCast(userdata));
1616
1617 var path_buffer: [posix.PATH_MAX]u8 = undefined;
1618 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
1619
1620 const flags: u32 = @as(u32, if (!options.follow_symlinks) posix.AT.SYMLINK_NOFOLLOW else 0);
1621
1622 const mode: u32 =
1623 @as(u32, if (options.read) posix.R_OK else 0) |
1624 @as(u32, if (options.write) posix.W_OK else 0) |
1625 @as(u32, if (options.execute) posix.X_OK else 0);
1626
1627 while (true) {
1628 try t.checkCancel();
1629 switch (posix.errno(posix.system.faccessat(dir.handle, sub_path_posix, mode, flags))) {
1630 .SUCCESS => return,
1631 .INTR => continue,
1632 .CANCELED => return error.Canceled,
1633
1634 .ACCES => return error.AccessDenied,
1635 .PERM => return error.PermissionDenied,
1636 .ROFS => return error.ReadOnlyFileSystem,
1637 .LOOP => return error.SymLinkLoop,
1638 .TXTBSY => return error.FileBusy,
1639 .NOTDIR => return error.FileNotFound,
1640 .NOENT => return error.FileNotFound,
1641 .NAMETOOLONG => return error.NameTooLong,
1642 .INVAL => |err| return errnoBug(err),
1643 .FAULT => |err| return errnoBug(err),
1644 .IO => return error.InputOutput,
1645 .NOMEM => return error.SystemResources,
1646 .ILSEQ => return error.BadPathName,
1647 else => |err| return posix.unexpectedErrno(err),
1648 }
1649 }
1650}
1651
1652fn dirAccessWasi(
1653 userdata: ?*anyopaque,
1654 dir: Io.Dir,
1655 sub_path: []const u8,
1656 options: Io.Dir.AccessOptions,
1657) Io.Dir.AccessError!void {
1658 if (builtin.link_libc) return dirAccessPosix(userdata, dir, sub_path, options);
1659 const t: *Threaded = @ptrCast(@alignCast(userdata));
1660 const wasi = std.os.wasi;
1661 const flags: wasi.lookupflags_t = .{
1662 .SYMLINK_FOLLOW = options.follow_symlinks,
1663 };
1664 var stat: wasi.filestat_t = undefined;
1665 while (true) {
1666 try t.checkCancel();
1667 switch (wasi.path_filestat_get(dir.handle, flags, sub_path.ptr, sub_path.len, &stat)) {
1668 .SUCCESS => break,
1669 .INTR => continue,
1670 .CANCELED => return error.Canceled,
1671
1672 .INVAL => |err| return errnoBug(err),
1673 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1674 .NOMEM => return error.SystemResources,
1675 .ACCES => return error.AccessDenied,
1676 .FAULT => |err| return errnoBug(err),
1677 .NAMETOOLONG => return error.NameTooLong,
1678 .NOENT => return error.FileNotFound,
1679 .NOTDIR => return error.FileNotFound,
1680 .NOTCAPABLE => return error.AccessDenied,
1681 .ILSEQ => return error.BadPathName,
1682 else => |err| return posix.unexpectedErrno(err),
1683 }
1684 }
1685
1686 if (!options.read and !options.write and !options.execute)
1687 return;
1688
1689 var directory: wasi.fdstat_t = undefined;
1690 if (wasi.fd_fdstat_get(dir.handle, &directory) != .SUCCESS)
1691 return error.AccessDenied;
1692
1693 var rights: wasi.rights_t = .{};
1694 if (options.read) {
1695 if (stat.filetype == .DIRECTORY) {
1696 rights.FD_READDIR = true;
1697 } else {
1698 rights.FD_READ = true;
1699 }
1700 }
1701 if (options.write)
1702 rights.FD_WRITE = true;
1703
1704 // No validation for execution.
1705
1706 // https://github.com/ziglang/zig/issues/18882
1707 const rights_int: u64 = @bitCast(rights);
1708 const inheriting_int: u64 = @bitCast(directory.fs_rights_inheriting);
1709 if ((rights_int & inheriting_int) != rights_int)
1710 return error.AccessDenied;
1711}
1712
1713fn dirAccessWindows(
1714 userdata: ?*anyopaque,
1715 dir: Io.Dir,
1716 sub_path: []const u8,
1717 options: Io.Dir.AccessOptions,
1718) Io.Dir.AccessError!void {
1719 const t: *Threaded = @ptrCast(@alignCast(userdata));
1720 try t.checkCancel();
1721
1722 _ = options; // TODO
1723
1724 const sub_path_w_array = try windows.sliceToPrefixedFileW(dir.handle, sub_path);
1725 const sub_path_w = sub_path_w_array.span();
1726
1727 if (sub_path_w[0] == '.' and sub_path_w[1] == 0) return;
1728 if (sub_path_w[0] == '.' and sub_path_w[1] == '.' and sub_path_w[2] == 0) return;
1729
1730 const path_len_bytes = std.math.cast(u16, std.mem.sliceTo(sub_path_w, 0).len * 2) orelse
1731 return error.NameTooLong;
1732 var nt_name: windows.UNICODE_STRING = .{
1733 .Length = path_len_bytes,
1734 .MaximumLength = path_len_bytes,
1735 .Buffer = @constCast(sub_path_w.ptr),
1736 };
1737 var attr = windows.OBJECT_ATTRIBUTES{
1738 .Length = @sizeOf(windows.OBJECT_ATTRIBUTES),
1739 .RootDirectory = if (std.fs.path.isAbsoluteWindowsWtf16(sub_path_w)) null else dir.handle,
1740 .Attributes = 0, // Note we do not use OBJ_CASE_INSENSITIVE here.
1741 .ObjectName = &nt_name,
1742 .SecurityDescriptor = null,
1743 .SecurityQualityOfService = null,
1744 };
1745 var basic_info: windows.FILE_BASIC_INFORMATION = undefined;
1746 switch (windows.ntdll.NtQueryAttributesFile(&attr, &basic_info)) {
1747 .SUCCESS => return,
1748 .OBJECT_NAME_NOT_FOUND => return error.FileNotFound,
1749 .OBJECT_PATH_NOT_FOUND => return error.FileNotFound,
1750 .OBJECT_NAME_INVALID => |err| return windows.statusBug(err),
1751 .INVALID_PARAMETER => |err| return windows.statusBug(err),
1752 .ACCESS_DENIED => return error.AccessDenied,
1753 .OBJECT_PATH_SYNTAX_BAD => |err| return windows.statusBug(err),
1754 else => |rc| return windows.unexpectedStatus(rc),
1755 }
1756}
1757
1758const dirCreateFile = switch (native_os) {
1759 .windows => dirCreateFileWindows,
1760 .wasi => dirCreateFileWasi,
1761 else => dirCreateFilePosix,
1762};
1763
1764fn dirCreateFilePosix(
1765 userdata: ?*anyopaque,
1766 dir: Io.Dir,
1767 sub_path: []const u8,
1768 flags: Io.File.CreateFlags,
1769) Io.File.OpenError!Io.File {
1770 const t: *Threaded = @ptrCast(@alignCast(userdata));
1771
1772 var path_buffer: [posix.PATH_MAX]u8 = undefined;
1773 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
1774
1775 var os_flags: posix.O = .{
1776 .ACCMODE = if (flags.read) .RDWR else .WRONLY,
1777 .CREAT = true,
1778 .TRUNC = flags.truncate,
1779 .EXCL = flags.exclusive,
1780 };
1781 if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
1782 if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
1783
1784 // Use the O locking flags if the os supports them to acquire the lock
1785 // atomically. Note that the NONBLOCK flag is removed after the openat()
1786 // call is successful.
1787 if (have_flock_open_flags) switch (flags.lock) {
1788 .none => {},
1789 .shared => {
1790 os_flags.SHLOCK = true;
1791 os_flags.NONBLOCK = flags.lock_nonblocking;
1792 },
1793 .exclusive => {
1794 os_flags.EXLOCK = true;
1795 os_flags.NONBLOCK = flags.lock_nonblocking;
1796 },
1797 };
1798
1799 const fd: posix.fd_t = while (true) {
1800 try t.checkCancel();
1801 const rc = openat_sym(dir.handle, sub_path_posix, os_flags, flags.mode);
1802 switch (posix.errno(rc)) {
1803 .SUCCESS => break @intCast(rc),
1804 .INTR => continue,
1805 .CANCELED => return error.Canceled,
1806
1807 .FAULT => |err| return errnoBug(err),
1808 .INVAL => return error.BadPathName,
1809 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1810 .ACCES => return error.AccessDenied,
1811 .FBIG => return error.FileTooBig,
1812 .OVERFLOW => return error.FileTooBig,
1813 .ISDIR => return error.IsDir,
1814 .LOOP => return error.SymLinkLoop,
1815 .MFILE => return error.ProcessFdQuotaExceeded,
1816 .NAMETOOLONG => return error.NameTooLong,
1817 .NFILE => return error.SystemFdQuotaExceeded,
1818 .NODEV => return error.NoDevice,
1819 .NOENT => return error.FileNotFound,
1820 .SRCH => return error.ProcessNotFound,
1821 .NOMEM => return error.SystemResources,
1822 .NOSPC => return error.NoSpaceLeft,
1823 .NOTDIR => return error.NotDir,
1824 .PERM => return error.PermissionDenied,
1825 .EXIST => return error.PathAlreadyExists,
1826 .BUSY => return error.DeviceBusy,
1827 .OPNOTSUPP => return error.FileLocksNotSupported,
1828 .AGAIN => return error.WouldBlock,
1829 .TXTBSY => return error.FileBusy,
1830 .NXIO => return error.NoDevice,
1831 .ILSEQ => return error.BadPathName,
1832 else => |err| return posix.unexpectedErrno(err),
1833 }
1834 };
1835 errdefer posix.close(fd);
1836
1837 if (have_flock and !have_flock_open_flags and flags.lock != .none) {
1838 const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
1839 const lock_flags = switch (flags.lock) {
1840 .none => unreachable,
1841 .shared => posix.LOCK.SH | lock_nonblocking,
1842 .exclusive => posix.LOCK.EX | lock_nonblocking,
1843 };
1844 while (true) {
1845 try t.checkCancel();
1846 switch (posix.errno(posix.system.flock(fd, lock_flags))) {
1847 .SUCCESS => break,
1848 .INTR => continue,
1849 .CANCELED => return error.Canceled,
1850
1851 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1852 .INVAL => |err| return errnoBug(err), // invalid parameters
1853 .NOLCK => return error.SystemResources,
1854 .AGAIN => return error.WouldBlock,
1855 .OPNOTSUPP => return error.FileLocksNotSupported,
1856 else => |err| return posix.unexpectedErrno(err),
1857 }
1858 }
1859 }
1860
1861 if (have_flock_open_flags and flags.lock_nonblocking) {
1862 var fl_flags: usize = while (true) {
1863 try t.checkCancel();
1864 const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
1865 switch (posix.errno(rc)) {
1866 .SUCCESS => break @intCast(rc),
1867 .INTR => continue,
1868 .CANCELED => return error.Canceled,
1869 else => |err| return posix.unexpectedErrno(err),
1870 }
1871 };
1872 fl_flags |= @as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
1873 while (true) {
1874 try t.checkCancel();
1875 switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
1876 .SUCCESS => break,
1877 .INTR => continue,
1878 .CANCELED => return error.Canceled,
1879 else => |err| return posix.unexpectedErrno(err),
1880 }
1881 }
1882 }
1883
1884 return .{ .handle = fd };
1885}
1886
1887fn dirCreateFileWindows(
1888 userdata: ?*anyopaque,
1889 dir: Io.Dir,
1890 sub_path: []const u8,
1891 flags: Io.File.CreateFlags,
1892) Io.File.OpenError!Io.File {
1893 const w = windows;
1894 const t: *Threaded = @ptrCast(@alignCast(userdata));
1895 try t.checkCancel();
1896
1897 const sub_path_w_array = try w.sliceToPrefixedFileW(dir.handle, sub_path);
1898 const sub_path_w = sub_path_w_array.span();
1899
1900 const read_flag = if (flags.read) @as(u32, w.GENERIC_READ) else 0;
1901 const handle = try w.OpenFile(sub_path_w, .{
1902 .dir = dir.handle,
1903 .access_mask = w.SYNCHRONIZE | w.GENERIC_WRITE | read_flag,
1904 .creation = if (flags.exclusive)
1905 @as(u32, w.FILE_CREATE)
1906 else if (flags.truncate)
1907 @as(u32, w.FILE_OVERWRITE_IF)
1908 else
1909 @as(u32, w.FILE_OPEN_IF),
1910 });
1911 errdefer w.CloseHandle(handle);
1912 var io_status_block: w.IO_STATUS_BLOCK = undefined;
1913 const range_off: w.LARGE_INTEGER = 0;
1914 const range_len: w.LARGE_INTEGER = 1;
1915 const exclusive = switch (flags.lock) {
1916 .none => return .{ .handle = handle },
1917 .shared => false,
1918 .exclusive => true,
1919 };
1920 try w.LockFile(
1921 handle,
1922 null,
1923 null,
1924 null,
1925 &io_status_block,
1926 &range_off,
1927 &range_len,
1928 null,
1929 @intFromBool(flags.lock_nonblocking),
1930 @intFromBool(exclusive),
1931 );
1932 return .{ .handle = handle };
1933}
1934
1935fn dirCreateFileWasi(
1936 userdata: ?*anyopaque,
1937 dir: Io.Dir,
1938 sub_path: []const u8,
1939 flags: Io.File.CreateFlags,
1940) Io.File.OpenError!Io.File {
1941 const t: *Threaded = @ptrCast(@alignCast(userdata));
1942 const wasi = std.os.wasi;
1943 const lookup_flags: wasi.lookupflags_t = .{};
1944 const oflags: wasi.oflags_t = .{
1945 .CREAT = true,
1946 .TRUNC = flags.truncate,
1947 .EXCL = flags.exclusive,
1948 };
1949 const fdflags: wasi.fdflags_t = .{};
1950 const base: wasi.rights_t = .{
1951 .FD_READ = flags.read,
1952 .FD_WRITE = true,
1953 .FD_DATASYNC = true,
1954 .FD_SEEK = true,
1955 .FD_TELL = true,
1956 .FD_FDSTAT_SET_FLAGS = true,
1957 .FD_SYNC = true,
1958 .FD_ALLOCATE = true,
1959 .FD_ADVISE = true,
1960 .FD_FILESTAT_SET_TIMES = true,
1961 .FD_FILESTAT_SET_SIZE = true,
1962 .FD_FILESTAT_GET = true,
1963 // POLL_FD_READWRITE only grants extra rights if the corresponding FD_READ and/or
1964 // FD_WRITE is also set.
1965 .POLL_FD_READWRITE = true,
1966 };
1967 const inheriting: wasi.rights_t = .{};
1968 var fd: posix.fd_t = undefined;
1969 while (true) {
1970 try t.checkCancel();
1971 switch (wasi.path_open(dir.handle, lookup_flags, sub_path.ptr, sub_path.len, oflags, base, inheriting, fdflags, &fd)) {
1972 .SUCCESS => return .{ .handle = fd },
1973 .INTR => continue,
1974 .CANCELED => return error.Canceled,
1975
1976 .FAULT => |err| return errnoBug(err),
1977 .INVAL => return error.BadPathName,
1978 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1979 .ACCES => return error.AccessDenied,
1980 .FBIG => return error.FileTooBig,
1981 .OVERFLOW => return error.FileTooBig,
1982 .ISDIR => return error.IsDir,
1983 .LOOP => return error.SymLinkLoop,
1984 .MFILE => return error.ProcessFdQuotaExceeded,
1985 .NAMETOOLONG => return error.NameTooLong,
1986 .NFILE => return error.SystemFdQuotaExceeded,
1987 .NODEV => return error.NoDevice,
1988 .NOENT => return error.FileNotFound,
1989 .NOMEM => return error.SystemResources,
1990 .NOSPC => return error.NoSpaceLeft,
1991 .NOTDIR => return error.NotDir,
1992 .PERM => return error.PermissionDenied,
1993 .EXIST => return error.PathAlreadyExists,
1994 .BUSY => return error.DeviceBusy,
1995 .NOTCAPABLE => return error.AccessDenied,
1996 .ILSEQ => return error.BadPathName,
1997 else => |err| return posix.unexpectedErrno(err),
1998 }
1999 }
2000}
2001
2002const dirOpenFile = switch (native_os) {
2003 .windows => dirOpenFileWindows,
2004 .wasi => dirOpenFileWasi,
2005 else => dirOpenFilePosix,
2006};
2007
2008fn dirOpenFilePosix(
2009 userdata: ?*anyopaque,
2010 dir: Io.Dir,
2011 sub_path: []const u8,
2012 flags: Io.File.OpenFlags,
2013) Io.File.OpenError!Io.File {
2014 const t: *Threaded = @ptrCast(@alignCast(userdata));
2015
2016 var path_buffer: [posix.PATH_MAX]u8 = undefined;
2017 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
2018
2019 var os_flags: posix.O = switch (native_os) {
2020 .wasi => .{
2021 .read = flags.mode != .write_only,
2022 .write = flags.mode != .read_only,
2023 },
2024 else => .{
2025 .ACCMODE = switch (flags.mode) {
2026 .read_only => .RDONLY,
2027 .write_only => .WRONLY,
2028 .read_write => .RDWR,
2029 },
2030 },
2031 };
2032 if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
2033 if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
2034 if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty;
2035
2036 // Use the O locking flags if the os supports them to acquire the lock
2037 // atomically. Note that the NONBLOCK flag is removed after the openat()
2038 // call is successful.
2039 if (have_flock_open_flags) switch (flags.lock) {
2040 .none => {},
2041 .shared => {
2042 os_flags.SHLOCK = true;
2043 os_flags.NONBLOCK = flags.lock_nonblocking;
2044 },
2045 .exclusive => {
2046 os_flags.EXLOCK = true;
2047 os_flags.NONBLOCK = flags.lock_nonblocking;
2048 },
2049 };
2050
2051 const fd: posix.fd_t = while (true) {
2052 try t.checkCancel();
2053 const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0));
2054 switch (posix.errno(rc)) {
2055 .SUCCESS => break @intCast(rc),
2056 .INTR => continue,
2057 .CANCELED => return error.Canceled,
2058
2059 .FAULT => |err| return errnoBug(err),
2060 .INVAL => return error.BadPathName,
2061 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2062 .ACCES => return error.AccessDenied,
2063 .FBIG => return error.FileTooBig,
2064 .OVERFLOW => return error.FileTooBig,
2065 .ISDIR => return error.IsDir,
2066 .LOOP => return error.SymLinkLoop,
2067 .MFILE => return error.ProcessFdQuotaExceeded,
2068 .NAMETOOLONG => return error.NameTooLong,
2069 .NFILE => return error.SystemFdQuotaExceeded,
2070 .NODEV => return error.NoDevice,
2071 .NOENT => return error.FileNotFound,
2072 .SRCH => return error.ProcessNotFound,
2073 .NOMEM => return error.SystemResources,
2074 .NOSPC => return error.NoSpaceLeft,
2075 .NOTDIR => return error.NotDir,
2076 .PERM => return error.PermissionDenied,
2077 .EXIST => return error.PathAlreadyExists,
2078 .BUSY => return error.DeviceBusy,
2079 .OPNOTSUPP => return error.FileLocksNotSupported,
2080 .AGAIN => return error.WouldBlock,
2081 .TXTBSY => return error.FileBusy,
2082 .NXIO => return error.NoDevice,
2083 .ILSEQ => return error.BadPathName,
2084 else => |err| return posix.unexpectedErrno(err),
2085 }
2086 };
2087 errdefer posix.close(fd);
2088
2089 if (have_flock and !have_flock_open_flags and flags.lock != .none) {
2090 const lock_nonblocking: i32 = if (flags.lock_nonblocking) posix.LOCK.NB else 0;
2091 const lock_flags = switch (flags.lock) {
2092 .none => unreachable,
2093 .shared => posix.LOCK.SH | lock_nonblocking,
2094 .exclusive => posix.LOCK.EX | lock_nonblocking,
2095 };
2096 while (true) {
2097 try t.checkCancel();
2098 switch (posix.errno(posix.system.flock(fd, lock_flags))) {
2099 .SUCCESS => break,
2100 .INTR => continue,
2101 .CANCELED => return error.Canceled,
2102
2103 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2104 .INVAL => |err| return errnoBug(err), // invalid parameters
2105 .NOLCK => return error.SystemResources,
2106 .AGAIN => return error.WouldBlock,
2107 .OPNOTSUPP => return error.FileLocksNotSupported,
2108 else => |err| return posix.unexpectedErrno(err),
2109 }
2110 }
2111 }
2112
2113 if (have_flock_open_flags and flags.lock_nonblocking) {
2114 var fl_flags: usize = while (true) {
2115 try t.checkCancel();
2116 const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
2117 switch (posix.errno(rc)) {
2118 .SUCCESS => break @intCast(rc),
2119 .INTR => continue,
2120 .CANCELED => return error.Canceled,
2121 else => |err| return posix.unexpectedErrno(err),
2122 }
2123 };
2124 fl_flags |= @as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
2125 while (true) {
2126 try t.checkCancel();
2127 switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
2128 .SUCCESS => break,
2129 .INTR => continue,
2130 .CANCELED => return error.Canceled,
2131 else => |err| return posix.unexpectedErrno(err),
2132 }
2133 }
2134 }
2135
2136 return .{ .handle = fd };
2137}
2138
2139fn dirOpenFileWindows(
2140 userdata: ?*anyopaque,
2141 dir: Io.Dir,
2142 sub_path: []const u8,
2143 flags: Io.File.OpenFlags,
2144) Io.File.OpenError!Io.File {
2145 const t: *Threaded = @ptrCast(@alignCast(userdata));
2146 const sub_path_w_array = try windows.sliceToPrefixedFileW(dir.handle, sub_path);
2147 const sub_path_w = sub_path_w_array.span();
2148 const dir_handle = if (std.fs.path.isAbsoluteWindowsWtf16(sub_path_w)) null else dir.handle;
2149 return dirOpenFileWtf16(t, dir_handle, sub_path_w, flags);
2150}
2151
2152pub fn dirOpenFileWtf16(
2153 t: *Threaded,
2154 dir_handle: ?windows.HANDLE,
2155 sub_path_w: [:0]const u16,
2156 flags: Io.File.OpenFlags,
2157) Io.File.OpenError!Io.File {
2158 if (std.mem.eql(u16, sub_path_w, &.{'.'})) return error.IsDir;
2159 if (std.mem.eql(u16, sub_path_w, &.{ '.', '.' })) return error.IsDir;
2160 const path_len_bytes = std.math.cast(u16, sub_path_w.len * 2) orelse return error.NameTooLong;
2161
2162 const w = windows;
2163
2164 var nt_name: w.UNICODE_STRING = .{
2165 .Length = path_len_bytes,
2166 .MaximumLength = path_len_bytes,
2167 .Buffer = @constCast(sub_path_w.ptr),
2168 };
2169 var attr: w.OBJECT_ATTRIBUTES = .{
2170 .Length = @sizeOf(w.OBJECT_ATTRIBUTES),
2171 .RootDirectory = dir_handle,
2172 .Attributes = 0,
2173 .ObjectName = &nt_name,
2174 .SecurityDescriptor = null,
2175 .SecurityQualityOfService = null,
2176 };
2177 var io_status_block: w.IO_STATUS_BLOCK = undefined;
2178 const blocking_flag: w.ULONG = w.FILE_SYNCHRONOUS_IO_NONALERT;
2179 const file_or_dir_flag: w.ULONG = w.FILE_NON_DIRECTORY_FILE;
2180 // If we're not following symlinks, we need to ensure we don't pass in any
2181 // synchronization flags such as FILE_SYNCHRONOUS_IO_NONALERT.
2182 const create_file_flags: w.ULONG = file_or_dir_flag |
2183 if (flags.follow_symlinks) blocking_flag else w.FILE_OPEN_REPARSE_POINT;
2184
2185 // There are multiple kernel bugs being worked around with retries.
2186 const max_attempts = 13;
2187 var attempt: u5 = 0;
2188
2189 const handle = while (true) {
2190 try t.checkCancel();
2191
2192 var result: w.HANDLE = undefined;
2193 const rc = w.ntdll.NtCreateFile(
2194 &result,
2195 w.SYNCHRONIZE |
2196 (if (flags.isRead()) @as(u32, w.GENERIC_READ) else 0) |
2197 (if (flags.isWrite()) @as(u32, w.GENERIC_WRITE) else 0),
2198 &attr,
2199 &io_status_block,
2200 null,
2201 w.FILE_ATTRIBUTE_NORMAL,
2202 w.FILE_SHARE_WRITE | w.FILE_SHARE_READ | w.FILE_SHARE_DELETE,
2203 w.FILE_OPEN,
2204 create_file_flags,
2205 null,
2206 0,
2207 );
2208 switch (rc) {
2209 .SUCCESS => break result,
2210 .OBJECT_NAME_INVALID => return error.BadPathName,
2211 .OBJECT_NAME_NOT_FOUND => return error.FileNotFound,
2212 .OBJECT_PATH_NOT_FOUND => return error.FileNotFound,
2213 .BAD_NETWORK_PATH => return error.NetworkNotFound, // \\server was not found
2214 .BAD_NETWORK_NAME => return error.NetworkNotFound, // \\server was found but \\server\share wasn't
2215 .NO_MEDIA_IN_DEVICE => return error.NoDevice,
2216 .INVALID_PARAMETER => |err| return w.statusBug(err),
2217 .SHARING_VIOLATION => {
2218 // This occurs if the file attempting to be opened is a running
2219 // executable. However, there's a kernel bug: the error may be
2220 // incorrectly returned for an indeterminate amount of time
2221 // after an executable file is closed. Here we work around the
2222 // kernel bug with retry attempts.
2223 if (max_attempts - attempt == 0) return error.SharingViolation;
2224 _ = w.kernel32.SleepEx((@as(u32, 1) << attempt) >> 1, w.TRUE);
2225 attempt += 1;
2226 continue;
2227 },
2228 .ACCESS_DENIED => return error.AccessDenied,
2229 .PIPE_BUSY => return error.PipeBusy,
2230 .PIPE_NOT_AVAILABLE => return error.NoDevice,
2231 .OBJECT_PATH_SYNTAX_BAD => |err| return w.statusBug(err),
2232 .OBJECT_NAME_COLLISION => return error.PathAlreadyExists,
2233 .FILE_IS_A_DIRECTORY => return error.IsDir,
2234 .NOT_A_DIRECTORY => return error.NotDir,
2235 .USER_MAPPED_FILE => return error.AccessDenied,
2236 .INVALID_HANDLE => |err| return w.statusBug(err),
2237 .DELETE_PENDING => {
2238 // This error means that there *was* a file in this location on
2239 // the file system, but it was deleted. However, the OS is not
2240 // finished with the deletion operation, and so this CreateFile
2241 // call has failed. Here, we simulate the kernel bug being
2242 // fixed by sleeping and retrying until the error goes away.
2243 if (max_attempts - attempt == 0) return error.SharingViolation;
2244 _ = w.kernel32.SleepEx((@as(u32, 1) << attempt) >> 1, w.TRUE);
2245 attempt += 1;
2246 continue;
2247 },
2248 .VIRUS_INFECTED, .VIRUS_DELETED => return error.AntivirusInterference,
2249 else => return w.unexpectedStatus(rc),
2250 }
2251 };
2252 errdefer w.CloseHandle(handle);
2253
2254 const range_off: w.LARGE_INTEGER = 0;
2255 const range_len: w.LARGE_INTEGER = 1;
2256 const exclusive = switch (flags.lock) {
2257 .none => return .{ .handle = handle },
2258 .shared => false,
2259 .exclusive => true,
2260 };
2261 try w.LockFile(
2262 handle,
2263 null,
2264 null,
2265 null,
2266 &io_status_block,
2267 &range_off,
2268 &range_len,
2269 null,
2270 @intFromBool(flags.lock_nonblocking),
2271 @intFromBool(exclusive),
2272 );
2273 return .{ .handle = handle };
2274}
2275
2276fn dirOpenFileWasi(
2277 userdata: ?*anyopaque,
2278 dir: Io.Dir,
2279 sub_path: []const u8,
2280 flags: Io.File.OpenFlags,
2281) Io.File.OpenError!Io.File {
2282 if (builtin.link_libc) return dirOpenFilePosix(userdata, dir, sub_path, flags);
2283 const t: *Threaded = @ptrCast(@alignCast(userdata));
2284 const wasi = std.os.wasi;
2285 var base: std.os.wasi.rights_t = .{};
2286 // POLL_FD_READWRITE only grants extra rights if the corresponding FD_READ and/or FD_WRITE
2287 // is also set.
2288 if (flags.isRead()) {
2289 base.FD_READ = true;
2290 base.FD_TELL = true;
2291 base.FD_SEEK = true;
2292 base.FD_FILESTAT_GET = true;
2293 base.POLL_FD_READWRITE = true;
2294 }
2295 if (flags.isWrite()) {
2296 base.FD_WRITE = true;
2297 base.FD_TELL = true;
2298 base.FD_SEEK = true;
2299 base.FD_DATASYNC = true;
2300 base.FD_FDSTAT_SET_FLAGS = true;
2301 base.FD_SYNC = true;
2302 base.FD_ALLOCATE = true;
2303 base.FD_ADVISE = true;
2304 base.FD_FILESTAT_SET_TIMES = true;
2305 base.FD_FILESTAT_SET_SIZE = true;
2306 base.POLL_FD_READWRITE = true;
2307 }
2308 const lookup_flags: wasi.lookupflags_t = .{};
2309 const oflags: wasi.oflags_t = .{};
2310 const inheriting: wasi.rights_t = .{};
2311 const fdflags: wasi.fdflags_t = .{};
2312 var fd: posix.fd_t = undefined;
2313 while (true) {
2314 try t.checkCancel();
2315 switch (wasi.path_open(dir.handle, lookup_flags, sub_path.ptr, sub_path.len, oflags, base, inheriting, fdflags, &fd)) {
2316 .SUCCESS => return .{ .handle = fd },
2317 .INTR => continue,
2318 .CANCELED => return error.Canceled,
2319
2320 .FAULT => |err| return errnoBug(err),
2321 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2322 .ACCES => return error.AccessDenied,
2323 .FBIG => return error.FileTooBig,
2324 .OVERFLOW => return error.FileTooBig,
2325 .ISDIR => return error.IsDir,
2326 .LOOP => return error.SymLinkLoop,
2327 .MFILE => return error.ProcessFdQuotaExceeded,
2328 .NFILE => return error.SystemFdQuotaExceeded,
2329 .NODEV => return error.NoDevice,
2330 .NOENT => return error.FileNotFound,
2331 .NOMEM => return error.SystemResources,
2332 .NOTDIR => return error.NotDir,
2333 .PERM => return error.PermissionDenied,
2334 .BUSY => return error.DeviceBusy,
2335 .NOTCAPABLE => return error.AccessDenied,
2336 .NAMETOOLONG => return error.NameTooLong,
2337 .INVAL => return error.BadPathName,
2338 .ILSEQ => return error.BadPathName,
2339 else => |err| return posix.unexpectedErrno(err),
2340 }
2341 }
2342}
2343
2344const dirOpenDir = switch (native_os) {
2345 .wasi => dirOpenDirWasi,
2346 .haiku => dirOpenDirHaiku,
2347 else => dirOpenDirPosix,
2348};
2349
2350/// This function is also used for WASI when libc is linked.
2351fn dirOpenDirPosix(
2352 userdata: ?*anyopaque,
2353 dir: Io.Dir,
2354 sub_path: []const u8,
2355 options: Io.Dir.OpenOptions,
2356) Io.Dir.OpenError!Io.Dir {
2357 const t: *Threaded = @ptrCast(@alignCast(userdata));
2358
2359 if (is_windows) {
2360 const sub_path_w = try windows.sliceToPrefixedFileW(dir.handle, sub_path);
2361 return dirOpenDirWindows(t, dir, sub_path_w.span(), options);
2362 }
2363
2364 var path_buffer: [posix.PATH_MAX]u8 = undefined;
2365 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
2366
2367 var flags: posix.O = switch (native_os) {
2368 .wasi => .{
2369 .read = true,
2370 .NOFOLLOW = !options.follow_symlinks,
2371 .DIRECTORY = true,
2372 },
2373 else => .{
2374 .ACCMODE = .RDONLY,
2375 .NOFOLLOW = !options.follow_symlinks,
2376 .DIRECTORY = true,
2377 .CLOEXEC = true,
2378 },
2379 };
2380
2381 if (@hasField(posix.O, "PATH") and !options.iterate)
2382 flags.PATH = true;
2383
2384 while (true) {
2385 try t.checkCancel();
2386 const rc = openat_sym(dir.handle, sub_path_posix, flags, @as(usize, 0));
2387 switch (posix.errno(rc)) {
2388 .SUCCESS => return .{ .handle = @intCast(rc) },
2389 .INTR => continue,
2390 .CANCELED => return error.Canceled,
2391
2392 .FAULT => |err| return errnoBug(err),
2393 .INVAL => return error.BadPathName,
2394 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2395 .ACCES => return error.AccessDenied,
2396 .LOOP => return error.SymLinkLoop,
2397 .MFILE => return error.ProcessFdQuotaExceeded,
2398 .NAMETOOLONG => return error.NameTooLong,
2399 .NFILE => return error.SystemFdQuotaExceeded,
2400 .NODEV => return error.NoDevice,
2401 .NOENT => return error.FileNotFound,
2402 .NOMEM => return error.SystemResources,
2403 .NOTDIR => return error.NotDir,
2404 .PERM => return error.PermissionDenied,
2405 .BUSY => return error.DeviceBusy,
2406 .NXIO => return error.NoDevice,
2407 .ILSEQ => return error.BadPathName,
2408 else => |err| return posix.unexpectedErrno(err),
2409 }
2410 }
2411}
2412
2413fn dirOpenDirHaiku(
2414 userdata: ?*anyopaque,
2415 dir: Io.Dir,
2416 sub_path: []const u8,
2417 options: Io.Dir.OpenOptions,
2418) Io.Dir.OpenError!Io.Dir {
2419 const t: *Threaded = @ptrCast(@alignCast(userdata));
2420
2421 var path_buffer: [posix.PATH_MAX]u8 = undefined;
2422 const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
2423
2424 _ = options;
2425
2426 while (true) {
2427 try t.checkCancel();
2428 const rc = posix.system._kern_open_dir(dir.handle, sub_path_posix);
2429 if (rc >= 0) return .{ .handle = rc };
2430 switch (@as(posix.E, @enumFromInt(rc))) {
2431 .INTR => continue,
2432 .CANCELED => return error.Canceled,
2433 .FAULT => |err| return errnoBug(err),
2434 .INVAL => |err| return errnoBug(err),
2435 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2436 .ACCES => return error.AccessDenied,
2437 .LOOP => return error.SymLinkLoop,
2438 .MFILE => return error.ProcessFdQuotaExceeded,
2439 .NAMETOOLONG => return error.NameTooLong,
2440 .NFILE => return error.SystemFdQuotaExceeded,
2441 .NODEV => return error.NoDevice,
2442 .NOENT => return error.FileNotFound,
2443 .NOMEM => return error.SystemResources,
2444 .NOTDIR => return error.NotDir,
2445 .PERM => return error.PermissionDenied,
2446 .BUSY => return error.DeviceBusy,
2447 else => |err| return posix.unexpectedErrno(err),
2448 }
2449 }
2450}
2451
2452pub fn dirOpenDirWindows(
2453 t: *Io.Threaded,
2454 dir: Io.Dir,
2455 sub_path_w: [:0]const u16,
2456 options: Io.Dir.OpenOptions,
2457) Io.Dir.OpenError!Io.Dir {
2458 const w = windows;
2459 // TODO remove some of these flags if options.access_sub_paths is false
2460 const base_flags = w.STANDARD_RIGHTS_READ | w.FILE_READ_ATTRIBUTES | w.FILE_READ_EA |
2461 w.SYNCHRONIZE | w.FILE_TRAVERSE;
2462 const access_mask: u32 = if (options.iterate) base_flags | w.FILE_LIST_DIRECTORY else base_flags;
2463
2464 const path_len_bytes: u16 = @intCast(sub_path_w.len * 2);
2465 var nt_name: w.UNICODE_STRING = .{
2466 .Length = path_len_bytes,
2467 .MaximumLength = path_len_bytes,
2468 .Buffer = @constCast(sub_path_w.ptr),
2469 };
2470 var attr: w.OBJECT_ATTRIBUTES = .{
2471 .Length = @sizeOf(w.OBJECT_ATTRIBUTES),
2472 .RootDirectory = if (std.fs.path.isAbsoluteWindowsWtf16(sub_path_w)) null else dir.handle,
2473 .Attributes = 0, // Note we do not use OBJ_CASE_INSENSITIVE here.
2474 .ObjectName = &nt_name,
2475 .SecurityDescriptor = null,
2476 .SecurityQualityOfService = null,
2477 };
2478 const open_reparse_point: w.DWORD = if (!options.follow_symlinks) w.FILE_OPEN_REPARSE_POINT else 0x0;
2479 var io_status_block: w.IO_STATUS_BLOCK = undefined;
2480 var result: Io.Dir = .{ .handle = undefined };
2481 try t.checkCancel();
2482 const rc = w.ntdll.NtCreateFile(
2483 &result.handle,
2484 access_mask,
2485 &attr,
2486 &io_status_block,
2487 null,
2488 w.FILE_ATTRIBUTE_NORMAL,
2489 w.FILE_SHARE_READ | w.FILE_SHARE_WRITE | w.FILE_SHARE_DELETE,
2490 w.FILE_OPEN,
2491 w.FILE_DIRECTORY_FILE | w.FILE_SYNCHRONOUS_IO_NONALERT | w.FILE_OPEN_FOR_BACKUP_INTENT | open_reparse_point,
2492 null,
2493 0,
2494 );
2495
2496 switch (rc) {
2497 .SUCCESS => return result,
2498 .OBJECT_NAME_INVALID => return error.BadPathName,
2499 .OBJECT_NAME_NOT_FOUND => return error.FileNotFound,
2500 .OBJECT_NAME_COLLISION => |err| return w.statusBug(err),
2501 .OBJECT_PATH_NOT_FOUND => return error.FileNotFound,
2502 .NOT_A_DIRECTORY => return error.NotDir,
2503 // This can happen if the directory has 'List folder contents' permission set to 'Deny'
2504 // and the directory is trying to be opened for iteration.
2505 .ACCESS_DENIED => return error.AccessDenied,
2506 .INVALID_PARAMETER => |err| return w.statusBug(err),
2507 else => return w.unexpectedStatus(rc),
2508 }
2509}
2510
2511const MakeOpenDirAccessMaskWOptions = struct {
2512 no_follow: bool,
2513 create_disposition: u32,
2514};
2515
2516fn dirClose(userdata: ?*anyopaque, dir: Io.Dir) void {
2517 const t: *Threaded = @ptrCast(@alignCast(userdata));
2518 _ = t;
2519 posix.close(dir.handle);
2520}
2521
2522fn dirOpenDirWasi(
2523 userdata: ?*anyopaque,
2524 dir: Io.Dir,
2525 sub_path: []const u8,
2526 options: Io.Dir.OpenOptions,
2527) Io.Dir.OpenError!Io.Dir {
2528 if (builtin.link_libc) return dirOpenDirPosix(userdata, dir, sub_path, options);
2529 const t: *Threaded = @ptrCast(@alignCast(userdata));
2530 const wasi = std.os.wasi;
2531
2532 var base: std.os.wasi.rights_t = .{
2533 .FD_FILESTAT_GET = true,
2534 .FD_FDSTAT_SET_FLAGS = true,
2535 .FD_FILESTAT_SET_TIMES = true,
2536 };
2537 if (options.access_sub_paths) {
2538 base.FD_READDIR = true;
2539 base.PATH_CREATE_DIRECTORY = true;
2540 base.PATH_CREATE_FILE = true;
2541 base.PATH_LINK_SOURCE = true;
2542 base.PATH_LINK_TARGET = true;
2543 base.PATH_OPEN = true;
2544 base.PATH_READLINK = true;
2545 base.PATH_RENAME_SOURCE = true;
2546 base.PATH_RENAME_TARGET = true;
2547 base.PATH_FILESTAT_GET = true;
2548 base.PATH_FILESTAT_SET_SIZE = true;
2549 base.PATH_FILESTAT_SET_TIMES = true;
2550 base.PATH_SYMLINK = true;
2551 base.PATH_REMOVE_DIRECTORY = true;
2552 base.PATH_UNLINK_FILE = true;
2553 }
2554
2555 const lookup_flags: wasi.lookupflags_t = .{ .SYMLINK_FOLLOW = options.follow_symlinks };
2556 const oflags: wasi.oflags_t = .{ .DIRECTORY = true };
2557 const fdflags: wasi.fdflags_t = .{};
2558 var fd: posix.fd_t = undefined;
2559
2560 while (true) {
2561 try t.checkCancel();
2562 switch (wasi.path_open(dir.handle, lookup_flags, sub_path.ptr, sub_path.len, oflags, base, base, fdflags, &fd)) {
2563 .SUCCESS => return .{ .handle = fd },
2564 .INTR => continue,
2565 .CANCELED => return error.Canceled,
2566
2567 .FAULT => |err| return errnoBug(err),
2568 .INVAL => return error.BadPathName,
2569 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2570 .ACCES => return error.AccessDenied,
2571 .LOOP => return error.SymLinkLoop,
2572 .MFILE => return error.ProcessFdQuotaExceeded,
2573 .NAMETOOLONG => return error.NameTooLong,
2574 .NFILE => return error.SystemFdQuotaExceeded,
2575 .NODEV => return error.NoDevice,
2576 .NOENT => return error.FileNotFound,
2577 .NOMEM => return error.SystemResources,
2578 .NOTDIR => return error.NotDir,
2579 .PERM => return error.PermissionDenied,
2580 .BUSY => return error.DeviceBusy,
2581 .NOTCAPABLE => return error.AccessDenied,
2582 .ILSEQ => return error.BadPathName,
2583 else => |err| return posix.unexpectedErrno(err),
2584 }
2585 }
2586}
2587
2588fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
2589 const t: *Threaded = @ptrCast(@alignCast(userdata));
2590 _ = t;
2591 posix.close(file.handle);
2592}
2593
2594const fileReadStreaming = switch (native_os) {
2595 .windows => fileReadStreamingWindows,
2596 else => fileReadStreamingPosix,
2597};
2598
2599fn fileReadStreamingPosix(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.Reader.Error!usize {
2600 const t: *Threaded = @ptrCast(@alignCast(userdata));
2601
2602 var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
2603 var i: usize = 0;
2604 for (data) |buf| {
2605 if (iovecs_buffer.len - i == 0) break;
2606 if (buf.len != 0) {
2607 iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
2608 i += 1;
2609 }
2610 }
2611 const dest = iovecs_buffer[0..i];
2612 assert(dest[0].len > 0);
2613
2614 if (native_os == .wasi and !builtin.link_libc) while (true) {
2615 try t.checkCancel();
2616 var nread: usize = undefined;
2617 switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
2618 .SUCCESS => return nread,
2619 .INTR => continue,
2620 .CANCELED => return error.Canceled,
2621
2622 .INVAL => |err| return errnoBug(err),
2623 .FAULT => |err| return errnoBug(err),
2624 .BADF => return error.NotOpenForReading, // File operation on directory.
2625 .IO => return error.InputOutput,
2626 .ISDIR => return error.IsDir,
2627 .NOBUFS => return error.SystemResources,
2628 .NOMEM => return error.SystemResources,
2629 .NOTCONN => return error.SocketUnconnected,
2630 .CONNRESET => return error.ConnectionResetByPeer,
2631 .TIMEDOUT => return error.Timeout,
2632 .NOTCAPABLE => return error.AccessDenied,
2633 else => |err| return posix.unexpectedErrno(err),
2634 }
2635 };
2636
2637 while (true) {
2638 try t.checkCancel();
2639 const rc = posix.system.readv(file.handle, dest.ptr, @intCast(dest.len));
2640 switch (posix.errno(rc)) {
2641 .SUCCESS => return @intCast(rc),
2642 .INTR => continue,
2643 .CANCELED => return error.Canceled,
2644
2645 .INVAL => |err| return errnoBug(err),
2646 .FAULT => |err| return errnoBug(err),
2647 .SRCH => return error.ProcessNotFound,
2648 .AGAIN => return error.WouldBlock,
2649 .BADF => |err| {
2650 if (native_os == .wasi) return error.NotOpenForReading; // File operation on directory.
2651 return errnoBug(err); // File descriptor used after closed.
2652 },
2653 .IO => return error.InputOutput,
2654 .ISDIR => return error.IsDir,
2655 .NOBUFS => return error.SystemResources,
2656 .NOMEM => return error.SystemResources,
2657 .NOTCONN => return error.SocketUnconnected,
2658 .CONNRESET => return error.ConnectionResetByPeer,
2659 .TIMEDOUT => return error.Timeout,
2660 else => |err| return posix.unexpectedErrno(err),
2661 }
2662 }
2663}
2664
2665fn fileReadStreamingWindows(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.Reader.Error!usize {
2666 const t: *Threaded = @ptrCast(@alignCast(userdata));
2667
2668 const DWORD = windows.DWORD;
2669 var index: usize = 0;
2670 while (data[index].len == 0) index += 1;
2671 const buffer = data[index];
2672 const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
2673
2674 while (true) {
2675 try t.checkCancel();
2676 var n: DWORD = undefined;
2677 if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, null) != 0)
2678 return n;
2679 switch (windows.GetLastError()) {
2680 .IO_PENDING => |err| return windows.errorBug(err),
2681 .OPERATION_ABORTED => continue,
2682 .BROKEN_PIPE => return 0,
2683 .HANDLE_EOF => return 0,
2684 .NETNAME_DELETED => return error.ConnectionResetByPeer,
2685 .LOCK_VIOLATION => return error.LockViolation,
2686 .ACCESS_DENIED => return error.AccessDenied,
2687 .INVALID_HANDLE => return error.NotOpenForReading,
2688 else => |err| return windows.unexpectedError(err),
2689 }
2690 }
2691}
2692
2693fn fileReadPositionalPosix(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize {
2694 const t: *Threaded = @ptrCast(@alignCast(userdata));
2695
2696 if (!have_preadv) @compileError("TODO");
2697
2698 var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
2699 var i: usize = 0;
2700 for (data) |buf| {
2701 if (iovecs_buffer.len - i == 0) break;
2702 if (buf.len != 0) {
2703 iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
2704 i += 1;
2705 }
2706 }
2707 const dest = iovecs_buffer[0..i];
2708 assert(dest[0].len > 0);
2709
2710 if (native_os == .wasi and !builtin.link_libc) while (true) {
2711 try t.checkCancel();
2712 var nread: usize = undefined;
2713 switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) {
2714 .SUCCESS => return nread,
2715 .INTR => continue,
2716 .CANCELED => return error.Canceled,
2717
2718 .INVAL => |err| return errnoBug(err),
2719 .FAULT => |err| return errnoBug(err),
2720 .AGAIN => |err| return errnoBug(err),
2721 .BADF => return error.NotOpenForReading, // File operation on directory.
2722 .IO => return error.InputOutput,
2723 .ISDIR => return error.IsDir,
2724 .NOBUFS => return error.SystemResources,
2725 .NOMEM => return error.SystemResources,
2726 .NOTCONN => return error.SocketUnconnected,
2727 .CONNRESET => return error.ConnectionResetByPeer,
2728 .TIMEDOUT => return error.Timeout,
2729 .NXIO => return error.Unseekable,
2730 .SPIPE => return error.Unseekable,
2731 .OVERFLOW => return error.Unseekable,
2732 .NOTCAPABLE => return error.AccessDenied,
2733 else => |err| return posix.unexpectedErrno(err),
2734 }
2735 };
2736
2737 while (true) {
2738 try t.checkCancel();
2739 const rc = preadv_sym(file.handle, dest.ptr, @intCast(dest.len), @bitCast(offset));
2740 switch (posix.errno(rc)) {
2741 .SUCCESS => return @bitCast(rc),
2742 .INTR => continue,
2743 .CANCELED => return error.Canceled,
2744
2745 .INVAL => |err| return errnoBug(err),
2746 .FAULT => |err| return errnoBug(err),
2747 .SRCH => return error.ProcessNotFound,
2748 .AGAIN => return error.WouldBlock,
2749 .BADF => |err| {
2750 if (native_os == .wasi) return error.NotOpenForReading; // File operation on directory.
2751 return errnoBug(err); // File descriptor used after closed.
2752 },
2753 .IO => return error.InputOutput,
2754 .ISDIR => return error.IsDir,
2755 .NOBUFS => return error.SystemResources,
2756 .NOMEM => return error.SystemResources,
2757 .NOTCONN => return error.SocketUnconnected,
2758 .CONNRESET => return error.ConnectionResetByPeer,
2759 .TIMEDOUT => return error.Timeout,
2760 .NXIO => return error.Unseekable,
2761 .SPIPE => return error.Unseekable,
2762 .OVERFLOW => return error.Unseekable,
2763 else => |err| return posix.unexpectedErrno(err),
2764 }
2765 }
2766}
2767
2768const fileReadPositional = switch (native_os) {
2769 .windows => fileReadPositionalWindows,
2770 else => fileReadPositionalPosix,
2771};
2772
2773fn fileReadPositionalWindows(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize {
2774 const t: *Threaded = @ptrCast(@alignCast(userdata));
2775
2776 const DWORD = windows.DWORD;
2777
2778 var index: usize = 0;
2779 while (data[index].len == 0) index += 1;
2780 const buffer = data[index];
2781 const want_read_count: DWORD = @min(std.math.maxInt(DWORD), buffer.len);
2782
2783 var overlapped: windows.OVERLAPPED = .{
2784 .Internal = 0,
2785 .InternalHigh = 0,
2786 .DUMMYUNIONNAME = .{
2787 .DUMMYSTRUCTNAME = .{
2788 .Offset = @truncate(offset),
2789 .OffsetHigh = @truncate(offset >> 32),
2790 },
2791 },
2792 .hEvent = null,
2793 };
2794
2795 while (true) {
2796 try t.checkCancel();
2797 var n: DWORD = undefined;
2798 if (windows.kernel32.ReadFile(file.handle, buffer.ptr, want_read_count, &n, &overlapped) != 0)
2799 return n;
2800 switch (windows.GetLastError()) {
2801 .IO_PENDING => |err| return windows.errorBug(err),
2802 .OPERATION_ABORTED => continue,
2803 .BROKEN_PIPE => return 0,
2804 .HANDLE_EOF => return 0,
2805 .NETNAME_DELETED => return error.ConnectionResetByPeer,
2806 .LOCK_VIOLATION => return error.LockViolation,
2807 .ACCESS_DENIED => return error.AccessDenied,
2808 .INVALID_HANDLE => return error.NotOpenForReading,
2809 else => |err| return windows.unexpectedError(err),
2810 }
2811 }
2812}
2813
2814fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekError!void {
2815 const t: *Threaded = @ptrCast(@alignCast(userdata));
2816 try t.checkCancel();
2817
2818 _ = file;
2819 _ = offset;
2820 @panic("TODO implement fileSeekBy");
2821}
2822
2823fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekError!void {
2824 const t: *Threaded = @ptrCast(@alignCast(userdata));
2825 const fd = file.handle;
2826
2827 if (native_os == .linux and !builtin.link_libc and @sizeOf(usize) == 4) while (true) {
2828 try t.checkCancel();
2829 var result: u64 = undefined;
2830 switch (posix.errno(posix.system.llseek(fd, offset, &result, posix.SEEK.SET))) {
2831 .SUCCESS => return,
2832 .INTR => continue,
2833 .CANCELED => return error.Canceled,
2834
2835 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2836 .INVAL => return error.Unseekable,
2837 .OVERFLOW => return error.Unseekable,
2838 .SPIPE => return error.Unseekable,
2839 .NXIO => return error.Unseekable,
2840 else => |err| return posix.unexpectedErrno(err),
2841 }
2842 };
2843
2844 if (native_os == .windows) {
2845 try t.checkCancel();
2846 return windows.SetFilePointerEx_BEGIN(fd, offset);
2847 }
2848
2849 if (native_os == .wasi and !builtin.link_libc) while (true) {
2850 try t.checkCancel();
2851 var new_offset: std.os.wasi.filesize_t = undefined;
2852 switch (std.os.wasi.fd_seek(fd, @bitCast(offset), .SET, &new_offset)) {
2853 .SUCCESS => return,
2854 .INTR => continue,
2855 .CANCELED => return error.Canceled,
2856
2857 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2858 .INVAL => return error.Unseekable,
2859 .OVERFLOW => return error.Unseekable,
2860 .SPIPE => return error.Unseekable,
2861 .NXIO => return error.Unseekable,
2862 .NOTCAPABLE => return error.AccessDenied,
2863 else => |err| return posix.unexpectedErrno(err),
2864 }
2865 };
2866
2867 if (posix.SEEK == void) return error.Unseekable;
2868
2869 while (true) {
2870 try t.checkCancel();
2871 switch (posix.errno(lseek_sym(fd, @bitCast(offset), posix.SEEK.SET))) {
2872 .SUCCESS => return,
2873 .INTR => continue,
2874 .CANCELED => return error.Canceled,
2875
2876 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
2877 .INVAL => return error.Unseekable,
2878 .OVERFLOW => return error.Unseekable,
2879 .SPIPE => return error.Unseekable,
2880 .NXIO => return error.Unseekable,
2881 else => |err| return posix.unexpectedErrno(err),
2882 }
2883 }
2884}
2885
2886fn openSelfExe(userdata: ?*anyopaque, flags: Io.File.OpenFlags) Io.File.OpenSelfExeError!Io.File {
2887 const t: *Threaded = @ptrCast(@alignCast(userdata));
2888 switch (native_os) {
2889 .linux, .serenity => return dirOpenFilePosix(t, .{ .handle = posix.AT.FDCWD }, "/proc/self/exe", flags),
2890 .windows => {
2891 // If ImagePathName is a symlink, then it will contain the path of the symlink,
2892 // not the path that the symlink points to. However, because we are opening
2893 // the file, we can let the openFileW call follow the symlink for us.
2894 const image_path_unicode_string = &windows.peb().ProcessParameters.ImagePathName;
2895 const image_path_name = image_path_unicode_string.Buffer.?[0 .. image_path_unicode_string.Length / 2 :0];
2896 const prefixed_path_w = try windows.wToPrefixedFileW(null, image_path_name);
2897 return dirOpenFileWtf16(t, null, prefixed_path_w.span(), flags);
2898 },
2899 else => @panic("TODO implement openSelfExe"),
2900 }
2901}
2902
2903fn fileWritePositional(
2904 userdata: ?*anyopaque,
2905 file: Io.File,
2906 buffer: [][]const u8,
2907 offset: u64,
2908) Io.File.WritePositionalError!usize {
2909 const t: *Threaded = @ptrCast(@alignCast(userdata));
2910 while (true) {
2911 try t.checkCancel();
2912 _ = file;
2913 _ = buffer;
2914 _ = offset;
2915 @panic("TODO implement fileWritePositional");
2916 }
2917}
2918
2919fn fileWriteStreaming(userdata: ?*anyopaque, file: Io.File, buffer: [][]const u8) Io.File.WriteStreamingError!usize {
2920 const t: *Threaded = @ptrCast(@alignCast(userdata));
2921 while (true) {
2922 try t.checkCancel();
2923 _ = file;
2924 _ = buffer;
2925 @panic("TODO implement fileWriteStreaming");
2926 }
2927}
2928
2929fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
2930 const t: *Threaded = @ptrCast(@alignCast(userdata));
2931 _ = t;
2932 const clock_id: posix.clockid_t = clockToPosix(clock);
2933 var tp: posix.timespec = undefined;
2934 switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) {
2935 .SUCCESS => return timestampFromPosix(&tp),
2936 .INVAL => return error.UnsupportedClock,
2937 else => |err| return posix.unexpectedErrno(err),
2938 }
2939}
2940
2941const now = switch (native_os) {
2942 .windows => nowWindows,
2943 .wasi => nowWasi,
2944 else => nowPosix,
2945};
2946
2947fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
2948 const t: *Threaded = @ptrCast(@alignCast(userdata));
2949 _ = t;
2950 switch (clock) {
2951 .real => {
2952 // RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds
2953 // and uses the NTFS/Windows epoch, which is 1601-01-01.
2954 const epoch_ns = std.time.epoch.windows * std.time.ns_per_s;
2955 return .{ .nanoseconds = @as(i96, windows.ntdll.RtlGetSystemTimePrecise()) * 100 + epoch_ns };
2956 },
2957 .awake, .boot => {
2958 // QPC on windows doesn't fail on >= XP/2000 and includes time suspended.
2959 const qpc = windows.QueryPerformanceCounter();
2960 // We don't need to cache QPF as it's internally just a memory read to KUSER_SHARED_DATA
2961 // (a read-only page of info updated and mapped by the kernel to all processes):
2962 // https://docs.microsoft.com/en-us/windows-hardware/drivers/ddi/ntddk/ns-ntddk-kuser_shared_data
2963 // https://www.geoffchappell.com/studies/windows/km/ntoskrnl/inc/api/ntexapi_x/kuser_shared_data/index.htm
2964 const qpf = windows.QueryPerformanceFrequency();
2965
2966 // 10Mhz (1 qpc tick every 100ns) is a common enough QPF value that we can optimize on it.
2967 // https://github.com/microsoft/STL/blob/785143a0c73f030238ef618890fd4d6ae2b3a3a0/stl/inc/chrono#L694-L701
2968 const common_qpf = 10_000_000;
2969 if (qpf == common_qpf) return .{ .nanoseconds = qpc * (std.time.ns_per_s / common_qpf) };
2970
2971 // Convert to ns using fixed point.
2972 const scale = @as(u64, std.time.ns_per_s << 32) / @as(u32, @intCast(qpf));
2973 const result = (@as(u96, qpc) * scale) >> 32;
2974 return .{ .nanoseconds = @intCast(result) };
2975 },
2976 .cpu_process,
2977 .cpu_thread,
2978 => return error.UnsupportedClock,
2979 }
2980}
2981
2982fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
2983 const t: *Threaded = @ptrCast(@alignCast(userdata));
2984 _ = t;
2985 var ns: std.os.wasi.timestamp_t = undefined;
2986 const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns);
2987 if (err != .SUCCESS) return error.Unexpected;
2988 return .fromNanoseconds(ns);
2989}
2990
2991const sleep = switch (native_os) {
2992 .windows => sleepWindows,
2993 .wasi => sleepWasi,
2994 .linux => sleepLinux,
2995 else => sleepPosix,
2996};
2997
2998fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
2999 const t: *Threaded = @ptrCast(@alignCast(userdata));
3000 const clock_id: posix.clockid_t = clockToPosix(switch (timeout) {
3001 .none => .awake,
3002 .duration => |d| d.clock,
3003 .deadline => |d| d.clock,
3004 });
3005 const deadline_nanoseconds: i96 = switch (timeout) {
3006 .none => std.math.maxInt(i96),
3007 .duration => |duration| duration.raw.nanoseconds,
3008 .deadline => |deadline| deadline.raw.nanoseconds,
3009 };
3010 var timespec: posix.timespec = timestampToPosix(deadline_nanoseconds);
3011 while (true) {
3012 try t.checkCancel();
3013 switch (std.os.linux.errno(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) {
3014 .none, .duration => false,
3015 .deadline => true,
3016 } }, ×pec, ×pec))) {
3017 .SUCCESS => return,
3018 .INTR => continue,
3019 .CANCELED => return error.Canceled,
3020 .INVAL => return error.UnsupportedClock,
3021 else => |err| return posix.unexpectedErrno(err),
3022 }
3023 }
3024}
3025
3026fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
3027 const t: *Threaded = @ptrCast(@alignCast(userdata));
3028 const t_io = ioBasic(t);
3029 try t.checkCancel();
3030 const ms = ms: {
3031 const d = (try timeout.toDurationFromNow(t_io)) orelse
3032 break :ms std.math.maxInt(windows.DWORD);
3033 break :ms std.math.lossyCast(windows.DWORD, d.raw.toMilliseconds());
3034 };
3035 // TODO: alertable true with checkCancel in a loop plus deadline
3036 _ = windows.kernel32.SleepEx(ms, windows.FALSE);
3037}
3038
3039fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
3040 const t: *Threaded = @ptrCast(@alignCast(userdata));
3041 const t_io = ioBasic(t);
3042 try t.checkCancel();
3043
3044 const w = std.os.wasi;
3045
3046 const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(t_io)) |d| .{
3047 .id = clockToWasi(d.clock),
3048 .timeout = std.math.lossyCast(u64, d.raw.nanoseconds),
3049 .precision = 0,
3050 .flags = 0,
3051 } else .{
3052 .id = .MONOTONIC,
3053 .timeout = std.math.maxInt(u64),
3054 .precision = 0,
3055 .flags = 0,
3056 };
3057 const in: w.subscription_t = .{
3058 .userdata = 0,
3059 .u = .{
3060 .tag = .CLOCK,
3061 .u = .{ .clock = clock },
3062 },
3063 };
3064 var event: w.event_t = undefined;
3065 var nevents: usize = undefined;
3066 _ = w.poll_oneoff(&in, &event, 1, &nevents);
3067}
3068
3069fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
3070 const t: *Threaded = @ptrCast(@alignCast(userdata));
3071 const t_io = ioBasic(t);
3072 const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type;
3073 const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type;
3074
3075 var timespec: posix.timespec = t: {
3076 const d = (try timeout.toDurationFromNow(t_io)) orelse break :t .{
3077 .sec = std.math.maxInt(sec_type),
3078 .nsec = std.math.maxInt(nsec_type),
3079 };
3080 break :t timestampToPosix(d.raw.toNanoseconds());
3081 };
3082 while (true) {
3083 try t.checkCancel();
3084 switch (posix.errno(posix.system.nanosleep(×pec, ×pec))) {
3085 .INTR => continue,
3086 .CANCELED => return error.Canceled,
3087 else => return, // This prong handles success as well as unexpected errors.
3088 }
3089 }
3090}
3091
3092fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
3093 const t: *Threaded = @ptrCast(@alignCast(userdata));
3094
3095 var reset_event: ResetEvent = .unset;
3096
3097 for (futures, 0..) |future, i| {
3098 const closure: *AsyncClosure = @ptrCast(@alignCast(future));
3099 if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) {
3100 for (futures[0..i]) |cleanup_future| {
3101 const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future));
3102 if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
3103 cleanup_closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event.
3104 }
3105 }
3106 return i;
3107 }
3108 }
3109
3110 try reset_event.wait(t);
3111
3112 var result: ?usize = null;
3113 for (futures, 0..) |future, i| {
3114 const closure: *AsyncClosure = @ptrCast(@alignCast(future));
3115 if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
3116 closure.reset_event.waitUncancelable(); // Ensure no reference to our stack-allocated reset_event.
3117 if (result == null) result = i; // In case multiple are ready, return first.
3118 }
3119 }
3120 return result.?;
3121}
3122
3123fn netListenIpPosix(
3124 userdata: ?*anyopaque,
3125 address: IpAddress,
3126 options: IpAddress.ListenOptions,
3127) IpAddress.ListenError!net.Server {
3128 if (!have_networking) return error.NetworkDown;
3129 const t: *Threaded = @ptrCast(@alignCast(userdata));
3130 const family = posixAddressFamily(&address);
3131 const socket_fd = try openSocketPosix(t, family, .{
3132 .mode = options.mode,
3133 .protocol = options.protocol,
3134 });
3135 errdefer posix.close(socket_fd);
3136
3137 if (options.reuse_address) {
3138 try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
3139 if (@hasDecl(posix.SO, "REUSEPORT"))
3140 try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1);
3141 }
3142
3143 var storage: PosixAddress = undefined;
3144 var addr_len = addressToPosix(&address, &storage);
3145 try posixBind(t, socket_fd, &storage.any, addr_len);
3146
3147 while (true) {
3148 try t.checkCancel();
3149 switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
3150 .SUCCESS => break,
3151 .ADDRINUSE => return error.AddressInUse,
3152 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3153 else => |err| return posix.unexpectedErrno(err),
3154 }
3155 }
3156
3157 try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
3158 return .{
3159 .socket = .{
3160 .handle = socket_fd,
3161 .address = addressFromPosix(&storage),
3162 },
3163 };
3164}
3165
3166fn netListenIpWindows(
3167 userdata: ?*anyopaque,
3168 address: IpAddress,
3169 options: IpAddress.ListenOptions,
3170) IpAddress.ListenError!net.Server {
3171 if (!have_networking) return error.NetworkDown;
3172 const t: *Threaded = @ptrCast(@alignCast(userdata));
3173 const family = posixAddressFamily(&address);
3174 const socket_handle = try openSocketWsa(t, family, .{
3175 .mode = options.mode,
3176 .protocol = options.protocol,
3177 });
3178 errdefer closeSocketWindows(socket_handle);
3179
3180 if (options.reuse_address)
3181 try setSocketOptionWsa(t, socket_handle, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
3182
3183 var storage: WsaAddress = undefined;
3184 var addr_len = addressToWsa(&address, &storage);
3185
3186 while (true) {
3187 try t.checkCancel();
3188 const rc = ws2_32.bind(socket_handle, &storage.any, addr_len);
3189 if (rc != ws2_32.SOCKET_ERROR) break;
3190 switch (ws2_32.WSAGetLastError()) {
3191 .EINTR => continue,
3192 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3193 .NOTINITIALISED => {
3194 try initializeWsa(t);
3195 continue;
3196 },
3197 .EADDRINUSE => return error.AddressInUse,
3198 .EADDRNOTAVAIL => return error.AddressUnavailable,
3199 .ENOTSOCK => |err| return wsaErrorBug(err),
3200 .EFAULT => |err| return wsaErrorBug(err),
3201 .EINVAL => |err| return wsaErrorBug(err),
3202 .ENOBUFS => return error.SystemResources,
3203 .ENETDOWN => return error.NetworkDown,
3204 else => |err| return windows.unexpectedWSAError(err),
3205 }
3206 }
3207
3208 while (true) {
3209 try t.checkCancel();
3210 const rc = ws2_32.listen(socket_handle, options.kernel_backlog);
3211 if (rc != ws2_32.SOCKET_ERROR) break;
3212 switch (ws2_32.WSAGetLastError()) {
3213 .EINTR => continue,
3214 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3215 .NOTINITIALISED => {
3216 try initializeWsa(t);
3217 continue;
3218 },
3219 .ENETDOWN => return error.NetworkDown,
3220 .EADDRINUSE => return error.AddressInUse,
3221 .EISCONN => |err| return wsaErrorBug(err),
3222 .EINVAL => |err| return wsaErrorBug(err),
3223 .EMFILE, .ENOBUFS => return error.SystemResources,
3224 .ENOTSOCK => |err| return wsaErrorBug(err),
3225 .EOPNOTSUPP => |err| return wsaErrorBug(err),
3226 .EINPROGRESS => |err| return wsaErrorBug(err),
3227 else => |err| return windows.unexpectedWSAError(err),
3228 }
3229 }
3230
3231 try wsaGetSockName(t, socket_handle, &storage.any, &addr_len);
3232
3233 return .{
3234 .socket = .{
3235 .handle = socket_handle,
3236 .address = addressFromWsa(&storage),
3237 },
3238 };
3239}
3240
3241fn netListenIpUnavailable(
3242 userdata: ?*anyopaque,
3243 address: IpAddress,
3244 options: IpAddress.ListenOptions,
3245) IpAddress.ListenError!net.Server {
3246 _ = userdata;
3247 _ = address;
3248 _ = options;
3249 return error.NetworkDown;
3250}
3251
3252fn netListenUnixPosix(
3253 userdata: ?*anyopaque,
3254 address: *const net.UnixAddress,
3255 options: net.UnixAddress.ListenOptions,
3256) net.UnixAddress.ListenError!net.Socket.Handle {
3257 if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
3258 const t: *Threaded = @ptrCast(@alignCast(userdata));
3259 const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
3260 error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported,
3261 error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported,
3262 error.SocketModeUnsupported => return error.AddressFamilyUnsupported,
3263 error.OptionUnsupported => return error.Unexpected,
3264 else => |e| return e,
3265 };
3266 errdefer posix.close(socket_fd);
3267
3268 var storage: UnixAddress = undefined;
3269 const addr_len = addressUnixToPosix(address, &storage);
3270 try posixBindUnix(t, socket_fd, &storage.any, addr_len);
3271
3272 while (true) {
3273 try t.checkCancel();
3274 switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
3275 .SUCCESS => break,
3276 .ADDRINUSE => return error.AddressInUse,
3277 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3278 else => |err| return posix.unexpectedErrno(err),
3279 }
3280 }
3281
3282 return socket_fd;
3283}
3284
3285fn netListenUnixWindows(
3286 userdata: ?*anyopaque,
3287 address: *const net.UnixAddress,
3288 options: net.UnixAddress.ListenOptions,
3289) net.UnixAddress.ListenError!net.Socket.Handle {
3290 if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
3291 const t: *Threaded = @ptrCast(@alignCast(userdata));
3292
3293 const socket_handle = openSocketWsa(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
3294 error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported,
3295 else => |e| return e,
3296 };
3297 errdefer closeSocketWindows(socket_handle);
3298
3299 var storage: WsaAddress = undefined;
3300 const addr_len = addressUnixToWsa(address, &storage);
3301
3302 while (true) {
3303 try t.checkCancel();
3304 const rc = ws2_32.bind(socket_handle, &storage.any, addr_len);
3305 if (rc != ws2_32.SOCKET_ERROR) break;
3306 switch (ws2_32.WSAGetLastError()) {
3307 .EINTR => continue,
3308 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3309 .NOTINITIALISED => {
3310 try initializeWsa(t);
3311 continue;
3312 },
3313 .EADDRINUSE => return error.AddressInUse,
3314 .EADDRNOTAVAIL => return error.AddressUnavailable,
3315 .ENOTSOCK => |err| return wsaErrorBug(err),
3316 .EFAULT => |err| return wsaErrorBug(err),
3317 .EINVAL => |err| return wsaErrorBug(err),
3318 .ENOBUFS => return error.SystemResources,
3319 .ENETDOWN => return error.NetworkDown,
3320 else => |err| return windows.unexpectedWSAError(err),
3321 }
3322 }
3323
3324 while (true) {
3325 try t.checkCancel();
3326 const rc = ws2_32.listen(socket_handle, options.kernel_backlog);
3327 if (rc != ws2_32.SOCKET_ERROR) break;
3328 switch (ws2_32.WSAGetLastError()) {
3329 .EINTR => continue,
3330 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3331 .NOTINITIALISED => {
3332 try initializeWsa(t);
3333 continue;
3334 },
3335 .ENETDOWN => return error.NetworkDown,
3336 .EADDRINUSE => return error.AddressInUse,
3337 .EISCONN => |err| return wsaErrorBug(err),
3338 .EINVAL => |err| return wsaErrorBug(err),
3339 .EMFILE, .ENOBUFS => return error.SystemResources,
3340 .ENOTSOCK => |err| return wsaErrorBug(err),
3341 .EOPNOTSUPP => |err| return wsaErrorBug(err),
3342 .EINPROGRESS => |err| return wsaErrorBug(err),
3343 else => |err| return windows.unexpectedWSAError(err),
3344 }
3345 }
3346
3347 return socket_handle;
3348}
3349
3350fn netListenUnixUnavailable(
3351 userdata: ?*anyopaque,
3352 address: *const net.UnixAddress,
3353 options: net.UnixAddress.ListenOptions,
3354) net.UnixAddress.ListenError!net.Socket.Handle {
3355 _ = userdata;
3356 _ = address;
3357 _ = options;
3358 return error.AddressFamilyUnsupported;
3359}
3360
3361fn posixBindUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
3362 while (true) {
3363 try t.checkCancel();
3364 switch (posix.errno(posix.system.bind(fd, addr, addr_len))) {
3365 .SUCCESS => break,
3366 .INTR => continue,
3367 .CANCELED => return error.Canceled,
3368
3369 .ACCES => return error.AccessDenied,
3370 .ADDRINUSE => return error.AddressInUse,
3371 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
3372 .ADDRNOTAVAIL => return error.AddressUnavailable,
3373 .NOMEM => return error.SystemResources,
3374
3375 .LOOP => return error.SymLinkLoop,
3376 .NOENT => return error.FileNotFound,
3377 .NOTDIR => return error.NotDir,
3378 .ROFS => return error.ReadOnlyFileSystem,
3379 .PERM => return error.PermissionDenied,
3380
3381 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3382 .INVAL => |err| return errnoBug(err), // invalid parameters
3383 .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
3384 .FAULT => |err| return errnoBug(err), // invalid `addr` pointer
3385 .NAMETOOLONG => |err| return errnoBug(err),
3386 else => |err| return posix.unexpectedErrno(err),
3387 }
3388 }
3389}
3390
3391fn posixBind(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
3392 while (true) {
3393 try t.checkCancel();
3394 switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
3395 .SUCCESS => break,
3396 .INTR => continue,
3397 .CANCELED => return error.Canceled,
3398
3399 .ADDRINUSE => return error.AddressInUse,
3400 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3401 .INVAL => |err| return errnoBug(err), // invalid parameters
3402 .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
3403 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
3404 .ADDRNOTAVAIL => return error.AddressUnavailable,
3405 .FAULT => |err| return errnoBug(err), // invalid `addr` pointer
3406 .NOMEM => return error.SystemResources,
3407 else => |err| return posix.unexpectedErrno(err),
3408 }
3409 }
3410}
3411
3412fn posixConnect(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
3413 while (true) {
3414 try t.checkCancel();
3415 switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
3416 .SUCCESS => return,
3417 .INTR => continue,
3418 .CANCELED => return error.Canceled,
3419
3420 .ADDRNOTAVAIL => return error.AddressUnavailable,
3421 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
3422 .AGAIN, .INPROGRESS => return error.WouldBlock,
3423 .ALREADY => return error.ConnectionPending,
3424 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3425 .CONNREFUSED => return error.ConnectionRefused,
3426 .CONNRESET => return error.ConnectionResetByPeer,
3427 .FAULT => |err| return errnoBug(err),
3428 .ISCONN => |err| return errnoBug(err),
3429 .HOSTUNREACH => return error.HostUnreachable,
3430 .NETUNREACH => return error.NetworkUnreachable,
3431 .NOTSOCK => |err| return errnoBug(err),
3432 .PROTOTYPE => |err| return errnoBug(err),
3433 .TIMEDOUT => return error.Timeout,
3434 .CONNABORTED => |err| return errnoBug(err),
3435 .ACCES => return error.AccessDenied,
3436 .PERM => |err| return errnoBug(err),
3437 .NOENT => |err| return errnoBug(err),
3438 .NETDOWN => return error.NetworkDown,
3439 else => |err| return posix.unexpectedErrno(err),
3440 }
3441 }
3442}
3443
3444fn posixConnectUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
3445 while (true) {
3446 try t.checkCancel();
3447 switch (posix.errno(posix.system.connect(fd, addr, addr_len))) {
3448 .SUCCESS => return,
3449 .INTR => continue,
3450 .CANCELED => return error.Canceled,
3451
3452 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
3453 .AGAIN => return error.WouldBlock,
3454 .INPROGRESS => return error.WouldBlock,
3455 .ACCES => return error.AccessDenied,
3456
3457 .LOOP => return error.SymLinkLoop,
3458 .NOENT => return error.FileNotFound,
3459 .NOTDIR => return error.NotDir,
3460 .ROFS => return error.ReadOnlyFileSystem,
3461 .PERM => return error.PermissionDenied,
3462
3463 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3464 .CONNABORTED => |err| return errnoBug(err),
3465 .FAULT => |err| return errnoBug(err),
3466 .ISCONN => |err| return errnoBug(err),
3467 .NOTSOCK => |err| return errnoBug(err),
3468 .PROTOTYPE => |err| return errnoBug(err),
3469 else => |err| return posix.unexpectedErrno(err),
3470 }
3471 }
3472}
3473
3474fn posixGetSockName(t: *Threaded, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
3475 while (true) {
3476 try t.checkCancel();
3477 switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
3478 .SUCCESS => break,
3479 .INTR => continue,
3480 .CANCELED => return error.Canceled,
3481
3482 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3483 .FAULT => |err| return errnoBug(err),
3484 .INVAL => |err| return errnoBug(err), // invalid parameters
3485 .NOTSOCK => |err| return errnoBug(err), // always a race condition
3486 .NOBUFS => return error.SystemResources,
3487 else => |err| return posix.unexpectedErrno(err),
3488 }
3489 }
3490}
3491
3492fn wsaGetSockName(t: *Threaded, handle: ws2_32.SOCKET, addr: *ws2_32.sockaddr, addr_len: *i32) !void {
3493 while (true) {
3494 try t.checkCancel();
3495 const rc = ws2_32.getsockname(handle, addr, addr_len);
3496 if (rc != ws2_32.SOCKET_ERROR) break;
3497 switch (ws2_32.WSAGetLastError()) {
3498 .EINTR => continue,
3499 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3500 .NOTINITIALISED => {
3501 try initializeWsa(t);
3502 continue;
3503 },
3504 .ENETDOWN => return error.NetworkDown,
3505 .EFAULT => |err| return wsaErrorBug(err),
3506 .ENOTSOCK => |err| return wsaErrorBug(err),
3507 .EINVAL => |err| return wsaErrorBug(err),
3508 else => |err| return windows.unexpectedWSAError(err),
3509 }
3510 }
3511}
3512
3513fn setSocketOption(t: *Threaded, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
3514 const o: []const u8 = @ptrCast(&option);
3515 while (true) {
3516 try t.checkCancel();
3517 switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
3518 .SUCCESS => return,
3519 .INTR => continue,
3520 .CANCELED => return error.Canceled,
3521
3522 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3523 .NOTSOCK => |err| return errnoBug(err),
3524 .INVAL => |err| return errnoBug(err),
3525 .FAULT => |err| return errnoBug(err),
3526 else => |err| return posix.unexpectedErrno(err),
3527 }
3528 }
3529}
3530
3531fn setSocketOptionWsa(t: *Threaded, socket: Io.net.Socket.Handle, level: i32, opt_name: u32, option: u32) !void {
3532 const o: []const u8 = @ptrCast(&option);
3533 const rc = ws2_32.setsockopt(socket, level, @bitCast(opt_name), o.ptr, @intCast(o.len));
3534 while (true) {
3535 if (rc != ws2_32.SOCKET_ERROR) return;
3536 switch (ws2_32.WSAGetLastError()) {
3537 .EINTR => continue,
3538 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3539 .NOTINITIALISED => {
3540 try initializeWsa(t);
3541 continue;
3542 },
3543 .ENETDOWN => return error.NetworkDown,
3544 .EFAULT => |err| return wsaErrorBug(err),
3545 .ENOTSOCK => |err| return wsaErrorBug(err),
3546 .EINVAL => |err| return wsaErrorBug(err),
3547 else => |err| return windows.unexpectedWSAError(err),
3548 }
3549 }
3550}
3551
3552fn netConnectIpPosix(
3553 userdata: ?*anyopaque,
3554 address: *const IpAddress,
3555 options: IpAddress.ConnectOptions,
3556) IpAddress.ConnectError!net.Stream {
3557 if (!have_networking) return error.NetworkDown;
3558 if (options.timeout != .none) @panic("TODO implement netConnectIpPosix with timeout");
3559 const t: *Threaded = @ptrCast(@alignCast(userdata));
3560 const family = posixAddressFamily(address);
3561 const socket_fd = try openSocketPosix(t, family, .{
3562 .mode = options.mode,
3563 .protocol = options.protocol,
3564 });
3565 errdefer posix.close(socket_fd);
3566 var storage: PosixAddress = undefined;
3567 var addr_len = addressToPosix(address, &storage);
3568 try posixConnect(t, socket_fd, &storage.any, addr_len);
3569 try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
3570 return .{ .socket = .{
3571 .handle = socket_fd,
3572 .address = addressFromPosix(&storage),
3573 } };
3574}
3575
3576fn netConnectIpWindows(
3577 userdata: ?*anyopaque,
3578 address: *const IpAddress,
3579 options: IpAddress.ConnectOptions,
3580) IpAddress.ConnectError!net.Stream {
3581 if (!have_networking) return error.NetworkDown;
3582 if (options.timeout != .none) @panic("TODO implement netConnectIpWindows with timeout");
3583 const t: *Threaded = @ptrCast(@alignCast(userdata));
3584 const family = posixAddressFamily(address);
3585 const socket_handle = try openSocketWsa(t, family, .{
3586 .mode = options.mode,
3587 .protocol = options.protocol,
3588 });
3589 errdefer closeSocketWindows(socket_handle);
3590
3591 var storage: WsaAddress = undefined;
3592 var addr_len = addressToWsa(address, &storage);
3593
3594 while (true) {
3595 const rc = ws2_32.connect(socket_handle, &storage.any, addr_len);
3596 if (rc != ws2_32.SOCKET_ERROR) break;
3597 switch (ws2_32.WSAGetLastError()) {
3598 .EINTR => continue,
3599 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3600 .NOTINITIALISED => {
3601 try initializeWsa(t);
3602 continue;
3603 },
3604
3605 .EADDRNOTAVAIL => return error.AddressUnavailable,
3606 .ECONNREFUSED => return error.ConnectionRefused,
3607 .ECONNRESET => return error.ConnectionResetByPeer,
3608 .ETIMEDOUT => return error.Timeout,
3609 .EHOSTUNREACH => return error.HostUnreachable,
3610 .ENETUNREACH => return error.NetworkUnreachable,
3611 .EFAULT => |err| return wsaErrorBug(err),
3612 .EINVAL => |err| return wsaErrorBug(err),
3613 .EISCONN => |err| return wsaErrorBug(err),
3614 .ENOTSOCK => |err| return wsaErrorBug(err),
3615 .EWOULDBLOCK => return error.WouldBlock,
3616 .EACCES => return error.AccessDenied,
3617 .ENOBUFS => return error.SystemResources,
3618 .EAFNOSUPPORT => return error.AddressFamilyUnsupported,
3619 else => |err| return windows.unexpectedWSAError(err),
3620 }
3621 }
3622
3623 try wsaGetSockName(t, socket_handle, &storage.any, &addr_len);
3624
3625 return .{ .socket = .{
3626 .handle = socket_handle,
3627 .address = addressFromWsa(&storage),
3628 } };
3629}
3630
3631fn netConnectIpUnavailable(
3632 userdata: ?*anyopaque,
3633 address: *const IpAddress,
3634 options: IpAddress.ConnectOptions,
3635) IpAddress.ConnectError!net.Stream {
3636 _ = userdata;
3637 _ = address;
3638 _ = options;
3639 return error.NetworkDown;
3640}
3641
3642fn netConnectUnixPosix(
3643 userdata: ?*anyopaque,
3644 address: *const net.UnixAddress,
3645) net.UnixAddress.ConnectError!net.Socket.Handle {
3646 if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
3647 const t: *Threaded = @ptrCast(@alignCast(userdata));
3648 const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
3649 error.OptionUnsupported => return error.Unexpected,
3650 else => |e| return e,
3651 };
3652 errdefer posix.close(socket_fd);
3653 var storage: UnixAddress = undefined;
3654 const addr_len = addressUnixToPosix(address, &storage);
3655 try posixConnectUnix(t, socket_fd, &storage.any, addr_len);
3656 return socket_fd;
3657}
3658
3659fn netConnectUnixWindows(
3660 userdata: ?*anyopaque,
3661 address: *const net.UnixAddress,
3662) net.UnixAddress.ConnectError!net.Socket.Handle {
3663 if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
3664 const t: *Threaded = @ptrCast(@alignCast(userdata));
3665
3666 const socket_handle = try openSocketWsa(t, posix.AF.UNIX, .{ .mode = .stream });
3667 errdefer closeSocketWindows(socket_handle);
3668 var storage: WsaAddress = undefined;
3669 const addr_len = addressUnixToWsa(address, &storage);
3670
3671 while (true) {
3672 const rc = ws2_32.connect(socket_handle, &storage.any, addr_len);
3673 if (rc != ws2_32.SOCKET_ERROR) break;
3674 switch (ws2_32.WSAGetLastError()) {
3675 .EINTR => continue,
3676 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3677 .NOTINITIALISED => {
3678 try initializeWsa(t);
3679 continue;
3680 },
3681
3682 .ECONNREFUSED => return error.FileNotFound,
3683 .EFAULT => |err| return wsaErrorBug(err),
3684 .EINVAL => |err| return wsaErrorBug(err),
3685 .EISCONN => |err| return wsaErrorBug(err),
3686 .ENOTSOCK => |err| return wsaErrorBug(err),
3687 .EWOULDBLOCK => return error.WouldBlock,
3688 .EACCES => return error.AccessDenied,
3689 .ENOBUFS => return error.SystemResources,
3690 .EAFNOSUPPORT => return error.AddressFamilyUnsupported,
3691 else => |err| return windows.unexpectedWSAError(err),
3692 }
3693 }
3694
3695 return socket_handle;
3696}
3697
3698fn netConnectUnixUnavailable(
3699 userdata: ?*anyopaque,
3700 address: *const net.UnixAddress,
3701) net.UnixAddress.ConnectError!net.Socket.Handle {
3702 _ = userdata;
3703 _ = address;
3704 return error.AddressFamilyUnsupported;
3705}
3706
3707fn netBindIpPosix(
3708 userdata: ?*anyopaque,
3709 address: *const IpAddress,
3710 options: IpAddress.BindOptions,
3711) IpAddress.BindError!net.Socket {
3712 if (!have_networking) return error.NetworkDown;
3713 const t: *Threaded = @ptrCast(@alignCast(userdata));
3714 const family = posixAddressFamily(address);
3715 const socket_fd = try openSocketPosix(t, family, options);
3716 errdefer posix.close(socket_fd);
3717 var storage: PosixAddress = undefined;
3718 var addr_len = addressToPosix(address, &storage);
3719 try posixBind(t, socket_fd, &storage.any, addr_len);
3720 try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
3721 return .{
3722 .handle = socket_fd,
3723 .address = addressFromPosix(&storage),
3724 };
3725}
3726
3727fn netBindIpWindows(
3728 userdata: ?*anyopaque,
3729 address: *const IpAddress,
3730 options: IpAddress.BindOptions,
3731) IpAddress.BindError!net.Socket {
3732 if (!have_networking) return error.NetworkDown;
3733 const t: *Threaded = @ptrCast(@alignCast(userdata));
3734 const family = posixAddressFamily(address);
3735 const socket_handle = try openSocketWsa(t, family, .{
3736 .mode = options.mode,
3737 .protocol = options.protocol,
3738 });
3739 errdefer closeSocketWindows(socket_handle);
3740
3741 var storage: WsaAddress = undefined;
3742 var addr_len = addressToWsa(address, &storage);
3743
3744 while (true) {
3745 try t.checkCancel();
3746 const rc = ws2_32.bind(socket_handle, &storage.any, addr_len);
3747 if (rc != ws2_32.SOCKET_ERROR) break;
3748 switch (ws2_32.WSAGetLastError()) {
3749 .EINTR => continue,
3750 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3751 .NOTINITIALISED => {
3752 try initializeWsa(t);
3753 continue;
3754 },
3755 .EADDRINUSE => return error.AddressInUse,
3756 .EADDRNOTAVAIL => return error.AddressUnavailable,
3757 .ENOTSOCK => |err| return wsaErrorBug(err),
3758 .EFAULT => |err| return wsaErrorBug(err),
3759 .EINVAL => |err| return wsaErrorBug(err),
3760 .ENOBUFS => return error.SystemResources,
3761 .ENETDOWN => return error.NetworkDown,
3762 else => |err| return windows.unexpectedWSAError(err),
3763 }
3764 }
3765
3766 try wsaGetSockName(t, socket_handle, &storage.any, &addr_len);
3767
3768 return .{
3769 .handle = socket_handle,
3770 .address = addressFromWsa(&storage),
3771 };
3772}
3773
3774fn netBindIpUnavailable(
3775 userdata: ?*anyopaque,
3776 address: *const IpAddress,
3777 options: IpAddress.BindOptions,
3778) IpAddress.BindError!net.Socket {
3779 _ = userdata;
3780 _ = address;
3781 _ = options;
3782 return error.NetworkDown;
3783}
3784
3785fn openSocketPosix(
3786 t: *Threaded,
3787 family: posix.sa_family_t,
3788 options: IpAddress.BindOptions,
3789) error{
3790 AddressFamilyUnsupported,
3791 ProtocolUnsupportedBySystem,
3792 ProcessFdQuotaExceeded,
3793 SystemFdQuotaExceeded,
3794 SystemResources,
3795 ProtocolUnsupportedByAddressFamily,
3796 SocketModeUnsupported,
3797 OptionUnsupported,
3798 Unexpected,
3799 Canceled,
3800}!posix.socket_t {
3801 const mode = posixSocketMode(options.mode);
3802 const protocol = posixProtocol(options.protocol);
3803 const socket_fd = while (true) {
3804 try t.checkCancel();
3805 const flags: u32 = mode | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
3806 const socket_rc = posix.system.socket(family, flags, protocol);
3807 switch (posix.errno(socket_rc)) {
3808 .SUCCESS => {
3809 const fd: posix.fd_t = @intCast(socket_rc);
3810 errdefer posix.close(fd);
3811 if (socket_flags_unsupported) while (true) {
3812 try t.checkCancel();
3813 switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
3814 .SUCCESS => break,
3815 .INTR => continue,
3816 .CANCELED => return error.Canceled,
3817 else => |err| return posix.unexpectedErrno(err),
3818 }
3819 };
3820 break fd;
3821 },
3822 .INTR => continue,
3823 .CANCELED => return error.Canceled,
3824
3825 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
3826 .INVAL => return error.ProtocolUnsupportedBySystem,
3827 .MFILE => return error.ProcessFdQuotaExceeded,
3828 .NFILE => return error.SystemFdQuotaExceeded,
3829 .NOBUFS => return error.SystemResources,
3830 .NOMEM => return error.SystemResources,
3831 .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
3832 .PROTOTYPE => return error.SocketModeUnsupported,
3833 else => |err| return posix.unexpectedErrno(err),
3834 }
3835 };
3836 errdefer posix.close(socket_fd);
3837
3838 if (options.ip6_only) {
3839 if (posix.IPV6 == void) return error.OptionUnsupported;
3840 try setSocketOption(t, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
3841 }
3842
3843 return socket_fd;
3844}
3845
3846fn openSocketWsa(t: *Threaded, family: posix.sa_family_t, options: IpAddress.BindOptions) !ws2_32.SOCKET {
3847 const mode = posixSocketMode(options.mode);
3848 const protocol = posixProtocol(options.protocol);
3849 const flags: u32 = ws2_32.WSA_FLAG_OVERLAPPED | ws2_32.WSA_FLAG_NO_HANDLE_INHERIT;
3850 while (true) {
3851 try t.checkCancel();
3852 const rc = ws2_32.WSASocketW(family, @bitCast(mode), @bitCast(protocol), null, 0, flags);
3853 if (rc != ws2_32.INVALID_SOCKET) return rc;
3854 switch (ws2_32.WSAGetLastError()) {
3855 .EINTR => continue,
3856 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3857 .NOTINITIALISED => {
3858 try initializeWsa(t);
3859 continue;
3860 },
3861 .EAFNOSUPPORT => return error.AddressFamilyUnsupported,
3862 .EMFILE => return error.ProcessFdQuotaExceeded,
3863 .ENOBUFS => return error.SystemResources,
3864 .EPROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
3865 else => |err| return windows.unexpectedWSAError(err),
3866 }
3867 }
3868}
3869
3870fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream {
3871 if (!have_networking) return error.NetworkDown;
3872 const t: *Threaded = @ptrCast(@alignCast(userdata));
3873 var storage: PosixAddress = undefined;
3874 var addr_len: posix.socklen_t = @sizeOf(PosixAddress);
3875 const fd = while (true) {
3876 try t.checkCancel();
3877 const rc = if (have_accept4)
3878 posix.system.accept4(listen_fd, &storage.any, &addr_len, posix.SOCK.CLOEXEC)
3879 else
3880 posix.system.accept(listen_fd, &storage.any, &addr_len);
3881 switch (posix.errno(rc)) {
3882 .SUCCESS => {
3883 const fd: posix.fd_t = @intCast(rc);
3884 errdefer posix.close(fd);
3885 if (!have_accept4) while (true) {
3886 try t.checkCancel();
3887 switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
3888 .SUCCESS => break,
3889 .INTR => continue,
3890 .CANCELED => return error.Canceled,
3891 else => |err| return posix.unexpectedErrno(err),
3892 }
3893 };
3894 break fd;
3895 },
3896 .INTR => continue,
3897 .CANCELED => return error.Canceled,
3898
3899 .AGAIN => |err| return errnoBug(err),
3900 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3901 .CONNABORTED => return error.ConnectionAborted,
3902 .FAULT => |err| return errnoBug(err),
3903 .INVAL => return error.SocketNotListening,
3904 .NOTSOCK => |err| return errnoBug(err),
3905 .MFILE => return error.ProcessFdQuotaExceeded,
3906 .NFILE => return error.SystemFdQuotaExceeded,
3907 .NOBUFS => return error.SystemResources,
3908 .NOMEM => return error.SystemResources,
3909 .OPNOTSUPP => |err| return errnoBug(err),
3910 .PROTO => return error.ProtocolFailure,
3911 .PERM => return error.BlockedByFirewall,
3912 else => |err| return posix.unexpectedErrno(err),
3913 }
3914 };
3915 return .{ .socket = .{
3916 .handle = fd,
3917 .address = addressFromPosix(&storage),
3918 } };
3919}
3920
3921fn netAcceptWindows(userdata: ?*anyopaque, listen_handle: net.Socket.Handle) net.Server.AcceptError!net.Stream {
3922 if (!have_networking) return error.NetworkDown;
3923 const t: *Threaded = @ptrCast(@alignCast(userdata));
3924 var storage: WsaAddress = undefined;
3925 var addr_len: i32 = @sizeOf(WsaAddress);
3926 while (true) {
3927 try t.checkCancel();
3928 const rc = ws2_32.accept(listen_handle, &storage.any, &addr_len);
3929 if (rc != ws2_32.INVALID_SOCKET) return .{ .socket = .{
3930 .handle = rc,
3931 .address = addressFromWsa(&storage),
3932 } };
3933 switch (ws2_32.WSAGetLastError()) {
3934 .EINTR => continue,
3935 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
3936 .NOTINITIALISED => {
3937 try initializeWsa(t);
3938 continue;
3939 },
3940 .ECONNRESET => return error.ConnectionAborted,
3941 .EFAULT => |err| return wsaErrorBug(err),
3942 .ENOTSOCK => |err| return wsaErrorBug(err),
3943 .EINVAL => |err| return wsaErrorBug(err),
3944 .EMFILE => return error.ProcessFdQuotaExceeded,
3945 .ENETDOWN => return error.NetworkDown,
3946 .ENOBUFS => return error.SystemResources,
3947 .EOPNOTSUPP => |err| return wsaErrorBug(err),
3948 else => |err| return windows.unexpectedWSAError(err),
3949 }
3950 }
3951}
3952
3953fn netAcceptUnavailable(userdata: ?*anyopaque, listen_handle: net.Socket.Handle) net.Server.AcceptError!net.Stream {
3954 _ = userdata;
3955 _ = listen_handle;
3956 return error.NetworkDown;
3957}
3958
3959fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
3960 if (!have_networking) return error.NetworkDown;
3961 const t: *Threaded = @ptrCast(@alignCast(userdata));
3962
3963 var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
3964 var i: usize = 0;
3965 for (data) |buf| {
3966 if (iovecs_buffer.len - i == 0) break;
3967 if (buf.len != 0) {
3968 iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
3969 i += 1;
3970 }
3971 }
3972 const dest = iovecs_buffer[0..i];
3973 assert(dest[0].len > 0);
3974
3975 if (native_os == .wasi and !builtin.link_libc) while (true) {
3976 try t.checkCancel();
3977 var n: usize = undefined;
3978 switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) {
3979 .SUCCESS => return n,
3980 .INTR => continue,
3981 .CANCELED => return error.Canceled,
3982
3983 .INVAL => |err| return errnoBug(err),
3984 .FAULT => |err| return errnoBug(err),
3985 .AGAIN => |err| return errnoBug(err),
3986 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
3987 .NOBUFS => return error.SystemResources,
3988 .NOMEM => return error.SystemResources,
3989 .NOTCONN => return error.SocketUnconnected,
3990 .CONNRESET => return error.ConnectionResetByPeer,
3991 .TIMEDOUT => return error.Timeout,
3992 .NOTCAPABLE => return error.AccessDenied,
3993 else => |err| return posix.unexpectedErrno(err),
3994 }
3995 };
3996
3997 while (true) {
3998 try t.checkCancel();
3999 const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
4000 switch (posix.errno(rc)) {
4001 .SUCCESS => return @intCast(rc),
4002 .INTR => continue,
4003 .CANCELED => return error.Canceled,
4004
4005 .INVAL => |err| return errnoBug(err),
4006 .FAULT => |err| return errnoBug(err),
4007 .AGAIN => |err| return errnoBug(err),
4008 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
4009 .NOBUFS => return error.SystemResources,
4010 .NOMEM => return error.SystemResources,
4011 .NOTCONN => return error.SocketUnconnected,
4012 .CONNRESET => return error.ConnectionResetByPeer,
4013 .TIMEDOUT => return error.Timeout,
4014 .PIPE => return error.SocketUnconnected,
4015 .NETDOWN => return error.NetworkDown,
4016 else => |err| return posix.unexpectedErrno(err),
4017 }
4018 }
4019}
4020
4021fn netReadWindows(userdata: ?*anyopaque, handle: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
4022 if (!have_networking) return error.NetworkDown;
4023 const t: *Threaded = @ptrCast(@alignCast(userdata));
4024
4025 const bufs = b: {
4026 var iovec_buffer: [max_iovecs_len]ws2_32.WSABUF = undefined;
4027 var i: usize = 0;
4028 var n: usize = 0;
4029 for (data) |buf| {
4030 if (iovec_buffer.len - i == 0) break;
4031 if (buf.len == 0) continue;
4032 if (std.math.cast(u32, buf.len)) |len| {
4033 iovec_buffer[i] = .{ .buf = buf.ptr, .len = len };
4034 i += 1;
4035 n += len;
4036 continue;
4037 }
4038 iovec_buffer[i] = .{ .buf = buf.ptr, .len = std.math.maxInt(u32) };
4039 i += 1;
4040 n += std.math.maxInt(u32);
4041 break;
4042 }
4043
4044 const bufs = iovec_buffer[0..i];
4045 assert(bufs[0].len != 0);
4046
4047 break :b bufs;
4048 };
4049
4050 while (true) {
4051 try t.checkCancel();
4052
4053 var flags: u32 = 0;
4054 var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED);
4055 var n: u32 = undefined;
4056 const rc = ws2_32.WSARecv(handle, bufs.ptr, @intCast(bufs.len), &n, &flags, &overlapped, null);
4057 if (rc != ws2_32.SOCKET_ERROR) return n;
4058 const wsa_error: ws2_32.WinsockError = switch (ws2_32.WSAGetLastError()) {
4059 .IO_PENDING => e: {
4060 var result_flags: u32 = undefined;
4061 const overlapped_rc = ws2_32.WSAGetOverlappedResult(
4062 handle,
4063 &overlapped,
4064 &n,
4065 windows.TRUE,
4066 &result_flags,
4067 );
4068 if (overlapped_rc == windows.FALSE) {
4069 break :e ws2_32.WSAGetLastError();
4070 } else {
4071 return n;
4072 }
4073 },
4074 else => |err| err,
4075 };
4076 switch (wsa_error) {
4077 .EINTR => continue,
4078 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
4079 .NOTINITIALISED => {
4080 try initializeWsa(t);
4081 continue;
4082 },
4083
4084 .ECONNRESET => return error.ConnectionResetByPeer,
4085 .EFAULT => unreachable, // a pointer is not completely contained in user address space.
4086 .EINVAL => |err| return wsaErrorBug(err),
4087 .EMSGSIZE => |err| return wsaErrorBug(err),
4088 .ENETDOWN => return error.NetworkDown,
4089 .ENETRESET => return error.ConnectionResetByPeer,
4090 .ENOTCONN => return error.SocketUnconnected,
4091 else => |err| return windows.unexpectedWSAError(err),
4092 }
4093 }
4094}
4095
4096fn netReadUnavailable(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
4097 _ = userdata;
4098 _ = fd;
4099 _ = data;
4100 return error.NetworkDown;
4101}
4102
4103fn netSendPosix(
4104 userdata: ?*anyopaque,
4105 handle: net.Socket.Handle,
4106 messages: []net.OutgoingMessage,
4107 flags: net.SendFlags,
4108) struct { ?net.Socket.SendError, usize } {
4109 if (!have_networking) return .{ error.NetworkDown, 0 };
4110 const t: *Threaded = @ptrCast(@alignCast(userdata));
4111
4112 const posix_flags: u32 =
4113 @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
4114 @as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) |
4115 @as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) |
4116 @as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) |
4117 @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
4118 posix.MSG.NOSIGNAL;
4119
4120 var i: usize = 0;
4121 while (messages.len - i != 0) {
4122 if (have_sendmmsg) {
4123 i += netSendMany(t, handle, messages[i..], posix_flags) catch |err| return .{ err, i };
4124 continue;
4125 }
4126 netSendOne(t, handle, &messages[i], posix_flags) catch |err| return .{ err, i };
4127 i += 1;
4128 }
4129 return .{ null, i };
4130}
4131
4132fn netSendWindows(
4133 userdata: ?*anyopaque,
4134 handle: net.Socket.Handle,
4135 messages: []net.OutgoingMessage,
4136 flags: net.SendFlags,
4137) struct { ?net.Socket.SendError, usize } {
4138 if (!have_networking) return .{ error.NetworkDown, 0 };
4139 const t: *Threaded = @ptrCast(@alignCast(userdata));
4140 _ = t;
4141 _ = handle;
4142 _ = messages;
4143 _ = flags;
4144 @panic("TODO netSendWindows");
4145}
4146
4147fn netSendUnavailable(
4148 userdata: ?*anyopaque,
4149 handle: net.Socket.Handle,
4150 messages: []net.OutgoingMessage,
4151 flags: net.SendFlags,
4152) struct { ?net.Socket.SendError, usize } {
4153 _ = userdata;
4154 _ = handle;
4155 _ = messages;
4156 _ = flags;
4157 return .{ error.NetworkDown, 0 };
4158}
4159
4160fn netSendOne(
4161 t: *Threaded,
4162 handle: net.Socket.Handle,
4163 message: *net.OutgoingMessage,
4164 flags: u32,
4165) net.Socket.SendError!void {
4166 var addr: PosixAddress = undefined;
4167 var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
4168 const msg: posix.msghdr_const = .{
4169 .name = &addr.any,
4170 .namelen = addressToPosix(message.address, &addr),
4171 .iov = (&iovec)[0..1],
4172 .iovlen = 1,
4173 // OS returns EINVAL if this pointer is invalid even if controllen is zero.
4174 .control = if (message.control.len == 0) null else @constCast(message.control.ptr),
4175 .controllen = @intCast(message.control.len),
4176 .flags = 0,
4177 };
4178 while (true) {
4179 try t.checkCancel();
4180 const rc = posix.system.sendmsg(handle, &msg, flags);
4181 if (is_windows) {
4182 if (rc == ws2_32.SOCKET_ERROR) {
4183 switch (ws2_32.WSAGetLastError()) {
4184 .EINTR => continue,
4185 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
4186 .NOTINITIALISED => {
4187 try initializeWsa(t);
4188 continue;
4189 },
4190 .EACCES => return error.AccessDenied,
4191 .EADDRNOTAVAIL => return error.AddressUnavailable,
4192 .ECONNRESET => return error.ConnectionResetByPeer,
4193 .EMSGSIZE => return error.MessageOversize,
4194 .ENOBUFS => return error.SystemResources,
4195 .ENOTSOCK => return error.FileDescriptorNotASocket,
4196 .EAFNOSUPPORT => return error.AddressFamilyUnsupported,
4197 .EDESTADDRREQ => unreachable, // A destination address is required.
4198 .EFAULT => unreachable, // The lpBuffers, lpTo, lpOverlapped, lpNumberOfBytesSent, or lpCompletionRoutine parameters are not part of the user address space, or the lpTo parameter is too small.
4199 .EHOSTUNREACH => return error.NetworkUnreachable,
4200 .EINVAL => unreachable,
4201 .ENETDOWN => return error.NetworkDown,
4202 .ENETRESET => return error.ConnectionResetByPeer,
4203 .ENETUNREACH => return error.NetworkUnreachable,
4204 .ENOTCONN => return error.SocketUnconnected,
4205 .ESHUTDOWN => |err| return wsaErrorBug(err),
4206 else => |err| return windows.unexpectedWSAError(err),
4207 }
4208 } else {
4209 message.data_len = @intCast(rc);
4210 return;
4211 }
4212 }
4213 switch (posix.errno(rc)) {
4214 .SUCCESS => {
4215 message.data_len = @intCast(rc);
4216 return;
4217 },
4218 .INTR => continue,
4219 .CANCELED => return error.Canceled,
4220
4221 .ACCES => return error.AccessDenied,
4222 .ALREADY => return error.FastOpenAlreadyInProgress,
4223 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
4224 .CONNRESET => return error.ConnectionResetByPeer,
4225 .DESTADDRREQ => |err| return errnoBug(err),
4226 .FAULT => |err| return errnoBug(err),
4227 .INVAL => |err| return errnoBug(err),
4228 .ISCONN => |err| return errnoBug(err),
4229 .MSGSIZE => return error.MessageOversize,
4230 .NOBUFS => return error.SystemResources,
4231 .NOMEM => return error.SystemResources,
4232 .NOTSOCK => |err| return errnoBug(err),
4233 .OPNOTSUPP => |err| return errnoBug(err),
4234 .PIPE => return error.SocketUnconnected,
4235 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
4236 .HOSTUNREACH => return error.HostUnreachable,
4237 .NETUNREACH => return error.NetworkUnreachable,
4238 .NOTCONN => return error.SocketUnconnected,
4239 .NETDOWN => return error.NetworkDown,
4240 else => |err| return posix.unexpectedErrno(err),
4241 }
4242 }
4243}
4244
4245fn netSendMany(
4246 t: *Threaded,
4247 handle: net.Socket.Handle,
4248 messages: []net.OutgoingMessage,
4249 flags: u32,
4250) net.Socket.SendError!usize {
4251 var msg_buffer: [64]std.os.linux.mmsghdr = undefined;
4252 var addr_buffer: [msg_buffer.len]PosixAddress = undefined;
4253 var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined;
4254 const min_len: usize = @min(messages.len, msg_buffer.len);
4255 const clamped_messages = messages[0..min_len];
4256 const clamped_msgs = (&msg_buffer)[0..min_len];
4257 const clamped_addrs = (&addr_buffer)[0..min_len];
4258 const clamped_iovecs = (&iovecs_buffer)[0..min_len];
4259
4260 for (clamped_messages, clamped_msgs, clamped_addrs, clamped_iovecs) |*message, *msg, *addr, *iovec| {
4261 iovec.* = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
4262 msg.* = .{
4263 .hdr = .{
4264 .name = &addr.any,
4265 .namelen = addressToPosix(message.address, addr),
4266 .iov = iovec[0..1],
4267 .iovlen = 1,
4268 .control = @constCast(message.control.ptr),
4269 .controllen = message.control.len,
4270 .flags = 0,
4271 },
4272 .len = undefined, // Populated by calling sendmmsg below.
4273 };
4274 }
4275
4276 while (true) {
4277 try t.checkCancel();
4278 const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), flags);
4279 switch (posix.errno(rc)) {
4280 .SUCCESS => {
4281 const n: usize = @intCast(rc);
4282 for (clamped_messages[0..n], clamped_msgs[0..n]) |*message, *msg| {
4283 message.data_len = msg.len;
4284 }
4285 return n;
4286 },
4287 .INTR => continue,
4288 .CANCELED => return error.Canceled,
4289
4290 .AGAIN => |err| return errnoBug(err),
4291 .ALREADY => return error.FastOpenAlreadyInProgress,
4292 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
4293 .CONNRESET => return error.ConnectionResetByPeer,
4294 .DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set.
4295 .FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument.
4296 .INVAL => |err| return errnoBug(err), // Invalid argument passed.
4297 .ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified
4298 .MSGSIZE => return error.MessageOversize,
4299 .NOBUFS => return error.SystemResources,
4300 .NOMEM => return error.SystemResources,
4301 .NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket.
4302 .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
4303 .PIPE => return error.SocketUnconnected,
4304 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
4305 .HOSTUNREACH => return error.HostUnreachable,
4306 .NETUNREACH => return error.NetworkUnreachable,
4307 .NOTCONN => return error.SocketUnconnected,
4308 .NETDOWN => return error.NetworkDown,
4309 else => |err| return posix.unexpectedErrno(err),
4310 }
4311 }
4312}
4313
4314fn netReceivePosix(
4315 userdata: ?*anyopaque,
4316 handle: net.Socket.Handle,
4317 message_buffer: []net.IncomingMessage,
4318 data_buffer: []u8,
4319 flags: net.ReceiveFlags,
4320 timeout: Io.Timeout,
4321) struct { ?net.Socket.ReceiveTimeoutError, usize } {
4322 if (!have_networking) return .{ error.NetworkDown, 0 };
4323 const t: *Threaded = @ptrCast(@alignCast(userdata));
4324 const t_io = io(t);
4325
4326 // recvmmsg is useless, here's why:
4327 // * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
4328 // * it wants iovecs for each message but we have a better API: one data
4329 // buffer to handle all the messages. The better API cannot be lowered to
4330 // the split vectors though because reducing the buffer size might make
4331 // some messages unreceivable.
4332
4333 // So the strategy instead is to use non-blocking recvmsg calls, calling
4334 // poll() with timeout if the first one returns EAGAIN.
4335 const posix_flags: u32 =
4336 @as(u32, if (flags.oob) posix.MSG.OOB else 0) |
4337 @as(u32, if (flags.peek) posix.MSG.PEEK else 0) |
4338 @as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) |
4339 posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL;
4340
4341 var poll_fds: [1]posix.pollfd = .{
4342 .{
4343 .fd = handle,
4344 .events = posix.POLL.IN,
4345 .revents = undefined,
4346 },
4347 };
4348 var message_i: usize = 0;
4349 var data_i: usize = 0;
4350
4351 const deadline = timeout.toDeadline(t_io) catch |err| return .{ err, message_i };
4352
4353 recv: while (true) {
4354 t.checkCancel() catch |err| return .{ err, message_i };
4355
4356 if (message_buffer.len - message_i == 0) return .{ null, message_i };
4357 const message = &message_buffer[message_i];
4358 const remaining_data_buffer = data_buffer[data_i..];
4359 var storage: PosixAddress = undefined;
4360 var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
4361 var msg: posix.msghdr = .{
4362 .name = &storage.any,
4363 .namelen = @sizeOf(PosixAddress),
4364 .iov = (&iov)[0..1],
4365 .iovlen = 1,
4366 .control = message.control.ptr,
4367 .controllen = @intCast(message.control.len),
4368 .flags = undefined,
4369 };
4370
4371 const recv_rc = posix.system.recvmsg(handle, &msg, posix_flags);
4372 switch (posix.errno(recv_rc)) {
4373 .SUCCESS => {
4374 const data = remaining_data_buffer[0..@intCast(recv_rc)];
4375 data_i += data.len;
4376 message.* = .{
4377 .from = addressFromPosix(&storage),
4378 .data = data,
4379 .control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
4380 .flags = .{
4381 .eor = (msg.flags & posix.MSG.EOR) != 0,
4382 .trunc = (msg.flags & posix.MSG.TRUNC) != 0,
4383 .ctrunc = (msg.flags & posix.MSG.CTRUNC) != 0,
4384 .oob = (msg.flags & posix.MSG.OOB) != 0,
4385 .errqueue = if (@hasDecl(posix.MSG, "ERRQUEUE")) (msg.flags & posix.MSG.ERRQUEUE) != 0 else false,
4386 },
4387 };
4388 message_i += 1;
4389 continue;
4390 },
4391 .AGAIN => while (true) {
4392 t.checkCancel() catch |err| return .{ err, message_i };
4393 if (message_i != 0) return .{ null, message_i };
4394
4395 const max_poll_ms = std.math.maxInt(u31);
4396 const timeout_ms: u31 = if (deadline) |d| t: {
4397 const duration = d.durationFromNow(t_io) catch |err| return .{ err, message_i };
4398 if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i };
4399 break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
4400 } else max_poll_ms;
4401
4402 const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms);
4403 switch (posix.errno(poll_rc)) {
4404 .SUCCESS => {
4405 if (poll_rc == 0) {
4406 // Although spurious timeouts are OK, when no deadline
4407 // is passed we must not return `error.Timeout`.
4408 if (deadline == null) continue;
4409 return .{ error.Timeout, message_i };
4410 }
4411 continue :recv;
4412 },
4413 .INTR => continue,
4414 .CANCELED => return .{ error.Canceled, message_i },
4415
4416 .FAULT => |err| return .{ errnoBug(err), message_i },
4417 .INVAL => |err| return .{ errnoBug(err), message_i },
4418 .NOMEM => return .{ error.SystemResources, message_i },
4419 else => |err| return .{ posix.unexpectedErrno(err), message_i },
4420 }
4421 },
4422 .INTR => continue,
4423 .CANCELED => return .{ error.Canceled, message_i },
4424
4425 .BADF => |err| return .{ errnoBug(err), message_i },
4426 .NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
4427 .MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },
4428 .FAULT => |err| return .{ errnoBug(err), message_i },
4429 .INVAL => |err| return .{ errnoBug(err), message_i },
4430 .NOBUFS => return .{ error.SystemResources, message_i },
4431 .NOMEM => return .{ error.SystemResources, message_i },
4432 .NOTCONN => return .{ error.SocketUnconnected, message_i },
4433 .NOTSOCK => |err| return .{ errnoBug(err), message_i },
4434 .MSGSIZE => return .{ error.MessageOversize, message_i },
4435 .PIPE => return .{ error.SocketUnconnected, message_i },
4436 .OPNOTSUPP => |err| return .{ errnoBug(err), message_i },
4437 .CONNRESET => return .{ error.ConnectionResetByPeer, message_i },
4438 .NETDOWN => return .{ error.NetworkDown, message_i },
4439 else => |err| return .{ posix.unexpectedErrno(err), message_i },
4440 }
4441 }
4442}
4443
4444fn netReceiveWindows(
4445 userdata: ?*anyopaque,
4446 handle: net.Socket.Handle,
4447 message_buffer: []net.IncomingMessage,
4448 data_buffer: []u8,
4449 flags: net.ReceiveFlags,
4450 timeout: Io.Timeout,
4451) struct { ?net.Socket.ReceiveTimeoutError, usize } {
4452 if (!have_networking) return .{ error.NetworkDown, 0 };
4453 const t: *Threaded = @ptrCast(@alignCast(userdata));
4454 _ = t;
4455 _ = handle;
4456 _ = message_buffer;
4457 _ = data_buffer;
4458 _ = flags;
4459 _ = timeout;
4460 @panic("TODO implement netReceiveWindows");
4461}
4462
4463fn netReceiveUnavailable(
4464 userdata: ?*anyopaque,
4465 handle: net.Socket.Handle,
4466 message_buffer: []net.IncomingMessage,
4467 data_buffer: []u8,
4468 flags: net.ReceiveFlags,
4469 timeout: Io.Timeout,
4470) struct { ?net.Socket.ReceiveTimeoutError, usize } {
4471 _ = userdata;
4472 _ = handle;
4473 _ = message_buffer;
4474 _ = data_buffer;
4475 _ = flags;
4476 _ = timeout;
4477 return .{ error.NetworkDown, 0 };
4478}
4479
4480fn netWritePosix(
4481 userdata: ?*anyopaque,
4482 fd: net.Socket.Handle,
4483 header: []const u8,
4484 data: []const []const u8,
4485 splat: usize,
4486) net.Stream.Writer.Error!usize {
4487 if (!have_networking) return error.NetworkDown;
4488 const t: *Threaded = @ptrCast(@alignCast(userdata));
4489
4490 var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
4491 var msg: posix.msghdr_const = .{
4492 .name = null,
4493 .namelen = 0,
4494 .iov = &iovecs,
4495 .iovlen = 0,
4496 .control = null,
4497 .controllen = 0,
4498 .flags = 0,
4499 };
4500 addBuf(&iovecs, &msg.iovlen, header);
4501 for (data[0 .. data.len - 1]) |bytes| addBuf(&iovecs, &msg.iovlen, bytes);
4502 const pattern = data[data.len - 1];
4503 if (iovecs.len - msg.iovlen != 0) switch (splat) {
4504 0 => {},
4505 1 => addBuf(&iovecs, &msg.iovlen, pattern),
4506 else => switch (pattern.len) {
4507 0 => {},
4508 1 => {
4509 var backup_buffer: [splat_buffer_size]u8 = undefined;
4510 const splat_buffer = &backup_buffer;
4511 const memset_len = @min(splat_buffer.len, splat);
4512 const buf = splat_buffer[0..memset_len];
4513 @memset(buf, pattern[0]);
4514 addBuf(&iovecs, &msg.iovlen, buf);
4515 var remaining_splat = splat - buf.len;
4516 while (remaining_splat > splat_buffer.len and iovecs.len - msg.iovlen != 0) {
4517 assert(buf.len == splat_buffer.len);
4518 addBuf(&iovecs, &msg.iovlen, splat_buffer);
4519 remaining_splat -= splat_buffer.len;
4520 }
4521 addBuf(&iovecs, &msg.iovlen, splat_buffer[0..remaining_splat]);
4522 },
4523 else => for (0..@min(splat, iovecs.len - msg.iovlen)) |_| {
4524 addBuf(&iovecs, &msg.iovlen, pattern);
4525 },
4526 },
4527 };
4528 const flags = posix.MSG.NOSIGNAL;
4529 while (true) {
4530 try t.checkCancel();
4531 const rc = posix.system.sendmsg(fd, &msg, flags);
4532 switch (posix.errno(rc)) {
4533 .SUCCESS => return @intCast(rc),
4534 .INTR => continue,
4535 .CANCELED => return error.Canceled,
4536
4537 .ACCES => |err| return errnoBug(err),
4538 .AGAIN => |err| return errnoBug(err),
4539 .ALREADY => return error.FastOpenAlreadyInProgress,
4540 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
4541 .CONNRESET => return error.ConnectionResetByPeer,
4542 .DESTADDRREQ => |err| return errnoBug(err), // The socket is not connection-mode, and no peer address is set.
4543 .FAULT => |err| return errnoBug(err), // An invalid user space address was specified for an argument.
4544 .INVAL => |err| return errnoBug(err), // Invalid argument passed.
4545 .ISCONN => |err| return errnoBug(err), // connection-mode socket was connected already but a recipient was specified
4546 .MSGSIZE => |err| return errnoBug(err),
4547 .NOBUFS => return error.SystemResources,
4548 .NOMEM => return error.SystemResources,
4549 .NOTSOCK => |err| return errnoBug(err), // The file descriptor sockfd does not refer to a socket.
4550 .OPNOTSUPP => |err| return errnoBug(err), // Some bit in the flags argument is inappropriate for the socket type.
4551 .PIPE => return error.SocketUnconnected,
4552 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
4553 .HOSTUNREACH => return error.HostUnreachable,
4554 .NETUNREACH => return error.NetworkUnreachable,
4555 .NOTCONN => return error.SocketUnconnected,
4556 .NETDOWN => return error.NetworkDown,
4557 else => |err| return posix.unexpectedErrno(err),
4558 }
4559 }
4560}
4561
4562fn netWriteWindows(
4563 userdata: ?*anyopaque,
4564 handle: net.Socket.Handle,
4565 header: []const u8,
4566 data: []const []const u8,
4567 splat: usize,
4568) net.Stream.Writer.Error!usize {
4569 const t: *Threaded = @ptrCast(@alignCast(userdata));
4570 comptime assert(native_os == .windows);
4571
4572 var iovecs: [max_iovecs_len]ws2_32.WSABUF = undefined;
4573 var len: u32 = 0;
4574 addWsaBuf(&iovecs, &len, header);
4575 for (data[0 .. data.len - 1]) |bytes| addWsaBuf(&iovecs, &len, bytes);
4576 const pattern = data[data.len - 1];
4577 if (iovecs.len - len != 0) switch (splat) {
4578 0 => {},
4579 1 => addWsaBuf(&iovecs, &len, pattern),
4580 else => switch (pattern.len) {
4581 0 => {},
4582 1 => {
4583 var backup_buffer: [64]u8 = undefined;
4584 const splat_buffer = &backup_buffer;
4585 const memset_len = @min(splat_buffer.len, splat);
4586 const buf = splat_buffer[0..memset_len];
4587 @memset(buf, pattern[0]);
4588 addWsaBuf(&iovecs, &len, buf);
4589 var remaining_splat = splat - buf.len;
4590 while (remaining_splat > splat_buffer.len and len < iovecs.len) {
4591 addWsaBuf(&iovecs, &len, splat_buffer);
4592 remaining_splat -= splat_buffer.len;
4593 }
4594 addWsaBuf(&iovecs, &len, splat_buffer[0..remaining_splat]);
4595 },
4596 else => for (0..@min(splat, iovecs.len - len)) |_| {
4597 addWsaBuf(&iovecs, &len, pattern);
4598 },
4599 },
4600 };
4601
4602 while (true) {
4603 try t.checkCancel();
4604
4605 var n: u32 = undefined;
4606 var overlapped: windows.OVERLAPPED = std.mem.zeroes(windows.OVERLAPPED);
4607 const rc = ws2_32.WSASend(handle, &iovecs, len, &n, 0, &overlapped, null);
4608 if (rc != ws2_32.SOCKET_ERROR) return n;
4609 const wsa_error: ws2_32.WinsockError = switch (ws2_32.WSAGetLastError()) {
4610 .IO_PENDING => e: {
4611 var result_flags: u32 = undefined;
4612 const overlapped_rc = ws2_32.WSAGetOverlappedResult(
4613 handle,
4614 &overlapped,
4615 &n,
4616 windows.TRUE,
4617 &result_flags,
4618 );
4619 if (overlapped_rc == windows.FALSE) {
4620 break :e ws2_32.WSAGetLastError();
4621 } else {
4622 return n;
4623 }
4624 },
4625 else => |err| err,
4626 };
4627 switch (wsa_error) {
4628 .EINTR => continue,
4629 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
4630 .NOTINITIALISED => {
4631 try initializeWsa(t);
4632 continue;
4633 },
4634
4635 .ECONNABORTED => return error.ConnectionResetByPeer,
4636 .ECONNRESET => return error.ConnectionResetByPeer,
4637 .EINVAL => return error.SocketUnconnected,
4638 .ENETDOWN => return error.NetworkDown,
4639 .ENETRESET => return error.ConnectionResetByPeer,
4640 .ENOBUFS => return error.SystemResources,
4641 .ENOTCONN => return error.SocketUnconnected,
4642 .ENOTSOCK => |err| return wsaErrorBug(err),
4643 .EOPNOTSUPP => |err| return wsaErrorBug(err),
4644 .ESHUTDOWN => |err| return wsaErrorBug(err),
4645 else => |err| return windows.unexpectedWSAError(err),
4646 }
4647 }
4648}
4649
4650fn addWsaBuf(v: []ws2_32.WSABUF, i: *u32, bytes: []const u8) void {
4651 const cap = std.math.maxInt(u32);
4652 var remaining = bytes;
4653 while (remaining.len > cap) {
4654 if (v.len - i.* == 0) return;
4655 v[i.*] = .{ .buf = @constCast(remaining.ptr), .len = cap };
4656 i.* += 1;
4657 remaining = remaining[cap..];
4658 } else {
4659 @branchHint(.likely);
4660 if (v.len - i.* == 0) return;
4661 v[i.*] = .{ .buf = @constCast(remaining.ptr), .len = @intCast(remaining.len) };
4662 i.* += 1;
4663 }
4664}
4665
4666fn netWriteUnavailable(
4667 userdata: ?*anyopaque,
4668 handle: net.Socket.Handle,
4669 header: []const u8,
4670 data: []const []const u8,
4671 splat: usize,
4672) net.Stream.Writer.Error!usize {
4673 _ = userdata;
4674 _ = handle;
4675 _ = header;
4676 _ = data;
4677 _ = splat;
4678 return error.NetworkDown;
4679}
4680
4681fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"), bytes: []const u8) void {
4682 // OS checks ptr addr before length so zero length vectors must be omitted.
4683 if (bytes.len == 0) return;
4684 if (v.len - i.* == 0) return;
4685 v[i.*] = .{ .base = bytes.ptr, .len = bytes.len };
4686 i.* += 1;
4687}
4688
4689fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
4690 const t: *Threaded = @ptrCast(@alignCast(userdata));
4691 _ = t;
4692 switch (native_os) {
4693 .windows => closeSocketWindows(handle),
4694 else => posix.close(handle),
4695 }
4696}
4697
4698fn netCloseUnavailable(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
4699 _ = userdata;
4700 _ = handle;
4701 unreachable; // How you gonna close something that was impossible to open?
4702}
4703
4704fn netInterfaceNameResolve(
4705 userdata: ?*anyopaque,
4706 name: *const net.Interface.Name,
4707) net.Interface.Name.ResolveError!net.Interface {
4708 if (!have_networking) return error.InterfaceNotFound;
4709 const t: *Threaded = @ptrCast(@alignCast(userdata));
4710
4711 if (native_os == .linux) {
4712 const sock_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) {
4713 error.ProcessFdQuotaExceeded => return error.SystemResources,
4714 error.SystemFdQuotaExceeded => return error.SystemResources,
4715 error.AddressFamilyUnsupported => return error.Unexpected,
4716 error.ProtocolUnsupportedBySystem => return error.Unexpected,
4717 error.ProtocolUnsupportedByAddressFamily => return error.Unexpected,
4718 error.SocketModeUnsupported => return error.Unexpected,
4719 error.OptionUnsupported => return error.Unexpected,
4720 else => |e| return e,
4721 };
4722 defer posix.close(sock_fd);
4723
4724 var ifr: posix.ifreq = .{
4725 .ifrn = .{ .name = @bitCast(name.bytes) },
4726 .ifru = undefined,
4727 };
4728
4729 while (true) {
4730 try t.checkCancel();
4731 switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) {
4732 .SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) },
4733 .INTR => continue,
4734 .CANCELED => return error.Canceled,
4735
4736 .INVAL => |err| return errnoBug(err), // Bad parameters.
4737 .NOTTY => |err| return errnoBug(err),
4738 .NXIO => |err| return errnoBug(err),
4739 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
4740 .FAULT => |err| return errnoBug(err), // Bad pointer parameter.
4741 .IO => |err| return errnoBug(err), // sock_fd is not a file descriptor
4742 .NODEV => return error.InterfaceNotFound,
4743 else => |err| return posix.unexpectedErrno(err),
4744 }
4745 }
4746 }
4747
4748 if (native_os == .windows) {
4749 try t.checkCancel();
4750 @panic("TODO implement netInterfaceNameResolve for Windows");
4751 }
4752
4753 if (builtin.link_libc) {
4754 try t.checkCancel();
4755 const index = std.c.if_nametoindex(&name.bytes);
4756 if (index == 0) return error.InterfaceNotFound;
4757 return .{ .index = @bitCast(index) };
4758 }
4759
4760 @panic("unimplemented");
4761}
4762
4763fn netInterfaceNameResolveUnavailable(
4764 userdata: ?*anyopaque,
4765 name: *const net.Interface.Name,
4766) net.Interface.Name.ResolveError!net.Interface {
4767 _ = userdata;
4768 _ = name;
4769 return error.InterfaceNotFound;
4770}
4771
4772fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
4773 const t: *Threaded = @ptrCast(@alignCast(userdata));
4774 try t.checkCancel();
4775
4776 if (native_os == .linux) {
4777 _ = interface;
4778 @panic("TODO implement netInterfaceName for linux");
4779 }
4780
4781 if (native_os == .windows) {
4782 @panic("TODO implement netInterfaceName for windows");
4783 }
4784
4785 if (builtin.link_libc) {
4786 @panic("TODO implement netInterfaceName for libc");
4787 }
4788
4789 @panic("unimplemented");
4790}
4791
4792fn netInterfaceNameUnavailable(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
4793 _ = userdata;
4794 _ = interface;
4795 return error.Unexpected;
4796}
4797
4798fn netLookup(
4799 userdata: ?*anyopaque,
4800 host_name: HostName,
4801 resolved: *Io.Queue(HostName.LookupResult),
4802 options: HostName.LookupOptions,
4803) void {
4804 const t: *Threaded = @ptrCast(@alignCast(userdata));
4805 const t_io = io(t);
4806 resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, host_name, resolved, options) });
4807}
4808
4809fn netLookupUnavailable(
4810 userdata: ?*anyopaque,
4811 host_name: HostName,
4812 resolved: *Io.Queue(HostName.LookupResult),
4813 options: HostName.LookupOptions,
4814) void {
4815 _ = host_name;
4816 _ = options;
4817 const t: *Threaded = @ptrCast(@alignCast(userdata));
4818 const t_io = ioBasic(t);
4819 resolved.putOneUncancelable(t_io, .{ .end = error.NetworkDown });
4820}
4821
4822fn netLookupFallible(
4823 t: *Threaded,
4824 host_name: HostName,
4825 resolved: *Io.Queue(HostName.LookupResult),
4826 options: HostName.LookupOptions,
4827) !void {
4828 if (!have_networking) return error.NetworkDown;
4829 const t_io = io(t);
4830 const name = host_name.bytes;
4831 assert(name.len <= HostName.max_len);
4832
4833 if (is_windows) {
4834 var name_buffer: [HostName.max_len + 1]u16 = undefined;
4835 const name_len = std.unicode.wtf8ToWtf16Le(&name_buffer, host_name.bytes) catch
4836 unreachable; // HostName is prevalidated.
4837 name_buffer[name_len] = 0;
4838 const name_w = name_buffer[0..name_len :0];
4839
4840 var port_buffer: [8]u8 = undefined;
4841 var port_buffer_wide: [8]u16 = undefined;
4842 const port = std.fmt.bufPrint(&port_buffer, "{d}", .{options.port}) catch
4843 unreachable; // `port_buffer` is big enough for decimal u16.
4844 for (port, port_buffer_wide[0..port.len]) |byte, *wide|
4845 wide.* = std.mem.nativeToLittle(u16, byte);
4846 port_buffer_wide[port.len] = 0;
4847 const port_w = port_buffer_wide[0..port.len :0];
4848
4849 const hints: ws2_32.ADDRINFOEXW = .{
4850 .flags = .{ .NUMERICSERV = true },
4851 .family = if (options.family) |f| switch (f) {
4852 .ip4 => posix.AF.INET,
4853 .ip6 => posix.AF.INET6,
4854 } else posix.AF.UNSPEC,
4855 .socktype = posix.SOCK.STREAM,
4856 .protocol = posix.IPPROTO.TCP,
4857 .canonname = null,
4858 .addr = null,
4859 .addrlen = 0,
4860 .blob = null,
4861 .bloblen = 0,
4862 .provider = null,
4863 .next = null,
4864 };
4865 const cancel_handle: ?*windows.HANDLE = null;
4866 var res: *ws2_32.ADDRINFOEXW = undefined;
4867 const timeout: ?*ws2_32.timeval = null;
4868 while (true) {
4869 try t.checkCancel(); // TODO make requestCancel call GetAddrInfoExCancel
4870 // TODO make this append to the queue eagerly rather than blocking until
4871 // the whole thing finishes
4872 const rc: ws2_32.WinsockError = @enumFromInt(ws2_32.GetAddrInfoExW(name_w, port_w, .DNS, null, &hints, &res, timeout, null, null, cancel_handle));
4873 switch (rc) {
4874 @as(ws2_32.WinsockError, @enumFromInt(0)) => break,
4875 .EINTR => continue,
4876 .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => return error.Canceled,
4877 .NOTINITIALISED => {
4878 try initializeWsa(t);
4879 continue;
4880 },
4881 .TRY_AGAIN => return error.NameServerFailure,
4882 .EINVAL => |err| return wsaErrorBug(err),
4883 .NO_RECOVERY => return error.NameServerFailure,
4884 .EAFNOSUPPORT => return error.AddressFamilyUnsupported,
4885 .NOT_ENOUGH_MEMORY => return error.SystemResources,
4886 .HOST_NOT_FOUND => return error.UnknownHostName,
4887 .TYPE_NOT_FOUND => return error.ProtocolUnsupportedByAddressFamily,
4888 .ESOCKTNOSUPPORT => return error.ProtocolUnsupportedBySystem,
4889 else => |err| return windows.unexpectedWSAError(err),
4890 }
4891 }
4892 defer ws2_32.FreeAddrInfoExW(res);
4893
4894 var it: ?*ws2_32.ADDRINFOEXW = res;
4895 var canon_name: ?[*:0]const u16 = null;
4896 while (it) |info| : (it = info.next) {
4897 const addr = info.addr orelse continue;
4898 const storage: WsaAddress = .{ .any = addr.* };
4899 try resolved.putOne(t_io, .{ .address = addressFromWsa(&storage) });
4900
4901 if (info.canonname) |n| {
4902 if (canon_name == null) {
4903 canon_name = n;
4904 }
4905 }
4906 }
4907 if (canon_name) |n| {
4908 const len = std.unicode.wtf16LeToWtf8(options.canonical_name_buffer, std.mem.sliceTo(n, 0));
4909 try resolved.putOne(t_io, .{ .canonical_name = .{
4910 .bytes = options.canonical_name_buffer[0..len],
4911 } });
4912 }
4913 return;
4914 }
4915
4916 // On Linux, glibc provides getaddrinfo_a which is capable of supporting our semantics.
4917 // However, musl's POSIX-compliant getaddrinfo is not, so we bypass it.
4918
4919 if (builtin.target.isGnuLibC()) {
4920 // TODO use getaddrinfo_a / gai_cancel
4921 }
4922
4923 if (native_os == .linux) {
4924 if (options.family != .ip4) {
4925 if (IpAddress.parseIp6(name, options.port)) |addr| {
4926 try resolved.putAll(t_io, &.{
4927 .{ .address = addr },
4928 .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
4929 });
4930 return;
4931 } else |_| {}
4932 }
4933
4934 if (options.family != .ip6) {
4935 if (IpAddress.parseIp4(name, options.port)) |addr| {
4936 try resolved.putAll(t_io, &.{
4937 .{ .address = addr },
4938 .{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
4939 });
4940 return;
4941 } else |_| {}
4942 }
4943
4944 lookupHosts(t, host_name, resolved, options) catch |err| switch (err) {
4945 error.UnknownHostName => {},
4946 else => |e| return e,
4947 };
4948
4949 // RFC 6761 Section 6.3.3
4950 // Name resolution APIs and libraries SHOULD recognize
4951 // localhost names as special and SHOULD always return the IP
4952 // loopback address for address queries and negative responses
4953 // for all other query types.
4954
4955 // Check for equal to "localhost(.)" or ends in ".localhost(.)"
4956 const localhost = if (name[name.len - 1] == '.') "localhost." else "localhost";
4957 if (std.mem.endsWith(u8, name, localhost) and
4958 (name.len == localhost.len or name[name.len - localhost.len] == '.'))
4959 {
4960 var results_buffer: [3]HostName.LookupResult = undefined;
4961 var results_index: usize = 0;
4962 if (options.family != .ip4) {
4963 results_buffer[results_index] = .{ .address = .{ .ip6 = .loopback(options.port) } };
4964 results_index += 1;
4965 }
4966 if (options.family != .ip6) {
4967 results_buffer[results_index] = .{ .address = .{ .ip4 = .loopback(options.port) } };
4968 results_index += 1;
4969 }
4970 const canon_name = "localhost";
4971 const canon_name_dest = options.canonical_name_buffer[0..canon_name.len];
4972 canon_name_dest.* = canon_name.*;
4973 results_buffer[results_index] = .{ .canonical_name = .{ .bytes = canon_name_dest } };
4974 results_index += 1;
4975 try resolved.putAll(t_io, results_buffer[0..results_index]);
4976 return;
4977 }
4978
4979 return lookupDnsSearch(t, host_name, resolved, options);
4980 }
4981
4982 if (native_os == .openbsd) {
4983 // TODO use getaddrinfo_async / asr_abort
4984 }
4985
4986 if (native_os == .freebsd) {
4987 // TODO use dnsres_getaddrinfo
4988 }
4989
4990 if (native_os.isDarwin()) {
4991 // TODO use CFHostStartInfoResolution / CFHostCancelInfoResolution
4992 }
4993
4994 if (builtin.link_libc) {
4995 // This operating system lacks a way to resolve asynchronously. We are
4996 // stuck with getaddrinfo.
4997 var name_buffer: [HostName.max_len + 1]u8 = undefined;
4998 @memcpy(name_buffer[0..host_name.bytes.len], host_name.bytes);
4999 name_buffer[host_name.bytes.len] = 0;
5000 const name_c = name_buffer[0..host_name.bytes.len :0];
5001
5002 var port_buffer: [8]u8 = undefined;
5003 const port_c = std.fmt.bufPrintZ(&port_buffer, "{d}", .{options.port}) catch unreachable;
5004
5005 const hints: posix.addrinfo = .{
5006 .flags = .{ .NUMERICSERV = true },
5007 .family = posix.AF.UNSPEC,
5008 .socktype = posix.SOCK.STREAM,
5009 .protocol = posix.IPPROTO.TCP,
5010 .canonname = null,
5011 .addr = null,
5012 .addrlen = 0,
5013 .next = null,
5014 };
5015 var res: ?*posix.addrinfo = null;
5016 while (true) {
5017 try t.checkCancel();
5018 switch (posix.system.getaddrinfo(name_c.ptr, port_c.ptr, &hints, &res)) {
5019 @as(posix.system.EAI, @enumFromInt(0)) => break,
5020 .ADDRFAMILY => return error.AddressFamilyUnsupported,
5021 .AGAIN => return error.NameServerFailure,
5022 .FAIL => return error.NameServerFailure,
5023 .FAMILY => return error.AddressFamilyUnsupported,
5024 .MEMORY => return error.SystemResources,
5025 .NODATA => return error.UnknownHostName,
5026 .NONAME => return error.UnknownHostName,
5027 .SYSTEM => switch (posix.errno(-1)) {
5028 .INTR => continue,
5029 .CANCELED => return error.Canceled,
5030 else => |e| return posix.unexpectedErrno(e),
5031 },
5032 else => return error.Unexpected,
5033 }
5034 }
5035 defer if (res) |some| posix.system.freeaddrinfo(some);
5036
5037 var it = res;
5038 var canon_name: ?[*:0]const u8 = null;
5039 while (it) |info| : (it = info.next) {
5040 const addr = info.addr orelse continue;
5041 const storage: PosixAddress = .{ .any = addr.* };
5042 try resolved.putOne(t_io, .{ .address = addressFromPosix(&storage) });
5043
5044 if (info.canonname) |n| {
5045 if (canon_name == null) {
5046 canon_name = n;
5047 }
5048 }
5049 }
5050 if (canon_name) |n| {
5051 try resolved.putOne(t_io, .{
5052 .canonical_name = copyCanon(options.canonical_name_buffer, std.mem.sliceTo(n, 0)),
5053 });
5054 }
5055 return;
5056 }
5057
5058 return error.OptionUnsupported;
5059}
5060
5061pub const PosixAddress = extern union {
5062 any: posix.sockaddr,
5063 in: posix.sockaddr.in,
5064 in6: posix.sockaddr.in6,
5065};
5066
5067const UnixAddress = extern union {
5068 any: posix.sockaddr,
5069 un: posix.sockaddr.un,
5070};
5071
5072const WsaAddress = extern union {
5073 any: ws2_32.sockaddr,
5074 in: ws2_32.sockaddr.in,
5075 in6: ws2_32.sockaddr.in6,
5076 un: ws2_32.sockaddr.un,
5077};
5078
5079pub fn posixAddressFamily(a: *const IpAddress) posix.sa_family_t {
5080 return switch (a.*) {
5081 .ip4 => posix.AF.INET,
5082 .ip6 => posix.AF.INET6,
5083 };
5084}
5085
5086pub fn addressFromPosix(posix_address: *const PosixAddress) IpAddress {
5087 return switch (posix_address.any.family) {
5088 posix.AF.INET => .{ .ip4 = address4FromPosix(&posix_address.in) },
5089 posix.AF.INET6 => .{ .ip6 = address6FromPosix(&posix_address.in6) },
5090 else => .{ .ip4 = .loopback(0) },
5091 };
5092}
5093
5094fn addressFromWsa(wsa_address: *const WsaAddress) IpAddress {
5095 return switch (wsa_address.any.family) {
5096 posix.AF.INET => .{ .ip4 = address4FromWsa(&wsa_address.in) },
5097 posix.AF.INET6 => .{ .ip6 = address6FromWsa(&wsa_address.in6) },
5098 else => .{ .ip4 = .loopback(0) },
5099 };
5100}
5101
5102pub fn addressToPosix(a: *const IpAddress, storage: *PosixAddress) posix.socklen_t {
5103 return switch (a.*) {
5104 .ip4 => |ip4| {
5105 storage.in = address4ToPosix(ip4);
5106 return @sizeOf(posix.sockaddr.in);
5107 },
5108 .ip6 => |*ip6| {
5109 storage.in6 = address6ToPosix(ip6);
5110 return @sizeOf(posix.sockaddr.in6);
5111 },
5112 };
5113}
5114
5115fn addressToWsa(a: *const IpAddress, storage: *WsaAddress) i32 {
5116 return switch (a.*) {
5117 .ip4 => |ip4| {
5118 storage.in = address4ToPosix(ip4);
5119 return @sizeOf(posix.sockaddr.in);
5120 },
5121 .ip6 => |*ip6| {
5122 storage.in6 = address6ToPosix(ip6);
5123 return @sizeOf(posix.sockaddr.in6);
5124 },
5125 };
5126}
5127
5128fn addressUnixToPosix(a: *const net.UnixAddress, storage: *UnixAddress) posix.socklen_t {
5129 @memcpy(storage.un.path[0..a.path.len], a.path);
5130 storage.un.family = posix.AF.UNIX;
5131 storage.un.path[a.path.len] = 0;
5132 return @sizeOf(posix.sockaddr.un);
5133}
5134
5135fn addressUnixToWsa(a: *const net.UnixAddress, storage: *WsaAddress) i32 {
5136 @memcpy(storage.un.path[0..a.path.len], a.path);
5137 storage.un.family = posix.AF.UNIX;
5138 storage.un.path[a.path.len] = 0;
5139 return @sizeOf(posix.sockaddr.un);
5140}
5141
5142fn address4FromPosix(in: *const posix.sockaddr.in) net.Ip4Address {
5143 return .{
5144 .port = std.mem.bigToNative(u16, in.port),
5145 .bytes = @bitCast(in.addr),
5146 };
5147}
5148
5149fn address6FromPosix(in6: *const posix.sockaddr.in6) net.Ip6Address {
5150 return .{
5151 .port = std.mem.bigToNative(u16, in6.port),
5152 .bytes = in6.addr,
5153 .flow = in6.flowinfo,
5154 .interface = .{ .index = in6.scope_id },
5155 };
5156}
5157
5158fn address4FromWsa(in: *const ws2_32.sockaddr.in) net.Ip4Address {
5159 return .{
5160 .port = std.mem.bigToNative(u16, in.port),
5161 .bytes = @bitCast(in.addr),
5162 };
5163}
5164
5165fn address6FromWsa(in6: *const ws2_32.sockaddr.in6) net.Ip6Address {
5166 return .{
5167 .port = std.mem.bigToNative(u16, in6.port),
5168 .bytes = in6.addr,
5169 .flow = in6.flowinfo,
5170 .interface = .{ .index = in6.scope_id },
5171 };
5172}
5173
5174fn address4ToPosix(a: net.Ip4Address) posix.sockaddr.in {
5175 return .{
5176 .port = std.mem.nativeToBig(u16, a.port),
5177 .addr = @bitCast(a.bytes),
5178 };
5179}
5180
5181fn address6ToPosix(a: *const net.Ip6Address) posix.sockaddr.in6 {
5182 return .{
5183 .port = std.mem.nativeToBig(u16, a.port),
5184 .flowinfo = a.flow,
5185 .addr = a.bytes,
5186 .scope_id = a.interface.index,
5187 };
5188}
5189
5190pub fn errnoBug(err: posix.E) Io.UnexpectedError {
5191 if (is_debug) std.debug.panic("programmer bug caused syscall error: {t}", .{err});
5192 return error.Unexpected;
5193}
5194
5195fn wsaErrorBug(err: ws2_32.WinsockError) Io.UnexpectedError {
5196 if (is_debug) std.debug.panic("programmer bug caused syscall error: {t}", .{err});
5197 return error.Unexpected;
5198}
5199
5200pub fn posixSocketMode(mode: net.Socket.Mode) u32 {
5201 return switch (mode) {
5202 .stream => posix.SOCK.STREAM,
5203 .dgram => posix.SOCK.DGRAM,
5204 .seqpacket => posix.SOCK.SEQPACKET,
5205 .raw => posix.SOCK.RAW,
5206 .rdm => posix.SOCK.RDM,
5207 };
5208}
5209
5210pub fn posixProtocol(protocol: ?net.Protocol) u32 {
5211 return @intFromEnum(protocol orelse return 0);
5212}
5213
5214fn recoverableOsBugDetected() void {
5215 if (is_debug) unreachable;
5216}
5217
5218fn clockToPosix(clock: Io.Clock) posix.clockid_t {
5219 return switch (clock) {
5220 .real => posix.CLOCK.REALTIME,
5221 .awake => switch (native_os) {
5222 .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => posix.CLOCK.UPTIME_RAW,
5223 else => posix.CLOCK.MONOTONIC,
5224 },
5225 .boot => switch (native_os) {
5226 .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => posix.CLOCK.MONOTONIC_RAW,
5227 // On freebsd derivatives, use MONOTONIC_FAST as currently there's
5228 // no precision tradeoff.
5229 .freebsd, .dragonfly => posix.CLOCK.MONOTONIC_FAST,
5230 // On linux, use BOOTTIME instead of MONOTONIC as it ticks while
5231 // suspended.
5232 .linux => posix.CLOCK.BOOTTIME,
5233 // On other posix systems, MONOTONIC is generally the fastest and
5234 // ticks while suspended.
5235 else => posix.CLOCK.MONOTONIC,
5236 },
5237 .cpu_process => posix.CLOCK.PROCESS_CPUTIME_ID,
5238 .cpu_thread => posix.CLOCK.THREAD_CPUTIME_ID,
5239 };
5240}
5241
5242fn clockToWasi(clock: Io.Clock) std.os.wasi.clockid_t {
5243 return switch (clock) {
5244 .real => .REALTIME,
5245 .awake => .MONOTONIC,
5246 .boot => .MONOTONIC,
5247 .cpu_process => .PROCESS_CPUTIME_ID,
5248 .cpu_thread => .THREAD_CPUTIME_ID,
5249 };
5250}
5251
5252fn statFromLinux(stx: *const std.os.linux.Statx) Io.File.Stat {
5253 const atime = stx.atime;
5254 const mtime = stx.mtime;
5255 const ctime = stx.ctime;
5256 return .{
5257 .inode = stx.ino,
5258 .size = stx.size,
5259 .mode = stx.mode,
5260 .kind = switch (stx.mode & std.os.linux.S.IFMT) {
5261 std.os.linux.S.IFDIR => .directory,
5262 std.os.linux.S.IFCHR => .character_device,
5263 std.os.linux.S.IFBLK => .block_device,
5264 std.os.linux.S.IFREG => .file,
5265 std.os.linux.S.IFIFO => .named_pipe,
5266 std.os.linux.S.IFLNK => .sym_link,
5267 std.os.linux.S.IFSOCK => .unix_domain_socket,
5268 else => .unknown,
5269 },
5270 .atime = .{ .nanoseconds = @intCast(@as(i128, atime.sec) * std.time.ns_per_s + atime.nsec) },
5271 .mtime = .{ .nanoseconds = @intCast(@as(i128, mtime.sec) * std.time.ns_per_s + mtime.nsec) },
5272 .ctime = .{ .nanoseconds = @intCast(@as(i128, ctime.sec) * std.time.ns_per_s + ctime.nsec) },
5273 };
5274}
5275
5276fn statFromPosix(st: *const posix.Stat) Io.File.Stat {
5277 const atime = st.atime();
5278 const mtime = st.mtime();
5279 const ctime = st.ctime();
5280 return .{
5281 .inode = st.ino,
5282 .size = @bitCast(st.size),
5283 .mode = st.mode,
5284 .kind = k: {
5285 const m = st.mode & posix.S.IFMT;
5286 switch (m) {
5287 posix.S.IFBLK => break :k .block_device,
5288 posix.S.IFCHR => break :k .character_device,
5289 posix.S.IFDIR => break :k .directory,
5290 posix.S.IFIFO => break :k .named_pipe,
5291 posix.S.IFLNK => break :k .sym_link,
5292 posix.S.IFREG => break :k .file,
5293 posix.S.IFSOCK => break :k .unix_domain_socket,
5294 else => {},
5295 }
5296 if (native_os == .illumos) switch (m) {
5297 posix.S.IFDOOR => break :k .door,
5298 posix.S.IFPORT => break :k .event_port,
5299 else => {},
5300 };
5301
5302 break :k .unknown;
5303 },
5304 .atime = timestampFromPosix(&atime),
5305 .mtime = timestampFromPosix(&mtime),
5306 .ctime = timestampFromPosix(&ctime),
5307 };
5308}
5309
5310fn statFromWasi(st: *const std.os.wasi.filestat_t) Io.File.Stat {
5311 return .{
5312 .inode = st.ino,
5313 .size = @bitCast(st.size),
5314 .mode = 0,
5315 .kind = switch (st.filetype) {
5316 .BLOCK_DEVICE => .block_device,
5317 .CHARACTER_DEVICE => .character_device,
5318 .DIRECTORY => .directory,
5319 .SYMBOLIC_LINK => .sym_link,
5320 .REGULAR_FILE => .file,
5321 .SOCKET_STREAM, .SOCKET_DGRAM => .unix_domain_socket,
5322 else => .unknown,
5323 },
5324 .atime = .fromNanoseconds(st.atim),
5325 .mtime = .fromNanoseconds(st.mtim),
5326 .ctime = .fromNanoseconds(st.ctim),
5327 };
5328}
5329
5330fn timestampFromPosix(timespec: *const posix.timespec) Io.Timestamp {
5331 return .{ .nanoseconds = @intCast(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec) };
5332}
5333
5334fn timestampToPosix(nanoseconds: i96) posix.timespec {
5335 return .{
5336 .sec = @intCast(@divFloor(nanoseconds, std.time.ns_per_s)),
5337 .nsec = @intCast(@mod(nanoseconds, std.time.ns_per_s)),
5338 };
5339}
5340
5341fn pathToPosix(file_path: []const u8, buffer: *[posix.PATH_MAX]u8) Io.Dir.PathNameError![:0]u8 {
5342 if (std.mem.containsAtLeastScalar2(u8, file_path, 0, 1)) return error.BadPathName;
5343 // >= rather than > to make room for the null byte
5344 if (file_path.len >= buffer.len) return error.NameTooLong;
5345 @memcpy(buffer[0..file_path.len], file_path);
5346 buffer[file_path.len] = 0;
5347 return buffer[0..file_path.len :0];
5348}
5349
5350fn lookupDnsSearch(
5351 t: *Threaded,
5352 host_name: HostName,
5353 resolved: *Io.Queue(HostName.LookupResult),
5354 options: HostName.LookupOptions,
5355) HostName.LookupError!void {
5356 const t_io = io(t);
5357 const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed;
5358
5359 // Count dots, suppress search when >=ndots or name ends in
5360 // a dot, which is an explicit request for global scope.
5361 const dots = std.mem.countScalar(u8, host_name.bytes, '.');
5362 const search_len = if (dots >= rc.ndots or std.mem.endsWith(u8, host_name.bytes, ".")) 0 else rc.search_len;
5363 const search = rc.search_buffer[0..search_len];
5364
5365 var canon_name = host_name.bytes;
5366
5367 // Strip final dot for canon, fail if multiple trailing dots.
5368 if (std.mem.endsWith(u8, canon_name, ".")) canon_name.len -= 1;
5369 if (std.mem.endsWith(u8, canon_name, ".")) return error.UnknownHostName;
5370
5371 // Name with search domain appended is set up in `canon_name`. This
5372 // both provides the desired default canonical name (if the requested
5373 // name is not a CNAME record) and serves as a buffer for passing the
5374 // full requested name to `lookupDns`.
5375 @memcpy(options.canonical_name_buffer[0..canon_name.len], canon_name);
5376 options.canonical_name_buffer[canon_name.len] = '.';
5377 var it = std.mem.tokenizeAny(u8, search, " \t");
5378 while (it.next()) |token| {
5379 @memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token);
5380 const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len];
5381 if (lookupDns(t, lookup_canon_name, &rc, resolved, options)) |result| {
5382 return result;
5383 } else |err| switch (err) {
5384 error.UnknownHostName => continue,
5385 else => |e| return e,
5386 }
5387 }
5388
5389 const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len];
5390 return lookupDns(t, lookup_canon_name, &rc, resolved, options);
5391}
5392
5393fn lookupDns(
5394 t: *Threaded,
5395 lookup_canon_name: []const u8,
5396 rc: *const HostName.ResolvConf,
5397 resolved: *Io.Queue(HostName.LookupResult),
5398 options: HostName.LookupOptions,
5399) HostName.LookupError!void {
5400 const t_io = io(t);
5401 const family_records: [2]struct { af: IpAddress.Family, rr: HostName.DnsRecord } = .{
5402 .{ .af = .ip6, .rr = .A },
5403 .{ .af = .ip4, .rr = .AAAA },
5404 };
5405 var query_buffers: [2][280]u8 = undefined;
5406 var answer_buffer: [2 * 512]u8 = undefined;
5407 var queries_buffer: [2][]const u8 = undefined;
5408 var answers_buffer: [2][]const u8 = undefined;
5409 var nq: usize = 0;
5410 var answer_buffer_i: usize = 0;
5411
5412 for (family_records) |fr| {
5413 if (options.family != fr.af) {
5414 const entropy = std.crypto.random.array(u8, 2);
5415 const len = writeResolutionQuery(&query_buffers[nq], 0, lookup_canon_name, 1, fr.rr, entropy);
5416 queries_buffer[nq] = query_buffers[nq][0..len];
5417 nq += 1;
5418 }
5419 }
5420
5421 var ip4_mapped_buffer: [HostName.ResolvConf.max_nameservers]IpAddress = undefined;
5422 const ip4_mapped = ip4_mapped_buffer[0..rc.nameservers_len];
5423 var any_ip6 = false;
5424 for (rc.nameservers(), ip4_mapped) |*ns, *m| {
5425 m.* = .{ .ip6 = .fromAny(ns.*) };
5426 any_ip6 = any_ip6 or ns.* == .ip6;
5427 }
5428 var socket = s: {
5429 if (any_ip6) ip6: {
5430 const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) };
5431 const socket = ip6_addr.bind(t_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) {
5432 error.AddressFamilyUnsupported => break :ip6,
5433 else => |e| return e,
5434 };
5435 break :s socket;
5436 }
5437 any_ip6 = false;
5438 const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) };
5439 const socket = try ip4_addr.bind(t_io, .{ .mode = .dgram });
5440 break :s socket;
5441 };
5442 defer socket.close(t_io);
5443
5444 const mapped_nameservers = if (any_ip6) ip4_mapped else rc.nameservers();
5445 const queries = queries_buffer[0..nq];
5446 const answers = answers_buffer[0..queries.len];
5447 var answers_remaining = answers.len;
5448 for (answers) |*answer| answer.len = 0;
5449
5450 // boot clock is chosen because time the computer is suspended should count
5451 // against time spent waiting for external messages to arrive.
5452 const clock: Io.Clock = .boot;
5453 var now_ts = try clock.now(t_io);
5454 const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds));
5455 const attempt_duration: Io.Duration = .{
5456 .nanoseconds = (std.time.ns_per_s / rc.attempts) * @as(i96, rc.timeout_seconds),
5457 };
5458
5459 send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(t_io)) {
5460 const max_messages = queries_buffer.len * HostName.ResolvConf.max_nameservers;
5461 {
5462 var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined;
5463 var message_i: usize = 0;
5464 for (queries, answers) |query, *answer| {
5465 if (answer.len != 0) continue;
5466 for (mapped_nameservers) |*ns| {
5467 message_buffer[message_i] = .{
5468 .address = ns,
5469 .data_ptr = query.ptr,
5470 .data_len = query.len,
5471 };
5472 message_i += 1;
5473 }
5474 }
5475 _ = netSendPosix(t, socket.handle, message_buffer[0..message_i], .{});
5476 }
5477
5478 const timeout: Io.Timeout = .{ .deadline = .{
5479 .raw = now_ts.addDuration(attempt_duration),
5480 .clock = clock,
5481 } };
5482
5483 while (true) {
5484 var message_buffer: [max_messages]Io.net.IncomingMessage = @splat(.init);
5485 const buf = answer_buffer[answer_buffer_i..];
5486 const recv_err, const recv_n = socket.receiveManyTimeout(t_io, &message_buffer, buf, .{}, timeout);
5487 for (message_buffer[0..recv_n]) |*received_message| {
5488 const reply = received_message.data;
5489 // Ignore non-identifiable packets.
5490 if (reply.len < 4) continue;
5491
5492 // Ignore replies from addresses we didn't send to.
5493 const ns = for (mapped_nameservers) |*ns| {
5494 if (received_message.from.eql(ns)) break ns;
5495 } else {
5496 continue;
5497 };
5498
5499 // Find which query this answer goes with, if any.
5500 const query, const answer = for (queries, answers) |query, *answer| {
5501 if (reply[0] == query[0] and reply[1] == query[1]) break .{ query, answer };
5502 } else {
5503 continue;
5504 };
5505 if (answer.len != 0) continue;
5506
5507 // Only accept positive or negative responses; retry immediately on
5508 // server failure, and ignore all other codes such as refusal.
5509 switch (reply[3] & 15) {
5510 0, 3 => {
5511 answer.* = reply;
5512 answer_buffer_i += reply.len;
5513 answers_remaining -= 1;
5514 if (answer_buffer.len - answer_buffer_i == 0) break :send;
5515 if (answers_remaining == 0) break :send;
5516 },
5517 2 => {
5518 var retry_message: Io.net.OutgoingMessage = .{
5519 .address = ns,
5520 .data_ptr = query.ptr,
5521 .data_len = query.len,
5522 };
5523 _ = netSendPosix(t, socket.handle, (&retry_message)[0..1], .{});
5524 continue;
5525 },
5526 else => continue,
5527 }
5528 }
5529 if (recv_err) |err| switch (err) {
5530 error.Canceled => return error.Canceled,
5531 error.Timeout => continue :send,
5532 else => continue,
5533 };
5534 }
5535 } else {
5536 return error.NameServerFailure;
5537 }
5538
5539 var addresses_len: usize = 0;
5540 var canonical_name: ?HostName = null;
5541
5542 for (answers) |answer| {
5543 var it = HostName.DnsResponse.init(answer) catch {
5544 // Here we could potentially add diagnostics to the results queue.
5545 continue;
5546 };
5547 while (it.next() catch {
5548 // Here we could potentially add diagnostics to the results queue.
5549 continue;
5550 }) |record| switch (record.rr) {
5551 .A => {
5552 const data = record.packet[record.data_off..][0..record.data_len];
5553 if (data.len != 4) return error.InvalidDnsARecord;
5554 try resolved.putOne(t_io, .{ .address = .{ .ip4 = .{
5555 .bytes = data[0..4].*,
5556 .port = options.port,
5557 } } });
5558 addresses_len += 1;
5559 },
5560 .AAAA => {
5561 const data = record.packet[record.data_off..][0..record.data_len];
5562 if (data.len != 16) return error.InvalidDnsAAAARecord;
5563 try resolved.putOne(t_io, .{ .address = .{ .ip6 = .{
5564 .bytes = data[0..16].*,
5565 .port = options.port,
5566 } } });
5567 addresses_len += 1;
5568 },
5569 .CNAME => {
5570 _, canonical_name = HostName.expand(record.packet, record.data_off, options.canonical_name_buffer) catch
5571 return error.InvalidDnsCnameRecord;
5572 },
5573 _ => continue,
5574 };
5575 }
5576
5577 try resolved.putOne(t_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } });
5578 if (addresses_len == 0) return error.NameServerFailure;
5579}
5580
5581fn lookupHosts(
5582 t: *Threaded,
5583 host_name: HostName,
5584 resolved: *Io.Queue(HostName.LookupResult),
5585 options: HostName.LookupOptions,
5586) !void {
5587 const t_io = io(t);
5588 const file = Io.File.openAbsolute(t_io, "/etc/hosts", .{}) catch |err| switch (err) {
5589 error.FileNotFound,
5590 error.NotDir,
5591 error.AccessDenied,
5592 => return error.UnknownHostName,
5593
5594 error.Canceled => |e| return e,
5595
5596 else => {
5597 // Here we could add more detailed diagnostics to the results queue.
5598 return error.DetectingNetworkConfigurationFailed;
5599 },
5600 };
5601 defer file.close(t_io);
5602
5603 var line_buf: [512]u8 = undefined;
5604 var file_reader = file.reader(t_io, &line_buf);
5605 return lookupHostsReader(t, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) {
5606 error.ReadFailed => switch (file_reader.err.?) {
5607 error.Canceled => |e| return e,
5608 else => {
5609 // Here we could add more detailed diagnostics to the results queue.
5610 return error.DetectingNetworkConfigurationFailed;
5611 },
5612 },
5613 error.Canceled => |e| return e,
5614 error.UnknownHostName => |e| return e,
5615 };
5616}
5617
5618fn lookupHostsReader(
5619 t: *Threaded,
5620 host_name: HostName,
5621 resolved: *Io.Queue(HostName.LookupResult),
5622 options: HostName.LookupOptions,
5623 reader: *Io.Reader,
5624) error{ ReadFailed, Canceled, UnknownHostName }!void {
5625 const t_io = io(t);
5626 var addresses_len: usize = 0;
5627 var canonical_name: ?HostName = null;
5628 while (true) {
5629 const line = reader.takeDelimiterExclusive('\n') catch |err| switch (err) {
5630 error.StreamTooLong => {
5631 // Skip lines that are too long.
5632 _ = reader.discardDelimiterInclusive('\n') catch |e| switch (e) {
5633 error.EndOfStream => break,
5634 error.ReadFailed => return error.ReadFailed,
5635 };
5636 continue;
5637 },
5638 error.ReadFailed => return error.ReadFailed,
5639 error.EndOfStream => break,
5640 };
5641 reader.toss(1);
5642 var split_it = std.mem.splitScalar(u8, line, '#');
5643 const no_comment_line = split_it.first();
5644
5645 var line_it = std.mem.tokenizeAny(u8, no_comment_line, " \t");
5646 const ip_text = line_it.next() orelse continue;
5647 var first_name_text: ?[]const u8 = null;
5648 while (line_it.next()) |name_text| {
5649 if (std.mem.eql(u8, name_text, host_name.bytes)) {
5650 if (first_name_text == null) first_name_text = name_text;
5651 break;
5652 }
5653 } else continue;
5654
5655 if (canonical_name == null) {
5656 if (HostName.init(first_name_text.?)) |name_text| {
5657 if (name_text.bytes.len <= options.canonical_name_buffer.len) {
5658 const canonical_name_dest = options.canonical_name_buffer[0..name_text.bytes.len];
5659 @memcpy(canonical_name_dest, name_text.bytes);
5660 canonical_name = .{ .bytes = canonical_name_dest };
5661 }
5662 } else |_| {}
5663 }
5664
5665 if (options.family != .ip6) {
5666 if (IpAddress.parseIp4(ip_text, options.port)) |addr| {
5667 try resolved.putOne(t_io, .{ .address = addr });
5668 addresses_len += 1;
5669 } else |_| {}
5670 }
5671 if (options.family != .ip4) {
5672 if (IpAddress.parseIp6(ip_text, options.port)) |addr| {
5673 try resolved.putOne(t_io, .{ .address = addr });
5674 addresses_len += 1;
5675 } else |_| {}
5676 }
5677 }
5678
5679 if (canonical_name) |canon_name| try resolved.putOne(t_io, .{ .canonical_name = canon_name });
5680 if (addresses_len == 0) return error.UnknownHostName;
5681}
5682
5683/// Writes DNS resolution query packet data to `w`; at most 280 bytes.
5684fn writeResolutionQuery(q: *[280]u8, op: u4, dname: []const u8, class: u8, ty: HostName.DnsRecord, entropy: [2]u8) usize {
5685 // This implementation is ported from musl libc.
5686 // A more idiomatic "ziggy" implementation would be welcome.
5687 var name = dname;
5688 if (std.mem.endsWith(u8, name, ".")) name.len -= 1;
5689 assert(name.len <= 253);
5690 const n = 17 + name.len + @intFromBool(name.len != 0);
5691
5692 // Construct query template - ID will be filled later
5693 q[0..2].* = entropy;
5694 @memset(q[2..n], 0);
5695 q[2] = @as(u8, op) * 8 + 1;
5696 q[5] = 1;
5697 @memcpy(q[13..][0..name.len], name);
5698 var i: usize = 13;
5699 var j: usize = undefined;
5700 while (q[i] != 0) : (i = j + 1) {
5701 j = i;
5702 while (q[j] != 0 and q[j] != '.') : (j += 1) {}
5703 // TODO determine the circumstances for this and whether or
5704 // not this should be an error.
5705 if (j - i - 1 > 62) unreachable;
5706 q[i - 1] = @intCast(j - i);
5707 }
5708 q[i + 1] = @intFromEnum(ty);
5709 q[i + 3] = class;
5710 return n;
5711}
5712
5713fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) HostName {
5714 const dest = canonical_name_buffer[0..name.len];
5715 @memcpy(dest, name);
5716 return .{ .bytes = dest };
5717}
5718
5719/// Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
5720/// https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
5721///
5722/// This XNU version appears to correspond to 11.0.1:
5723/// https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html
5724///
5725/// ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout
5726/// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
5727const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11;
5728
5729fn futexWait(t: *Threaded, ptr: *const std.atomic.Value(u32), expect: u32) Io.Cancelable!void {
5730 @branchHint(.cold);
5731
5732 if (builtin.cpu.arch.isWasm()) {
5733 comptime assert(builtin.cpu.has(.wasm, .atomics));
5734 try t.checkCancel();
5735 const timeout: i64 = -1;
5736 const signed_expect: i32 = @bitCast(expect);
5737 const result = asm volatile (
5738 \\local.get %[ptr]
5739 \\local.get %[expected]
5740 \\local.get %[timeout]
5741 \\memory.atomic.wait32 0
5742 \\local.set %[ret]
5743 : [ret] "=r" (-> u32),
5744 : [ptr] "r" (&ptr.raw),
5745 [expected] "r" (signed_expect),
5746 [timeout] "r" (timeout),
5747 );
5748 switch (result) {
5749 0 => {}, // ok
5750 1 => {}, // expected != loaded
5751 2 => assert(!is_debug), // timeout
5752 else => assert(!is_debug),
5753 }
5754 } else switch (native_os) {
5755 .linux => {
5756 const linux = std.os.linux;
5757 try t.checkCancel();
5758 const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
5759 if (is_debug) switch (linux.errno(rc)) {
5760 .SUCCESS => {}, // notified by `wake()`
5761 .INTR => {}, // gives caller a chance to check cancellation
5762 .AGAIN => {}, // ptr.* != expect
5763 .INVAL => {}, // possibly timeout overflow
5764 .TIMEDOUT => unreachable,
5765 .FAULT => unreachable, // ptr was invalid
5766 else => unreachable,
5767 };
5768 },
5769 .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
5770 const c = std.c;
5771 const flags: c.UL = .{
5772 .op = .COMPARE_AND_WAIT,
5773 .NO_ERRNO = true,
5774 };
5775 try t.checkCancel();
5776 const status = if (darwin_supports_ulock_wait2)
5777 c.__ulock_wait2(flags, ptr, expect, 0, 0)
5778 else
5779 c.__ulock_wait(flags, ptr, expect, 0);
5780
5781 if (status >= 0) return;
5782
5783 if (is_debug) switch (@as(c.E, @enumFromInt(-status))) {
5784 .INTR => {}, // spurious wake
5785 // Address of the futex was paged out. This is unlikely, but possible in theory, and
5786 // pthread/libdispatch on darwin bother to handle it. In this case we'll return
5787 // without waiting, but the caller should retry anyway.
5788 .FAULT => {},
5789 .TIMEDOUT => unreachable,
5790 else => unreachable,
5791 };
5792 },
5793 .windows => {
5794 try t.checkCancel();
5795 switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) {
5796 .SUCCESS => {},
5797 .CANCELLED => return error.Canceled,
5798 else => recoverableOsBugDetected(),
5799 }
5800 },
5801 .freebsd => {
5802 const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
5803 try t.checkCancel();
5804 const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0);
5805 if (is_debug) switch (posix.errno(rc)) {
5806 .SUCCESS => {},
5807 .FAULT => unreachable, // one of the args points to invalid memory
5808 .INVAL => unreachable, // arguments should be correct
5809 .TIMEDOUT => unreachable, // no timeout provided
5810 .INTR => {}, // spurious wake
5811 else => unreachable,
5812 };
5813 },
5814 else => @compileError("unimplemented: futexWait"),
5815 }
5816}
5817
5818pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) void {
5819 @branchHint(.cold);
5820
5821 if (builtin.cpu.arch.isWasm()) {
5822 comptime assert(builtin.cpu.has(.wasm, .atomics));
5823 const timeout: i64 = -1;
5824 const signed_expect: i32 = @bitCast(expect);
5825 const result = asm volatile (
5826 \\local.get %[ptr]
5827 \\local.get %[expected]
5828 \\local.get %[timeout]
5829 \\memory.atomic.wait32 0
5830 \\local.set %[ret]
5831 : [ret] "=r" (-> u32),
5832 : [ptr] "r" (&ptr.raw),
5833 [expected] "r" (signed_expect),
5834 [timeout] "r" (timeout),
5835 );
5836 switch (result) {
5837 0 => {}, // ok
5838 1 => {}, // expected != loaded
5839 2 => recoverableOsBugDetected(), // timeout
5840 else => recoverableOsBugDetected(),
5841 }
5842 } else switch (native_os) {
5843 .linux => {
5844 const linux = std.os.linux;
5845 const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
5846 switch (linux.errno(rc)) {
5847 .SUCCESS => {}, // notified by `wake()`
5848 .INTR => {}, // gives caller a chance to check cancellation
5849 .AGAIN => {}, // ptr.* != expect
5850 .INVAL => {}, // possibly timeout overflow
5851 .TIMEDOUT => recoverableOsBugDetected(),
5852 .FAULT => recoverableOsBugDetected(), // ptr was invalid
5853 else => recoverableOsBugDetected(),
5854 }
5855 },
5856 .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
5857 const c = std.c;
5858 const flags: c.UL = .{
5859 .op = .COMPARE_AND_WAIT,
5860 .NO_ERRNO = true,
5861 };
5862 const status = if (darwin_supports_ulock_wait2)
5863 c.__ulock_wait2(flags, ptr, expect, 0, 0)
5864 else
5865 c.__ulock_wait(flags, ptr, expect, 0);
5866
5867 if (status >= 0) return;
5868
5869 switch (@as(c.E, @enumFromInt(-status))) {
5870 // Wait was interrupted by the OS or other spurious signalling.
5871 .INTR => {},
5872 // Address of the futex was paged out. This is unlikely, but possible in theory, and
5873 // pthread/libdispatch on darwin bother to handle it. In this case we'll return
5874 // without waiting, but the caller should retry anyway.
5875 .FAULT => {},
5876 .TIMEDOUT => recoverableOsBugDetected(),
5877 else => recoverableOsBugDetected(),
5878 }
5879 },
5880 .windows => {
5881 switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) {
5882 .SUCCESS, .CANCELLED => {},
5883 else => recoverableOsBugDetected(),
5884 }
5885 },
5886 .freebsd => {
5887 const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
5888 const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0);
5889 switch (posix.errno(rc)) {
5890 .SUCCESS => {},
5891 .INTR => {}, // spurious wake
5892 .FAULT => recoverableOsBugDetected(), // one of the args points to invalid memory
5893 .INVAL => recoverableOsBugDetected(), // arguments should be correct
5894 .TIMEDOUT => recoverableOsBugDetected(), // no timeout provided
5895 else => recoverableOsBugDetected(),
5896 }
5897 },
5898 else => @compileError("unimplemented: futexWaitUncancelable"),
5899 }
5900}
5901
5902pub fn futexWaitDurationUncancelable(ptr: *const std.atomic.Value(u32), expect: u32, timeout: Io.Duration) void {
5903 @branchHint(.cold);
5904
5905 if (native_os == .linux) {
5906 const linux = std.os.linux;
5907 var ts = timestampToPosix(timeout.toNanoseconds());
5908 const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, &ts);
5909 if (is_debug) switch (linux.errno(rc)) {
5910 .SUCCESS => {}, // notified by `wake()`
5911 .INTR => {}, // gives caller a chance to check cancellation
5912 .AGAIN => {}, // ptr.* != expect
5913 .TIMEDOUT => {},
5914 .INVAL => {}, // possibly timeout overflow
5915 .FAULT => unreachable, // ptr was invalid
5916 else => unreachable,
5917 };
5918 return;
5919 } else {
5920 @compileError("TODO");
5921 }
5922}
5923
5924pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void {
5925 @branchHint(.cold);
5926
5927 if (builtin.cpu.arch.isWasm()) {
5928 comptime assert(builtin.cpu.has(.wasm, .atomics));
5929 assert(max_waiters != 0);
5930 const woken_count = asm volatile (
5931 \\local.get %[ptr]
5932 \\local.get %[waiters]
5933 \\memory.atomic.notify 0
5934 \\local.set %[ret]
5935 : [ret] "=r" (-> u32),
5936 : [ptr] "r" (&ptr.raw),
5937 [waiters] "r" (max_waiters),
5938 );
5939 _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
5940 } else switch (native_os) {
5941 .linux => {
5942 const linux = std.os.linux;
5943 switch (linux.errno(linux.futex_3arg(
5944 &ptr.raw,
5945 .{ .cmd = .WAKE, .private = true },
5946 @min(max_waiters, std.math.maxInt(i32)),
5947 ))) {
5948 .SUCCESS => return, // successful wake up
5949 .INVAL => return, // invalid futex_wait() on ptr done elsewhere
5950 .FAULT => return, // pointer became invalid while doing the wake
5951 else => return recoverableOsBugDetected(), // deadlock due to operating system bug
5952 }
5953 },
5954 .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
5955 const c = std.c;
5956 const flags: c.UL = .{
5957 .op = .COMPARE_AND_WAIT,
5958 .NO_ERRNO = true,
5959 .WAKE_ALL = max_waiters > 1,
5960 };
5961 while (true) {
5962 const status = c.__ulock_wake(flags, ptr, 0);
5963 if (status >= 0) return;
5964 switch (@as(c.E, @enumFromInt(-status))) {
5965 .INTR, .CANCELED => continue, // spurious wake()
5966 .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
5967 .NOENT => return, // nothing was woken up
5968 .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
5969 else => unreachable, // deadlock due to operating system bug
5970 }
5971 }
5972 },
5973 .windows => {
5974 assert(max_waiters != 0);
5975 switch (max_waiters) {
5976 1 => windows.ntdll.RtlWakeAddressSingle(ptr),
5977 else => windows.ntdll.RtlWakeAddressAll(ptr),
5978 }
5979 },
5980 .freebsd => {
5981 const rc = std.c._umtx_op(
5982 @intFromPtr(&ptr.raw),
5983 @intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE),
5984 @as(c_ulong, max_waiters),
5985 0, // there is no timeout struct
5986 0, // there is no timeout struct pointer
5987 );
5988 switch (posix.errno(rc)) {
5989 .SUCCESS => {},
5990 .FAULT => {}, // it's ok if the ptr doesn't point to valid memory
5991 .INVAL => unreachable, // arguments should be correct
5992 else => unreachable, // deadlock due to operating system bug
5993 }
5994 },
5995 else => @compileError("unimplemented: futexWake"),
5996 }
5997}
5998
5999/// A thread-safe logical boolean value which can be `set` and `unset`.
6000///
6001/// It can also block threads until the value is set with cancelation via timed
6002/// waits. Statically initializable; four bytes on all targets.
6003pub const ResetEvent = switch (native_os) {
6004 .illumos, .netbsd => ResetEventPosix,
6005 else => ResetEventFutex,
6006};
6007
6008/// A `ResetEvent` implementation based on futexes.
6009const ResetEventFutex = enum(u32) {
6010 unset = 0,
6011 waiting = 1,
6012 is_set = 2,
6013
6014 /// Returns whether the logical boolean is `set`.
6015 ///
6016 /// Once `reset` is called, this returns false until the next `set`.
6017 ///
6018 /// The memory accesses before the `set` can be said to happen before
6019 /// `isSet` returns true.
6020 pub fn isSet(ref: *const ResetEventFutex) bool {
6021 if (builtin.single_threaded) return switch (ref.*) {
6022 .unset => false,
6023 .waiting => unreachable,
6024 .is_set => true,
6025 };
6026 // Acquire barrier ensures memory accesses before `set` happen before
6027 // returning true.
6028 return @atomicLoad(ResetEventFutex, ref, .acquire) == .is_set;
6029 }
6030
6031 /// Blocks the calling thread until `set` is called.
6032 ///
6033 /// This is effectively a more efficient version of `while (!isSet()) {}`.
6034 ///
6035 /// The memory accesses before the `set` can be said to happen before `wait` returns.
6036 pub fn wait(ref: *ResetEventFutex, t: *Threaded) Io.Cancelable!void {
6037 if (builtin.single_threaded) switch (ref.*) {
6038 .unset => unreachable, // Deadlock, no other threads to wake us up.
6039 .waiting => unreachable, // Invalid state.
6040 .is_set => return,
6041 };
6042 // Try to set the state from `unset` to `waiting` to indicate to the
6043 // `set` thread that others are blocked on the ResetEventFutex. Avoid using
6044 // any strict barriers until we know the ResetEventFutex is set.
6045 var state = @atomicLoad(ResetEventFutex, ref, .acquire);
6046 if (state == .is_set) {
6047 @branchHint(.likely);
6048 return;
6049 }
6050 if (state == .unset) {
6051 state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
6052 }
6053 while (state == .waiting) {
6054 try futexWait(t, @ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
6055 state = @atomicLoad(ResetEventFutex, ref, .acquire);
6056 }
6057 assert(state == .is_set);
6058 }
6059
6060 /// Same as `wait` except uninterruptible.
6061 pub fn waitUncancelable(ref: *ResetEventFutex) void {
6062 if (builtin.single_threaded) switch (ref.*) {
6063 .unset => unreachable, // Deadlock, no other threads to wake us up.
6064 .waiting => unreachable, // Invalid state.
6065 .is_set => return,
6066 };
6067 // Try to set the state from `unset` to `waiting` to indicate to the
6068 // `set` thread that others are blocked on the ResetEventFutex. Avoid using
6069 // any strict barriers until we know the ResetEventFutex is set.
6070 var state = @atomicLoad(ResetEventFutex, ref, .acquire);
6071 if (state == .is_set) {
6072 @branchHint(.likely);
6073 return;
6074 }
6075 if (state == .unset) {
6076 state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
6077 }
6078 while (state == .waiting) {
6079 futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
6080 state = @atomicLoad(ResetEventFutex, ref, .acquire);
6081 }
6082 assert(state == .is_set);
6083 }
6084
6085 /// Marks the logical boolean as `set` and unblocks any threads in `wait`
6086 /// or `timedWait` to observe the new state.
6087 ///
6088 /// The logical boolean stays `set` until `reset` is called, making future
6089 /// `set` calls do nothing semantically.
6090 ///
6091 /// The memory accesses before `set` can be said to happen before `isSet`
6092 /// returns true or `wait`/`timedWait` return successfully.
6093 pub fn set(ref: *ResetEventFutex) void {
6094 if (builtin.single_threaded) {
6095 ref.* = .is_set;
6096 return;
6097 }
6098 if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) {
6099 futexWake(@ptrCast(ref), std.math.maxInt(u32));
6100 }
6101 }
6102
6103 /// Unmarks the ResetEventFutex as if `set` was never called.
6104 ///
6105 /// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent
6106 /// calls to `set`, `isSet` and `reset` are allowed.
6107 pub fn reset(ref: *ResetEventFutex) void {
6108 if (builtin.single_threaded) {
6109 ref.* = .unset;
6110 return;
6111 }
6112 @atomicStore(ResetEventFutex, ref, .unset, .monotonic);
6113 }
6114};
6115
6116/// A `ResetEvent` implementation based on pthreads API.
6117const ResetEventPosix = struct {
6118 cond: std.c.pthread_cond_t,
6119 mutex: std.c.pthread_mutex_t,
6120 state: ResetEventFutex,
6121
6122 pub const unset: ResetEventPosix = .{
6123 .cond = std.c.PTHREAD_COND_INITIALIZER,
6124 .mutex = std.c.PTHREAD_MUTEX_INITIALIZER,
6125 .state = .unset,
6126 };
6127
6128 pub fn isSet(rep: *const ResetEventPosix) bool {
6129 if (builtin.single_threaded) return switch (rep.state) {
6130 .unset => false,
6131 .waiting => unreachable,
6132 .is_set => true,
6133 };
6134 return @atomicLoad(ResetEventFutex, &rep.state, .acquire) == .is_set;
6135 }
6136
6137 pub fn wait(rep: *ResetEventPosix, t: *Threaded) Io.Cancelable!void {
6138 if (builtin.single_threaded) switch (rep.*) {
6139 .unset => unreachable, // Deadlock, no other threads to wake us up.
6140 .waiting => unreachable, // Invalid state.
6141 .is_set => return,
6142 };
6143 assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS);
6144 defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS);
6145 sw: switch (rep.state) {
6146 .unset => {
6147 rep.state = .waiting;
6148 continue :sw .waiting;
6149 },
6150 .waiting => {
6151 try t.checkCancel();
6152 assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS);
6153 continue :sw rep.state;
6154 },
6155 .is_set => return,
6156 }
6157 }
6158
6159 pub fn waitUncancelable(rep: *ResetEventPosix) void {
6160 if (builtin.single_threaded) switch (rep.*) {
6161 .unset => unreachable, // Deadlock, no other threads to wake us up.
6162 .waiting => unreachable, // Invalid state.
6163 .is_set => return,
6164 };
6165 assert(std.c.pthread_mutex_lock(&rep.mutex) == .SUCCESS);
6166 defer assert(std.c.pthread_mutex_unlock(&rep.mutex) == .SUCCESS);
6167 sw: switch (rep.state) {
6168 .unset => {
6169 rep.state = .waiting;
6170 continue :sw .waiting;
6171 },
6172 .waiting => {
6173 assert(std.c.pthread_cond_wait(&rep.cond, &rep.mutex) == .SUCCESS);
6174 continue :sw rep.state;
6175 },
6176 .is_set => return,
6177 }
6178 }
6179
6180 pub fn set(rep: *ResetEventPosix) void {
6181 if (builtin.single_threaded) {
6182 rep.* = .is_set;
6183 return;
6184 }
6185 if (@atomicRmw(ResetEventFutex, &rep.state, .Xchg, .is_set, .release) == .waiting) {
6186 assert(std.c.pthread_cond_broadcast(&rep.cond) == .SUCCESS);
6187 }
6188 }
6189
6190 pub fn reset(rep: *ResetEventPosix) void {
6191 if (builtin.single_threaded) {
6192 rep.* = .unset;
6193 return;
6194 }
6195 @atomicStore(ResetEventFutex, &rep.state, .unset, .monotonic);
6196 }
6197};
6198
6199fn closeSocketWindows(s: ws2_32.SOCKET) void {
6200 const rc = ws2_32.closesocket(s);
6201 if (is_debug) switch (rc) {
6202 0 => {},
6203 ws2_32.SOCKET_ERROR => switch (ws2_32.WSAGetLastError()) {
6204 else => recoverableOsBugDetected(),
6205 },
6206 else => recoverableOsBugDetected(),
6207 };
6208}
6209
6210const Wsa = struct {
6211 status: Status = .uninitialized,
6212 mutex: Io.Mutex = .init,
6213 init_error: ?Wsa.InitError = null,
6214
6215 const Status = enum { uninitialized, initialized, failure };
6216
6217 const InitError = error{
6218 ProcessFdQuotaExceeded,
6219 NetworkDown,
6220 VersionUnsupported,
6221 BlockingOperationInProgress,
6222 } || Io.UnexpectedError;
6223};
6224
6225fn initializeWsa(t: *Threaded) error{NetworkDown}!void {
6226 const t_io = io(t);
6227 const wsa = &t.wsa;
6228 wsa.mutex.lockUncancelable(t_io);
6229 defer wsa.mutex.unlock(t_io);
6230 switch (wsa.status) {
6231 .uninitialized => {
6232 var wsa_data: ws2_32.WSADATA = undefined;
6233 const minor_version = 2;
6234 const major_version = 2;
6235 switch (ws2_32.WSAStartup((@as(windows.WORD, minor_version) << 8) | major_version, &wsa_data)) {
6236 0 => {
6237 wsa.status = .initialized;
6238 return;
6239 },
6240 else => |err_int| switch (@as(ws2_32.WinsockError, @enumFromInt(@as(u16, @intCast(err_int))))) {
6241 .SYSNOTREADY => wsa.init_error = error.NetworkDown,
6242 .VERNOTSUPPORTED => wsa.init_error = error.VersionUnsupported,
6243 .EINPROGRESS => wsa.init_error = error.BlockingOperationInProgress,
6244 .EPROCLIM => wsa.init_error = error.ProcessFdQuotaExceeded,
6245 else => |err| wsa.init_error = windows.unexpectedWSAError(err),
6246 },
6247 }
6248 },
6249 .initialized => return,
6250 .failure => {},
6251 }
6252 return error.NetworkDown;
6253}
6254
6255fn doNothingSignalHandler(_: posix.SIG) callconv(.c) void {}
6256
6257test {
6258 _ = @import("Threaded/test.zig");
6259}