Commit db0dd3a480

Jacob Young <jacobly0@users.noreply.github.com>
2025-03-29 07:31:27
EventLoop: get file operations working
Something is horribly wrong with scheduling, as can be seen in the debug output, but at least it somehow manages to exit cleanly...
1 parent 238de05
Changed files (1)
lib
lib/std/Io/EventLoop.zig
@@ -9,26 +9,25 @@ const IoUring = std.os.linux.IoUring;
 
 gpa: Allocator,
 mutex: std.Thread.Mutex,
-cond: std.Thread.Condition,
 queue: std.DoublyLinkedList(void),
+/// Atomic copy of queue.len
+queue_len: usize,
 free: std.DoublyLinkedList(void),
-main_context: Context,
-exit_awaiter: ?*Fiber,
+main_fiber: Fiber,
+idle_count: usize,
 threads: std.ArrayListUnmanaged(Thread),
-/// 1 bit per thread, same order as `thread_index`.
-idle_iourings: []usize,
+exiting: bool,
 
 threadlocal var thread_index: u32 = undefined;
 
 /// Empirically saw 10KB being used by the self-hosted backend for logging.
-const idle_stack_size = 32 * 1024;
+const idle_stack_size = 64 * 1024;
 
 const io_uring_entries = 64;
 
 const Thread = struct {
     thread: std.Thread,
     idle_context: Context,
-    current_idle_context: *Context,
     current_context: *Context,
     io_uring: IoUring,
 
@@ -103,98 +102,92 @@ pub fn io(el: *EventLoop) Io {
 }
 
 pub fn init(el: *EventLoop, gpa: Allocator) !void {
-    const n_threads: usize = @max((std.Thread.getCpuCount() catch 1), 1);
-    const threads_bytes = n_threads * @sizeOf(Thread);
-    const idle_context_offset = std.mem.alignForward(usize, threads_bytes, @alignOf(Context));
-    const idle_stack_end_offset = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
-    const allocated_slice = try gpa.alignedAlloc(u8, @max(@alignOf(Thread), @alignOf(Context)), idle_stack_end_offset);
+    const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread);
+    const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
+    const allocated_slice = try gpa.alignedAlloc(u8, @alignOf(Thread), idle_stack_end_offset);
     errdefer gpa.free(allocated_slice);
-    const idle_iourings = try gpa.alloc(usize, (n_threads + @bitSizeOf(usize) - 1) / @bitSizeOf(usize));
-    errdefer gpa.free(idle_iourings);
-    @memset(idle_iourings, 0);
     el.* = .{
         .gpa = gpa,
         .mutex = .{},
-        .cond = .{},
         .queue = .{},
+        .queue_len = 0,
         .free = .{},
-        .main_context = undefined,
-        .exit_awaiter = null,
-        .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_bytes])),
-        .idle_iourings = idle_iourings,
+        .main_fiber = undefined,
+        .idle_count = 0,
+        .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_size])),
+        .exiting = false,
     };
+    thread_index = 0;
     const main_thread = el.threads.addOneAssumeCapacity();
     main_thread.io_uring = try IoUring.init(io_uring_entries, 0);
-    const main_idle_context: *Context = @alignCast(std.mem.bytesAsValue(Context, allocated_slice[idle_context_offset..][0..@sizeOf(Context)]));
-    const idle_stack_end: [*]align(@max(@alignOf(Thread), @alignOf(Context))) usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
+    const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
     (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
-    main_idle_context.* = .{
+    main_thread.idle_context = .{
         .rsp = @intFromPtr(idle_stack_end - 1),
         .rbp = 0,
         .rip = @intFromPtr(&mainIdleEntry),
     };
-    std.log.debug("created main idle {*}", .{main_idle_context});
-    main_thread.current_idle_context = main_idle_context;
-    std.log.debug("created main {*}", .{&el.main_context});
-    main_thread.current_context = &el.main_context;
+    std.log.debug("created main idle {*}", .{&main_thread.idle_context});
+    std.log.debug("created main {*}", .{&el.main_fiber});
+    main_thread.current_context = &el.main_fiber.context;
 }
 
 pub fn deinit(el: *EventLoop) void {
     assert(el.queue.len == 0); // pending async
-    el.yield(null, &el.exit_awaiter);
+    el.yield(null, .exit);
     while (el.free.pop()) |free_node| {
         const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node));
         el.gpa.free(free_fiber.allocatedSlice());
     }
