master
   1const EventLoop = @This();
   2const builtin = @import("builtin");
   3
   4const std = @import("../std.zig");
   5const Io = std.Io;
   6const assert = std.debug.assert;
   7const Allocator = std.mem.Allocator;
   8const Alignment = std.mem.Alignment;
   9const IoUring = std.os.linux.IoUring;
  10
  11/// Must be a thread-safe allocator.
  12gpa: Allocator,
  13mutex: std.Thread.Mutex,
  14main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
  15threads: Thread.List,
  16
  17/// Empirically saw >128KB being used by the self-hosted backend to panic.
  18const idle_stack_size = 256 * 1024;
  19
  20const max_idle_search = 4;
  21const max_steal_ready_search = 4;
  22
  23const io_uring_entries = 64;
  24
  25const Thread = struct {
  26    thread: std.Thread,
  27    idle_context: Context,
  28    current_context: *Context,
  29    ready_queue: ?*Fiber,
  30    io_uring: IoUring,
  31    idle_search_index: u32,
  32    steal_ready_search_index: u32,
  33
  34    const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
  35
  36    threadlocal var self: *Thread = undefined;
  37
  38    fn current() *Thread {
  39        return self;
  40    }
  41
  42    fn currentFiber(thread: *Thread) *Fiber {
  43        return @fieldParentPtr("context", thread.current_context);
  44    }
  45
  46    const List = struct {
  47        allocated: []Thread,
  48        reserved: u32,
  49        active: u32,
  50    };
  51};
  52
  53const Fiber = struct {
  54    required_align: void align(4),
  55    context: Context,
  56    awaiter: ?*Fiber,
  57    queue_next: ?*Fiber,
  58    cancel_thread: ?*Thread,
  59    awaiting_completions: std.StaticBitSet(3),
  60
  61    const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
  62
  63    const max_result_align: Alignment = .@"16";
  64    const max_result_size = max_result_align.forward(64);
  65    /// This includes any stack realignments that need to happen, and also the
  66    /// initial frame return address slot and argument frame, depending on target.
  67    const min_stack_size = 4 * 1024 * 1024;
  68    const max_context_align: Alignment = .@"16";
  69    const max_context_size = max_context_align.forward(1024);
  70    const max_closure_size: usize = @sizeOf(AsyncClosure);
  71    const max_closure_align: Alignment = .of(AsyncClosure);
  72    const allocation_size = std.mem.alignForward(
  73        usize,
  74        max_closure_align.max(max_context_align).forward(
  75            max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
  76        ) + max_closure_size + max_context_size,
  77        std.heap.page_size_max,
  78    );
  79
  80    fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
  81        return @ptrCast(try el.gpa.alignedAlloc(u8, .of(Fiber), allocation_size));
  82    }
  83
  84    fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
  85        return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
  86    }
  87
  88    fn allocatedEnd(f: *Fiber) [*]u8 {
  89        const allocated_slice = f.allocatedSlice();
  90        return allocated_slice[allocated_slice.len..].ptr;
  91    }
  92
  93    fn resultPointer(f: *Fiber, comptime Result: type) *Result {
  94        return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
  95    }
  96
  97    fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
  98        return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
  99    }
 100
 101    fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
 102        if (@cmpxchgStrong(
 103            ?*Thread,
 104            &fiber.cancel_thread,
 105            null,
 106            thread,
 107            .acq_rel,
 108            .acquire,
 109        )) |cancel_thread| {
 110            assert(cancel_thread == Thread.canceling);
 111            return error.Canceled;
 112        }
 113    }
 114
 115    fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
 116        if (@cmpxchgStrong(
 117            ?*Thread,
 118            &fiber.cancel_thread,
 119            thread,
 120            null,
 121            .acq_rel,
 122            .acquire,
 123        )) |cancel_thread| assert(cancel_thread == Thread.canceling);
 124    }
 125
 126    const Queue = struct { head: *Fiber, tail: *Fiber };
 127};
 128
 129fn recycle(el: *EventLoop, fiber: *Fiber) void {
 130    std.log.debug("recyling {*}", .{fiber});
 131    assert(fiber.queue_next == null);
 132    el.gpa.free(fiber.allocatedSlice());
 133}
 134
 135pub fn io(el: *EventLoop) Io {
 136    return .{
 137        .userdata = el,
 138        .vtable = &.{
 139            .async = async,
 140            .concurrent = concurrent,
 141            .await = await,
 142            .select = select,
 143            .cancel = cancel,
 144            .cancelRequested = cancelRequested,
 145
 146            .mutexLock = mutexLock,
 147            .mutexUnlock = mutexUnlock,
 148
 149            .conditionWait = conditionWait,
 150            .conditionWake = conditionWake,
 151
 152            .createFile = createFile,
 153            .fileOpen = fileOpen,
 154            .fileClose = fileClose,
 155            .pread = pread,
 156            .pwrite = pwrite,
 157
 158            .now = now,
 159            .sleep = sleep,
 160        },
 161    };
 162}
 163
 164pub fn init(el: *EventLoop, gpa: Allocator) !void {
 165    const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread);
 166    const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
 167    const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
 168    errdefer gpa.free(allocated_slice);
 169    el.* = .{
 170        .gpa = gpa,
 171        .mutex = .{},
 172        .main_fiber_buffer = undefined,
 173        .threads = .{
 174            .allocated = @ptrCast(allocated_slice[0..threads_size]),
 175            .reserved = 1,
 176            .active = 1,
 177        },
 178    };
 179    const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
 180    main_fiber.* = .{
 181        .required_align = {},
 182        .context = undefined,
 183        .awaiter = null,
 184        .queue_next = null,
 185        .cancel_thread = null,
 186        .awaiting_completions = .initEmpty(),
 187    };
 188    const main_thread = &el.threads.allocated[0];
 189    Thread.self = main_thread;
 190    const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
 191    (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
 192    main_thread.* = .{
 193        .thread = undefined,
 194        .idle_context = switch (builtin.cpu.arch) {
 195            .aarch64 => .{
 196                .sp = @intFromPtr(idle_stack_end),
 197                .fp = 0,
 198                .pc = @intFromPtr(&mainIdleEntry),
 199            },
 200            .x86_64 => .{
 201                .rsp = @intFromPtr(idle_stack_end - 1),
 202                .rbp = 0,
 203                .rip = @intFromPtr(&mainIdleEntry),
 204            },
 205            else => @compileError("unimplemented architecture"),
 206        },
 207        .current_context = &main_fiber.context,
 208        .ready_queue = null,
 209        .io_uring = try IoUring.init(io_uring_entries, 0),
 210        .idle_search_index = 1,
 211        .steal_ready_search_index = 1,
 212    };
 213    errdefer main_thread.io_uring.deinit();
 214    std.log.debug("created main idle {*}", .{&main_thread.idle_context});
 215    std.log.debug("created main {*}", .{main_fiber});
 216}
 217
 218pub fn deinit(el: *EventLoop) void {
 219    const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
 220    for (el.threads.allocated[0..active_threads]) |*thread| {
 221        const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
 222        assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
 223    }
 224    el.yield(null, .exit);
 225    const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(el.threads.allocated.ptr));
 226    const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
 227    for (el.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
 228    el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
 229    el.* = undefined;
 230}
 231
 232fn findReadyFiber(el: *EventLoop, thread: *Thread) ?*Fiber {
 233    if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| {
 234        @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
 235        ready_fiber.queue_next = null;
 236        return ready_fiber;
 237    }
 238    const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
 239    for (0..@min(max_steal_ready_search, active_threads)) |_| {
 240        defer thread.steal_ready_search_index += 1;
 241        if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0;
 242        const steal_ready_search_thread = &el.threads.allocated[0..active_threads][thread.steal_ready_search_index];
 243        if (steal_ready_search_thread == thread) continue;
 244        const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
 245        if (ready_fiber == Fiber.finished) continue;
 246        if (@cmpxchgWeak(
 247            ?*Fiber,
 248            &steal_ready_search_thread.ready_queue,
 249            ready_fiber,
 250            null,
 251            .acquire,
 252            .monotonic,
 253        )) |_| continue;
 254        @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
 255        ready_fiber.queue_next = null;
 256        return ready_fiber;
 257    }
 258    // couldn't find anything to do, so we are now open for business
 259    @atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic);
 260    return null;
 261}
 262
 263fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
 264    const thread: *Thread = .current();
 265    const ready_context = if (maybe_ready_fiber orelse el.findReadyFiber(thread)) |ready_fiber|
 266        &ready_fiber.context
 267    else
 268        &thread.idle_context;
 269    const message: SwitchMessage = .{
 270        .contexts = .{
 271            .prev = thread.current_context,
 272            .ready = ready_context,
 273        },
 274        .pending_task = pending_task,
 275    };
 276    std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready });
 277    contextSwitch(&message).handle(el);
 278}
 279
 280fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
 281    {
 282        var fiber = ready_queue.head;
 283        while (true) {
 284            std.log.debug("scheduling {*}", .{fiber});
 285            fiber = fiber.queue_next orelse break;
 286        }
 287        assert(fiber == ready_queue.tail);
 288    }
 289    // shared fields of previous `Thread` must be initialized before later ones are marked as active
 290    const new_thread_index = @atomicLoad(u32, &el.threads.active, .acquire);
 291    for (0..@min(max_idle_search, new_thread_index)) |_| {
 292        defer thread.idle_search_index += 1;
 293        if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
 294        const idle_search_thread = &el.threads.allocated[0..new_thread_index][thread.idle_search_index];
 295        if (idle_search_thread == thread) continue;
 296        if (@cmpxchgWeak(
 297            ?*Fiber,
 298            &idle_search_thread.ready_queue,
 299            null,
 300            ready_queue.head,
 301            .release,
 302            .monotonic,
 303        )) |_| continue;
 304        getSqe(&thread.io_uring).* = .{
 305            .opcode = .MSG_RING,
 306            .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
 307            .ioprio = 0,
 308            .fd = idle_search_thread.io_uring.fd,
 309            .off = @intFromEnum(Completion.UserData.wakeup),
 310            .addr = 0,
 311            .len = 0,
 312            .rw_flags = 0,
 313            .user_data = @intFromEnum(Completion.UserData.wakeup),
 314            .buf_index = 0,
 315            .personality = 0,
 316            .splice_fd_in = 0,
 317            .addr3 = 0,
 318            .resv = 0,
 319        };
 320        return;
 321    }
 322    spawn_thread: {
 323        // previous failed reservations must have completed before retrying
 324        if (new_thread_index == el.threads.allocated.len or @cmpxchgWeak(
 325            u32,
 326            &el.threads.reserved,
 327            new_thread_index,
 328            new_thread_index + 1,
 329            .acquire,
 330            .monotonic,
 331        ) != null) break :spawn_thread;
 332        const new_thread = &el.threads.allocated[new_thread_index];
 333        const next_thread_index = new_thread_index + 1;
 334        new_thread.* = .{
 335            .thread = undefined,
 336            .idle_context = undefined,
 337            .current_context = &new_thread.idle_context,
 338            .ready_queue = ready_queue.head,
 339            .io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
 340                @atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
 341                // no more access to `thread` after giving up reservation
 342                std.log.warn("unable to create worker thread due to io_uring init failure: {s}", .{@errorName(err)});
 343                break :spawn_thread;
 344            },
 345            .idle_search_index = 0,
 346            .steal_ready_search_index = 0,
 347        };
 348        new_thread.thread = std.Thread.spawn(.{
 349            .stack_size = idle_stack_size,
 350            .allocator = el.gpa,
 351        }, threadEntry, .{ el, new_thread_index }) catch |err| {
 352            new_thread.io_uring.deinit();
 353            @atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
 354            // no more access to `thread` after giving up reservation
 355            std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
 356            break :spawn_thread;
 357        };
 358        // shared fields of `Thread` must be initialized before being marked active
 359        @atomicStore(u32, &el.threads.active, next_thread_index, .release);
 360        return;
 361    }
 362    // nobody wanted it, so just queue it on ourselves
 363    while (@cmpxchgWeak(
 364        ?*Fiber,
 365        &thread.ready_queue,
 366        ready_queue.tail.queue_next,
 367        ready_queue.head,
 368        .acq_rel,
 369        .acquire,
 370    )) |old_head| ready_queue.tail.queue_next = old_head;
 371}
 372
 373fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
 374    message.handle(el);
 375    el.idle(&el.threads.allocated[0]);
 376    el.yield(@ptrCast(&el.main_fiber_buffer), .nothing);
 377    unreachable; // switched to dead fiber
 378}
 379
 380fn threadEntry(el: *EventLoop, index: u32) void {
 381    const thread: *Thread = &el.threads.allocated[index];
 382    Thread.self = thread;
 383    std.log.debug("created thread idle {*}", .{&thread.idle_context});
 384    el.idle(thread);
 385}
 386
 387const Completion = struct {
 388    const UserData = enum(usize) {
 389        unused,
 390        wakeup,
 391        cleanup,
 392        exit,
 393        /// *Fiber
 394        _,
 395    };
 396    result: i32,
 397    flags: u32,
 398};
 399
 400fn idle(el: *EventLoop, thread: *Thread) void {
 401    var maybe_ready_fiber: ?*Fiber = null;
 402    while (true) {
 403        while (maybe_ready_fiber orelse el.findReadyFiber(thread)) |ready_fiber| {
 404            el.yield(ready_fiber, .nothing);
 405            maybe_ready_fiber = null;
 406        }
 407        _ = thread.io_uring.submit_and_wait(1) catch |err| switch (err) {
 408            error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}),
 409            else => |e| @panic(@errorName(e)),
 410        };
 411        var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
 412        var maybe_ready_queue: ?Fiber.Queue = null;
 413        for (cqes_buffer[0 .. thread.io_uring.copy_cqes(&cqes_buffer, 0) catch |err| switch (err) {
 414            error.SignalInterrupt => cqes_len: {
 415                std.log.warn("copy_cqes failed with SignalInterrupt", .{});
 416                break :cqes_len 0;
 417            },
 418            else => |e| @panic(@errorName(e)),
 419        }]) |cqe| switch (@as(Completion.UserData, @enumFromInt(cqe.user_data))) {
 420            .unused => unreachable, // bad submission queued?
 421            .wakeup => {},
 422            .cleanup => @panic("failed to notify other threads that we are exiting"),
 423            .exit => {
 424                assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
 425                return;
 426            },
 427            _ => switch (errno(cqe.res)) {
 428                .INTR => getSqe(&thread.io_uring).* = .{
 429                    .opcode = .ASYNC_CANCEL,
 430                    .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
 431                    .ioprio = 0,
 432                    .fd = 0,
 433                    .off = 0,
 434                    .addr = cqe.user_data,
 435                    .len = 0,
 436                    .rw_flags = 0,
 437                    .user_data = @intFromEnum(Completion.UserData.wakeup),
 438                    .buf_index = 0,
 439                    .personality = 0,
 440                    .splice_fd_in = 0,
 441                    .addr3 = 0,
 442                    .resv = 0,
 443                },
 444                else => {
 445                    const fiber: *Fiber = @ptrFromInt(cqe.user_data);
 446                    assert(fiber.queue_next == null);
 447                    fiber.resultPointer(Completion).* = .{
 448                        .result = cqe.res,
 449                        .flags = cqe.flags,
 450                    };
 451                    if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
 452                        ready_queue.tail.queue_next = fiber;
 453                        ready_queue.tail = fiber;
 454                    } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
 455                },
 456            },
 457        };
 458        if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue);
 459    }
 460}
 461
 462const SwitchMessage = struct {
 463    contexts: extern struct {
 464        prev: *Context,
 465        ready: *Context,
 466    },
 467    pending_task: PendingTask,
 468
 469    const PendingTask = union(enum) {
 470        nothing,
 471        reschedule,
 472        recycle: *Fiber,
 473        register_awaiter: *?*Fiber,
 474        register_select: []const *Io.AnyFuture,
 475        mutex_lock: struct {
 476            prev_state: Io.Mutex.State,
 477            mutex: *Io.Mutex,
 478        },
 479        condition_wait: struct {
 480            cond: *Io.Condition,
 481            mutex: *Io.Mutex,
 482        },
 483        exit,
 484    };
 485
 486    fn handle(message: *const SwitchMessage, el: *EventLoop) void {
 487        const thread: *Thread = .current();
 488        thread.current_context = message.contexts.ready;
 489        switch (message.pending_task) {
 490            .nothing => {},
 491            .reschedule => if (message.contexts.prev != &thread.idle_context) {
 492                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 493                assert(prev_fiber.queue_next == null);
 494                el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 495            },
 496            .recycle => |fiber| {
 497                el.recycle(fiber);
 498            },
 499            .register_awaiter => |awaiter| {
 500                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 501                assert(prev_fiber.queue_next == null);
 502                if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
 503                    el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 504            },
 505            .register_select => |futures| {
 506                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 507                assert(prev_fiber.queue_next == null);
 508                for (futures) |any_future| {
 509                    const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
 510                    if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) {
 511                        const closure: *AsyncClosure = .fromFiber(future_fiber);
 512                        if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) {
 513                            el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 514                        }
 515                    }
 516                }
 517            },
 518            .mutex_lock => |mutex_lock| {
 519                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 520                assert(prev_fiber.queue_next == null);
 521                var prev_state = mutex_lock.prev_state;
 522                while (switch (prev_state) {
 523                    else => next_state: {
 524                        prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
 525                        break :next_state @cmpxchgWeak(
 526                            Io.Mutex.State,
 527                            &mutex_lock.mutex.state,
 528                            prev_state,
 529                            @enumFromInt(@intFromPtr(prev_fiber)),
 530                            .release,
 531                            .acquire,
 532                        );
 533                    },
 534                    .unlocked => @cmpxchgWeak(
 535                        Io.Mutex.State,
 536                        &mutex_lock.mutex.state,
 537                        .unlocked,
 538                        .locked_once,
 539                        .acquire,
 540                        .acquire,
 541                    ) orelse {
 542                        prev_fiber.queue_next = null;
 543                        el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
 544                        return;
 545                    },
 546                }) |next_state| prev_state = next_state;
 547            },
 548            .condition_wait => |condition_wait| {
 549                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
 550                assert(prev_fiber.queue_next == null);
 551                const cond_impl = prev_fiber.resultPointer(ConditionImpl);
 552                cond_impl.* = .{
 553                    .tail = prev_fiber,
 554                    .event = .queued,
 555                };
 556                if (@cmpxchgStrong(
 557                    ?*Fiber,
 558                    @as(*?*Fiber, @ptrCast(&condition_wait.cond.state)),
 559                    null,
 560                    prev_fiber,
 561                    .release,
 562                    .acquire,
 563                )) |waiting_fiber| {
 564                    const waiting_cond_impl = waiting_fiber.?.resultPointer(ConditionImpl);
 565                    assert(waiting_cond_impl.tail.queue_next == null);
 566                    waiting_cond_impl.tail.queue_next = prev_fiber;
 567                    waiting_cond_impl.tail = prev_fiber;
 568                }
 569                condition_wait.mutex.unlock(el.io());
 570            },
 571            .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| {
 572                getSqe(&thread.io_uring).* = .{
 573                    .opcode = .MSG_RING,
 574                    .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
 575                    .ioprio = 0,
 576                    .fd = each_thread.io_uring.fd,
 577                    .off = @intFromEnum(Completion.UserData.exit),
 578                    .addr = 0,
 579                    .len = 0,
 580                    .rw_flags = 0,
 581                    .user_data = @intFromEnum(Completion.UserData.cleanup),
 582                    .buf_index = 0,
 583                    .personality = 0,
 584                    .splice_fd_in = 0,
 585                    .addr3 = 0,
 586                    .resv = 0,
 587                };
 588            },
 589        }
 590    }
 591};
 592
 593const Context = switch (builtin.cpu.arch) {
 594    .aarch64 => extern struct {
 595        sp: u64,
 596        fp: u64,
 597        pc: u64,
 598    },
 599    .x86_64 => extern struct {
 600        rsp: u64,
 601        rbp: u64,
 602        rip: u64,
 603    },
 604    else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 605};
 606
 607inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
 608    return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
 609        .aarch64 => asm volatile (
 610            \\ ldp x0, x2, [x1]
 611            \\ ldr x3, [x2, #16]
 612            \\ mov x4, sp
 613            \\ stp x4, fp, [x0]
 614            \\ adr x5, 0f
 615            \\ ldp x4, fp, [x2]
 616            \\ str x5, [x0, #16]
 617            \\ mov sp, x4
 618            \\ br x3
 619            \\0:
 620            : [received_message] "={x1}" (-> *const @FieldType(SwitchMessage, "contexts")),
 621            : [message_to_send] "{x1}" (&message.contexts),
 622            : .{
 623              .x0 = true,
 624              .x1 = true,
 625              .x2 = true,
 626              .x3 = true,
 627              .x4 = true,
 628              .x5 = true,
 629              .x6 = true,
 630              .x7 = true,
 631              .x8 = true,
 632              .x9 = true,
 633              .x10 = true,
 634              .x11 = true,
 635              .x12 = true,
 636              .x13 = true,
 637              .x14 = true,
 638              .x15 = true,
 639              .x16 = true,
 640              .x17 = true,
 641              .x18 = true,
 642              .x19 = true,
 643              .x20 = true,
 644              .x21 = true,
 645              .x22 = true,
 646              .x23 = true,
 647              .x24 = true,
 648              .x25 = true,
 649              .x26 = true,
 650              .x27 = true,
 651              .x28 = true,
 652              .x30 = true,
 653              .z0 = true,
 654              .z1 = true,
 655              .z2 = true,
 656              .z3 = true,
 657              .z4 = true,
 658              .z5 = true,
 659              .z6 = true,
 660              .z7 = true,
 661              .z8 = true,
 662              .z9 = true,
 663              .z10 = true,
 664              .z11 = true,
 665              .z12 = true,
 666              .z13 = true,
 667              .z14 = true,
 668              .z15 = true,
 669              .z16 = true,
 670              .z17 = true,
 671              .z18 = true,
 672              .z19 = true,
 673              .z20 = true,
 674              .z21 = true,
 675              .z22 = true,
 676              .z23 = true,
 677              .z24 = true,
 678              .z25 = true,
 679              .z26 = true,
 680              .z27 = true,
 681              .z28 = true,
 682              .z29 = true,
 683              .z30 = true,
 684              .z31 = true,
 685              .p0 = true,
 686              .p1 = true,
 687              .p2 = true,
 688              .p3 = true,
 689              .p4 = true,
 690              .p5 = true,
 691              .p6 = true,
 692              .p7 = true,
 693              .p8 = true,
 694              .p9 = true,
 695              .p10 = true,
 696              .p11 = true,
 697              .p12 = true,
 698              .p13 = true,
 699              .p14 = true,
 700              .p15 = true,
 701              .fpcr = true,
 702              .fpsr = true,
 703              .ffr = true,
 704              .memory = true,
 705            }),
 706        .x86_64 => asm volatile (
 707            \\ movq 0(%%rsi), %%rax
 708            \\ movq 8(%%rsi), %%rcx
 709            \\ leaq 0f(%%rip), %%rdx
 710            \\ movq %%rsp, 0(%%rax)
 711            \\ movq %%rbp, 8(%%rax)
 712            \\ movq %%rdx, 16(%%rax)
 713            \\ movq 0(%%rcx), %%rsp
 714            \\ movq 8(%%rcx), %%rbp
 715            \\ jmpq *16(%%rcx)
 716            \\0:
 717            : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")),
 718            : [message_to_send] "{rsi}" (&message.contexts),
 719            : .{
 720              .rax = true,
 721              .rcx = true,
 722              .rdx = true,
 723              .rbx = true,
 724              .rsi = true,
 725              .rdi = true,
 726              .r8 = true,
 727              .r9 = true,
 728              .r10 = true,
 729              .r11 = true,
 730              .r12 = true,
 731              .r13 = true,
 732              .r14 = true,
 733              .r15 = true,
 734              .mm0 = true,
 735              .mm1 = true,
 736              .mm2 = true,
 737              .mm3 = true,
 738              .mm4 = true,
 739              .mm5 = true,
 740              .mm6 = true,
 741              .mm7 = true,
 742              .zmm0 = true,
 743              .zmm1 = true,
 744              .zmm2 = true,
 745              .zmm3 = true,
 746              .zmm4 = true,
 747              .zmm5 = true,
 748              .zmm6 = true,
 749              .zmm7 = true,
 750              .zmm8 = true,
 751              .zmm9 = true,
 752              .zmm10 = true,
 753              .zmm11 = true,
 754              .zmm12 = true,
 755              .zmm13 = true,
 756              .zmm14 = true,
 757              .zmm15 = true,
 758              .zmm16 = true,
 759              .zmm17 = true,
 760              .zmm18 = true,
 761              .zmm19 = true,
 762              .zmm20 = true,
 763              .zmm21 = true,
 764              .zmm22 = true,
 765              .zmm23 = true,
 766              .zmm24 = true,
 767              .zmm25 = true,
 768              .zmm26 = true,
 769              .zmm27 = true,
 770              .zmm28 = true,
 771              .zmm29 = true,
 772              .zmm30 = true,
 773              .zmm31 = true,
 774              .fpsr = true,
 775              .fpcr = true,
 776              .mxcsr = true,
 777              .rflags = true,
 778              .dirflag = true,
 779              .memory = true,
 780            }),
 781        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 782    });
 783}
 784
 785fn mainIdleEntry() callconv(.naked) void {
 786    switch (builtin.cpu.arch) {
 787        .x86_64 => asm volatile (
 788            \\ movq (%%rsp), %%rdi
 789            \\ jmp %[mainIdle:P]
 790            :
 791            : [mainIdle] "X" (&mainIdle),
 792        ),
 793        .aarch64 => asm volatile (
 794            \\ ldr x0, [sp, #-8]
 795            \\ b %[mainIdle]
 796            :
 797            : [mainIdle] "X" (&mainIdle),
 798        ),
 799        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 800    }
 801}
 802
 803fn fiberEntry() callconv(.naked) void {
 804    switch (builtin.cpu.arch) {
 805        .x86_64 => asm volatile (
 806            \\ leaq 8(%%rsp), %%rdi
 807            \\ jmp %[AsyncClosure_call:P]
 808            :
 809            : [AsyncClosure_call] "X" (&AsyncClosure.call),
 810        ),
 811        else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 812    }
 813}
 814
 815const AsyncClosure = struct {
 816    event_loop: *EventLoop,
 817    fiber: *Fiber,
 818    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 819    result_align: Alignment,
 820    already_awaited: bool,
 821
 822    fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
 823        return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
 824    }
 825
 826    fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
 827        message.handle(closure.event_loop);
 828        const fiber = closure.fiber;
 829        std.log.debug("{*} performing async", .{fiber});
 830        closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
 831        const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
 832        const ready_awaiter = r: {
 833            const a = awaiter orelse break :r null;
 834            if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
 835            break :r a;
 836        };
 837        closure.event_loop.yield(ready_awaiter, .nothing);
 838        unreachable; // switched to dead fiber
 839    }
 840
 841    fn fromFiber(fiber: *Fiber) *AsyncClosure {
 842        return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
 843            @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
 844        ) - @sizeOf(AsyncClosure));
 845    }
 846};
 847
 848fn async(
 849    userdata: ?*anyopaque,
 850    result: []u8,
 851    result_alignment: Alignment,
 852    context: []const u8,
 853    context_alignment: Alignment,
 854    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 855) ?*std.Io.AnyFuture {
 856    return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
 857        start(context.ptr, result.ptr);
 858        return null;
 859    };
 860}
 861
 862fn concurrent(
 863    userdata: ?*anyopaque,
 864    result_len: usize,
 865    result_alignment: Alignment,
 866    context: []const u8,
 867    context_alignment: Alignment,
 868    start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 869) Io.ConcurrentError!*std.Io.AnyFuture {
 870    assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
 871    assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
 872    assert(result_len <= Fiber.max_result_size); // TODO
 873    assert(context.len <= Fiber.max_context_size); // TODO
 874
 875    const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
 876    const fiber = try Fiber.allocate(event_loop);
 877    std.log.debug("allocated {*}", .{fiber});
 878
 879    const closure: *AsyncClosure = .fromFiber(fiber);
 880    fiber.* = .{
 881        .required_align = {},
 882        .context = switch (builtin.cpu.arch) {
 883            .x86_64 => .{
 884                .rsp = @intFromPtr(closure) - @sizeOf(usize),
 885                .rbp = 0,
 886                .rip = @intFromPtr(&fiberEntry),
 887            },
 888            .aarch64 => .{
 889                .sp = @intFromPtr(closure),
 890                .fp = 0,
 891                .pc = @intFromPtr(&fiberEntry),
 892            },
 893            else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
 894        },
 895        .awaiter = null,
 896        .queue_next = null,
 897        .cancel_thread = null,
 898        .awaiting_completions = .initEmpty(),
 899    };
 900    closure.* = .{
 901        .event_loop = event_loop,
 902        .fiber = fiber,
 903        .start = start,
 904        .result_align = result_alignment,
 905        .already_awaited = false,
 906    };
 907    @memcpy(closure.contextPointer(), context);
 908
 909    event_loop.schedule(.current(), .{ .head = fiber, .tail = fiber });
 910    return @ptrCast(fiber);
 911}
 912
 913fn await(
 914    userdata: ?*anyopaque,
 915    any_future: *std.Io.AnyFuture,
 916    result: []u8,
 917    result_alignment: Alignment,
 918) void {
 919    const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
 920    const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
 921    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
 922        event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
 923    @memcpy(result, future_fiber.resultBytes(result_alignment));
 924    event_loop.recycle(future_fiber);
 925}
 926
 927fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
 928    const el: *EventLoop = @ptrCast(@alignCast(userdata));
 929
 930    // Optimization to avoid the yield below.
 931    for (futures, 0..) |any_future, i| {
 932        const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
 933        if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) == Fiber.finished)
 934            return i;
 935    }
 936
 937    el.yield(null, .{ .register_select = futures });
 938
 939    std.log.debug("back from select yield", .{});
 940
 941    const my_thread: *Thread = .current();
 942    const my_fiber = my_thread.currentFiber();
 943    var result: ?usize = null;
 944
 945    for (futures, 0..) |any_future, i| {
 946        const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
 947        if (@cmpxchgStrong(?*Fiber, &future_fiber.awaiter, my_fiber, null, .seq_cst, .seq_cst)) |awaiter| {
 948            if (awaiter == Fiber.finished) {
 949                if (result == null) result = i;
 950            } else if (awaiter) |a| {
 951                const closure: *AsyncClosure = .fromFiber(a);
 952                closure.already_awaited = false;
 953            }
 954        } else {
 955            const closure: *AsyncClosure = .fromFiber(my_fiber);
 956            closure.already_awaited = false;
 957        }
 958    }
 959
 960    return result.?;
 961}
 962
 963fn cancel(
 964    userdata: ?*anyopaque,
 965    any_future: *std.Io.AnyFuture,
 966    result: []u8,
 967    result_alignment: Alignment,
 968) void {
 969    const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
 970    if (@atomicRmw(
 971        ?*Thread,
 972        &future_fiber.cancel_thread,
 973        .Xchg,
 974        Thread.canceling,
 975        .acq_rel,
 976    )) |cancel_thread| if (cancel_thread != Thread.canceling) {
 977        getSqe(&Thread.current().io_uring).* = .{
 978            .opcode = .MSG_RING,
 979            .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
 980            .ioprio = 0,
 981            .fd = cancel_thread.io_uring.fd,
 982            .off = @intFromPtr(future_fiber),
 983            .addr = 0,
 984            .len = @bitCast(-@as(i32, @intFromEnum(std.os.linux.E.INTR))),
 985            .rw_flags = 0,
 986            .user_data = @intFromEnum(Completion.UserData.cleanup),
 987            .buf_index = 0,
 988            .personality = 0,
 989            .splice_fd_in = 0,
 990            .addr3 = 0,
 991            .resv = 0,
 992        };
 993    };
 994    await(userdata, any_future, result, result_alignment);
 995}
 996
 997fn cancelRequested(userdata: ?*anyopaque) bool {
 998    _ = userdata;
 999    return @atomicLoad(?*Thread, &Thread.current().currentFiber().cancel_thread, .acquire) == Thread.canceling;
1000}
1001
1002fn createFile(
1003    userdata: ?*anyopaque,
1004    dir: Io.Dir,
1005    sub_path: []const u8,
1006    flags: Io.File.CreateFlags,
1007) Io.File.OpenError!Io.File {
1008    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1009    const thread: *Thread = .current();
1010    const iou = &thread.io_uring;
1011    const fiber = thread.currentFiber();
1012    try fiber.enterCancelRegion(thread);
1013
1014    const posix = std.posix;
1015    const sub_path_c = try posix.toPosixPath(sub_path);
1016
1017    var os_flags: posix.O = .{
1018        .ACCMODE = if (flags.read) .RDWR else .WRONLY,
1019        .CREAT = true,
1020        .TRUNC = flags.truncate,
1021        .EXCL = flags.exclusive,
1022    };
1023    if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
1024    if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
1025
1026    // Use the O locking flags if the os supports them to acquire the lock
1027    // atomically. Note that the NONBLOCK flag is removed after the openat()
1028    // call is successful.
1029    const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
1030    if (has_flock_open_flags) switch (flags.lock) {
1031        .none => {},
1032        .shared => {
1033            os_flags.SHLOCK = true;
1034            os_flags.NONBLOCK = flags.lock_nonblocking;
1035        },
1036        .exclusive => {
1037            os_flags.EXLOCK = true;
1038            os_flags.NONBLOCK = flags.lock_nonblocking;
1039        },
1040    };
1041    const have_flock = @TypeOf(posix.system.flock) != void;
1042
1043    if (have_flock and !has_flock_open_flags and flags.lock != .none) {
1044        @panic("TODO");
1045    }
1046
1047    if (has_flock_open_flags and flags.lock_nonblocking) {
1048        @panic("TODO");
1049    }
1050
1051    getSqe(iou).* = .{
1052        .opcode = .OPENAT,
1053        .flags = 0,
1054        .ioprio = 0,
1055        .fd = dir.handle,
1056        .off = 0,
1057        .addr = @intFromPtr(&sub_path_c),
1058        .len = @intCast(flags.mode),
1059        .rw_flags = @bitCast(os_flags),
1060        .user_data = @intFromPtr(fiber),
1061        .buf_index = 0,
1062        .personality = 0,
1063        .splice_fd_in = 0,
1064        .addr3 = 0,
1065        .resv = 0,
1066    };
1067
1068    el.yield(null, .nothing);
1069    fiber.exitCancelRegion(thread);
1070
1071    const completion = fiber.resultPointer(Completion);
1072    switch (errno(completion.result)) {
1073        .SUCCESS => return .{ .handle = completion.result },
1074        .INTR => unreachable,
1075        .CANCELED => return error.Canceled,
1076
1077        .FAULT => unreachable,
1078        .INVAL => return error.BadPathName,
1079        .BADF => unreachable,
1080        .ACCES => return error.AccessDenied,
1081        .FBIG => return error.FileTooBig,
1082        .OVERFLOW => return error.FileTooBig,
1083        .ISDIR => return error.IsDir,
1084        .LOOP => return error.SymLinkLoop,
1085        .MFILE => return error.ProcessFdQuotaExceeded,
1086        .NAMETOOLONG => return error.NameTooLong,
1087        .NFILE => return error.SystemFdQuotaExceeded,
1088        .NODEV => return error.NoDevice,
1089        .NOENT => return error.FileNotFound,
1090        .NOMEM => return error.SystemResources,
1091        .NOSPC => return error.NoSpaceLeft,
1092        .NOTDIR => return error.NotDir,
1093        .PERM => return error.PermissionDenied,
1094        .EXIST => return error.PathAlreadyExists,
1095        .BUSY => return error.DeviceBusy,
1096        .OPNOTSUPP => return error.FileLocksNotSupported,
1097        .AGAIN => return error.WouldBlock,
1098        .TXTBSY => return error.FileBusy,
1099        .NXIO => return error.NoDevice,
1100        else => |err| return posix.unexpectedErrno(err),
1101    }
1102}
1103
1104fn fileOpen(
1105    userdata: ?*anyopaque,
1106    dir: Io.Dir,
1107    sub_path: []const u8,
1108    flags: Io.File.OpenFlags,
1109) Io.File.OpenError!Io.File {
1110    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1111    const thread: *Thread = .current();
1112    const iou = &thread.io_uring;
1113    const fiber = thread.currentFiber();
1114    try fiber.enterCancelRegion(thread);
1115
1116    const posix = std.posix;
1117    const sub_path_c = try posix.toPosixPath(sub_path);
1118
1119    var os_flags: posix.O = .{
1120        .ACCMODE = switch (flags.mode) {
1121            .read_only => .RDONLY,
1122            .write_only => .WRONLY,
1123            .read_write => .RDWR,
1124        },
1125    };
1126
1127    if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
1128    if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
1129    if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty;
1130
1131    // Use the O locking flags if the os supports them to acquire the lock
1132    // atomically.
1133    const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
1134    if (has_flock_open_flags) {
1135        // Note that the NONBLOCK flag is removed after the openat() call
1136        // is successful.
1137        switch (flags.lock) {
1138            .none => {},
1139            .shared => {
1140                os_flags.SHLOCK = true;
1141                os_flags.NONBLOCK = flags.lock_nonblocking;
1142            },
1143            .exclusive => {
1144                os_flags.EXLOCK = true;
1145                os_flags.NONBLOCK = flags.lock_nonblocking;
1146            },
1147        }
1148    }
1149    const have_flock = @TypeOf(posix.system.flock) != void;
1150
1151    if (have_flock and !has_flock_open_flags and flags.lock != .none) {
1152        @panic("TODO");
1153    }
1154
1155    if (has_flock_open_flags and flags.lock_nonblocking) {
1156        @panic("TODO");
1157    }
1158
1159    getSqe(iou).* = .{
1160        .opcode = .OPENAT,
1161        .flags = 0,
1162        .ioprio = 0,
1163        .fd = dir.handle,
1164        .off = 0,
1165        .addr = @intFromPtr(&sub_path_c),
1166        .len = 0,
1167        .rw_flags = @bitCast(os_flags),
1168        .user_data = @intFromPtr(fiber),
1169        .buf_index = 0,
1170        .personality = 0,
1171        .splice_fd_in = 0,
1172        .addr3 = 0,
1173        .resv = 0,
1174    };
1175
1176    el.yield(null, .nothing);
1177    fiber.exitCancelRegion(thread);
1178
1179    const completion = fiber.resultPointer(Completion);
1180    switch (errno(completion.result)) {
1181        .SUCCESS => return .{ .handle = completion.result },
1182        .INTR => unreachable,
1183        .CANCELED => return error.Canceled,
1184
1185        .FAULT => unreachable,
1186        .INVAL => return error.BadPathName,
1187        .BADF => unreachable,
1188        .ACCES => return error.AccessDenied,
1189        .FBIG => return error.FileTooBig,
1190        .OVERFLOW => return error.FileTooBig,
1191        .ISDIR => return error.IsDir,
1192        .LOOP => return error.SymLinkLoop,
1193        .MFILE => return error.ProcessFdQuotaExceeded,
1194        .NAMETOOLONG => return error.NameTooLong,
1195        .NFILE => return error.SystemFdQuotaExceeded,
1196        .NODEV => return error.NoDevice,
1197        .NOENT => return error.FileNotFound,
1198        .NOMEM => return error.SystemResources,
1199        .NOSPC => return error.NoSpaceLeft,
1200        .NOTDIR => return error.NotDir,
1201        .PERM => return error.PermissionDenied,
1202        .EXIST => return error.PathAlreadyExists,
1203        .BUSY => return error.DeviceBusy,
1204        .OPNOTSUPP => return error.FileLocksNotSupported,
1205        .AGAIN => return error.WouldBlock,
1206        .TXTBSY => return error.FileBusy,
1207        .NXIO => return error.NoDevice,
1208        else => |err| return posix.unexpectedErrno(err),
1209    }
1210}
1211
1212fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
1213    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1214    const thread: *Thread = .current();
1215    const iou = &thread.io_uring;
1216    const fiber = thread.currentFiber();
1217
1218    getSqe(iou).* = .{
1219        .opcode = .CLOSE,
1220        .flags = 0,
1221        .ioprio = 0,
1222        .fd = file.handle,
1223        .off = 0,
1224        .addr = 0,
1225        .len = 0,
1226        .rw_flags = 0,
1227        .user_data = @intFromPtr(fiber),
1228        .buf_index = 0,
1229        .personality = 0,
1230        .splice_fd_in = 0,
1231        .addr3 = 0,
1232        .resv = 0,
1233    };
1234
1235    el.yield(null, .nothing);
1236
1237    const completion = fiber.resultPointer(Completion);
1238    switch (errno(completion.result)) {
1239        .SUCCESS => return,
1240        .INTR => unreachable,
1241        .CANCELED => return,
1242
1243        .BADF => unreachable, // Always a race condition.
1244        else => return,
1245    }
1246}
1247
1248fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.off_t) Io.File.PReadError!usize {
1249    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1250    const thread: *Thread = .current();
1251    const iou = &thread.io_uring;
1252    const fiber = thread.currentFiber();
1253    try fiber.enterCancelRegion(thread);
1254
1255    getSqe(iou).* = .{
1256        .opcode = .READ,
1257        .flags = 0,
1258        .ioprio = 0,
1259        .fd = file.handle,
1260        .off = @bitCast(offset),
1261        .addr = @intFromPtr(buffer.ptr),
1262        .len = @min(buffer.len, 0x7ffff000),
1263        .rw_flags = 0,
1264        .user_data = @intFromPtr(fiber),
1265        .buf_index = 0,
1266        .personality = 0,
1267        .splice_fd_in = 0,
1268        .addr3 = 0,
1269        .resv = 0,
1270    };
1271
1272    el.yield(null, .nothing);
1273    fiber.exitCancelRegion(thread);
1274
1275    const completion = fiber.resultPointer(Completion);
1276    switch (errno(completion.result)) {
1277        .SUCCESS => return @as(u32, @bitCast(completion.result)),
1278        .INTR => unreachable,
1279        .CANCELED => return error.Canceled,
1280
1281        .INVAL => unreachable,
1282        .FAULT => unreachable,
1283        .NOENT => return error.ProcessNotFound,
1284        .AGAIN => return error.WouldBlock,
1285        .BADF => return error.NotOpenForReading, // Can be a race condition.
1286        .IO => return error.InputOutput,
1287        .ISDIR => return error.IsDir,
1288        .NOBUFS => return error.SystemResources,
1289        .NOMEM => return error.SystemResources,
1290        .NOTCONN => return error.SocketUnconnected,
1291        .CONNRESET => return error.ConnectionResetByPeer,
1292        .TIMEDOUT => return error.Timeout,
1293        .NXIO => return error.Unseekable,
1294        .SPIPE => return error.Unseekable,
1295        .OVERFLOW => return error.Unseekable,
1296        else => |err| return std.posix.unexpectedErrno(err),
1297    }
1298}
1299
1300fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.posix.off_t) Io.File.PWriteError!usize {
1301    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1302    const thread: *Thread = .current();
1303    const iou = &thread.io_uring;
1304    const fiber = thread.currentFiber();
1305    try fiber.enterCancelRegion(thread);
1306
1307    getSqe(iou).* = .{
1308        .opcode = .WRITE,
1309        .flags = 0,
1310        .ioprio = 0,
1311        .fd = file.handle,
1312        .off = @bitCast(offset),
1313        .addr = @intFromPtr(buffer.ptr),
1314        .len = @min(buffer.len, 0x7ffff000),
1315        .rw_flags = 0,
1316        .user_data = @intFromPtr(fiber),
1317        .buf_index = 0,
1318        .personality = 0,
1319        .splice_fd_in = 0,
1320        .addr3 = 0,
1321        .resv = 0,
1322    };
1323
1324    el.yield(null, .nothing);
1325    fiber.exitCancelRegion(thread);
1326
1327    const completion = fiber.resultPointer(Completion);
1328    switch (errno(completion.result)) {
1329        .SUCCESS => return @as(u32, @bitCast(completion.result)),
1330        .INTR => unreachable,
1331        .CANCELED => return error.Canceled,
1332
1333        .INVAL => return error.InvalidArgument,
1334        .FAULT => unreachable,
1335        .NOENT => return error.ProcessNotFound,
1336        .AGAIN => return error.WouldBlock,
1337        .BADF => return error.NotOpenForWriting, // can be a race condition.
1338        .DESTADDRREQ => unreachable, // `connect` was never called.
1339        .DQUOT => return error.DiskQuota,
1340        .FBIG => return error.FileTooBig,
1341        .IO => return error.InputOutput,
1342        .NOSPC => return error.NoSpaceLeft,
1343        .ACCES => return error.AccessDenied,
1344        .PERM => return error.PermissionDenied,
1345        .PIPE => return error.BrokenPipe,
1346        .NXIO => return error.Unseekable,
1347        .SPIPE => return error.Unseekable,
1348        .OVERFLOW => return error.Unseekable,
1349        .BUSY => return error.DeviceBusy,
1350        .CONNRESET => return error.ConnectionResetByPeer,
1351        .MSGSIZE => return error.MessageOversize,
1352        else => |err| return std.posix.unexpectedErrno(err),
1353    }
1354}
1355
1356fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
1357    _ = userdata;
1358    const timespec = try std.posix.clock_gettime(clockid);
1359    return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
1360}
1361
1362fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
1363    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1364    const thread: *Thread = .current();
1365    const iou = &thread.io_uring;
1366    const fiber = thread.currentFiber();
1367    try fiber.enterCancelRegion(thread);
1368
1369    const deadline_nanoseconds: i96 = switch (deadline) {
1370        .duration => |duration| duration.nanoseconds,
1371        .timestamp => |timestamp| @intFromEnum(timestamp),
1372    };
1373    const timespec: std.os.linux.kernel_timespec = .{
1374        .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
1375        .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
1376    };
1377    getSqe(iou).* = .{
1378        .opcode = .TIMEOUT,
1379        .flags = 0,
1380        .ioprio = 0,
1381        .fd = 0,
1382        .off = 0,
1383        .addr = @intFromPtr(&timespec),
1384        .len = 1,
1385        .rw_flags = @as(u32, switch (deadline) {
1386            .duration => 0,
1387            .timestamp => std.os.linux.IORING_TIMEOUT_ABS,
1388        }) | @as(u32, switch (clockid) {
1389            .REALTIME => std.os.linux.IORING_TIMEOUT_REALTIME,
1390            .MONOTONIC => 0,
1391            .BOOTTIME => std.os.linux.IORING_TIMEOUT_BOOTTIME,
1392            else => return error.UnsupportedClock,
1393        }),
1394        .user_data = @intFromPtr(fiber),
1395        .buf_index = 0,
1396        .personality = 0,
1397        .splice_fd_in = 0,
1398        .addr3 = 0,
1399        .resv = 0,
1400    };
1401
1402    el.yield(null, .nothing);
1403    fiber.exitCancelRegion(thread);
1404
1405    const completion = fiber.resultPointer(Completion);
1406    switch (errno(completion.result)) {
1407        .SUCCESS, .TIME => return,
1408        .INTR => unreachable,
1409        .CANCELED => return error.Canceled,
1410
1411        else => |err| return std.posix.unexpectedErrno(err),
1412    }
1413}
1414
1415fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void {
1416    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1417    el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex } });
1418}
1419fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
1420    var maybe_waiting_fiber: ?*Fiber = @ptrFromInt(@intFromEnum(prev_state));
1421    while (if (maybe_waiting_fiber) |waiting_fiber| @cmpxchgWeak(
1422        Io.Mutex.State,
1423        &mutex.state,
1424        @enumFromInt(@intFromPtr(waiting_fiber)),
1425        @enumFromInt(@intFromPtr(waiting_fiber.queue_next)),
1426        .release,
1427        .acquire,
1428    ) else @cmpxchgWeak(
1429        Io.Mutex.State,
1430        &mutex.state,
1431        .locked_once,
1432        .unlocked,
1433        .release,
1434        .acquire,
1435    ) orelse return) |next_state| maybe_waiting_fiber = @ptrFromInt(@intFromEnum(next_state));
1436    maybe_waiting_fiber.?.queue_next = null;
1437    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1438    el.yield(maybe_waiting_fiber.?, .reschedule);
1439}
1440
1441const ConditionImpl = struct {
1442    tail: *Fiber,
1443    event: union(enum) {
1444        queued,
1445        wake: Io.Condition.Wake,
1446    },
1447};
1448
1449fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
1450    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1451    el.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
1452    const thread = Thread.current();
1453    const fiber = thread.currentFiber();
1454    const cond_impl = fiber.resultPointer(ConditionImpl);
1455    try mutex.lock(el.io());
1456    switch (cond_impl.event) {
1457        .queued => {},
1458        .wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) {
1459            .one => if (@cmpxchgStrong(
1460                ?*Fiber,
1461                @as(*?*Fiber, @ptrCast(&cond.state)),
1462                null,
1463                next_fiber,
1464                .release,
1465                .acquire,
1466            )) |old_fiber| {
1467                const old_cond_impl = old_fiber.?.resultPointer(ConditionImpl);
1468                assert(old_cond_impl.tail.queue_next == null);
1469                old_cond_impl.tail.queue_next = next_fiber;
1470                old_cond_impl.tail = cond_impl.tail;
1471            },
1472            .all => el.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }),
1473        },
1474    }
1475    fiber.queue_next = null;
1476}
1477
1478fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
1479    const el: *EventLoop = @ptrCast(@alignCast(userdata));
1480    const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
1481    waiting_fiber.resultPointer(ConditionImpl).event = .{ .wake = wake };
1482    el.yield(waiting_fiber, .reschedule);
1483}
1484
1485fn errno(signed: i32) std.os.linux.E {
1486    return .init(@bitCast(@as(isize, signed)));
1487}
1488
1489fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
1490    while (true) return iou.get_sqe() catch {
1491        _ = iou.submit_and_wait(0) catch |err| switch (err) {
1492            error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}),
1493            else => |e| @panic(@errorName(e)),
1494        };
1495        continue;
1496    };
1497}