master
1const Kqueue = @This();
2const builtin = @import("builtin");
3
4const std = @import("../std.zig");
5const Io = std.Io;
6const Dir = std.Io.Dir;
7const File = std.Io.File;
8const net = std.Io.net;
9const assert = std.debug.assert;
10const Allocator = std.mem.Allocator;
11const Alignment = std.mem.Alignment;
12const IpAddress = std.Io.net.IpAddress;
13const errnoBug = std.Io.Threaded.errnoBug;
14const posix = std.posix;
15
16/// Must be a thread-safe allocator.
17gpa: Allocator,
18mutex: std.Thread.Mutex,
19main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
20threads: Thread.List,
21
22/// Empirically saw >128KB being used by the self-hosted backend to panic.
23const idle_stack_size = 256 * 1024;
24
25const max_idle_search = 4;
26const max_steal_ready_search = 4;
27const max_iovecs_len = 8;
28
29const changes_buffer_len = 64;
30
31const Thread = struct {
32 thread: std.Thread,
33 idle_context: Context,
34 current_context: *Context,
35 ready_queue: ?*Fiber,
36 kq_fd: posix.fd_t,
37 idle_search_index: u32,
38 steal_ready_search_index: u32,
39 /// For ensuring multiple fibers waiting on the same file descriptor and
40 /// filter use the same kevent.
41 wait_queues: std.AutoArrayHashMapUnmanaged(WaitQueueKey, *Fiber),
42
43 const WaitQueueKey = struct {
44 ident: usize,
45 filter: i32,
46 };
47
48 const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
49
50 threadlocal var self: *Thread = undefined;
51
52 fn current() *Thread {
53 return self;
54 }
55
56 fn currentFiber(thread: *Thread) *Fiber {
57 return @fieldParentPtr("context", thread.current_context);
58 }
59
60 const List = struct {
61 allocated: []Thread,
62 reserved: u32,
63 active: u32,
64 };
65
66 fn deinit(thread: *Thread, gpa: Allocator) void {
67 posix.close(thread.kq_fd);
68 assert(thread.wait_queues.count() == 0);
69 thread.wait_queues.deinit(gpa);
70 thread.* = undefined;
71 }
72};
73
74const Fiber = struct {
75 required_align: void align(4),
76 context: Context,
77 awaiter: ?*Fiber,
78 queue_next: ?*Fiber,
79 cancel_thread: ?*Thread,
80 awaiting_completions: std.StaticBitSet(3),
81
82 const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
83
84 const max_result_align: Alignment = .@"16";
85 const max_result_size = max_result_align.forward(64);
86 /// This includes any stack realignments that need to happen, and also the
87 /// initial frame return address slot and argument frame, depending on target.
88 const min_stack_size = 4 * 1024 * 1024;
89 const max_context_align: Alignment = .@"16";
90 const max_context_size = max_context_align.forward(1024);
91 const max_closure_size: usize = @sizeOf(AsyncClosure);
92 const max_closure_align: Alignment = .of(AsyncClosure);
93 const allocation_size = std.mem.alignForward(
94 usize,
95 max_closure_align.max(max_context_align).forward(
96 max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
97 ) + max_closure_size + max_context_size,
98 std.heap.page_size_max,
99 );
100
101 fn allocate(k: *Kqueue) error{OutOfMemory}!*Fiber {
102 return @ptrCast(try k.gpa.alignedAlloc(u8, .of(Fiber), allocation_size));
103 }
104
105 fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
106 return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
107 }
108
109 fn allocatedEnd(f: *Fiber) [*]u8 {
110 const allocated_slice = f.allocatedSlice();
111 return allocated_slice[allocated_slice.len..].ptr;
112 }
113
114 fn resultPointer(f: *Fiber, comptime Result: type) *Result {
115 return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
116 }
117
118 fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
119 return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
120 }
121
122 fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
123 if (@cmpxchgStrong(
124 ?*Thread,
125 &fiber.cancel_thread,
126 null,
127 thread,
128 .acq_rel,
129 .acquire,
130 )) |cancel_thread| {
131 assert(cancel_thread == Thread.canceling);
132 return error.Canceled;
133 }
134 }
135
136 fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
137 if (@cmpxchgStrong(
138 ?*Thread,
139 &fiber.cancel_thread,
140 thread,
141 null,
142 .acq_rel,
143 .acquire,
144 )) |cancel_thread| assert(cancel_thread == Thread.canceling);
145 }
146
147 const Queue = struct { head: *Fiber, tail: *Fiber };
148};
149
150fn recycle(k: *Kqueue, fiber: *Fiber) void {
151 std.log.debug("recyling {*}", .{fiber});
152 assert(fiber.queue_next == null);
153 k.gpa.free(fiber.allocatedSlice());
154}
155
156pub const InitOptions = struct {
157 n_threads: ?usize = null,
158};
159
160pub fn init(k: *Kqueue, gpa: Allocator, options: InitOptions) !void {
161 assert(options.n_threads != 0);
162 const n_threads = @max(1, options.n_threads orelse std.Thread.getCpuCount() catch 1);
163 const threads_size = n_threads * @sizeOf(Thread);
164 const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
165 const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
166 errdefer gpa.free(allocated_slice);
167 k.* = .{
168 .gpa = gpa,
169 .mutex = .{},
170 .main_fiber_buffer = undefined,
171 .threads = .{
172 .allocated = @ptrCast(allocated_slice[0..threads_size]),
173 .reserved = 1,
174 .active = 1,
175 },
176 };
177 const main_fiber: *Fiber = @ptrCast(&k.main_fiber_buffer);
178 main_fiber.* = .{
179 .required_align = {},
180 .context = undefined,
181 .awaiter = null,
182 .queue_next = null,
183 .cancel_thread = null,
184 .awaiting_completions = .initEmpty(),
185 };
186 const main_thread = &k.threads.allocated[0];
187 Thread.self = main_thread;
188 const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
189 (idle_stack_end - 1)[0..1].* = .{@intFromPtr(k)};
190 main_thread.* = .{
191 .thread = undefined,
192 .idle_context = switch (builtin.cpu.arch) {
193 .aarch64 => .{
194 .sp = @intFromPtr(idle_stack_end),
195 .fp = 0,
196 .pc = @intFromPtr(&mainIdleEntry),
197 },
198 .x86_64 => .{
199 .rsp = @intFromPtr(idle_stack_end - 1),
200 .rbp = 0,
201 .rip = @intFromPtr(&mainIdleEntry),
202 },
203 else => @compileError("unimplemented architecture"),
204 },
205 .current_context = &main_fiber.context,
206 .ready_queue = null,
207 .kq_fd = try posix.kqueue(),
208 .idle_search_index = 1,
209 .steal_ready_search_index = 1,
210 .wait_queues = .empty,
211 };
212 errdefer std.posix.close(main_thread.kq_fd);
213 std.log.debug("created main idle {*}", .{&main_thread.idle_context});
214 std.log.debug("created main {*}", .{main_fiber});
215}
216
217pub fn deinit(k: *Kqueue) void {
218 const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
219 for (k.threads.allocated[0..active_threads]) |*thread| {
220 const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
221 assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
222 }
223 k.yield(null, .exit);
224 const main_thread = &k.threads.allocated[0];
225 const gpa = k.gpa;
226 main_thread.deinit(gpa);
227 const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(k.threads.allocated.ptr));
228 const idle_stack_end_offset = std.mem.alignForward(usize, k.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
229 for (k.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
230 gpa.free(allocated_ptr[0..idle_stack_end_offset]);
231 k.* = undefined;
232}
233
234fn findReadyFiber(k: *Kqueue, thread: *Thread) ?*Fiber {
235 if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| {
236 @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
237 ready_fiber.queue_next = null;
238 return ready_fiber;
239 }
240 const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
241 for (0..@min(max_steal_ready_search, active_threads)) |_| {
242 defer thread.steal_ready_search_index += 1;
243 if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0;
244 const steal_ready_search_thread = &k.threads.allocated[0..active_threads][thread.steal_ready_search_index];
245 if (steal_ready_search_thread == thread) continue;
246 const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
247 if (ready_fiber == Fiber.finished) continue;
248 if (@cmpxchgWeak(
249 ?*Fiber,
250 &steal_ready_search_thread.ready_queue,
251 ready_fiber,
252 null,
253 .acquire,
254 .monotonic,
255 )) |_| continue;
256 @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
257 ready_fiber.queue_next = null;
258 return ready_fiber;
259 }
260 // couldn't find anything to do, so we are now open for business
261 @atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic);
262 return null;
263}
264
265fn yield(k: *Kqueue, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
266 const thread: *Thread = .current();
267 const ready_context = if (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber|
268 &ready_fiber.context
269 else
270 &thread.idle_context;
271 const message: SwitchMessage = .{
272 .contexts = .{
273 .prev = thread.current_context,
274 .ready = ready_context,
275 },
276 .pending_task = pending_task,
277 };
278 std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready });
279 contextSwitch(&message).handle(k);
280}
281
282fn schedule(k: *Kqueue, thread: *Thread, ready_queue: Fiber.Queue) void {
283 {
284 var fiber = ready_queue.head;
285 while (true) {
286 std.log.debug("scheduling {*}", .{fiber});
287 fiber = fiber.queue_next orelse break;
288 }
289 assert(fiber == ready_queue.tail);
290 }
291 // shared fields of previous `Thread` must be initialized before later ones are marked as active
292 const new_thread_index = @atomicLoad(u32, &k.threads.active, .acquire);
293 for (0..@min(max_idle_search, new_thread_index)) |_| {
294 defer thread.idle_search_index += 1;
295 if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
296 const idle_search_thread = &k.threads.allocated[0..new_thread_index][thread.idle_search_index];
297 if (idle_search_thread == thread) continue;
298 if (@cmpxchgWeak(
299 ?*Fiber,
300 &idle_search_thread.ready_queue,
301 null,
302 ready_queue.head,
303 .release,
304 .monotonic,
305 )) |_| continue;
306 const changes = [_]posix.Kevent{
307 .{
308 .ident = 0,
309 .filter = std.c.EVFILT.USER,
310 .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
311 .fflags = std.c.NOTE.TRIGGER,
312 .data = 0,
313 .udata = @intFromEnum(Completion.UserData.wakeup),
314 },
315 };
316 // If an error occurs it only pessimises scheduling.
317 _ = posix.kevent(idle_search_thread.kq_fd, &changes, &.{}, null) catch {};
318 return;
319 }
320 spawn_thread: {
321 // previous failed reservations must have completed before retrying
322 if (new_thread_index == k.threads.allocated.len or @cmpxchgWeak(
323 u32,
324 &k.threads.reserved,
325 new_thread_index,
326 new_thread_index + 1,
327 .acquire,
328 .monotonic,
329 ) != null) break :spawn_thread;
330 const new_thread = &k.threads.allocated[new_thread_index];
331 const next_thread_index = new_thread_index + 1;
332 new_thread.* = .{
333 .thread = undefined,
334 .idle_context = undefined,
335 .current_context = &new_thread.idle_context,
336 .ready_queue = ready_queue.head,
337 .kq_fd = posix.kqueue() catch |err| {
338 @atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
339 // no more access to `thread` after giving up reservation
340 std.log.warn("unable to create worker thread due to kqueue init failure: {t}", .{err});
341 break :spawn_thread;
342 },
343 .idle_search_index = 0,
344 .steal_ready_search_index = 0,
345 .wait_queues = .empty,
346 };
347 new_thread.thread = std.Thread.spawn(.{
348 .stack_size = idle_stack_size,
349 .allocator = k.gpa,
350 }, threadEntry, .{ k, new_thread_index }) catch |err| {
351 posix.close(new_thread.kq_fd);
352 @atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
353 // no more access to `thread` after giving up reservation
354 std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
355 break :spawn_thread;
356 };
357 // shared fields of `Thread` must be initialized before being marked active
358 @atomicStore(u32, &k.threads.active, next_thread_index, .release);
359 return;
360 }
361 // nobody wanted it, so just queue it on ourselves
362 while (@cmpxchgWeak(
363 ?*Fiber,
364 &thread.ready_queue,
365 ready_queue.tail.queue_next,
366 ready_queue.head,
367 .acq_rel,
368 .acquire,
369 )) |old_head| ready_queue.tail.queue_next = old_head;
370}
371
372fn mainIdle(k: *Kqueue, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
373 message.handle(k);
374 k.idle(&k.threads.allocated[0]);
375 k.yield(@ptrCast(&k.main_fiber_buffer), .nothing);
376 unreachable; // switched to dead fiber
377}
378
379fn threadEntry(k: *Kqueue, index: u32) void {
380 const thread: *Thread = &k.threads.allocated[index];
381 Thread.self = thread;
382 std.log.debug("created thread idle {*}", .{&thread.idle_context});
383 k.idle(thread);
384 thread.deinit(k.gpa);
385}
386
387const Completion = struct {
388 const UserData = enum(usize) {
389 unused,
390 wakeup,
391 cleanup,
392 exit,
393 /// *Fiber
394 _,
395 };
396 /// Corresponds to Kevent field.
397 flags: u16,
398 /// Corresponds to Kevent field.
399 fflags: u32,
400 /// Corresponds to Kevent field.
401 data: isize,
402};
403
404fn idle(k: *Kqueue, thread: *Thread) void {
405 var events_buffer: [changes_buffer_len]posix.Kevent = undefined;
406 var maybe_ready_fiber: ?*Fiber = null;
407 while (true) {
408 while (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber| {
409 k.yield(ready_fiber, .nothing);
410 maybe_ready_fiber = null;
411 }
412 const n = posix.kevent(thread.kq_fd, &.{}, &events_buffer, null) catch |err| {
413 // TODO handle EINTR for cancellation purposes
414 @panic(@errorName(err));
415 };
416 var maybe_ready_queue: ?Fiber.Queue = null;
417 for (events_buffer[0..n]) |event| switch (@as(Completion.UserData, @enumFromInt(event.udata))) {
418 .unused => unreachable, // bad submission queued?
419 .wakeup => {},
420 .cleanup => @panic("failed to notify other threads that we are exiting"),
421 .exit => {
422 assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
423 return;
424 },
425 _ => {
426 const event_head_fiber: *Fiber = @ptrFromInt(event.udata);
427 const event_tail_fiber = thread.wait_queues.fetchSwapRemove(.{
428 .ident = event.ident,
429 .filter = event.filter,
430 }).?.value;
431 assert(event_tail_fiber.queue_next == null);
432
433 // TODO reevaluate this logic
434 event_head_fiber.resultPointer(Completion).* = .{
435 .flags = event.flags,
436 .fflags = event.fflags,
437 .data = event.data,
438 };
439
440 queue_ready: {
441 const head: *Fiber = if (maybe_ready_fiber == null) f: {
442 maybe_ready_fiber = event_head_fiber;
443 const next = event_head_fiber.queue_next orelse break :queue_ready;
444 event_head_fiber.queue_next = null;
445 break :f next;
446 } else event_head_fiber;
447
448 if (maybe_ready_queue) |*ready_queue| {
449 ready_queue.tail.queue_next = head;
450 ready_queue.tail = event_tail_fiber;
451 } else {
452 maybe_ready_queue = .{ .head = head, .tail = event_tail_fiber };
453 }
454 }
455 },
456 };
457 if (maybe_ready_queue) |ready_queue| k.schedule(thread, ready_queue);
458 }
459}
460
461const SwitchMessage = struct {
462 contexts: extern struct {
463 prev: *Context,
464 ready: *Context,
465 },
466 pending_task: PendingTask,
467
468 const PendingTask = union(enum) {
469 nothing,
470 reschedule,
471 recycle: *Fiber,
472 register_awaiter: *?*Fiber,
473 register_select: []const *Io.AnyFuture,
474 mutex_lock: struct {
475 prev_state: Io.Mutex.State,
476 mutex: *Io.Mutex,
477 },
478 condition_wait: struct {
479 cond: *Io.Condition,
480 mutex: *Io.Mutex,
481 },
482 exit,
483 };
484
485 fn handle(message: *const SwitchMessage, k: *Kqueue) void {
486 const thread: *Thread = .current();
487 thread.current_context = message.contexts.ready;
488 switch (message.pending_task) {
489 .nothing => {},
490 .reschedule => if (message.contexts.prev != &thread.idle_context) {
491 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
492 assert(prev_fiber.queue_next == null);
493 k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
494 },
495 .recycle => |fiber| {
496 k.recycle(fiber);
497 },
498 .register_awaiter => |awaiter| {
499 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
500 assert(prev_fiber.queue_next == null);
501 if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
502 k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
503 },
504 .register_select => |futures| {
505 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
506 assert(prev_fiber.queue_next == null);
507 for (futures) |any_future| {
508 const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
509 if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) {
510 const closure: *AsyncClosure = .fromFiber(future_fiber);
511 if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) {
512 k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
513 }
514 }
515 }
516 },
517 .mutex_lock => |mutex_lock| {
518 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
519 assert(prev_fiber.queue_next == null);
520 var prev_state = mutex_lock.prev_state;
521 while (switch (prev_state) {
522 else => next_state: {
523 prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
524 break :next_state @cmpxchgWeak(
525 Io.Mutex.State,
526 &mutex_lock.mutex.state,
527 prev_state,
528 @enumFromInt(@intFromPtr(prev_fiber)),
529 .release,
530 .acquire,
531 );
532 },
533 .unlocked => @cmpxchgWeak(
534 Io.Mutex.State,
535 &mutex_lock.mutex.state,
536 .unlocked,
537 .locked_once,
538 .acquire,
539 .acquire,
540 ) orelse {
541 prev_fiber.queue_next = null;
542 k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
543 return;
544 },
545 }) |next_state| prev_state = next_state;
546 },
547 .condition_wait => |condition_wait| {
548 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
549 assert(prev_fiber.queue_next == null);
550 const cond_impl = prev_fiber.resultPointer(Condition);
551 cond_impl.* = .{
552 .tail = prev_fiber,
553 .event = .queued,
554 };
555 if (@cmpxchgStrong(
556 ?*Fiber,
557 @as(*?*Fiber, @ptrCast(&condition_wait.cond.state)),
558 null,
559 prev_fiber,
560 .release,
561 .acquire,
562 )) |waiting_fiber| {
563 const waiting_cond_impl = waiting_fiber.?.resultPointer(Condition);
564 assert(waiting_cond_impl.tail.queue_next == null);
565 waiting_cond_impl.tail.queue_next = prev_fiber;
566 waiting_cond_impl.tail = prev_fiber;
567 }
568 condition_wait.mutex.unlock(k.io());
569 },
570 .exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| {
571 const changes = [_]posix.Kevent{
572 .{
573 .ident = 0,
574 .filter = std.c.EVFILT.USER,
575 .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
576 .fflags = std.c.NOTE.TRIGGER,
577 .data = 0,
578 .udata = @intFromEnum(Completion.UserData.exit),
579 },
580 };
581 _ = posix.kevent(each_thread.kq_fd, &changes, &.{}, null) catch |err| {
582 @panic(@errorName(err));
583 };
584 },
585 }
586 }
587};
588
589const Context = switch (builtin.cpu.arch) {
590 .aarch64 => extern struct {
591 sp: u64,
592 fp: u64,
593 pc: u64,
594 },
595 .x86_64 => extern struct {
596 rsp: u64,
597 rbp: u64,
598 rip: u64,
599 },
600 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
601};
602
603inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
604 return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
605 .aarch64 => asm volatile (
606 \\ ldp x0, x2, [x1]
607 \\ ldr x3, [x2, #16]
608 \\ mov x4, sp
609 \\ stp x4, fp, [x0]
610 \\ adr x5, 0f
611 \\ ldp x4, fp, [x2]
612 \\ str x5, [x0, #16]
613 \\ mov sp, x4
614 \\ br x3
615 \\0:
616 : [received_message] "={x1}" (-> *const @FieldType(SwitchMessage, "contexts")),
617 : [message_to_send] "{x1}" (&message.contexts),
618 : .{
619 .x0 = true,
620 .x1 = true,
621 .x2 = true,
622 .x3 = true,
623 .x4 = true,
624 .x5 = true,
625 .x6 = true,
626 .x7 = true,
627 .x8 = true,
628 .x9 = true,
629 .x10 = true,
630 .x11 = true,
631 .x12 = true,
632 .x13 = true,
633 .x14 = true,
634 .x15 = true,
635 .x16 = true,
636 .x17 = true,
637 .x19 = true,
638 .x20 = true,
639 .x21 = true,
640 .x22 = true,
641 .x23 = true,
642 .x24 = true,
643 .x25 = true,
644 .x26 = true,
645 .x27 = true,
646 .x28 = true,
647 .x30 = true,
648 .z0 = true,
649 .z1 = true,
650 .z2 = true,
651 .z3 = true,
652 .z4 = true,
653 .z5 = true,
654 .z6 = true,
655 .z7 = true,
656 .z8 = true,
657 .z9 = true,
658 .z10 = true,
659 .z11 = true,
660 .z12 = true,
661 .z13 = true,
662 .z14 = true,
663 .z15 = true,
664 .z16 = true,
665 .z17 = true,
666 .z18 = true,
667 .z19 = true,
668 .z20 = true,
669 .z21 = true,
670 .z22 = true,
671 .z23 = true,
672 .z24 = true,
673 .z25 = true,
674 .z26 = true,
675 .z27 = true,
676 .z28 = true,
677 .z29 = true,
678 .z30 = true,
679 .z31 = true,
680 .p0 = true,
681 .p1 = true,
682 .p2 = true,
683 .p3 = true,
684 .p4 = true,
685 .p5 = true,
686 .p6 = true,
687 .p7 = true,
688 .p8 = true,
689 .p9 = true,
690 .p10 = true,
691 .p11 = true,
692 .p12 = true,
693 .p13 = true,
694 .p14 = true,
695 .p15 = true,
696 .fpcr = true,
697 .fpsr = true,
698 .ffr = true,
699 .memory = true,
700 }),
701 .x86_64 => asm volatile (
702 \\ movq 0(%%rsi), %%rax
703 \\ movq 8(%%rsi), %%rcx
704 \\ leaq 0f(%%rip), %%rdx
705 \\ movq %%rsp, 0(%%rax)
706 \\ movq %%rbp, 8(%%rax)
707 \\ movq %%rdx, 16(%%rax)
708 \\ movq 0(%%rcx), %%rsp
709 \\ movq 8(%%rcx), %%rbp
710 \\ jmpq *16(%%rcx)
711 \\0:
712 : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")),
713 : [message_to_send] "{rsi}" (&message.contexts),
714 : .{
715 .rax = true,
716 .rcx = true,
717 .rdx = true,
718 .rbx = true,
719 .rsi = true,
720 .rdi = true,
721 .r8 = true,
722 .r9 = true,
723 .r10 = true,
724 .r11 = true,
725 .r12 = true,
726 .r13 = true,
727 .r14 = true,
728 .r15 = true,
729 .mm0 = true,
730 .mm1 = true,
731 .mm2 = true,
732 .mm3 = true,
733 .mm4 = true,
734 .mm5 = true,
735 .mm6 = true,
736 .mm7 = true,
737 .zmm0 = true,
738 .zmm1 = true,
739 .zmm2 = true,
740 .zmm3 = true,
741 .zmm4 = true,
742 .zmm5 = true,
743 .zmm6 = true,
744 .zmm7 = true,
745 .zmm8 = true,
746 .zmm9 = true,
747 .zmm10 = true,
748 .zmm11 = true,
749 .zmm12 = true,
750 .zmm13 = true,
751 .zmm14 = true,
752 .zmm15 = true,
753 .zmm16 = true,
754 .zmm17 = true,
755 .zmm18 = true,
756 .zmm19 = true,
757 .zmm20 = true,
758 .zmm21 = true,
759 .zmm22 = true,
760 .zmm23 = true,
761 .zmm24 = true,
762 .zmm25 = true,
763 .zmm26 = true,
764 .zmm27 = true,
765 .zmm28 = true,
766 .zmm29 = true,
767 .zmm30 = true,
768 .zmm31 = true,
769 .fpsr = true,
770 .fpcr = true,
771 .mxcsr = true,
772 .rflags = true,
773 .dirflag = true,
774 .memory = true,
775 }),
776 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
777 });
778}
779
780fn mainIdleEntry() callconv(.naked) void {
781 switch (builtin.cpu.arch) {
782 .x86_64 => asm volatile (
783 \\ movq (%%rsp), %%rdi
784 \\ jmp %[mainIdle:P]
785 :
786 : [mainIdle] "X" (&mainIdle),
787 ),
788 .aarch64 => asm volatile (
789 \\ ldr x0, [sp, #-8]
790 \\ b %[mainIdle]
791 :
792 : [mainIdle] "X" (&mainIdle),
793 ),
794 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
795 }
796}
797
798fn fiberEntry() callconv(.naked) void {
799 switch (builtin.cpu.arch) {
800 .x86_64 => asm volatile (
801 \\ leaq 8(%%rsp), %%rdi
802 \\ jmp %[AsyncClosure_call:P]
803 :
804 : [AsyncClosure_call] "X" (&AsyncClosure.call),
805 ),
806 .aarch64 => asm volatile (
807 \\ mov x0, sp
808 \\ b %[AsyncClosure_call]
809 :
810 : [AsyncClosure_call] "X" (&AsyncClosure.call),
811 ),
812 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
813 }
814}
815
816const AsyncClosure = struct {
817 kqueue: *Kqueue,
818 fiber: *Fiber,
819 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
820 result_align: Alignment,
821 already_awaited: bool,
822
823 fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
824 return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
825 }
826
827 fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
828 message.handle(closure.kqueue);
829 const fiber = closure.fiber;
830 std.log.debug("{*} performing async", .{fiber});
831 closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
832 const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
833 const ready_awaiter = r: {
834 const a = awaiter orelse break :r null;
835 if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
836 break :r a;
837 };
838 closure.kqueue.yield(ready_awaiter, .nothing);
839 unreachable; // switched to dead fiber
840 }
841
842 fn fromFiber(fiber: *Fiber) *AsyncClosure {
843 return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
844 @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
845 ) - @sizeOf(AsyncClosure));
846 }
847};
848
849pub fn io(k: *Kqueue) Io {
850 return .{
851 .userdata = k,
852 .vtable = &.{
853 .async = async,
854 .concurrent = concurrent,
855 .await = await,
856 .cancel = cancel,
857 .cancelRequested = cancelRequested,
858 .select = select,
859
860 .groupAsync = groupAsync,
861 .groupWait = groupWait,
862 .groupCancel = groupCancel,
863
864 .mutexLock = mutexLock,
865 .mutexLockUncancelable = mutexLockUncancelable,
866 .mutexUnlock = mutexUnlock,
867
868 .conditionWait = conditionWait,
869 .conditionWaitUncancelable = conditionWaitUncancelable,
870 .conditionWake = conditionWake,
871
872 .dirMake = dirMake,
873 .dirMakePath = dirMakePath,
874 .dirMakeOpenPath = dirMakeOpenPath,
875 .dirStat = dirStat,
876 .dirStatPath = dirStatPath,
877
878 .fileStat = fileStat,
879 .dirAccess = dirAccess,
880 .dirCreateFile = dirCreateFile,
881 .dirOpenFile = dirOpenFile,
882 .dirOpenDir = dirOpenDir,
883 .dirClose = dirClose,
884 .fileClose = fileClose,
885 .fileWriteStreaming = fileWriteStreaming,
886 .fileWritePositional = fileWritePositional,
887 .fileReadStreaming = fileReadStreaming,
888 .fileReadPositional = fileReadPositional,
889 .fileSeekBy = fileSeekBy,
890 .fileSeekTo = fileSeekTo,
891 .openSelfExe = openSelfExe,
892
893 .now = now,
894 .sleep = sleep,
895
896 .netListenIp = netListenIp,
897 .netListenUnix = netListenUnix,
898 .netAccept = netAccept,
899 .netBindIp = netBindIp,
900 .netConnectIp = netConnectIp,
901 .netConnectUnix = netConnectUnix,
902 .netClose = netClose,
903 .netRead = netRead,
904 .netWrite = netWrite,
905 .netSend = netSend,
906 .netReceive = netReceive,
907 .netInterfaceNameResolve = netInterfaceNameResolve,
908 .netInterfaceName = netInterfaceName,
909 .netLookup = netLookup,
910 },
911 };
912}
913
914fn async(
915 userdata: ?*anyopaque,
916 result: []u8,
917 result_alignment: std.mem.Alignment,
918 context: []const u8,
919 context_alignment: std.mem.Alignment,
920 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
921) ?*Io.AnyFuture {
922 return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
923 start(context.ptr, result.ptr);
924 return null;
925 };
926}
927
928fn concurrent(
929 userdata: ?*anyopaque,
930 result_len: usize,
931 result_alignment: Alignment,
932 context: []const u8,
933 context_alignment: Alignment,
934 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
935) Io.ConcurrentError!*Io.AnyFuture {
936 const k: *Kqueue = @ptrCast(@alignCast(userdata));
937 assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
938 assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
939 assert(result_len <= Fiber.max_result_size); // TODO
940 assert(context.len <= Fiber.max_context_size); // TODO
941
942 const fiber = Fiber.allocate(k) catch return error.ConcurrencyUnavailable;
943 std.log.debug("allocated {*}", .{fiber});
944
945 const closure: *AsyncClosure = .fromFiber(fiber);
946 fiber.* = .{
947 .required_align = {},
948 .context = switch (builtin.cpu.arch) {
949 .x86_64 => .{
950 .rsp = @intFromPtr(closure) - @sizeOf(usize),
951 .rbp = 0,
952 .rip = @intFromPtr(&fiberEntry),
953 },
954 .aarch64 => .{
955 .sp = @intFromPtr(closure),
956 .fp = 0,
957 .pc = @intFromPtr(&fiberEntry),
958 },
959 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
960 },
961 .awaiter = null,
962 .queue_next = null,
963 .cancel_thread = null,
964 .awaiting_completions = .initEmpty(),
965 };
966 closure.* = .{
967 .kqueue = k,
968 .fiber = fiber,
969 .start = start,
970 .result_align = result_alignment,
971 .already_awaited = false,
972 };
973 @memcpy(closure.contextPointer(), context);
974
975 k.schedule(.current(), .{ .head = fiber, .tail = fiber });
976 return @ptrCast(fiber);
977}
978
979fn await(
980 userdata: ?*anyopaque,
981 any_future: *Io.AnyFuture,
982 result: []u8,
983 result_alignment: std.mem.Alignment,
984) void {
985 const k: *Kqueue = @ptrCast(@alignCast(userdata));
986 const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
987 if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
988 k.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
989 @memcpy(result, future_fiber.resultBytes(result_alignment));
990 k.recycle(future_fiber);
991}
992
993fn cancel(
994 userdata: ?*anyopaque,
995 any_future: *Io.AnyFuture,
996 result: []u8,
997 result_alignment: std.mem.Alignment,
998) void {
999 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1000 _ = k;
1001 _ = any_future;
1002 _ = result;
1003 _ = result_alignment;
1004 @panic("TODO");
1005}
1006
1007fn cancelRequested(userdata: ?*anyopaque) bool {
1008 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1009 _ = k;
1010 return false; // TODO
1011}
1012
1013fn groupAsync(
1014 userdata: ?*anyopaque,
1015 group: *Io.Group,
1016 context: []const u8,
1017 context_alignment: std.mem.Alignment,
1018 start: *const fn (*Io.Group, context: *const anyopaque) void,
1019) void {
1020 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1021 _ = k;
1022 _ = group;
1023 _ = context;
1024 _ = context_alignment;
1025 _ = start;
1026 @panic("TODO");
1027}
1028
1029fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
1030 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1031 _ = k;
1032 _ = group;
1033 _ = token;
1034 @panic("TODO");
1035}
1036
1037fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
1038 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1039 _ = k;
1040 _ = group;
1041 _ = token;
1042 @panic("TODO");
1043}
1044
1045fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
1046 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1047 _ = k;
1048 _ = futures;
1049 @panic("TODO");
1050}
1051
1052fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
1053 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1054 _ = k;
1055 _ = prev_state;
1056 _ = mutex;
1057 @panic("TODO");
1058}
1059fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
1060 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1061 _ = k;
1062 _ = prev_state;
1063 _ = mutex;
1064 @panic("TODO");
1065}
1066fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
1067 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1068 _ = k;
1069 _ = prev_state;
1070 _ = mutex;
1071 @panic("TODO");
1072}
1073
1074fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
1075 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1076 k.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
1077 const thread = Thread.current();
1078 const fiber = thread.currentFiber();
1079 const cond_impl = fiber.resultPointer(Condition);
1080 try mutex.lock(k.io());
1081 switch (cond_impl.event) {
1082 .queued => {},
1083 .wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) {
1084 .one => if (@cmpxchgStrong(
1085 ?*Fiber,
1086 @as(*?*Fiber, @ptrCast(&cond.state)),
1087 null,
1088 next_fiber,
1089 .release,
1090 .acquire,
1091 )) |old_fiber| {
1092 const old_cond_impl = old_fiber.?.resultPointer(Condition);
1093 assert(old_cond_impl.tail.queue_next == null);
1094 old_cond_impl.tail.queue_next = next_fiber;
1095 old_cond_impl.tail = cond_impl.tail;
1096 },
1097 .all => k.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }),
1098 },
1099 }
1100 fiber.queue_next = null;
1101}
1102
1103fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
1104 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1105 _ = k;
1106 _ = cond;
1107 _ = mutex;
1108 @panic("TODO");
1109}
1110fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
1111 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1112 const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
1113 waiting_fiber.resultPointer(Condition).event = .{ .wake = wake };
1114 k.yield(waiting_fiber, .reschedule);
1115}
1116
1117fn dirMake(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, mode: Dir.Mode) Dir.MakeError!void {
1118 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1119 _ = k;
1120 _ = dir;
1121 _ = sub_path;
1122 _ = mode;
1123 @panic("TODO");
1124}
1125fn dirMakePath(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, mode: Dir.Mode) Dir.MakeError!void {
1126 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1127 _ = k;
1128 _ = dir;
1129 _ = sub_path;
1130 _ = mode;
1131 @panic("TODO");
1132}
1133fn dirMakeOpenPath(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.OpenOptions) Dir.MakeOpenPathError!Dir {
1134 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1135 _ = k;
1136 _ = dir;
1137 _ = sub_path;
1138 _ = options;
1139 @panic("TODO");
1140}
1141fn dirStat(userdata: ?*anyopaque, dir: Dir) Dir.StatError!Dir.Stat {
1142 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1143 _ = k;
1144 _ = dir;
1145 @panic("TODO");
1146}
1147fn dirStatPath(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.StatPathOptions) Dir.StatPathError!File.Stat {
1148 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1149 _ = k;
1150 _ = dir;
1151 _ = sub_path;
1152 _ = options;
1153 @panic("TODO");
1154}
1155fn dirAccess(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.AccessOptions) Dir.AccessError!void {
1156 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1157 _ = k;
1158 _ = dir;
1159 _ = sub_path;
1160 _ = options;
1161 @panic("TODO");
1162}
1163fn dirCreateFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File {
1164 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1165 _ = k;
1166 _ = dir;
1167 _ = sub_path;
1168 _ = flags;
1169 @panic("TODO");
1170}
1171fn dirOpenFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File {
1172 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1173 _ = k;
1174 _ = dir;
1175 _ = sub_path;
1176 _ = flags;
1177 @panic("TODO");
1178}
1179fn dirOpenDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.OpenOptions) Dir.OpenError!Dir {
1180 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1181 _ = k;
1182 _ = dir;
1183 _ = sub_path;
1184 _ = options;
1185 @panic("TODO");
1186}
1187fn dirClose(userdata: ?*anyopaque, dir: Dir) void {
1188 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1189 _ = k;
1190 _ = dir;
1191 @panic("TODO");
1192}
1193fn fileStat(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
1194 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1195 _ = k;
1196 _ = file;
1197 @panic("TODO");
1198}
1199fn fileClose(userdata: ?*anyopaque, file: File) void {
1200 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1201 _ = k;
1202 _ = file;
1203 @panic("TODO");
1204}
1205fn fileWriteStreaming(userdata: ?*anyopaque, file: File, buffer: [][]const u8) File.WriteStreamingError!usize {
1206 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1207 _ = k;
1208 _ = file;
1209 _ = buffer;
1210 @panic("TODO");
1211}
1212fn fileWritePositional(userdata: ?*anyopaque, file: File, buffer: [][]const u8, offset: u64) File.WritePositionalError!usize {
1213 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1214 _ = k;
1215 _ = file;
1216 _ = buffer;
1217 _ = offset;
1218 @panic("TODO");
1219}
1220fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: [][]u8) File.Reader.Error!usize {
1221 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1222 _ = k;
1223 _ = file;
1224 _ = data;
1225 @panic("TODO");
1226}
1227fn fileReadPositional(userdata: ?*anyopaque, file: File, data: [][]u8, offset: u64) File.ReadPositionalError!usize {
1228 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1229 _ = k;
1230 _ = file;
1231 _ = data;
1232 _ = offset;
1233 @panic("TODO");
1234}
1235fn fileSeekBy(userdata: ?*anyopaque, file: File, relative_offset: i64) File.SeekError!void {
1236 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1237 _ = k;
1238 _ = file;
1239 _ = relative_offset;
1240 @panic("TODO");
1241}
1242fn fileSeekTo(userdata: ?*anyopaque, file: File, absolute_offset: u64) File.SeekError!void {
1243 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1244 _ = k;
1245 _ = file;
1246 _ = absolute_offset;
1247 @panic("TODO");
1248}
1249fn openSelfExe(userdata: ?*anyopaque, file: File.OpenFlags) File.OpenSelfExeError!File {
1250 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1251 _ = k;
1252 _ = file;
1253 @panic("TODO");
1254}
1255
1256fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
1257 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1258 _ = k;
1259 _ = clock;
1260 @panic("TODO");
1261}
1262fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
1263 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1264 _ = k;
1265 _ = timeout;
1266 @panic("TODO");
1267}
1268
1269fn netListenIp(
1270 userdata: ?*anyopaque,
1271 address: net.IpAddress,
1272 options: net.IpAddress.ListenOptions,
1273) net.IpAddress.ListenError!net.Server {
1274 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1275 _ = k;
1276 _ = address;
1277 _ = options;
1278 @panic("TODO");
1279}
1280fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream {
1281 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1282 _ = k;
1283 _ = server;
1284 @panic("TODO");
1285}
1286fn netBindIp(
1287 userdata: ?*anyopaque,
1288 address: *const net.IpAddress,
1289 options: net.IpAddress.BindOptions,
1290) net.IpAddress.BindError!net.Socket {
1291 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1292 const family = Io.Threaded.posixAddressFamily(address);
1293 const socket_fd = try openSocketPosix(k, family, options);
1294 errdefer std.posix.close(socket_fd);
1295 var storage: Io.Threaded.PosixAddress = undefined;
1296 var addr_len = Io.Threaded.addressToPosix(address, &storage);
1297 try posixBind(k, socket_fd, &storage.any, addr_len);
1298 try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
1299 return .{
1300 .handle = socket_fd,
1301 .address = Io.Threaded.addressFromPosix(&storage),
1302 };
1303}
1304fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream {
1305 if (options.timeout != .none) @panic("TODO");
1306 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1307 const family = Io.Threaded.posixAddressFamily(address);
1308 const socket_fd = try openSocketPosix(k, family, .{
1309 .mode = options.mode,
1310 .protocol = options.protocol,
1311 });
1312 errdefer posix.close(socket_fd);
1313 var storage: Io.Threaded.PosixAddress = undefined;
1314 var addr_len = Io.Threaded.addressToPosix(address, &storage);
1315 try posixConnect(k, socket_fd, &storage.any, addr_len);
1316 try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
1317 return .{ .socket = .{
1318 .handle = socket_fd,
1319 .address = Io.Threaded.addressFromPosix(&storage),
1320 } };
1321}
1322
1323fn posixConnect(k: *Kqueue, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
1324 while (true) {
1325 try k.checkCancel();
1326 switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
1327 .SUCCESS => return,
1328 .INTR => continue,
1329 .CANCELED => return error.Canceled,
1330 .AGAIN => @panic("TODO"),
1331 .INPROGRESS => return, // Due to TCP fast open, we find out possible error later.
1332
1333 .ADDRNOTAVAIL => return error.AddressUnavailable,
1334 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1335 .ALREADY => return error.ConnectionPending,
1336 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1337 .CONNREFUSED => return error.ConnectionRefused,
1338 .CONNRESET => return error.ConnectionResetByPeer,
1339 .FAULT => |err| return errnoBug(err),
1340 .ISCONN => |err| return errnoBug(err),
1341 .HOSTUNREACH => return error.HostUnreachable,
1342 .NETUNREACH => return error.NetworkUnreachable,
1343 .NOTSOCK => |err| return errnoBug(err),
1344 .PROTOTYPE => |err| return errnoBug(err),
1345 .TIMEDOUT => return error.Timeout,
1346 .CONNABORTED => |err| return errnoBug(err),
1347 .ACCES => return error.AccessDenied,
1348 .PERM => |err| return errnoBug(err),
1349 .NOENT => |err| return errnoBug(err),
1350 .NETDOWN => return error.NetworkDown,
1351 else => |err| return posix.unexpectedErrno(err),
1352 }
1353 }
1354}
1355
1356fn netListenUnix(
1357 userdata: ?*anyopaque,
1358 unix_address: *const net.UnixAddress,
1359 options: net.UnixAddress.ListenOptions,
1360) net.UnixAddress.ListenError!net.Socket.Handle {
1361 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1362 _ = k;
1363 _ = unix_address;
1364 _ = options;
1365 @panic("TODO");
1366}
1367fn netConnectUnix(
1368 userdata: ?*anyopaque,
1369 unix_address: *const net.UnixAddress,
1370) net.UnixAddress.ConnectError!net.Socket.Handle {
1371 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1372 _ = k;
1373 _ = unix_address;
1374 @panic("TODO");
1375}
1376
1377fn netSend(
1378 userdata: ?*anyopaque,
1379 handle: net.Socket.Handle,
1380 outgoing_messages: []net.OutgoingMessage,
1381 flags: net.SendFlags,
1382) struct { ?net.Socket.SendError, usize } {
1383 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1384
1385 const posix_flags: u32 =
1386 @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
1387 @as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) |
1388 @as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) |
1389 @as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) |
1390 @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
1391 posix.MSG.NOSIGNAL;
1392
1393 for (outgoing_messages, 0..) |*msg, i| {
1394 netSendOne(k, handle, msg, posix_flags) catch |err| return .{ err, i };
1395 }
1396
1397 return .{ null, outgoing_messages.len };
1398}
1399
1400fn netSendOne(
1401 k: *Kqueue,
1402 handle: net.Socket.Handle,
1403 message: *net.OutgoingMessage,
1404 flags: u32,
1405) net.Socket.SendError!void {
1406 var addr: Io.Threaded.PosixAddress = undefined;
1407 var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
1408 const msg: posix.msghdr_const = .{
1409 .name = &addr.any,
1410 .namelen = Io.Threaded.addressToPosix(message.address, &addr),
1411 .iov = (&iovec)[0..1],
1412 .iovlen = 1,
1413 // OS returns EINVAL if this pointer is invalid even if controllen is zero.
1414 .control = if (message.control.len == 0) null else @constCast(message.control.ptr),
1415 .controllen = @intCast(message.control.len),
1416 .flags = 0,
1417 };
1418 while (true) {
1419 try k.checkCancel();
1420 const rc = posix.system.sendmsg(handle, &msg, flags);
1421 switch (posix.errno(rc)) {
1422 .SUCCESS => {
1423 message.data_len = @intCast(rc);
1424 return;
1425 },
1426 .INTR => continue,
1427 .CANCELED => return error.Canceled,
1428 .AGAIN => @panic("TODO register kevent"),
1429
1430 .ACCES => return error.AccessDenied,
1431 .ALREADY => return error.FastOpenAlreadyInProgress,
1432 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1433 .CONNRESET => return error.ConnectionResetByPeer,
1434 .DESTADDRREQ => |err| return errnoBug(err),
1435 .FAULT => |err| return errnoBug(err),
1436 .INVAL => |err| return errnoBug(err),
1437 .ISCONN => |err| return errnoBug(err),
1438 .MSGSIZE => return error.MessageOversize,
1439 .NOBUFS => return error.SystemResources,
1440 .NOMEM => return error.SystemResources,
1441 .NOTSOCK => |err| return errnoBug(err),
1442 .OPNOTSUPP => |err| return errnoBug(err),
1443 .PIPE => return error.SocketUnconnected,
1444 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1445 .HOSTUNREACH => return error.HostUnreachable,
1446 .NETUNREACH => return error.NetworkUnreachable,
1447 .NOTCONN => return error.SocketUnconnected,
1448 .NETDOWN => return error.NetworkDown,
1449 else => |err| return posix.unexpectedErrno(err),
1450 }
1451 }
1452}
1453
1454fn netReceive(
1455 userdata: ?*anyopaque,
1456 handle: net.Socket.Handle,
1457 message_buffer: []net.IncomingMessage,
1458 data_buffer: []u8,
1459 flags: net.ReceiveFlags,
1460 timeout: Io.Timeout,
1461) struct { ?net.Socket.ReceiveTimeoutError, usize } {
1462 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1463 _ = k;
1464 _ = handle;
1465 _ = message_buffer;
1466 _ = data_buffer;
1467 _ = flags;
1468 _ = timeout;
1469 @panic("TODO");
1470}
1471
1472fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
1473 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1474
1475 var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
1476 var i: usize = 0;
1477 for (data) |buf| {
1478 if (iovecs_buffer.len - i == 0) break;
1479 if (buf.len != 0) {
1480 iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
1481 i += 1;
1482 }
1483 }
1484 const dest = iovecs_buffer[0..i];
1485 assert(dest[0].len > 0);
1486
1487 while (true) {
1488 try k.checkCancel();
1489 const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
1490 switch (posix.errno(rc)) {
1491 .SUCCESS => return @intCast(rc),
1492 .INTR => continue,
1493 .CANCELED => return error.Canceled,
1494 .AGAIN => {
1495 const thread: *Thread = .current();
1496 const fiber = thread.currentFiber();
1497 const ident: u32 = @bitCast(fd);
1498 const filter = std.c.EVFILT.READ;
1499 const gop = thread.wait_queues.getOrPut(k.gpa, .{
1500 .ident = ident,
1501 .filter = filter,
1502 }) catch return error.SystemResources;
1503 if (gop.found_existing) {
1504 const tail_fiber = gop.value_ptr.*;
1505 assert(tail_fiber.queue_next == null);
1506 tail_fiber.queue_next = fiber;
1507 gop.value_ptr.* = fiber;
1508 } else {
1509 gop.value_ptr.* = fiber;
1510 const changes = [_]posix.Kevent{
1511 .{
1512 .ident = ident,
1513 .filter = filter,
1514 .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
1515 .fflags = 0,
1516 .data = 0,
1517 .udata = @intFromPtr(fiber),
1518 },
1519 };
1520 assert(0 == (posix.kevent(thread.kq_fd, &changes, &.{}, null) catch |err| {
1521 @panic(@errorName(err)); // TODO
1522 }));
1523 }
1524 yield(k, null, .nothing);
1525 continue;
1526 },
1527
1528 .INVAL => |err| return errnoBug(err),
1529 .FAULT => |err| return errnoBug(err),
1530 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1531 .NOBUFS => return error.SystemResources,
1532 .NOMEM => return error.SystemResources,
1533 .NOTCONN => return error.SocketUnconnected,
1534 .CONNRESET => return error.ConnectionResetByPeer,
1535 .TIMEDOUT => return error.Timeout,
1536 .PIPE => return error.SocketUnconnected,
1537 .NETDOWN => return error.NetworkDown,
1538 else => |err| return posix.unexpectedErrno(err),
1539 }
1540 }
1541}
1542
1543fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize {
1544 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1545 _ = k;
1546 _ = dest;
1547 _ = header;
1548 _ = data;
1549 _ = splat;
1550 @panic("TODO");
1551}
1552fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
1553 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1554 _ = k;
1555 _ = handle;
1556 @panic("TODO");
1557}
1558fn netInterfaceNameResolve(
1559 userdata: ?*anyopaque,
1560 name: *const net.Interface.Name,
1561) net.Interface.Name.ResolveError!net.Interface {
1562 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1563 _ = k;
1564 _ = name;
1565 @panic("TODO");
1566}
1567fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
1568 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1569 _ = k;
1570 _ = interface;
1571 @panic("TODO");
1572}
1573fn netLookup(
1574 userdata: ?*anyopaque,
1575 host_name: net.HostName,
1576 result: *Io.Queue(net.HostName.LookupResult),
1577 options: net.HostName.LookupOptions,
1578) void {
1579 const k: *Kqueue = @ptrCast(@alignCast(userdata));
1580 _ = k;
1581 _ = host_name;
1582 _ = result;
1583 _ = options;
1584 @panic("TODO");
1585}
1586
1587fn openSocketPosix(
1588 k: *Kqueue,
1589 family: posix.sa_family_t,
1590 options: IpAddress.BindOptions,
1591) error{
1592 AddressFamilyUnsupported,
1593 ProtocolUnsupportedBySystem,
1594 ProcessFdQuotaExceeded,
1595 SystemFdQuotaExceeded,
1596 SystemResources,
1597 ProtocolUnsupportedByAddressFamily,
1598 SocketModeUnsupported,
1599 OptionUnsupported,
1600 Unexpected,
1601 Canceled,
1602}!posix.socket_t {
1603 const mode = Io.Threaded.posixSocketMode(options.mode);
1604 const protocol = Io.Threaded.posixProtocol(options.protocol);
1605 const socket_fd = while (true) {
1606 try k.checkCancel();
1607 const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
1608 const socket_rc = posix.system.socket(family, flags, protocol);
1609 switch (posix.errno(socket_rc)) {
1610 .SUCCESS => {
1611 const fd: posix.fd_t = @intCast(socket_rc);
1612 errdefer posix.close(fd);
1613 if (Io.Threaded.socket_flags_unsupported) {
1614 while (true) {
1615 try k.checkCancel();
1616 switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
1617 .SUCCESS => break,
1618 .INTR => continue,
1619 .CANCELED => return error.Canceled,
1620 else => |err| return posix.unexpectedErrno(err),
1621 }
1622 }
1623
1624 var fl_flags: usize = while (true) {
1625 try k.checkCancel();
1626 const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
1627 switch (posix.errno(rc)) {
1628 .SUCCESS => break @intCast(rc),
1629 .INTR => continue,
1630 .CANCELED => return error.Canceled,
1631 else => |err| return posix.unexpectedErrno(err),
1632 }
1633 };
1634 fl_flags |= @as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
1635 while (true) {
1636 try k.checkCancel();
1637 switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
1638 .SUCCESS => break,
1639 .INTR => continue,
1640 .CANCELED => return error.Canceled,
1641 else => |err| return posix.unexpectedErrno(err),
1642 }
1643 }
1644 }
1645 break fd;
1646 },
1647 .INTR => continue,
1648 .CANCELED => return error.Canceled,
1649
1650 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1651 .INVAL => return error.ProtocolUnsupportedBySystem,
1652 .MFILE => return error.ProcessFdQuotaExceeded,
1653 .NFILE => return error.SystemFdQuotaExceeded,
1654 .NOBUFS => return error.SystemResources,
1655 .NOMEM => return error.SystemResources,
1656 .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
1657 .PROTOTYPE => return error.SocketModeUnsupported,
1658 else => |err| return posix.unexpectedErrno(err),
1659 }
1660 };
1661 errdefer posix.close(socket_fd);
1662
1663 if (options.ip6_only) {
1664 if (posix.IPV6 == void) return error.OptionUnsupported;
1665 try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
1666 }
1667
1668 return socket_fd;
1669}
1670
1671fn posixBind(
1672 k: *Kqueue,
1673 socket_fd: posix.socket_t,
1674 addr: *const posix.sockaddr,
1675 addr_len: posix.socklen_t,
1676) !void {
1677 while (true) {
1678 try k.checkCancel();
1679 switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
1680 .SUCCESS => break,
1681 .INTR => continue,
1682 .CANCELED => return error.Canceled,
1683
1684 .ADDRINUSE => return error.AddressInUse,
1685 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1686 .INVAL => |err| return errnoBug(err), // invalid parameters
1687 .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
1688 .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1689 .ADDRNOTAVAIL => return error.AddressUnavailable,
1690 .FAULT => |err| return errnoBug(err), // invalid `addr` pointer
1691 .NOMEM => return error.SystemResources,
1692 else => |err| return posix.unexpectedErrno(err),
1693 }
1694 }
1695}
1696
1697fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
1698 while (true) {
1699 try k.checkCancel();
1700 switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
1701 .SUCCESS => break,
1702 .INTR => continue,
1703 .CANCELED => return error.Canceled,
1704
1705 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1706 .FAULT => |err| return errnoBug(err),
1707 .INVAL => |err| return errnoBug(err), // invalid parameters
1708 .NOTSOCK => |err| return errnoBug(err), // always a race condition
1709 .NOBUFS => return error.SystemResources,
1710 else => |err| return posix.unexpectedErrno(err),
1711 }
1712 }
1713}
1714
1715fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
1716 const o: []const u8 = @ptrCast(&option);
1717 while (true) {
1718 try k.checkCancel();
1719 switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
1720 .SUCCESS => return,
1721 .INTR => continue,
1722 .CANCELED => return error.Canceled,
1723
1724 .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1725 .NOTSOCK => |err| return errnoBug(err),
1726 .INVAL => |err| return errnoBug(err),
1727 .FAULT => |err| return errnoBug(err),
1728 else => |err| return posix.unexpectedErrno(err),
1729 }
1730 }
1731}
1732
1733fn checkCancel(k: *Kqueue) error{Canceled}!void {
1734 if (cancelRequested(k)) return error.Canceled;
1735}
1736
1737const Condition = struct {
1738 tail: *Fiber,
1739 event: union(enum) {
1740 queued,
1741 wake: Io.Condition.Wake,
1742 },
1743};