-    const idle_context_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread), @alignOf(Context));
-    const idle_stack_end = std.mem.alignForward(usize, idle_context_offset + idle_stack_size, std.heap.page_size_max);
-    const allocated_ptr: [*]align(@max(@alignOf(Thread), @alignOf(Context))) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
+    const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
+    const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
     for (el.threads.items[1..]) |*thread| thread.thread.join();
-    el.gpa.free(allocated_ptr[0..idle_stack_end]);
+    el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
 }
 
-const PendingTask = union(enum) {
-    none,
-    register_awaiter: *?*Fiber,
-    io_uring_submit: *IoUring,
-};
-
-fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: PendingTask) void {
+fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
     const thread: *Thread = &el.threads.items[thread_index];
     const ready_context: *Context = ready_context: {
         const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
             el.mutex.lock();
             defer el.mutex.unlock();
-            break :ready_node el.queue.pop();
+            const ready_node = el.queue.pop();
+            @atomicStore(usize, &el.queue_len, el.queue.len, .unordered);
+            break :ready_node ready_node;
         }) |ready_node|
             @alignCast(@fieldParentPtr("queue_node", ready_node))
         else
-            break :ready_context thread.current_idle_context;
+            break :ready_context &thread.idle_context;
         break :ready_context &ready_fiber.context;
     };
     const message: SwitchMessage = .{
-        .prev_context = thread.current_context,
-        .ready_context = ready_context,
+        .contexts = .{
+            .prev = thread.current_context,
+            .ready = ready_context,
+        },
         .pending_task = pending_task,
     };
-    std.log.debug("switching from {*} to {*}", .{ message.prev_context, message.ready_context });
+    std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready });
     contextSwitch(&message).handle(el);
 }
 
 fn schedule(el: *EventLoop, fiber: *Fiber) void {
-    el.mutex.lock();
-    el.queue.append(&fiber.queue_node);
-    //for (el.idle_iourings) |*int| {
-    //    const idler_subset = @atomicLoad(usize, int, .unordered);
-    //    if (idler_subset == 0) continue;
-    //
-    //}
-    if (el.idle_count > 0) {
-        el.mutex.unlock();
-        el.cond.signal();
+    if (idle_count: {
+        el.mutex.lock();
+        defer el.mutex.unlock();
+        el.queue.append(&fiber.queue_node);
+        @atomicStore(usize, &el.queue_len, el.queue.len, .unordered);
+        break :idle_count el.idle_count;
+    } > 0) {
+        _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), 1, switch (@bitSizeOf(usize)) {
+            8 => std.os.linux.FUTEX2.SIZE_U8,
+            16 => std.os.linux.FUTEX2.SIZE_U16,
+            32 => std.os.linux.FUTEX2.SIZE_U32,
+            64 => std.os.linux.FUTEX2.SIZE_U64,
+            else => @compileError("unsupported @sizeOf(usize)"),
+        } | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring
         return;
     }
-    defer el.mutex.unlock();
     if (el.threads.items.len == el.threads.capacity) return;
     const thread = el.threads.addOneAssumeCapacity();
     thread.thread = std.Thread.spawn(.{
@@ -216,64 +209,101 @@ fn recycle(el: *EventLoop, fiber: *Fiber) void {
 
 fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
     message.handle(el);
-    el.yield(el.idle(), null);
+    el.idle();
+    el.yield(&el.main_fiber, .nothing);
     unreachable; // switched to dead fiber
 }
 
 fn threadEntry(el: *EventLoop, index: usize) void {
-    thread_index = index;
+    thread_index = @intCast(index);
     const thread: *Thread = &el.threads.items[index];
     std.log.debug("created thread idle {*}", .{&thread.idle_context});
     thread.io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
         std.log.warn("exiting worker thread during init due to io_uring init failure: {s}", .{@errorName(err)});
         return;
     };
-    thread.current_idle_context = &thread.idle_context;
     thread.current_context = &thread.idle_context;
-    _ = el.idle();
+    el.idle();
 }
 
-fn idle(el: *EventLoop) *Fiber {
+const UserData = enum(u64) {
+    queue_len_futex_wait,
+    _,
+};
+
+fn idle(el: *EventLoop) void {
     const thread: *Thread = &el.threads.items[thread_index];
-    // The idle fiber only runs on one thread.
     const iou = &thread.io_uring;
     var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
+    var futex_is_scheduled: bool = false;
 
     while (true) {
-        el.yield(null, null);
-        if (@atomicLoad(?*Fiber, &el.exit_awaiter, .acquire)) |exit_awaiter| {
-            el.cond.broadcast();
-            return exit_awaiter;
-        }
-        // TODO add uring to bit set
-        const n = iou.copy_cqes(&cqes_buffer, 1) catch @panic("TODO handle copy_cqes error");
-        const cqes = cqes_buffer[0..n];
-        for (cqes) |cqe| {
-            const fiber: *Fiber = @ptrFromInt(cqe.user_data);
-            const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer()));
-            res.* = cqe.res;
-            el.schedule(fiber);
+        el.yield(null, .nothing);
+        if (@atomicLoad(bool, &el.exiting, .acquire)) return;
+        if (!futex_is_scheduled) {
+            const sqe = getSqe(&thread.io_uring);
+            sqe.prep_rw(.FUTEX_WAIT, switch (@bitSizeOf(usize)) {
+                8 => std.os.linux.FUTEX2.SIZE_U8,
+                16 => std.os.linux.FUTEX2.SIZE_U16,
+                32 => std.os.linux.FUTEX2.SIZE_U32,
+                64 => std.os.linux.FUTEX2.SIZE_U64,
+                else => @compileError("unsupported @sizeOf(usize)"),
+            } | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0);
+            sqe.addr3 = std.math.maxInt(u64);
+            sqe.user_data = @intFromEnum(UserData.queue_len_futex_wait);
+            futex_is_scheduled = true;
         }
+        _ = iou.submit_and_wait(1) catch |err| switch (err) {
+            error.SignalInterrupt => 0,
+            else => @panic(@errorName(err)),
+        };
+        for (cqes_buffer[0 .. iou.copy_cqes(&cqes_buffer, 1) catch |err| switch (err) {
+            error.SignalInterrupt => 0,
+            else => @panic(@errorName(err)),
+        }]) |cqe| switch (@as(UserData, @enumFromInt(cqe.user_data))) {
+            .queue_len_futex_wait => futex_is_scheduled = false,
+            _ => {
+                const fiber: *Fiber = @ptrFromInt(cqe.user_data);
+                const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer()));
+                res.* = cqe.res;
+                el.schedule(fiber);
+            },
+        };
     }
 }
 
