master
   1const Kqueue = @This();
   2const builtin = @import("builtin");
   3
   4const std = @import("../std.zig");
   5const Io = std.Io;
   6const Dir = std.Io.Dir;
   7const File = std.Io.File;
   8const net = std.Io.net;
   9const assert = std.debug.assert;
  10const Allocator = std.mem.Allocator;
  11const Alignment = std.mem.Alignment;
  12const IpAddress = std.Io.net.IpAddress;
  13const errnoBug = std.Io.Threaded.errnoBug;
  14const posix = std.posix;
  15
  16/// Must be a thread-safe allocator.
  17gpa: Allocator,
  18mutex: std.Thread.Mutex,
  19main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
  20threads: Thread.List,
  21
  22/// Empirically saw >128KB being used by the self-hosted backend to panic.
  23const idle_stack_size = 256 * 1024;
  24
  25const max_idle_search = 4;
  26const max_steal_ready_search = 4;
  27const max_iovecs_len = 8;
  28
  29const changes_buffer_len = 64;
  30
  31const Thread = struct {
  32    thread: std.Thread,
  33    idle_context: Context,
  34    current_context: *Context,
  35    ready_queue: ?*Fiber,
  36    kq_fd: posix.fd_t,
  37    idle_search_index: u32,
  38    steal_ready_search_index: u32,
  39    /// For ensuring multiple fibers waiting on the same file descriptor and
  40    /// filter use the same kevent.
  41    wait_queues: std.AutoArrayHashMapUnmanaged(WaitQueueKey, *Fiber),
  42
  43    const WaitQueueKey = struct {
  44        ident: usize,
  45        filter: i32,
  46    };
  47
  48    const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
  49
  50    threadlocal var self: *Thread = undefined;
  51
  52    fn current() *Thread {
  53        return self;
  54    }
  55
  56    fn currentFiber(thread: *Thread) *Fiber {
  57        return @fieldParentPtr("context", thread.current_context);
  58    }
  59
  60    const List = struct {
  61        allocated: []Thread,
  62        reserved: u32,
  63        active: u32,
  64    };
  65
  66    fn deinit(thread: *Thread, gpa: Allocator) void {
  67        posix.close(thread.kq_fd);
  68        assert(thread.wait_queues.count() == 0);
  69        thread.wait_queues.deinit(gpa);
  70        thread.* = undefined;
  71    }
  72};
  73
  74const Fiber = struct {
  75    required_align: void align(4),
  76    context: Context,
  77    awaiter: ?*Fiber,
  78    queue_next: ?*Fiber,
  79    cancel_thread: ?*Thread,
  80    awaiting_completions: std.StaticBitSet(3),
  81
  82    const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
  83
  84    const max_result_align: Alignment = .@"16";
  85    const max_result_size = max_result_align.forward(64);
  86    /// This includes any stack realignments that need to happen, and also the
  87    /// initial frame return address slot and argument frame, depending on target.
  88    const min_stack_size = 4 * 1024 * 1024;
  89    const max_context_align: Alignment = .@"16";
  90    const max_context_size = max_context_align.forward(1024);
  91    const max_closure_size: usize = @sizeOf(AsyncClosure);
  92    const max_closure_align: Alignment = .of(AsyncClosure);
  93    const allocation_size = std.mem.alignForward(
  94        usize,
  95        max_closure_align.max(max_context_align).forward(
  96            max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
  97        ) + max_closure_size + max_context_size,
  98        std.heap.page_size_max,
  99    );
 100
 101    fn allocate(k: *Kqueue) error{OutOfMemory}!*Fiber {
 102        return @ptrCast(try k.gpa.alignedAlloc(u8, .of(Fiber), allocation_size));
 103    }
 104
 105    fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
 106        return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
 107    }
 108
 109    fn allocatedEnd(f: *Fiber) [*]u8 {
 110        const allocated_slice = f.allocatedSlice();
 111        return allocated_slice[allocated_slice.len..].ptr;
 112    }
 113
 114    fn resultPointer(f: *Fiber, comptime Result: type) *Result {
 115        return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
 116    }
 117
 118    fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
 119        return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
 120    }
 121
 122    fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
 123        if (@cmpxchgStrong(
 124            ?*Thread,
 125            &fiber.cancel_thread,
 126            null,
 127            thread,
 128            .acq_rel,
 129            .acquire,
 130        )) |cancel_thread| {
 131            assert(cancel_thread == Thread.canceling);
 132            return error.Canceled;
 133        }
 134    }
 135
 136    fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
 137        if (@cmpxchgStrong(
 138            ?*Thread,
 139            &fiber.cancel_thread,
 140            thread,
 141            null,
 142            .acq_rel,
 143            .acquire,
 144        )) |cancel_thread| assert(cancel_thread == Thread.canceling);
 145    }
 146
 147    const Queue = struct { head: *Fiber, tail: *Fiber };
 148};
 149
 150fn recycle(k: *Kqueue, fiber: *Fiber) void {
 151    std.log.debug("recyling {*}", .{fiber});
 152    assert(fiber.queue_next == null);
 153    k.gpa.free(fiber.allocatedSlice());
 154}
 155
 156pub const InitOptions = struct {
 157    n_threads: ?usize = null,
 158};
 159
 160pub fn init(k: *Kqueue, gpa: Allocator, options: InitOptions) !void {
 161    assert(options.n_threads != 0);
 162    const n_threads = @max(1, options.n_threads orelse std.Thread.getCpuCount() catch 1);
 163    const threads_size = n_threads * @sizeOf(Thread);
 164    const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
 165    const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
 166    errdefer gpa.free(allocated_slice);
 167    k.* = .{
 168        .gpa = gpa,
 169        .mutex = .{},
 170        .main_fiber_buffer = undefined,
 171        .threads = .{
 172            .allocated = @ptrCast(allocated_slice[0..threads_size]),
 173            .reserved = 1,
 174            .active = 1,
 175        },
 176    };
 177    const main_fiber: *Fiber = @ptrCast(&k.main_fiber_buffer);
 178    main_fiber.* = .{
 179        .required_align = {},
 180        .context = undefined,
 181        .awaiter = null,
 182        .queue_next = null,
 183        .cancel_thread = null,
 184        .awaiting_completions = .initEmpty(),
 185    };
 186    const main_thread = &k.threads.allocated[0];
 187    Thread.self = main_thread;
 188    const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
 189    (idle_stack_end - 1)[0..1].* = .{@intFromPtr(k)};
 190    main_thread.* = .{
 191        .thread = undefined,
 192        .idle_context = switch (builtin.cpu.arch) {
 193            .aarch64 => .{
 194                .sp = @intFromPtr(idle_stack_end),
 195                .fp = 0,
 196                .pc = @intFromPtr(&mainIdleEntry),
 197            },
 198            .x86_64 => .{
 199                .rsp = @intFromPtr(idle_stack_end - 1),
 200                .rbp = 0,
 201                .rip = @intFromPtr(&mainIdleEntry),
 202            },
 203            else => @compileError("unimplemented architecture"),
 204        },
 205        .current_context = &main_fiber.context,
 206        .ready_queue = null,
 207        .kq_fd = try posix.kqueue(),
 208        .idle_search_index = 1,
 209        .steal_ready_search_index = 1,
 210        .wait_queues = .empty,
 211    };
 212    errdefer std.posix.close(main_thread.kq_fd);
 213    std.log.debug("created main idle {*}", .{&main_thread.idle_context});
 214    std.log.debug("created main {*}", .{main_fiber});
 215}
 216
 217pub fn deinit(k: *Kqueue) void {
 218    const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
 219    for (k.threads.allocated[0..active_threads]) |*thread| {
 220        const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
 221        assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
 222    }
 223    k.yield(null, .exit);
 224    const main_thread = &k.threads.allocated[0];
 225    const gpa = k.gpa;
 226    main_thread.deinit(gpa);
 227    const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(k.threads.allocated.ptr));
 228    const idle_stack_end_offset = std.mem.alignForward(usize, k.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
 229    for (k.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
 230    gpa.free(allocated_ptr[0..idle_stack_end_offset]);
 231    k.* = undefined;
 232}
 233
 234fn findReadyFiber(k: *Kqueue, thread: *Thread) ?*Fiber {
 235    if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| {
 236        @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
 237        ready_fiber.queue_next = null;
 238        return ready_fiber;
 239    }
 240    const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
 241    for (0..@min(max_steal_ready_search, active_threads)) |_| {
 242        defer thread.steal_ready_search_index += 1;
 243        if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0;
 244        const steal_ready_search_thread = &k.threads.allocated[0..active_threads][thread.steal_ready_search_index];
 245        if (steal_ready_search_thread == thread) continue;
 246        const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
 247        if (ready_fiber == Fiber.finished) continue;
 248        if (@cmpxchgWeak(
 249            ?*Fiber,
 250            &steal_ready_search_thread.ready_queue,
 251            ready_fiber,
 252            null,
 253            .acquire,
 254            .monotonic,
 255        )) |_| continue;
 256        @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
 257        ready_fiber.queue_next = null;
 258        return ready_fiber;
 259    }
 260    // couldn't find anything to do, so we are now open for business
 261    @atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic);
 262    return null;
 263}
 264
 265fn yield(k: *Kqueue, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
 266    const thread: *Thread = .current();
 267    const ready_context = if (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber|
 268        &ready_fiber.context
 269    else
 270        &thread.idle_context;
 271    const message: SwitchMessage = .{
 272        .contexts = .{
 273            .prev = thread.current_context,
 274            .ready = ready_context,
 275        },
 276        .pending_task = pending_task,
 277    };
 278    std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready });
 279    contextSwitch(&message).handle(k);
 280}
 281
 282fn schedule(k: *Kqueue, thread: *Thread, ready_queue: Fiber.Queue) void {
 283    {
 284        var fiber = ready_queue.head;
 285        while (true) {
 286            std.log.debug("scheduling {*}", .{fiber});
 287            fiber = fiber.queue_next orelse break;
 288        }
 289        assert(fiber == ready_queue.tail);
 290    }
 291    // shared fields of previous `Thread` must be initialized before later ones are marked as active
 292    const new_thread_index = @atomicLoad(u32, &k.threads.active, .acquire);
 293    for (0..@min(max_idle_search, new_thread_index)) |_| {
 294        defer thread.idle_search_index += 1;
 295        if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
 296        const idle_search_thread = &k.threads.allocated[0..new_thread_index][thread.idle_search_index];
 297        if (idle_search_thread == thread) continue;
 298        if (@cmpxchgWeak(
 299            ?*Fiber,
 300            &idle_search_thread.ready_queue,
 301            null,
 302            ready_queue.head,
 303            .release,
 304            .monotonic,
 305        )) |_| continue;
 306        const changes = [_]posix.Kevent{
 307            .{
 308                .ident = 0,
 309                .filter = std.c.EVFILT.USER,
 310                .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
 311                .fflags = std.c.NOTE.TRIGGER,
 312                .data = 0,
 313                .udata = @intFromEnum(Completion.UserData.wakeup),
 314            },
 315        };
 316        // If an error occurs it only pessimises scheduling.
 317        _ = posix.kevent(idle_search_thread.kq_fd, &changes, &.{}, null) catch {};
 318        return;
 319    }
 320    spawn_thread: {
 321        // previous failed reservations must have completed before retrying
 322        if (new_thread_index == k.threads.allocated.len or @cmpxchgWeak(
 323            u32,
 324            &k.threads.reserved,
 325            new_thread_index,
 326            new_thread_index + 1,
 327            .acquire,
 328            .monotonic,
 329        ) != null) break :spawn_thread;
 330        const new_thread = &k.threads.allocated[new_thread_index];
 331        const next_thread_index = new_thread_index + 1;
 332        new_thread.* = .{
 333            .thread = undefined,
 334            .idle_context = undefined,
 335            .current_context = &new_thread.idle_context,
 336            .ready_queue = ready_queue.head,
 337            .kq_fd = posix.kqueue() catch |err| {
 338                @atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
 339                // no more access to `thread` after giving up reservation
 340                std.log.warn("unable to create worker thread due to kqueue init failure: {t}", .{err});
 341                break :spawn_thread;
 342            },
 343            .idle_search_index = 0,
 344            .steal_ready_search_index = 0,
 345            .wait_queues = .empty,
 346        };
 347        new_thread.thread = std.Thread.spawn(.{
 348            .stack_size = idle_stack_size,
 349            .allocator = k.gpa,
 350        }, threadEntry, .{ k, new_thread_index }) catch |err| {
 351            posix.close(new_thread.kq_fd);
 352            @atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
 353            // no more access to `thread` after giving up reservation
 354            std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
 355            break :spawn_thread;
 356        };
 357        // shared fields of `Thread` must be initialized before being marked active
 358        @atomicStore(u32, &k.threads.active, next_thread_index, .release);
 359        return;
 360    }
 361    // nobody wanted it, so just queue it on ourselves
 362    while (@cmpxchgWeak(
 363        ?*Fiber,
 364        &thread.ready_queue,
 365        ready_queue.tail.queue_next,
 366        ready_queue.head,
 367        .acq_rel,
 368        .acquire,
 369    )) |old_head| ready_queue.tail.queue_next = old_head;
 370}
 371
 372fn mainIdle(k: *Kqueue, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
 373    message.handle(k);
 374    k.idle(&k.threads.allocated[0]);
 375    k.yield(@ptrCast(&k.main_fiber_buffer), .nothing);
 376    unreachable; // switched to dead fiber
 377}
 378
 379fn threadEntry(k: *Kqueue, index: u32) void {
 380    const thread: *Thread = &k.threads.allocated[index];
 381    Thread.self = thread;
 382    std.log.debug("created thread idle {*}", .{&thread.idle_context});
 383    k.idle(thread);
 384    thread.deinit(k.gpa);
 385}
 386
 387const Completion = struct {
 388    const UserData = enum(usize) {
 389        unused,
 390        wakeup,
 391        cleanup,
 392        exit,
 393        /// *Fiber
 394        _,
 395    };
 396    /// Corresponds to Kevent field.
 397    flags: u16,
 398    /// Corresponds to Kevent field.
 399    fflags: u32,
 400    /// Corresponds to Kevent field.
 401    data: isize,
 402};
 403
 404fn idle(k: *Kqueue, thread: *Thread) void {
 405    var events_buffer: [changes_buffer_len]posix.Kevent = undefined;
 406    var maybe_ready_fiber: ?*Fiber = null;
 407    while (true) {
 408        while (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber| {
 409            k.yield(ready_fiber, .nothing);
 410            maybe_ready_fiber = null;
 411        }
 412        const n = posix.kevent(thread.kq_fd, &.{}, &events_buffer, null) catch |err| {
 413            // TODO handle EINTR for cancellation purposes
 414            @panic(@errorName(err));
 415        };
 416        var maybe_ready_queue: ?Fiber.Queue = null;
 417        for (events_buffer[0..n]) |event| switch (@as(Completion.UserData, @enumFromInt(event.udata))) {
 418            .unused => unreachable, // bad submission queued?
 419            .wakeup => {},
 420            .cleanup => @panic("failed to notify other threads that we are exiting"),
 421            .exit => {
 422                assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
 423                return;
 424            },
 425            _ => {
 426                const event_head_fiber: *Fiber = @ptrFromInt(event.udata);
 427                const event_tail_fiber = thread.wait_queues.fetchSwapRemove(.{
 428                    .ident = event.ident,
 429                    .filter = event.filter,
 430                }).?.value;
 431                assert(event_tail_fiber.queue_next == null);
 432
 433                // TODO reevaluate this logic
 434                event_head_fiber.resultPointer(Completion).* = .{
 435                    .flags = event.flags,
 436                    .fflags = event.fflags,
 437                    .data = event.data,
 438                };
 439
 440                queue_ready: {
 441                    const head: *Fiber = if (maybe_ready_fiber == null) f: {
 442                        maybe_ready_fiber = event_head_fiber;
 443                        const next = event_head_fiber.queue_next orelse break :queue_ready;
 444                        event_head_fiber.queue_next = null;
 445                        break :f next;
 446                    } else event_head_fiber;
 447
 448                    if (maybe_ready_queue) |*ready_queue| {
 449                        ready_queue.tail.queue_next = head;
 450                        ready_queue.tail = event_tail_fiber;
 451                    } else {
 452                        maybe_ready_queue = .{ .head = head, .tail = event_tail_fiber };
 453                    }
 454                }
 455            },
 456        };
 457        if (maybe_ready_queue) |ready_queue| k.schedule(thread, ready_queue);
 458    }
 459}
 460
 461const SwitchMessage = struct {
 462    contexts: extern struct {
 463        prev: *Context,
 464        ready: *Context,
 465    },
 466    pending_task: PendingTask,
 467
 468    const PendingTask = union(enum) {
 469        nothing,
 470        reschedule,
 471        recycle: *Fiber,
 472        register_awaiter: *?*Fiber,
 473        register_select: []const *Io.AnyFuture,
 474        mutex_lock: struct {
 475            prev_state: Io.Mutex.State,
 476            mutex: *Io.Mutex,
 477        },
 478        condition_wait: struct {
 479            cond: *Io.Condition,
 480            mutex: *Io.Mutex,
 481        },
 482        exit,
 483    };
 484
 485    fn handle(message: *const SwitchMessage, k: *Kqueue) void {
 486        const thread: *Thread = .current();
 487        thread.current_context = message.contexts.ready;
 488        switch (message.pending_task) {
 489            .nothing => {},
 490            .reschedule => if (message.contexts.prev != &thread.idle_context) {
 491                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 492                assert(prev_fiber.queue_next == null);
 493                k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 494            },
 495            .recycle => |fiber| {
 496                k.recycle(fiber);
 497            },
 498            .register_awaiter => |awaiter| {
 499                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 500                assert(prev_fiber.queue_next == null);
 501                if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
 502                    k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 503            },
 504            .register_select => |futures| {
 505                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 506                assert(prev_fiber.queue_next == null);
 507                for (futures) |any_future| {
 508                    const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
 509                    if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) {
 510                        const closure: *AsyncClosure = .fromFiber(future_fiber);
 511                        if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) {
 512                            k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 513                        }
 514                    }
 515                }
 516            },
 517            .mutex_lock => |mutex_lock| {
 518                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 519                assert(prev_fiber.queue_next == null);
 520                var prev_state = mutex_lock.prev_state;
 521                while (switch (prev_state) {
 522                    else => next_state: {
 523                        prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
 524                        break :next_state @cmpxchgWeak(
 525                            Io.Mutex.State,
 526                            &mutex_lock.mutex.state,
 527                            prev_state,
 528                            @enumFromInt(@intFromPtr(prev_fiber)),
 529                            .release,
 530                            .acquire,
 531                        );
 532                    },
 533                    .unlocked => @cmpxchgWeak(
 534                        Io.Mutex.State,
 535                        &mutex_lock.mutex.state,
 536                        .unlocked,
 537                        .locked_once,
 538                        .acquire,
 539                        .acquire,
 540                    ) orelse {
 541                        prev_fiber.queue_next = null;
 542                        k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 543                        return;
 544                    },
 545                }) |next_state| prev_state = next_state;
 546            },
 547            .condition_wait => |condition_wait| {
 548                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 549                assert(prev_fiber.queue_next == null);
 550                const cond_impl = prev_fiber.resultPointer(Condition);
 551                cond_impl.* = .{
 552                    .tail = prev_fiber,
 553                    .event = .queued,
 554                };
 555                if (@cmpxchgStrong(
 556                    ?*Fiber,
 557                    @as(*?*Fiber, @ptrCast(&condition_wait.cond.state)),
 558                    null,
 559                    prev_fiber,
 560                    .release,
 561                    .acquire,
 562                )) |waiting_fiber| {
 563                    const waiting_cond_impl = waiting_fiber.?.resultPointer(Condition);
 564                    assert(waiting_cond_impl.tail.queue_next == null);
 565                    waiting_cond_impl.tail.queue_next = prev_fiber;
 566                    waiting_cond_impl.tail = prev_fiber;
 567                }
 568                condition_wait.mutex.unlock(k.io());
 569            },
 570            .exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| {
 571                const changes = [_]posix.Kevent{
 572                    .{
 573                        .ident = 0,
 574                        .filter = std.c.EVFILT.USER,
 575                        .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
 576                        .fflags = std.c.NOTE.TRIGGER,
 577                        .data = 0,
 578                        .udata = @intFromEnum(Completion.UserData.exit),
 579                    },
 580                };
 581                _ = posix.kevent(each_thread.kq_fd, &changes, &.{}, null) catch |err| {
 582                    @panic(@errorName(err));
 583                };
 584            },
 585        }
 586    }
 587};
 588
 589const Context = switch (builtin.cpu.arch) {
 590    .aarch64 => extern struct {
 591        sp: u64,
 592        fp: u64,
 593        pc: u64,
 594    },
 595    .x86_64 => extern struct {
 596        rsp: u64,
 597        rbp: u64,
 598        rip: u64,
 599    },
 600    else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 601};
 602
 603inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
 604    return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
 605        .aarch64 => asm volatile (
 606            \\ ldp x0, x2, [x1]
 607            \\ ldr x3, [x2, #16]
 608            \\ mov x4, sp
 609            \\ stp x4, fp, [x0]
 610            \\ adr x5, 0f
 611            \\ ldp x4, fp, [x2]
 612            \\ str x5, [x0, #16]
 613            \\ mov sp, x4
 614            \\ br x3
 615            \\0:
 616            : [received_message] "={x1}" (-> *const @FieldType(SwitchMessage, "contexts")),
 617            : [message_to_send] "{x1}" (&message.contexts),
 618            : .{
 619              .x0 = true,
 620              .x1 = true,
 621              .x2 = true,
 622              .x3 = true,
 623              .x4 = true,
 624              .x5 = true,
 625              .x6 = true,
 626              .x7 = true,
 627              .x8 = true,
 628              .x9 = true,
 629              .x10 = true,
 630              .x11 = true,
 631              .x12 = true,
 632              .x13 = true,
 633              .x14 = true,
 634              .x15 = true,
 635              .x16 = true,
 636              .x17 = true,
 637              .x19 = true,
 638              .x20 = true,
 639              .x21 = true,
 640              .x22 = true,
 641              .x23 = true,
 642              .x24 = true,
 643              .x25 = true,
 644              .x26 = true,
 645              .x27 = true,
 646              .x28 = true,
 647              .x30 = true,
 648              .z0 = true,
 649              .z1 = true,
 650              .z2 = true,
 651              .z3 = true,
 652              .z4 = true,
 653              .z5 = true,
 654              .z6 = true,
 655              .z7 = true,
 656              .z8 = true,
 657              .z9 = true,
 658              .z10 = true,
 659              .z11 = true,
 660              .z12 = true,
 661              .z13 = true,
 662              .z14 = true,
 663              .z15 = true,
 664              .z16 = true,
 665              .z17 = true,
 666              .z18 = true,
 667              .z19 = true,
 668              .z20 = true,
 669              .z21 = true,
 670              .z22 = true,
 671              .z23 = true,
 672              .z24 = true,
 673              .z25 = true,
 674              .z26 = true,
 675              .z27 = true,
 676              .z28 = true,
 677              .z29 = true,
 678              .z30 = true,
 679              .z31 = true,
 680              .p0 = true,
 681              .p1 = true,
 682              .p2 = true,
 683              .p3 = true,
 684              .p4 = true,
 685              .p5 = true,
 686              .p6 = true,
 687              .p7 = true,
 688              .p8 = true,
 689              .p9 = true,
 690              .p10 = true,
 691              .p11 = true,
 692              .p12 = true,
 693              .p13 = true,
 694              .p14 = true,
 695              .p15 = true,
 696              .fpcr = true,
 697              .fpsr = true,
 698              .ffr = true,
 699              .memory = true,
 700            }),
 701        .x86_64 => asm volatile (
 702            \\ movq 0(%%rsi), %%rax
 703            \\ movq 8(%%rsi), %%rcx
 704            \\ leaq 0f(%%rip), %%rdx
 705            \\ movq %%rsp, 0(%%rax)
 706            \\ movq %%rbp, 8(%%rax)
 707            \\ movq %%rdx, 16(%%rax)
 708            \\ movq 0(%%rcx), %%rsp
 709            \\ movq 8(%%rcx), %%rbp
 710            \\ jmpq *16(%%rcx)
 711            \\0:
 712            : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")),
 713            : [message_to_send] "{rsi}" (&message.contexts),
 714            : .{
 715              .rax = true,
 716              .rcx = true,
 717              .rdx = true,
 718              .rbx = true,
 719              .rsi = true,
 720              .rdi = true,
 721              .r8 = true,
 722              .r9 = true,
 723              .r10 = true,
 724              .r11 = true,
 725              .r12 = true,
 726              .r13 = true,
 727              .r14 = true,
 728              .r15 = true,
 729              .mm0 = true,
 730              .mm1 = true,
 731              .mm2 = true,
 732              .mm3 = true,
 733              .mm4 = true,
 734              .mm5 = true,
 735              .mm6 = true,
 736              .mm7 = true,
 737              .zmm0 = true,
 738              .zmm1 = true,
 739              .zmm2 = true,
 740              .zmm3 = true,
 741              .zmm4 = true,
 742              .zmm5 = true,
 743              .zmm6 = true,
 744              .zmm7 = true,
 745              .zmm8 = true,
 746              .zmm9 = true,
 747              .zmm10 = true,
 748              .zmm11 = true,
 749              .zmm12 = true,
 750              .zmm13 = true,
 751              .zmm14 = true,
 752              .zmm15 = true,
 753              .zmm16 = true,
 754              .zmm17 = true,
 755              .zmm18 = true,
 756              .zmm19 = true,
 757              .zmm20 = true,
 758              .zmm21 = true,
 759              .zmm22 = true,
 760              .zmm23 = true,
 761              .zmm24 = true,
 762              .zmm25 = true,
 763              .zmm26 = true,
 764              .zmm27 = true,
 765              .zmm28 = true,
 766              .zmm29 = true,
 767              .zmm30 = true,
 768              .zmm31 = true,
 769              .fpsr = true,
 770              .fpcr = true,
 771              .mxcsr = true,
 772              .rflags = true,
 773              .dirflag = true,
 774              .memory = true,
 775            }),
 776        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 777    });
 778}
 779
 780fn mainIdleEntry() callconv(.naked) void {
 781    switch (builtin.cpu.arch) {
 782        .x86_64 => asm volatile (
 783            \\ movq (%%rsp), %%rdi
 784            \\ jmp %[mainIdle:P]
 785            :
 786            : [mainIdle] "X" (&mainIdle),
 787        ),
 788        .aarch64 => asm volatile (
 789            \\ ldr x0, [sp, #-8]
 790            \\ b %[mainIdle]
 791            :
 792            : [mainIdle] "X" (&mainIdle),
 793        ),
 794        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 795    }
 796}
 797
 798fn fiberEntry() callconv(.naked) void {
 799    switch (builtin.cpu.arch) {
 800        .x86_64 => asm volatile (
 801            \\ leaq 8(%%rsp), %%rdi
 802            \\ jmp %[AsyncClosure_call:P]
 803            :
 804            : [AsyncClosure_call] "X" (&AsyncClosure.call),
 805        ),
 806        .aarch64 => asm volatile (
 807            \\ mov x0, sp
 808            \\ b %[AsyncClosure_call]
 809            :
 810            : [AsyncClosure_call] "X" (&AsyncClosure.call),
 811        ),
 812        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 813    }
 814}
 815
 816const AsyncClosure = struct {
 817    kqueue: *Kqueue,
 818    fiber: *Fiber,
 819    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 820    result_align: Alignment,
 821    already_awaited: bool,
 822
 823    fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
 824        return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
 825    }
 826
 827    fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
 828        message.handle(closure.kqueue);
 829        const fiber = closure.fiber;
 830        std.log.debug("{*} performing async", .{fiber});
 831        closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
 832        const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
 833        const ready_awaiter = r: {
 834            const a = awaiter orelse break :r null;
 835            if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
 836            break :r a;
 837        };
 838        closure.kqueue.yield(ready_awaiter, .nothing);
 839        unreachable; // switched to dead fiber
 840    }
 841
 842    fn fromFiber(fiber: *Fiber) *AsyncClosure {
 843        return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
 844            @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
 845        ) - @sizeOf(AsyncClosure));
 846    }
 847};
 848
 849pub fn io(k: *Kqueue) Io {
 850    return .{
 851        .userdata = k,
 852        .vtable = &.{
 853            .async = async,
 854            .concurrent = concurrent,
 855            .await = await,
 856            .cancel = cancel,
 857            .cancelRequested = cancelRequested,
 858            .select = select,
 859
 860            .groupAsync = groupAsync,
 861            .groupWait = groupWait,
 862            .groupCancel = groupCancel,
 863
 864            .mutexLock = mutexLock,
 865            .mutexLockUncancelable = mutexLockUncancelable,
 866            .mutexUnlock = mutexUnlock,
 867
 868            .conditionWait = conditionWait,
 869            .conditionWaitUncancelable = conditionWaitUncancelable,
 870            .conditionWake = conditionWake,
 871
 872            .dirMake = dirMake,
 873            .dirMakePath = dirMakePath,
 874            .dirMakeOpenPath = dirMakeOpenPath,
 875            .dirStat = dirStat,
 876            .dirStatPath = dirStatPath,
 877
 878            .fileStat = fileStat,
 879            .dirAccess = dirAccess,
 880            .dirCreateFile = dirCreateFile,
 881            .dirOpenFile = dirOpenFile,
 882            .dirOpenDir = dirOpenDir,
 883            .dirClose = dirClose,
 884            .fileClose = fileClose,
 885            .fileWriteStreaming = fileWriteStreaming,
 886            .fileWritePositional = fileWritePositional,
 887            .fileReadStreaming = fileReadStreaming,
 888            .fileReadPositional = fileReadPositional,
 889            .fileSeekBy = fileSeekBy,
 890            .fileSeekTo = fileSeekTo,
 891            .openSelfExe = openSelfExe,
 892
 893            .now = now,
 894            .sleep = sleep,
 895
 896            .netListenIp = netListenIp,
 897            .netListenUnix = netListenUnix,
 898            .netAccept = netAccept,
 899            .netBindIp = netBindIp,
 900            .netConnectIp = netConnectIp,
 901            .netConnectUnix = netConnectUnix,
 902            .netClose = netClose,
 903            .netRead = netRead,
 904            .netWrite = netWrite,
 905            .netSend = netSend,
 906            .netReceive = netReceive,
 907            .netInterfaceNameResolve = netInterfaceNameResolve,
 908            .netInterfaceName = netInterfaceName,
 909            .netLookup = netLookup,
 910        },
 911    };
 912}
 913
 914fn async(
 915    userdata: ?*anyopaque,
 916    result: []u8,
 917    result_alignment: std.mem.Alignment,
 918    context: []const u8,
 919    context_alignment: std.mem.Alignment,
 920    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 921) ?*Io.AnyFuture {
 922    return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
 923        start(context.ptr, result.ptr);
 924        return null;
 925    };
 926}
 927
 928fn concurrent(
 929    userdata: ?*anyopaque,
 930    result_len: usize,
 931    result_alignment: Alignment,
 932    context: []const u8,
 933    context_alignment: Alignment,
 934    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 935) Io.ConcurrentError!*Io.AnyFuture {
 936    const k: *Kqueue = @ptrCast(@alignCast(userdata));
 937    assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
 938    assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
 939    assert(result_len <= Fiber.max_result_size); // TODO
 940    assert(context.len <= Fiber.max_context_size); // TODO
 941
 942    const fiber = Fiber.allocate(k) catch return error.ConcurrencyUnavailable;
 943    std.log.debug("allocated {*}", .{fiber});
 944
 945    const closure: *AsyncClosure = .fromFiber(fiber);
 946    fiber.* = .{
 947        .required_align = {},
 948        .context = switch (builtin.cpu.arch) {
 949            .x86_64 => .{
 950                .rsp = @intFromPtr(closure) - @sizeOf(usize),
 951                .rbp = 0,
 952                .rip = @intFromPtr(&fiberEntry),
 953            },
 954            .aarch64 => .{
 955                .sp = @intFromPtr(closure),
 956                .fp = 0,
 957                .pc = @intFromPtr(&fiberEntry),
 958            },
 959            else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 960        },
 961        .awaiter = null,
 962        .queue_next = null,
 963        .cancel_thread = null,
 964        .awaiting_completions = .initEmpty(),
 965    };
 966    closure.* = .{
 967        .kqueue = k,
 968        .fiber = fiber,
 969        .start = start,
 970        .result_align = result_alignment,
 971        .already_awaited = false,
 972    };
 973    @memcpy(closure.contextPointer(), context);
 974
 975    k.schedule(.current(), .{ .head = fiber, .tail = fiber });
 976    return @ptrCast(fiber);
 977}
 978
 979fn await(
 980    userdata: ?*anyopaque,
 981    any_future: *Io.AnyFuture,
 982    result: []u8,
 983    result_alignment: std.mem.Alignment,
 984) void {
 985    const k: *Kqueue = @ptrCast(@alignCast(userdata));
 986    const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
 987    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
 988        k.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
 989    @memcpy(result, future_fiber.resultBytes(result_alignment));
 990    k.recycle(future_fiber);
 991}
 992
 993fn cancel(
 994    userdata: ?*anyopaque,
 995    any_future: *Io.AnyFuture,
 996    result: []u8,
 997    result_alignment: std.mem.Alignment,
 998) void {
 999    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1000    _ = k;
1001    _ = any_future;
1002    _ = result;
1003    _ = result_alignment;
1004    @panic("TODO");
1005}
1006
1007fn cancelRequested(userdata: ?*anyopaque) bool {
1008    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1009    _ = k;
1010    return false; // TODO
1011}
1012
1013fn groupAsync(
1014    userdata: ?*anyopaque,
1015    group: *Io.Group,
1016    context: []const u8,
1017    context_alignment: std.mem.Alignment,
1018    start: *const fn (*Io.Group, context: *const anyopaque) void,
1019) void {
1020    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1021    _ = k;
1022    _ = group;
1023    _ = context;
1024    _ = context_alignment;
1025    _ = start;
1026    @panic("TODO");
1027}
1028
1029fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
1030    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1031    _ = k;
1032    _ = group;
1033    _ = token;
1034    @panic("TODO");
1035}
1036
1037fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
1038    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1039    _ = k;
1040    _ = group;
1041    _ = token;
1042    @panic("TODO");
1043}
1044
1045fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
1046    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1047    _ = k;
1048    _ = futures;
1049    @panic("TODO");
1050}
1051
1052fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
1053    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1054    _ = k;
1055    _ = prev_state;
1056    _ = mutex;
1057    @panic("TODO");
1058}
1059fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
1060    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1061    _ = k;
1062    _ = prev_state;
1063    _ = mutex;
1064    @panic("TODO");
1065}
1066fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
1067    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1068    _ = k;
1069    _ = prev_state;
1070    _ = mutex;
1071    @panic("TODO");
1072}
1073
1074fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
1075    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1076    k.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
1077    const thread = Thread.current();
1078    const fiber = thread.currentFiber();
1079    const cond_impl = fiber.resultPointer(Condition);
1080    try mutex.lock(k.io());
1081    switch (cond_impl.event) {
1082        .queued => {},
1083        .wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) {
1084            .one => if (@cmpxchgStrong(
1085                ?*Fiber,
1086                @as(*?*Fiber, @ptrCast(&cond.state)),
1087                null,
1088                next_fiber,
1089                .release,
1090                .acquire,
1091            )) |old_fiber| {
1092                const old_cond_impl = old_fiber.?.resultPointer(Condition);
1093                assert(old_cond_impl.tail.queue_next == null);
1094                old_cond_impl.tail.queue_next = next_fiber;
1095                old_cond_impl.tail = cond_impl.tail;
1096            },
1097            .all => k.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }),
1098        },
1099    }
1100    fiber.queue_next = null;
1101}
1102
1103fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
1104    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1105    _ = k;
1106    _ = cond;
1107    _ = mutex;
1108    @panic("TODO");
1109}
1110fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
1111    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1112    const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
1113    waiting_fiber.resultPointer(Condition).event = .{ .wake = wake };
1114    k.yield(waiting_fiber, .reschedule);
1115}
1116
1117fn dirMake(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, mode: Dir.Mode) Dir.MakeError!void {
1118    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1119    _ = k;
1120    _ = dir;
1121    _ = sub_path;
1122    _ = mode;
1123    @panic("TODO");
1124}
1125fn dirMakePath(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, mode: Dir.Mode) Dir.MakeError!void {
1126    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1127    _ = k;
1128    _ = dir;
1129    _ = sub_path;
1130    _ = mode;
1131    @panic("TODO");
1132}
1133fn dirMakeOpenPath(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.OpenOptions) Dir.MakeOpenPathError!Dir {
1134    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1135    _ = k;
1136    _ = dir;
1137    _ = sub_path;
1138    _ = options;
1139    @panic("TODO");
1140}
1141fn dirStat(userdata: ?*anyopaque, dir: Dir) Dir.StatError!Dir.Stat {
1142    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1143    _ = k;
1144    _ = dir;
1145    @panic("TODO");
1146}
1147fn dirStatPath(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.StatPathOptions) Dir.StatPathError!File.Stat {
1148    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1149    _ = k;
1150    _ = dir;
1151    _ = sub_path;
1152    _ = options;
1153    @panic("TODO");
1154}
1155fn dirAccess(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.AccessOptions) Dir.AccessError!void {
1156    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1157    _ = k;
1158    _ = dir;
1159    _ = sub_path;
1160    _ = options;
1161    @panic("TODO");
1162}
1163fn dirCreateFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File {
1164    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1165    _ = k;
1166    _ = dir;
1167    _ = sub_path;
1168    _ = flags;
1169    @panic("TODO");
1170}
1171fn dirOpenFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File {
1172    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1173    _ = k;
1174    _ = dir;
1175    _ = sub_path;
1176    _ = flags;
1177    @panic("TODO");
1178}
1179fn dirOpenDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.OpenOptions) Dir.OpenError!Dir {
1180    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1181    _ = k;
1182    _ = dir;
1183    _ = sub_path;
1184    _ = options;
1185    @panic("TODO");
1186}
1187fn dirClose(userdata: ?*anyopaque, dir: Dir) void {
1188    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1189    _ = k;
1190    _ = dir;
1191    @panic("TODO");
1192}
1193fn fileStat(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
1194    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1195    _ = k;
1196    _ = file;
1197    @panic("TODO");
1198}
1199fn fileClose(userdata: ?*anyopaque, file: File) void {
1200    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1201    _ = k;
1202    _ = file;
1203    @panic("TODO");
1204}
1205fn fileWriteStreaming(userdata: ?*anyopaque, file: File, buffer: [][]const u8) File.WriteStreamingError!usize {
1206    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1207    _ = k;
1208    _ = file;
1209    _ = buffer;
1210    @panic("TODO");
1211}
1212fn fileWritePositional(userdata: ?*anyopaque, file: File, buffer: [][]const u8, offset: u64) File.WritePositionalError!usize {
1213    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1214    _ = k;
1215    _ = file;
1216    _ = buffer;
1217    _ = offset;
1218    @panic("TODO");
1219}
1220fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: [][]u8) File.Reader.Error!usize {
1221    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1222    _ = k;
1223    _ = file;
1224    _ = data;
1225    @panic("TODO");
1226}
1227fn fileReadPositional(userdata: ?*anyopaque, file: File, data: [][]u8, offset: u64) File.ReadPositionalError!usize {
1228    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1229    _ = k;
1230    _ = file;
1231    _ = data;
1232    _ = offset;
1233    @panic("TODO");
1234}
1235fn fileSeekBy(userdata: ?*anyopaque, file: File, relative_offset: i64) File.SeekError!void {
1236    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1237    _ = k;
1238    _ = file;
1239    _ = relative_offset;
1240    @panic("TODO");
1241}
1242fn fileSeekTo(userdata: ?*anyopaque, file: File, absolute_offset: u64) File.SeekError!void {
1243    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1244    _ = k;
1245    _ = file;
1246    _ = absolute_offset;
1247    @panic("TODO");
1248}
1249fn openSelfExe(userdata: ?*anyopaque, file: File.OpenFlags) File.OpenSelfExeError!File {
1250    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1251    _ = k;
1252    _ = file;
1253    @panic("TODO");
1254}
1255
1256fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
1257    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1258    _ = k;
1259    _ = clock;
1260    @panic("TODO");
1261}
1262fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
1263    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1264    _ = k;
1265    _ = timeout;
1266    @panic("TODO");
1267}
1268
1269fn netListenIp(
1270    userdata: ?*anyopaque,
1271    address: net.IpAddress,
1272    options: net.IpAddress.ListenOptions,
1273) net.IpAddress.ListenError!net.Server {
1274    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1275    _ = k;
1276    _ = address;
1277    _ = options;
1278    @panic("TODO");
1279}
1280fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream {
1281    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1282    _ = k;
1283    _ = server;
1284    @panic("TODO");
1285}
1286fn netBindIp(
1287    userdata: ?*anyopaque,
1288    address: *const net.IpAddress,
1289    options: net.IpAddress.BindOptions,
1290) net.IpAddress.BindError!net.Socket {
1291    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1292    const family = Io.Threaded.posixAddressFamily(address);
1293    const socket_fd = try openSocketPosix(k, family, options);
1294    errdefer std.posix.close(socket_fd);
1295    var storage: Io.Threaded.PosixAddress = undefined;
1296    var addr_len = Io.Threaded.addressToPosix(address, &storage);
1297    try posixBind(k, socket_fd, &storage.any, addr_len);
1298    try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
1299    return .{
1300        .handle = socket_fd,
1301        .address = Io.Threaded.addressFromPosix(&storage),
1302    };
1303}
1304fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream {
1305    if (options.timeout != .none) @panic("TODO");
1306    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1307    const family = Io.Threaded.posixAddressFamily(address);
1308    const socket_fd = try openSocketPosix(k, family, .{
1309        .mode = options.mode,
1310        .protocol = options.protocol,
1311    });
1312    errdefer posix.close(socket_fd);
1313    var storage: Io.Threaded.PosixAddress = undefined;
1314    var addr_len = Io.Threaded.addressToPosix(address, &storage);
1315    try posixConnect(k, socket_fd, &storage.any, addr_len);
1316    try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
1317    return .{ .socket = .{
1318        .handle = socket_fd,
1319        .address = Io.Threaded.addressFromPosix(&storage),
1320    } };
1321}
1322
1323fn posixConnect(k: *Kqueue, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
1324    while (true) {
1325        try k.checkCancel();
1326        switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
1327            .SUCCESS => return,
1328            .INTR => continue,
1329            .CANCELED => return error.Canceled,
1330            .AGAIN => @panic("TODO"),
1331            .INPROGRESS => return, // Due to TCP fast open, we find out possible error later.
1332
1333            .ADDRNOTAVAIL => return error.AddressUnavailable,
1334            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1335            .ALREADY => return error.ConnectionPending,
1336            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1337            .CONNREFUSED => return error.ConnectionRefused,
1338            .CONNRESET => return error.ConnectionResetByPeer,
1339            .FAULT => |err| return errnoBug(err),
1340            .ISCONN => |err| return errnoBug(err),
1341            .HOSTUNREACH => return error.HostUnreachable,
1342            .NETUNREACH => return error.NetworkUnreachable,
1343            .NOTSOCK => |err| return errnoBug(err),
1344            .PROTOTYPE => |err| return errnoBug(err),
1345            .TIMEDOUT => return error.Timeout,
1346            .CONNABORTED => |err| return errnoBug(err),
1347            .ACCES => return error.AccessDenied,
1348            .PERM => |err| return errnoBug(err),
1349            .NOENT => |err| return errnoBug(err),
1350            .NETDOWN => return error.NetworkDown,
1351            else => |err| return posix.unexpectedErrno(err),
1352        }
1353    }
1354}
1355
1356fn netListenUnix(
1357    userdata: ?*anyopaque,
1358    unix_address: *const net.UnixAddress,
1359    options: net.UnixAddress.ListenOptions,
1360) net.UnixAddress.ListenError!net.Socket.Handle {
1361    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1362    _ = k;
1363    _ = unix_address;
1364    _ = options;
1365    @panic("TODO");
1366}
1367fn netConnectUnix(
1368    userdata: ?*anyopaque,
1369    unix_address: *const net.UnixAddress,
1370) net.UnixAddress.ConnectError!net.Socket.Handle {
1371    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1372    _ = k;
1373    _ = unix_address;
1374    @panic("TODO");
1375}
1376
1377fn netSend(
1378    userdata: ?*anyopaque,
1379    handle: net.Socket.Handle,
1380    outgoing_messages: []net.OutgoingMessage,
1381    flags: net.SendFlags,
1382) struct { ?net.Socket.SendError, usize } {
1383    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1384
1385    const posix_flags: u32 =
1386        @as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
1387        @as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) |
1388        @as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) |
1389        @as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) |
1390        @as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
1391        posix.MSG.NOSIGNAL;
1392
1393    for (outgoing_messages, 0..) |*msg, i| {
1394        netSendOne(k, handle, msg, posix_flags) catch |err| return .{ err, i };
1395    }
1396
1397    return .{ null, outgoing_messages.len };
1398}
1399
1400fn netSendOne(
1401    k: *Kqueue,
1402    handle: net.Socket.Handle,
1403    message: *net.OutgoingMessage,
1404    flags: u32,
1405) net.Socket.SendError!void {
1406    var addr: Io.Threaded.PosixAddress = undefined;
1407    var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
1408    const msg: posix.msghdr_const = .{
1409        .name = &addr.any,
1410        .namelen = Io.Threaded.addressToPosix(message.address, &addr),
1411        .iov = (&iovec)[0..1],
1412        .iovlen = 1,
1413        // OS returns EINVAL if this pointer is invalid even if controllen is zero.
1414        .control = if (message.control.len == 0) null else @constCast(message.control.ptr),
1415        .controllen = @intCast(message.control.len),
1416        .flags = 0,
1417    };
1418    while (true) {
1419        try k.checkCancel();
1420        const rc = posix.system.sendmsg(handle, &msg, flags);
1421        switch (posix.errno(rc)) {
1422            .SUCCESS => {
1423                message.data_len = @intCast(rc);
1424                return;
1425            },
1426            .INTR => continue,
1427            .CANCELED => return error.Canceled,
1428            .AGAIN => @panic("TODO register kevent"),
1429
1430            .ACCES => return error.AccessDenied,
1431            .ALREADY => return error.FastOpenAlreadyInProgress,
1432            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1433            .CONNRESET => return error.ConnectionResetByPeer,
1434            .DESTADDRREQ => |err| return errnoBug(err),
1435            .FAULT => |err| return errnoBug(err),
1436            .INVAL => |err| return errnoBug(err),
1437            .ISCONN => |err| return errnoBug(err),
1438            .MSGSIZE => return error.MessageOversize,
1439            .NOBUFS => return error.SystemResources,
1440            .NOMEM => return error.SystemResources,
1441            .NOTSOCK => |err| return errnoBug(err),
1442            .OPNOTSUPP => |err| return errnoBug(err),
1443            .PIPE => return error.SocketUnconnected,
1444            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1445            .HOSTUNREACH => return error.HostUnreachable,
1446            .NETUNREACH => return error.NetworkUnreachable,
1447            .NOTCONN => return error.SocketUnconnected,
1448            .NETDOWN => return error.NetworkDown,
1449            else => |err| return posix.unexpectedErrno(err),
1450        }
1451    }
1452}
1453
1454fn netReceive(
1455    userdata: ?*anyopaque,
1456    handle: net.Socket.Handle,
1457    message_buffer: []net.IncomingMessage,
1458    data_buffer: []u8,
1459    flags: net.ReceiveFlags,
1460    timeout: Io.Timeout,
1461) struct { ?net.Socket.ReceiveTimeoutError, usize } {
1462    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1463    _ = k;
1464    _ = handle;
1465    _ = message_buffer;
1466    _ = data_buffer;
1467    _ = flags;
1468    _ = timeout;
1469    @panic("TODO");
1470}
1471
1472fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
1473    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1474
1475    var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
1476    var i: usize = 0;
1477    for (data) |buf| {
1478        if (iovecs_buffer.len - i == 0) break;
1479        if (buf.len != 0) {
1480            iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
1481            i += 1;
1482        }
1483    }
1484    const dest = iovecs_buffer[0..i];
1485    assert(dest[0].len > 0);
1486
1487    while (true) {
1488        try k.checkCancel();
1489        const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
1490        switch (posix.errno(rc)) {
1491            .SUCCESS => return @intCast(rc),
1492            .INTR => continue,
1493            .CANCELED => return error.Canceled,
1494            .AGAIN => {
1495                const thread: *Thread = .current();
1496                const fiber = thread.currentFiber();
1497                const ident: u32 = @bitCast(fd);
1498                const filter = std.c.EVFILT.READ;
1499                const gop = thread.wait_queues.getOrPut(k.gpa, .{
1500                    .ident = ident,
1501                    .filter = filter,
1502                }) catch return error.SystemResources;
1503                if (gop.found_existing) {
1504                    const tail_fiber = gop.value_ptr.*;
1505                    assert(tail_fiber.queue_next == null);
1506                    tail_fiber.queue_next = fiber;
1507                    gop.value_ptr.* = fiber;
1508                } else {
1509                    gop.value_ptr.* = fiber;
1510                    const changes = [_]posix.Kevent{
1511                        .{
1512                            .ident = ident,
1513                            .filter = filter,
1514                            .flags = std.c.EV.ADD | std.c.EV.ONESHOT,
1515                            .fflags = 0,
1516                            .data = 0,
1517                            .udata = @intFromPtr(fiber),
1518                        },
1519                    };
1520                    assert(0 == (posix.kevent(thread.kq_fd, &changes, &.{}, null) catch |err| {
1521                        @panic(@errorName(err)); // TODO
1522                    }));
1523                }
1524                yield(k, null, .nothing);
1525                continue;
1526            },
1527
1528            .INVAL => |err| return errnoBug(err),
1529            .FAULT => |err| return errnoBug(err),
1530            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1531            .NOBUFS => return error.SystemResources,
1532            .NOMEM => return error.SystemResources,
1533            .NOTCONN => return error.SocketUnconnected,
1534            .CONNRESET => return error.ConnectionResetByPeer,
1535            .TIMEDOUT => return error.Timeout,
1536            .PIPE => return error.SocketUnconnected,
1537            .NETDOWN => return error.NetworkDown,
1538            else => |err| return posix.unexpectedErrno(err),
1539        }
1540    }
1541}
1542
1543fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize {
1544    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1545    _ = k;
1546    _ = dest;
1547    _ = header;
1548    _ = data;
1549    _ = splat;
1550    @panic("TODO");
1551}
1552fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
1553    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1554    _ = k;
1555    _ = handle;
1556    @panic("TODO");
1557}
1558fn netInterfaceNameResolve(
1559    userdata: ?*anyopaque,
1560    name: *const net.Interface.Name,
1561) net.Interface.Name.ResolveError!net.Interface {
1562    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1563    _ = k;
1564    _ = name;
1565    @panic("TODO");
1566}
1567fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
1568    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1569    _ = k;
1570    _ = interface;
1571    @panic("TODO");
1572}
1573fn netLookup(
1574    userdata: ?*anyopaque,
1575    host_name: net.HostName,
1576    result: *Io.Queue(net.HostName.LookupResult),
1577    options: net.HostName.LookupOptions,
1578) void {
1579    const k: *Kqueue = @ptrCast(@alignCast(userdata));
1580    _ = k;
1581    _ = host_name;
1582    _ = result;
1583    _ = options;
1584    @panic("TODO");
1585}
1586
1587fn openSocketPosix(
1588    k: *Kqueue,
1589    family: posix.sa_family_t,
1590    options: IpAddress.BindOptions,
1591) error{
1592    AddressFamilyUnsupported,
1593    ProtocolUnsupportedBySystem,
1594    ProcessFdQuotaExceeded,
1595    SystemFdQuotaExceeded,
1596    SystemResources,
1597    ProtocolUnsupportedByAddressFamily,
1598    SocketModeUnsupported,
1599    OptionUnsupported,
1600    Unexpected,
1601    Canceled,
1602}!posix.socket_t {
1603    const mode = Io.Threaded.posixSocketMode(options.mode);
1604    const protocol = Io.Threaded.posixProtocol(options.protocol);
1605    const socket_fd = while (true) {
1606        try k.checkCancel();
1607        const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
1608        const socket_rc = posix.system.socket(family, flags, protocol);
1609        switch (posix.errno(socket_rc)) {
1610            .SUCCESS => {
1611                const fd: posix.fd_t = @intCast(socket_rc);
1612                errdefer posix.close(fd);
1613                if (Io.Threaded.socket_flags_unsupported) {
1614                    while (true) {
1615                        try k.checkCancel();
1616                        switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
1617                            .SUCCESS => break,
1618                            .INTR => continue,
1619                            .CANCELED => return error.Canceled,
1620                            else => |err| return posix.unexpectedErrno(err),
1621                        }
1622                    }
1623
1624                    var fl_flags: usize = while (true) {
1625                        try k.checkCancel();
1626                        const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
1627                        switch (posix.errno(rc)) {
1628                            .SUCCESS => break @intCast(rc),
1629                            .INTR => continue,
1630                            .CANCELED => return error.Canceled,
1631                            else => |err| return posix.unexpectedErrno(err),
1632                        }
1633                    };
1634                    fl_flags |= @as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
1635                    while (true) {
1636                        try k.checkCancel();
1637                        switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
1638                            .SUCCESS => break,
1639                            .INTR => continue,
1640                            .CANCELED => return error.Canceled,
1641                            else => |err| return posix.unexpectedErrno(err),
1642                        }
1643                    }
1644                }
1645                break fd;
1646            },
1647            .INTR => continue,
1648            .CANCELED => return error.Canceled,
1649
1650            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1651            .INVAL => return error.ProtocolUnsupportedBySystem,
1652            .MFILE => return error.ProcessFdQuotaExceeded,
1653            .NFILE => return error.SystemFdQuotaExceeded,
1654            .NOBUFS => return error.SystemResources,
1655            .NOMEM => return error.SystemResources,
1656            .PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
1657            .PROTOTYPE => return error.SocketModeUnsupported,
1658            else => |err| return posix.unexpectedErrno(err),
1659        }
1660    };
1661    errdefer posix.close(socket_fd);
1662
1663    if (options.ip6_only) {
1664        if (posix.IPV6 == void) return error.OptionUnsupported;
1665        try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
1666    }
1667
1668    return socket_fd;
1669}
1670
1671fn posixBind(
1672    k: *Kqueue,
1673    socket_fd: posix.socket_t,
1674    addr: *const posix.sockaddr,
1675    addr_len: posix.socklen_t,
1676) !void {
1677    while (true) {
1678        try k.checkCancel();
1679        switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
1680            .SUCCESS => break,
1681            .INTR => continue,
1682            .CANCELED => return error.Canceled,
1683
1684            .ADDRINUSE => return error.AddressInUse,
1685            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1686            .INVAL => |err| return errnoBug(err), // invalid parameters
1687            .NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
1688            .AFNOSUPPORT => return error.AddressFamilyUnsupported,
1689            .ADDRNOTAVAIL => return error.AddressUnavailable,
1690            .FAULT => |err| return errnoBug(err), // invalid `addr` pointer
1691            .NOMEM => return error.SystemResources,
1692            else => |err| return posix.unexpectedErrno(err),
1693        }
1694    }
1695}
1696
1697fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
1698    while (true) {
1699        try k.checkCancel();
1700        switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
1701            .SUCCESS => break,
1702            .INTR => continue,
1703            .CANCELED => return error.Canceled,
1704
1705            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1706            .FAULT => |err| return errnoBug(err),
1707            .INVAL => |err| return errnoBug(err), // invalid parameters
1708            .NOTSOCK => |err| return errnoBug(err), // always a race condition
1709            .NOBUFS => return error.SystemResources,
1710            else => |err| return posix.unexpectedErrno(err),
1711        }
1712    }
1713}
1714
1715fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
1716    const o: []const u8 = @ptrCast(&option);
1717    while (true) {
1718        try k.checkCancel();
1719        switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
1720            .SUCCESS => return,
1721            .INTR => continue,
1722            .CANCELED => return error.Canceled,
1723
1724            .BADF => |err| return errnoBug(err), // File descriptor used after closed.
1725            .NOTSOCK => |err| return errnoBug(err),
1726            .INVAL => |err| return errnoBug(err),
1727            .FAULT => |err| return errnoBug(err),
1728            else => |err| return posix.unexpectedErrno(err),
1729        }
1730    }
1731}
1732
1733fn checkCancel(k: *Kqueue) error{Canceled}!void {
1734    if (cancelRequested(k)) return error.Canceled;
1735}
1736
1737const Condition = struct {
1738    tail: *Fiber,
1739    event: union(enum) {
1740        queued,
1741        wake: Io.Condition.Wake,
1742    },
1743};