-const SwitchMessage = extern struct {
-    prev_context: *Context,
-    ready_context: *Context,
+const SwitchMessage = struct {
+    contexts: extern struct {
+        prev: *Context,
+        ready: *Context,
+    },
     pending_task: PendingTask,
 
+    const PendingTask = union(enum) {
+        nothing,
+        register_awaiter: *?*Fiber,
+        exit,
+    };
+
     fn handle(message: *const SwitchMessage, el: *EventLoop) void {
         const thread: *Thread = &el.threads.items[thread_index];
-        thread.current_context = message.ready_context;
+        thread.current_context = message.contexts.ready;
         switch (message.pending_task) {
-            .none => {},
+            .nothing => {},
             .register_awaiter => |awaiter| {
-                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.prev_context));
+                const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
                 if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
             },
-            .io_uring_submit => |iou| {
-                _ = iou.flush_sq();
-                // TODO: determine whether this return value should be used
+            .exit => {
+                @atomicStore(bool, &el.exiting, true, .unordered);
+                @atomicStore(usize, &el.queue_len, std.math.maxInt(usize), .release);
+                _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(usize), std.math.maxInt(i32), switch (@bitSizeOf(usize)) {
+                    8 => std.os.linux.FUTEX2.SIZE_U8,
+                    16 => std.os.linux.FUTEX2.SIZE_U16,
+                    32 => std.os.linux.FUTEX2.SIZE_U32,
+                    64 => std.os.linux.FUTEX2.SIZE_U64,
+                    else => @compileError("unsupported @sizeOf(usize)"),
+                } | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring
             },
         }
     }
@@ -289,7 +319,7 @@ const Context = switch (builtin.cpu.arch) {
 };
 
 inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
-    return switch (builtin.cpu.arch) {
+    return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
         .x86_64 => asm volatile (
             \\ movq 0(%%rsi), %%rax
             \\ movq 8(%%rsi), %%rcx
@@ -301,8 +331,8 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
             \\ movq 8(%%rcx), %%rbp
             \\ jmpq *16(%%rcx)
             \\0:
-            : [received_message] "={rsi}" (-> *const SwitchMessage),
-            : [message_to_send] "{rsi}" (message),
+            : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")),
+            : [message_to_send] "{rsi}" (&message.contexts),
             : "rax", "rcx", "rdx", "rbx", "rdi", //
             "r8", "r9", "r10", "r11", "r12", "r13", "r14", "r15", //
             "mm0", "mm1", "mm2", "mm3", "mm4", "mm5", "mm6", "mm7", //
@@ -313,7 +343,7 @@ inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
             "fpsr", "fpcr", "mxcsr", "rflags", "dirflag", "memory"
         ),
         else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
-    };
+    });
 }
 
 fn mainIdleEntry() callconv(.naked) void {
@@ -401,7 +431,7 @@ const AsyncClosure = struct {
         std.log.debug("{*} performing async", .{closure.fiber});
         closure.start(closure.contextPointer(), closure.fiber.resultPointer());
         const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
-        closure.event_loop.yield(awaiter, null);
+        closure.event_loop.yield(awaiter, .nothing);
         unreachable; // switched to dead fiber
     }
 };
@@ -409,17 +439,93 @@ const AsyncClosure = struct {
 pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
     const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
     const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
-    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, &future_fiber.awaiter);
+    if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
     @memcpy(result, future_fiber.resultPointer());
     event_loop.recycle(future_fiber);
 }
 
 pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.CreateFlags) std.fs.File.OpenError!std.fs.File {
-    _ = userdata;
-    _ = dir;
-    _ = sub_path;
-    _ = flags;
-    @panic("TODO");
+    const el: *EventLoop = @ptrCast(@alignCast(userdata));
+
+    const posix = std.posix;
+    const sub_path_c = try posix.toPosixPath(sub_path);
+
+    var os_flags: posix.O = .{
+        .ACCMODE = if (flags.read) .RDWR else .WRONLY,
+        .CREAT = true,
+        .TRUNC = flags.truncate,
+        .EXCL = flags.exclusive,
+    };
+    if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
+    if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
+
+    // Use the O locking flags if the os supports them to acquire the lock
+    // atomically. Note that the NONBLOCK flag is removed after the openat()
+    // call is successful.
+    const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
+    if (has_flock_open_flags) switch (flags.lock) {
+        .none => {},
+        .shared => {
+            os_flags.SHLOCK = true;
+            os_flags.NONBLOCK = flags.lock_nonblocking;
+        },
+        .exclusive => {
+            os_flags.EXLOCK = true;
+            os_flags.NONBLOCK = flags.lock_nonblocking;
+        },
+    };
+    const have_flock = @TypeOf(posix.system.flock) != void;
+
+    if (have_flock and !has_flock_open_flags and flags.lock != .none) {
+        @panic("TODO");
+    }
+
+    if (has_flock_open_flags and flags.lock_nonblocking) {
+        @panic("TODO");
+    }
+
+    const thread: *Thread = &el.threads.items[thread_index];
+    const iou = &thread.io_uring;
+    const sqe = getSqe(iou);
+    const fiber = thread.currentFiber();
+
+    sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode);
+    sqe.user_data = @intFromPtr(fiber);
+
+    el.yield(null, .nothing);
+
+    const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
+    const rc = result.*;
+    switch (errno(rc)) {
+        .SUCCESS => return .{ .handle = rc },
+        .INTR => @panic("TODO is this reachable?"),
+        .CANCELED => @panic("TODO figure out how this error code fits into things"),
+
+        .FAULT => unreachable,
+        .INVAL => return error.BadPathName,
+        .BADF => unreachable,
+        .ACCES => return error.AccessDenied,
+        .FBIG => return error.FileTooBig,
+        .OVERFLOW => return error.FileTooBig,
+        .ISDIR => return error.IsDir,
+        .LOOP => return error.SymLinkLoop,
+        .MFILE => return error.ProcessFdQuotaExceeded,
+        .NAMETOOLONG => return error.NameTooLong,
+        .NFILE => return error.SystemFdQuotaExceeded,
+        .NODEV => return error.NoDevice,
+        .NOENT => return error.FileNotFound,
+        .NOMEM => return error.SystemResources,
+        .NOSPC => return error.NoSpaceLeft,
+        .NOTDIR => return error.NotDir,
+        .PERM => return error.PermissionDenied,
+        .EXIST => return error.PathAlreadyExists,
+        .BUSY => return error.DeviceBusy,
+        .OPNOTSUPP => return error.FileLocksNotSupported,
+        .AGAIN => return error.WouldBlock,
+        .TXTBSY => return error.FileBusy,
+        .NXIO => return error.NoDevice,
+        else => |err| return posix.unexpectedErrno(err),
+    }
 }
 
 pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.OpenFlags) std.fs.File.OpenError!std.fs.File {
@@ -476,7 +582,7 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl
     sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0);
     sqe.user_data = @intFromPtr(fiber);
 
-    el.yield(null, .{ .io_uring_submit = iou });
+    el.yield(null, .nothing);
 
     const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
     const rc = result.*;
@@ -510,8 +616,6 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl
         .NXIO => return error.NoDevice,
         else => |err| return posix.unexpectedErrno(err),
     }
-
-    return .{ .handle = result.* };
 }
 
 fn errno(signed: i32) std.posix.E {
@@ -524,21 +628,109 @@ fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
 }
 
 pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
-    _ = userdata;
-    _ = file;
-    @panic("TODO");
+    const el: *EventLoop = @ptrCast(@alignCast(userdata));
+
+    const posix = std.posix;
+
+    const thread: *Thread = &el.threads.items[thread_index];
+    const iou = &thread.io_uring;
+    const sqe = getSqe(iou);
+    const fiber = thread.currentFiber();
+
+    sqe.prep_close(file.handle);
+    sqe.user_data = @intFromPtr(fiber);
+
+    el.yield(null, .nothing);
+
+    const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
+    const rc = result.*;
+    switch (errno(rc)) {
+        .SUCCESS => return,
+        .INTR => @panic("TODO is this reachable?"),
+        .CANCELED => @panic("TODO figure out how this error code fits into things"),
+
+        .BADF => unreachable, // Always a race condition.
+        else => return,
+    }
 }
 
 pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.ReadError!usize {
-    _ = userdata;
-    _ = file;
-    _ = buffer;
-    @panic("TODO");
+    const el: *EventLoop = @ptrCast(@alignCast(userdata));
+
+    const posix = std.posix;
+
+    const thread: *Thread = &el.threads.items[thread_index];
+    const iou = &thread.io_uring;
+    const sqe = getSqe(iou);
+    const fiber = thread.currentFiber();
+
+    sqe.prep_read(file.handle, buffer, std.math.maxInt(u64));
+    sqe.user_data = @intFromPtr(fiber);
+
+    el.yield(null, .nothing);
+
+    const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
+    const rc = result.*;
+    switch (errno(rc)) {
+        .SUCCESS => return @as(u32, @bitCast(rc)),
+        .INTR => @panic("TODO is this reachable?"),
+        .CANCELED => @panic("TODO figure out how this error code fits into things"),
+
+        .INVAL => unreachable,
+        .FAULT => unreachable,
+        .NOENT => return error.ProcessNotFound,
+        .AGAIN => return error.WouldBlock,
+        .BADF => return error.NotOpenForReading, // Can be a race condition.
+        .IO => return error.InputOutput,
+        .ISDIR => return error.IsDir,
+        .NOBUFS => return error.SystemResources,
+        .NOMEM => return error.SystemResources,
+        .NOTCONN => return error.SocketNotConnected,
+        .CONNRESET => return error.ConnectionResetByPeer,
+        .TIMEDOUT => return error.ConnectionTimedOut,
+        else => |err| return posix.unexpectedErrno(err),
+    }
 }
 
 pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.fs.File.WriteError!usize {
-    _ = userdata;
-    _ = file;
-    _ = buffer;
-    @panic("TODO");
+    const el: *EventLoop = @ptrCast(@alignCast(userdata));
+
+    const posix = std.posix;
+
+    const thread: *Thread = &el.threads.items[thread_index];
+    const iou = &thread.io_uring;
+    const sqe = getSqe(iou);
+    const fiber = thread.currentFiber();
+
+    sqe.prep_write(file.handle, buffer, std.math.maxInt(u64));
+    sqe.user_data = @intFromPtr(fiber);
+
+    el.yield(null, .nothing);
+
+    const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
+    const rc = result.*;
+    switch (errno(rc)) {
+        .SUCCESS => return @as(u32, @bitCast(rc)),
+        .INTR => @panic("TODO is this reachable?"),
+        .CANCELED => @panic("TODO figure out how this error code fits into things"),
+
+        .INVAL => return error.InvalidArgument,
+        .FAULT => unreachable,
+        .NOENT => return error.ProcessNotFound,
+        .AGAIN => return error.WouldBlock,
+        .BADF => return error.NotOpenForWriting, // can be a race condition.
+        .DESTADDRREQ => unreachable, // `connect` was never called.
+        .DQUOT => return error.DiskQuota,
+        .FBIG => return error.FileTooBig,
+        .IO => return error.InputOutput,
+        .NOSPC => return error.NoSpaceLeft,
+        .ACCES => return error.AccessDenied,
+        .PERM => return error.PermissionDenied,
+        .PIPE => return error.BrokenPipe,
+        .CONNRESET => return error.ConnectionResetByPeer,
+        .BUSY => return error.DeviceBusy,
+        .NXIO => return error.NoDevice,
+        .MSGSIZE => return error.MessageTooBig,
+        else => |err| return posix.unexpectedErrno(err),
+    }
 }