Commit 08b609a79f

Jacob Young <jacobly0@users.noreply.github.com>
2025-03-30 21:13:41
Io: implement sleep and fix cancel bugs
1 parent 5041c9a
Changed files (5)
lib/std/Io/EventLoop.zig
@@ -31,10 +31,12 @@ const Thread = struct {
     idle_search_index: u32,
     steal_ready_search_index: u32,
 
-    threadlocal var index: u32 = undefined;
+    const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
 
-    fn current(el: *EventLoop) *Thread {
-        return &el.threads.allocated[index];
+    threadlocal var self: *Thread = undefined;
+
+    fn current() *Thread {
+        return self;
     }
 
     fn currentFiber(thread: *Thread) *Fiber {
@@ -52,10 +54,9 @@ const Fiber = struct {
     context: Context,
     awaiter: ?*Fiber,
     queue_next: ?*Fiber,
-    can_cancel: bool,
-    canceled: bool,
+    cancel_thread: ?*Thread,
 
-    const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber)));
+    const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
 
     const max_result_align: Alignment = .@"16";
     const max_result_size = max_result_align.forward(64);
@@ -75,7 +76,7 @@ const Fiber = struct {
     );
 
     fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
-        const thread: *Thread = .current(el);
+        const thread: *Thread = .current();
         if (thread.free_queue) |free_fiber| {
             thread.free_queue = free_fiber.queue_next;
             free_fiber.queue_next = null;
@@ -101,6 +102,40 @@ const Fiber = struct {
         return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
     }
 
+    fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{AsyncCancel}!void {
+        if (@cmpxchgStrong(
+            ?*Thread,
+            &fiber.cancel_thread,
+            null,
+            thread,
+            .acq_rel,
+            .acquire,
+        )) |cancel_thread| {
+            assert(cancel_thread == Thread.canceling);
+            return error.AsyncCancel;
+        }
+    }
+
+    fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
+        if (@cmpxchgStrong(
+            ?*Thread,
+            &fiber.cancel_thread,
+            thread,
+            null,
+            .acq_rel,
+            .acquire,
+        )) |cancel_thread| assert(cancel_thread == Thread.canceling);
+    }
+
+    fn recycle(fiber: *Fiber) void {
+        const thread: *Thread = .current();
+        std.log.debug("recyling {*}", .{fiber});
+        assert(fiber.queue_next == null);
+        @memset(fiber.allocatedSlice(), undefined);
+        fiber.queue_next = thread.free_queue;
+        thread.free_queue = fiber;
+    }
+
     const Queue = struct { head: *Fiber, tail: *Fiber };
 };
 
@@ -110,13 +145,18 @@ pub fn io(el: *EventLoop) Io {
         .vtable = &.{
             .@"async" = @"async",
             .@"await" = @"await",
+
             .cancel = cancel,
             .cancelRequested = cancelRequested,
+
             .createFile = createFile,
             .openFile = openFile,
             .closeFile = closeFile,
-            .read = read,
-            .write = write,
+            .pread = pread,
+            .pwrite = pwrite,
+
+            .now = now,
+            .sleep = sleep,
         },
     };
 }
@@ -133,8 +173,7 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
             .context = undefined,
             .awaiter = null,
             .queue_next = null,
-            .can_cancel = false,
-            .canceled = false,
+            .cancel_thread = null,
         },
         .threads = .{
             .allocated = @ptrCast(allocated_slice[0..threads_size]),
@@ -142,8 +181,8 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
             .active = 1,
         },
     };
-    Thread.index = 0;
     const main_thread = &el.threads.allocated[0];
+    Thread.self = main_thread;
     const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
     (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
     main_thread.* = .{
@@ -168,24 +207,22 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
 pub fn deinit(el: *EventLoop) void {
     const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
     for (el.threads.allocated[0..active_threads]) |*thread|
-        assert(@atomicLoad(?*Fiber, &thread.ready_queue, .unordered) == null); // pending async
+        assert(@atomicLoad(?*Fiber, &thread.ready_queue, .acquire) == null); // pending async
     el.yield(null, .exit);
+    for (el.threads.allocated[0..active_threads]) |*thread| while (thread.free_queue) |free_fiber| {
+        thread.free_queue = free_fiber.queue_next;
+        free_fiber.queue_next = null;
+        el.gpa.free(free_fiber.allocatedSlice());
+    };
     const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr));
     const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
-    for (el.threads.allocated[1..active_threads]) |*thread| {
-        thread.thread.join();
-        while (thread.free_queue) |free_fiber| {
-            thread.free_queue = free_fiber.queue_next;
-            free_fiber.queue_next = null;
-            el.gpa.free(free_fiber.allocatedSlice());
-        }
-    }
+    for (el.threads.allocated[1..active_threads]) |thread| thread.thread.join();
     el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
     el.* = undefined;
 }
 
 fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
-    const thread: *Thread = .current(el);
+    const thread: *Thread = .current();
     const ready_context: *Context = if (maybe_ready_fiber) |ready_fiber|
         &ready_fiber.context
     else if (thread.ready_queue) |ready_fiber| ready_context: {
@@ -198,6 +235,7 @@ fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage
             defer thread.steal_ready_search_index += 1;
             if (thread.steal_ready_search_index == ready_threads) thread.steal_ready_search_index = 0;
             const steal_ready_search_thread = &el.threads.allocated[thread.steal_ready_search_index];
+            if (steal_ready_search_thread == thread) continue;
             const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
             if (@cmpxchgWeak(
                 ?*Fiber,
@@ -236,6 +274,7 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
         defer thread.idle_search_index += 1;
         if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
         const idle_search_thread = &el.threads.allocated[thread.idle_search_index];
+        if (idle_search_thread == thread) continue;
         if (@cmpxchgWeak(
             ?*Fiber,
             &idle_search_thread.ready_queue,
@@ -249,11 +288,11 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
             .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
             .ioprio = 0,
             .fd = idle_search_thread.io_uring.fd,
-            .off = @intFromEnum(Completion.Key.wakeup),
+            .off = @intFromEnum(Completion.UserData.wakeup),
             .addr = 0,
             .len = 0,
             .rw_flags = 0,
-            .user_data = @intFromEnum(Completion.Key.wakeup),
+            .user_data = @intFromEnum(Completion.UserData.wakeup),
             .buf_index = 0,
             .personality = 0,
             .splice_fd_in = 0,
@@ -314,15 +353,6 @@ fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
     )) |old_head| ready_queue.tail.queue_next = old_head;
 }
 
-fn recycle(el: *EventLoop, fiber: *Fiber) void {
-    const thread: *Thread = .current(el);
-    std.log.debug("recyling {*}", .{fiber});
-    assert(fiber.queue_next == null);
-    @memset(fiber.allocatedSlice(), undefined);
-    fiber.queue_next = thread.free_queue;
-    thread.free_queue = fiber;
-}
-
 fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
     message.handle(el);
     const thread: *Thread = &el.threads.allocated[0];
@@ -332,17 +362,16 @@ fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAl
 }
 
 fn threadEntry(el: *EventLoop, index: u32) void {
-    Thread.index = index;
     const thread: *Thread = &el.threads.allocated[index];
+    Thread.self = thread;
     std.log.debug("created thread idle {*}", .{&thread.idle_context});
     el.idle(thread);
 }
 
 const Completion = struct {
-    const Key = enum(usize) {
+    const UserData = enum(usize) {
         unused,
         wakeup,
-        cancel,
         cleanup,
         exit,
         /// *Fiber
@@ -369,26 +398,43 @@ fn idle(el: *EventLoop, thread: *Thread) void {
                 break :cqes_len 0;
             },
             else => |e| @panic(@errorName(e)),
-        }]) |cqe| switch (@as(Completion.Key, @enumFromInt(cqe.user_data))) {
+        }]) |cqe| switch (@as(Completion.UserData, @enumFromInt(cqe.user_data))) {
             .unused => unreachable, // bad submission queued?
             .wakeup => {},
-            .cancel => {},
             .cleanup => @panic("failed to notify other threads that we are exiting"),
             .exit => {
                 assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
                 return;
             },
-            _ => {
-                const fiber: *Fiber = @ptrFromInt(cqe.user_data);
-                assert(fiber.queue_next == null);
-                fiber.resultPointer(Completion).* = .{
-                    .result = cqe.res,
-                    .flags = cqe.flags,
-                };
-                if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
-                    ready_queue.tail.queue_next = fiber;
-                    ready_queue.tail = fiber;
-                } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
+            _ => switch (errno(cqe.res)) {
+                .INTR => getSqe(&thread.io_uring).* = .{
+                    .opcode = .ASYNC_CANCEL,
+                    .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
+                    .ioprio = 0,
+                    .fd = 0,
+                    .off = 0,
+                    .addr = cqe.user_data,
+                    .len = 0,
+                    .rw_flags = 0,
+                    .user_data = @intFromEnum(Completion.UserData.wakeup),
+                    .buf_index = 0,
+                    .personality = 0,
+                    .splice_fd_in = 0,
+                    .addr3 = 0,
+                    .resv = 0,
+                },
+                else => {
+                    const fiber: *Fiber = @ptrFromInt(cqe.user_data);
+                    assert(fiber.queue_next == null);
+                    fiber.resultPointer(Completion).* = .{
+                        .result = cqe.res,
+                        .flags = cqe.flags,
+                    };
+                    if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
+                        ready_queue.tail.queue_next = fiber;
+                        ready_queue.tail = fiber;
+                    } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
+                },
             },
         };
         if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue);
@@ -409,7 +455,7 @@ const SwitchMessage = struct {
     };
 
     fn handle(message: *const SwitchMessage, el: *EventLoop) void {
-        const thread: *Thread = .current(el);
+        const thread: *Thread = .current();
         thread.current_context = message.contexts.ready;
         switch (message.pending_task) {
             .nothing => {},
@@ -429,11 +475,11 @@ const SwitchMessage = struct {
                     .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
                     .ioprio = 0,
                     .fd = each_thread.io_uring.fd,
-                    .off = @intFromEnum(Completion.Key.exit),
+                    .off = @intFromEnum(Completion.UserData.exit),
                     .addr = 0,
                     .len = 0,
                     .rw_flags = 0,
-                    .user_data = @intFromEnum(Completion.Key.cleanup),
+                    .user_data = @intFromEnum(Completion.UserData.cleanup),
                     .buf_index = 0,
                     .personality = 0,
                     .splice_fd_in = 0,
@@ -544,6 +590,7 @@ fn @"async"(
         start(context.ptr, result.ptr);
         return null;
     };
+    errdefer fiber.recycle();
     std.log.debug("allocated {*}", .{fiber});
 
     const closure: *AsyncClosure = @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
@@ -560,8 +607,7 @@ fn @"async"(
         },
         .awaiter = null,
         .queue_next = null,
-        .can_cancel = false,
-        .canceled = false,
+        .cancel_thread = null,
     };
     closure.* = .{
         .event_loop = event_loop,
@@ -571,7 +617,7 @@ fn @"async"(
     };
     @memcpy(closure.contextPointer(), context);
 
-    event_loop.schedule(.current(event_loop), .{ .head = fiber, .tail = fiber });
+    event_loop.schedule(.current(), .{ .head = fiber, .tail = fiber });
     return @ptrCast(fiber);
 }
 
@@ -585,7 +631,7 @@ fn @"await"(
     const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
     if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
     @memcpy(result, future_fiber.resultBytes(result_alignment));
-    event_loop.recycle(future_fiber);
+    future_fiber.recycle();
 }
 
 fn cancel(
@@ -594,35 +640,37 @@ fn cancel(
     result: []u8,
     result_alignment: Alignment,
 ) void {
-    const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
     const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
-    @atomicStore(bool, &future_fiber.canceled, true, .release);
-    if (@atomicLoad(bool, &future_fiber.can_cancel, .acquire)) {
-        const thread: *Thread = .current(event_loop);
-        getSqe(&thread.io_uring).* = .{
-            .opcode = .ASYNC_CANCEL,
+    if (@atomicRmw(
+        ?*Thread,
+        &future_fiber.cancel_thread,
+        .Xchg,
+        Thread.canceling,
+        .acq_rel,
+    )) |cancel_thread| if (cancel_thread != Thread.canceling) {
+        getSqe(&Thread.current().io_uring).* = .{
+            .opcode = .MSG_RING,
             .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
             .ioprio = 0,
-            .fd = 0,
-            .off = 0,
-            .addr = @intFromPtr(future_fiber),
-            .len = 0,
+            .fd = cancel_thread.io_uring.fd,
+            .off = @intFromPtr(future_fiber),
+            .addr = 0,
+            .len = @bitCast(-@as(i32, @intFromEnum(std.os.linux.E.INTR))),
             .rw_flags = 0,
-            .user_data = @intFromEnum(Completion.Key.cancel),
+            .user_data = @intFromEnum(Completion.UserData.cleanup),
             .buf_index = 0,
             .personality = 0,
             .splice_fd_in = 0,
             .addr3 = 0,
             .resv = 0,
         };
-    }
+    };
     @"await"(userdata, any_future, result, result_alignment);
 }
 
 fn cancelRequested(userdata: ?*anyopaque) bool {
-    const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
-    const thread: *Thread = .current(event_loop);
-    return thread.currentFiber().canceled;
+    _ = userdata;
+    return @atomicLoad(?*Thread, &Thread.current().currentFiber().cancel_thread, .acquire) == Thread.canceling;
 }
 
 pub fn createFile(
@@ -632,6 +680,10 @@ pub fn createFile(
     flags: Io.CreateFlags,
 ) Io.FileOpenError!std.fs.File {
     const el: *EventLoop = @alignCast(@ptrCast(userdata));
+    const thread: *Thread = .current();
+    const iou = &thread.io_uring;
+    const fiber = thread.currentFiber();
+    try fiber.enterCancelRegion(thread);
 
     const posix = std.posix;
     const sub_path_c = try posix.toPosixPath(sub_path);
@@ -670,23 +722,30 @@ pub fn createFile(
         @panic("TODO");
     }
 
-    const thread: *Thread = .current(el);
-    const iou = &thread.io_uring;
-    const fiber = thread.currentFiber();
-    if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
-
-    const sqe = getSqe(iou);
-    sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode);
-    sqe.user_data = @intFromPtr(fiber);
+    getSqe(iou).* = .{
+        .opcode = .OPENAT,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = dir.fd,
+        .off = 0,
+        .addr = @intFromPtr(&sub_path_c),
+        .len = @intCast(flags.mode),
+        .rw_flags = @bitCast(os_flags),
+        .user_data = @intFromPtr(fiber),
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .addr3 = 0,
+        .resv = 0,
+    };
 
-    @atomicStore(bool, &fiber.can_cancel, true, .release);
     el.yield(null, .nothing);
-    @atomicStore(bool, &fiber.can_cancel, false, .release);
+    fiber.exitCancelRegion(thread);
 
     const completion = fiber.resultPointer(Completion);
     switch (errno(completion.result)) {
         .SUCCESS => return .{ .handle = completion.result },
-        .INTR => @panic("TODO is this reachable?"),
+        .INTR => unreachable,
         .CANCELED => return error.AsyncCancel,
 
         .FAULT => unreachable,
@@ -723,10 +782,10 @@ pub fn openFile(
     flags: Io.OpenFlags,
 ) Io.FileOpenError!std.fs.File {
     const el: *EventLoop = @alignCast(@ptrCast(userdata));
-    const thread: *Thread = .current(el);
+    const thread: *Thread = .current();
     const iou = &thread.io_uring;
     const fiber = thread.currentFiber();
-    if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
+    try fiber.enterCancelRegion(thread);
 
     const posix = std.posix;
     const sub_path_c = try posix.toPosixPath(sub_path);
@@ -771,18 +830,30 @@ pub fn openFile(
         @panic("TODO");
     }
 
-    const sqe = getSqe(iou);
-    sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0);
-    sqe.user_data = @intFromPtr(fiber);
+    getSqe(iou).* = .{
+        .opcode = .OPENAT,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = dir.fd,
+        .off = 0,
+        .addr = @intFromPtr(&sub_path_c),
+        .len = 0,
+        .rw_flags = @bitCast(os_flags),
+        .user_data = @intFromPtr(fiber),
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .addr3 = 0,
+        .resv = 0,
+    };
 
-    @atomicStore(bool, &fiber.can_cancel, true, .release);
     el.yield(null, .nothing);
-    @atomicStore(bool, &fiber.can_cancel, false, .release);
+    fiber.exitCancelRegion(thread);
 
     const completion = fiber.resultPointer(Completion);
     switch (errno(completion.result)) {
         .SUCCESS => return .{ .handle = completion.result },
-        .INTR => @panic("TODO is this reachable?"),
+        .INTR => unreachable,
         .CANCELED => return error.AsyncCancel,
 
         .FAULT => unreachable,
@@ -814,20 +885,33 @@ pub fn openFile(
 
 pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
     const el: *EventLoop = @alignCast(@ptrCast(userdata));
-    const thread: *Thread = .current(el);
+    const thread: *Thread = .current();
     const iou = &thread.io_uring;
     const fiber = thread.currentFiber();
 
-    const sqe = getSqe(iou);
-    sqe.prep_close(file.handle);
-    sqe.user_data = @intFromPtr(fiber);
+    getSqe(iou).* = .{
+        .opcode = .CLOSE,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = file.handle,
+        .off = 0,
+        .addr = 0,
+        .len = 0,
+        .rw_flags = 0,
+        .user_data = @intFromPtr(fiber),
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .addr3 = 0,
+        .resv = 0,
+    };
 
     el.yield(null, .nothing);
 
     const completion = fiber.resultPointer(Completion);
     switch (errno(completion.result)) {
         .SUCCESS => return,
-        .INTR => @panic("TODO is this reachable?"),
+        .INTR => unreachable,
         .CANCELED => return,
 
         .BADF => unreachable, // Always a race condition.
@@ -835,25 +919,37 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
     }
 }
 
-pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize {
+pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
     const el: *EventLoop = @alignCast(@ptrCast(userdata));
-    const thread: *Thread = .current(el);
+    const thread: *Thread = .current();
     const iou = &thread.io_uring;
     const fiber = thread.currentFiber();
-    if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
-
-    const sqe = getSqe(iou);
-    sqe.prep_read(file.handle, buffer, std.math.maxInt(u64));
-    sqe.user_data = @intFromPtr(fiber);
+    try fiber.enterCancelRegion(thread);
+
+    getSqe(iou).* = .{
+        .opcode = .READ,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = file.handle,
+        .off = @bitCast(offset),
+        .addr = @intFromPtr(buffer.ptr),
+        .len = @min(buffer.len, 0x7ffff000),
+        .rw_flags = 0,
+        .user_data = @intFromPtr(fiber),
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .addr3 = 0,
+        .resv = 0,
+    };
 
-    @atomicStore(bool, &fiber.can_cancel, true, .release);
     el.yield(null, .nothing);
-    @atomicStore(bool, &fiber.can_cancel, false, .release);
+    fiber.exitCancelRegion(thread);
 
     const completion = fiber.resultPointer(Completion);
     switch (errno(completion.result)) {
         .SUCCESS => return @as(u32, @bitCast(completion.result)),
-        .INTR => @panic("TODO is this reachable?"),
+        .INTR => unreachable,
         .CANCELED => return error.AsyncCancel,
 
         .INVAL => unreachable,
@@ -868,30 +964,44 @@ pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadE
         .NOTCONN => return error.SocketNotConnected,
         .CONNRESET => return error.ConnectionResetByPeer,
         .TIMEDOUT => return error.ConnectionTimedOut,
+        .NXIO => return error.Unseekable,
+        .SPIPE => return error.Unseekable,
+        .OVERFLOW => return error.Unseekable,
         else => |err| return std.posix.unexpectedErrno(err),
     }
 }
 
-pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize {
+pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
     const el: *EventLoop = @alignCast(@ptrCast(userdata));
-
-    const thread: *Thread = .current(el);
+    const thread: *Thread = .current();
     const iou = &thread.io_uring;
     const fiber = thread.currentFiber();
-    if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
-
-    const sqe = getSqe(iou);
-    sqe.prep_write(file.handle, buffer, std.math.maxInt(u64));
-    sqe.user_data = @intFromPtr(fiber);
+    try fiber.enterCancelRegion(thread);
+
+    getSqe(iou).* = .{
+        .opcode = .WRITE,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = file.handle,
+        .off = @bitCast(offset),
+        .addr = @intFromPtr(buffer.ptr),
+        .len = @min(buffer.len, 0x7ffff000),
+        .rw_flags = 0,
+        .user_data = @intFromPtr(fiber),
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .addr3 = 0,
+        .resv = 0,
+    };
 
-    @atomicStore(bool, &fiber.can_cancel, true, .release);
     el.yield(null, .nothing);
-    @atomicStore(bool, &fiber.can_cancel, false, .release);
+    fiber.exitCancelRegion(thread);
 
     const completion = fiber.resultPointer(Completion);
     switch (errno(completion.result)) {
         .SUCCESS => return @as(u32, @bitCast(completion.result)),
-        .INTR => @panic("TODO is this reachable?"),
+        .INTR => unreachable,
         .CANCELED => return error.AsyncCancel,
 
         .INVAL => return error.InvalidArgument,
@@ -907,17 +1017,77 @@ pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.Fi
         .ACCES => return error.AccessDenied,
         .PERM => return error.PermissionDenied,
         .PIPE => return error.BrokenPipe,
-        .CONNRESET => return error.ConnectionResetByPeer,
+        .NXIO => return error.Unseekable,
+        .SPIPE => return error.Unseekable,
+        .OVERFLOW => return error.Unseekable,
         .BUSY => return error.DeviceBusy,
-        .NXIO => return error.NoDevice,
+        .CONNRESET => return error.ConnectionResetByPeer,
         .MSGSIZE => return error.MessageTooBig,
         else => |err| return std.posix.unexpectedErrno(err),
     }
 }
 
-fn errno(signed: i32) std.posix.E {
-    const int = if (signed > -4096 and signed < 0) -signed else 0;
-    return @enumFromInt(int);
+pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
+    _ = userdata;
+    const timespec = try std.posix.clock_gettime(clockid);
+    return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
+}
+
+pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
+    const el: *EventLoop = @alignCast(@ptrCast(userdata));
+    const thread: *Thread = .current();
+    const iou = &thread.io_uring;
+    const fiber = thread.currentFiber();
+    try fiber.enterCancelRegion(thread);
+
+    const deadline_nanoseconds: i96 = switch (deadline) {
+        .nanoseconds => |nanoseconds| nanoseconds,
+        .timestamp => |timestamp| @intFromEnum(timestamp),
+    };
+    const timespec: std.os.linux.kernel_timespec = .{
+        .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
+        .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
+    };
+    getSqe(iou).* = .{
+        .opcode = .TIMEOUT,
+        .flags = 0,
+        .ioprio = 0,
+        .fd = 0,
+        .off = 0,
+        .addr = @intFromPtr(&timespec),
+        .len = 1,
+        .rw_flags = @as(u32, switch (deadline) {
+            .nanoseconds => 0,
+            .timestamp => std.os.linux.IORING_TIMEOUT_ABS,
+        }) | @as(u32, switch (clockid) {
+            .REALTIME => std.os.linux.IORING_TIMEOUT_REALTIME,
+            .MONOTONIC => 0,
+            .BOOTTIME => std.os.linux.IORING_TIMEOUT_BOOTTIME,
+            else => return error.UnsupportedClock,
+        }),
+        .user_data = @intFromPtr(fiber),
+        .buf_index = 0,
+        .personality = 0,
+        .splice_fd_in = 0,
+        .addr3 = 0,
+        .resv = 0,
+    };
+
+    el.yield(null, .nothing);
+    fiber.exitCancelRegion(thread);
+
+    const completion = fiber.resultPointer(Completion);
+    switch (errno(completion.result)) {
+        .SUCCESS, .TIME => return,
+        .INTR => unreachable,
+        .CANCELED => return error.AsyncCancel,
+
+        else => |err| return std.posix.unexpectedErrno(err),
+    }
+}
+
+fn errno(signed: i32) std.os.linux.E {
+    return .init(@bitCast(@as(isize, signed)));
 }
 
 fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
lib/std/Thread/Pool.zig
@@ -332,13 +332,18 @@ pub fn io(pool: *Pool) Io {
         .vtable = &.{
             .@"async" = @"async",
             .@"await" = @"await",
+
             .cancel = cancel,
             .cancelRequested = cancelRequested,
+
             .createFile = createFile,
             .openFile = openFile,
             .closeFile = closeFile,
-            .read = read,
-            .write = write,
+            .pread = pread,
+            .pwrite = pwrite,
+
+            .now = now,
+            .sleep = sleep,
         },
     };
 }
@@ -347,15 +352,44 @@ const AsyncClosure = struct {
     func: *const fn (context: *anyopaque, result: *anyopaque) void,
     runnable: Runnable = .{ .runFn = runFn },
     reset_event: std.Thread.ResetEvent,
-    cancel_flag: bool,
+    cancel_tid: std.Thread.Id,
     context_offset: usize,
     result_offset: usize,
 
+    const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) {
+        .int => |int_info| switch (int_info.signedness) {
+            .signed => -1,
+            .unsigned => std.math.maxInt(std.Thread.Id),
+        },
+        .pointer => @ptrFromInt(std.math.maxInt(usize)),
+        else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
+    };
+
     fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void {
         const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable));
+        const tid = std.Thread.getCurrentId();
+        if (@cmpxchgStrong(
+            std.Thread.Id,
+            &closure.cancel_tid,
+            0,
+            tid,
+            .acq_rel,
+            .acquire,
+        )) |cancel_tid| {
+            assert(cancel_tid == canceling_tid);
+            return;
+        }
         current_closure = closure;
         closure.func(closure.contextPointer(), closure.resultPointer());
         current_closure = null;
+        if (@cmpxchgStrong(
+            std.Thread.Id,
+            &closure.cancel_tid,
+            tid,
+            0,
+            .acq_rel,
+            .acquire,
+        )) |cancel_tid| assert(cancel_tid == canceling_tid);
         closure.reset_event.set();
     }
 
@@ -414,7 +448,7 @@ fn @"async"(
         .context_offset = context_offset,
         .result_offset = result_offset,
         .reset_event = .{},
-        .cancel_flag = false,
+        .cancel_tid = 0,
     };
     @memcpy(closure.contextPointer()[0..context.len], context);
     pool.run_queue.prepend(&closure.runnable.node);
@@ -456,7 +490,23 @@ fn cancel(
     _ = result_alignment;
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
-    @atomicStore(bool, &closure.cancel_flag, true, .seq_cst);
+    switch (@atomicRmw(
+        std.Thread.Id,
+        &closure.cancel_tid,
+        .Xchg,
+        AsyncClosure.canceling_tid,
+        .acq_rel,
+    )) {
+        0, AsyncClosure.canceling_tid => {},
+        else => |cancel_tid| switch (builtin.os.tag) {
+            .linux => _ = std.os.linux.tgkill(
+                std.os.linux.getpid(),
+                @bitCast(cancel_tid),
+                std.posix.SIG.IO,
+            ),
+            else => {},
+        },
+    }
     closure.waitAndFree(pool.allocator, result);
 }
 
@@ -464,7 +514,7 @@ fn cancelRequested(userdata: ?*anyopaque) bool {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     _ = pool;
     const closure = current_closure orelse return false;
-    return @atomicLoad(bool, &closure.cancel_flag, .unordered);
+    return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid;
 }
 
 fn checkCancel(pool: *Pool) error{AsyncCancel}!void {
@@ -499,14 +549,52 @@ pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
     return file.close();
 }
 
-pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize {
+pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     try pool.checkCancel();
-    return file.read(buffer);
+    return switch (offset) {
+        -1 => file.read(buffer),
+        else => file.pread(buffer, @bitCast(offset)),
+    };
 }
 
-pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize {
+pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     try pool.checkCancel();
-    return file.write(buffer);
+    return switch (offset) {
+        -1 => file.write(buffer),
+        else => file.pwrite(buffer, @bitCast(offset)),
+    };
+}
+
+pub fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
+    const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+    try pool.checkCancel();
+    const timespec = try std.posix.clock_gettime(clockid);
+    return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
+}
+
+pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
+    const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+    const deadline_nanoseconds: i96 = switch (deadline) {
+        .nanoseconds => |nanoseconds| nanoseconds,
+        .timestamp => |timestamp| @intFromEnum(timestamp),
+    };
+    var timespec: std.posix.timespec = .{
+        .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
+        .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
+    };
+    while (true) {
+        try pool.checkCancel();
+        switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clockid, .{ .ABSTIME = switch (deadline) {
+            .nanoseconds => false,
+            .timestamp => true,
+        } }, &timespec, &timespec))) {
+            .SUCCESS => return,
+            .FAULT => unreachable,
+            .INTR => {},
+            .INVAL => return error.UnsupportedClock,
+            else => |err| return std.posix.unexpectedErrno(err),
+        }
+    }
 }
lib/std/Io.zig
@@ -579,7 +579,6 @@ pub const VTable = struct {
         context_alignment: std.mem.Alignment,
         start: *const fn (context: *const anyopaque, result: *anyopaque) void,
     ) ?*AnyFuture,
-
     /// This function is only called when `async` returns a non-null value.
     ///
     /// Thread-safe.
@@ -609,7 +608,6 @@ pub const VTable = struct {
         result: []u8,
         result_alignment: std.mem.Alignment,
     ) void,
-
     /// Returns whether the current thread of execution is known to have
     /// been requested to cancel.
     ///
@@ -619,8 +617,11 @@ pub const VTable = struct {
     createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File,
     openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File,
     closeFile: *const fn (?*anyopaque, fs.File) void,
-    read: *const fn (?*anyopaque, file: fs.File, buffer: []u8) FileReadError!usize,
-    write: *const fn (?*anyopaque, file: fs.File, buffer: []const u8) FileWriteError!usize,
+    pread: *const fn (?*anyopaque, file: fs.File, buffer: []u8, offset: std.posix.off_t) FilePReadError!usize,
+    pwrite: *const fn (?*anyopaque, file: fs.File, buffer: []const u8, offset: std.posix.off_t) FilePWriteError!usize,
+
+    now: *const fn (?*anyopaque, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp,
+    sleep: *const fn (?*anyopaque, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void,
 };
 
 pub const OpenFlags = fs.File.OpenFlags;
@@ -628,7 +629,27 @@ pub const CreateFlags = fs.File.CreateFlags;
 
 pub const FileOpenError = fs.File.OpenError || error{AsyncCancel};
 pub const FileReadError = fs.File.ReadError || error{AsyncCancel};
+pub const FilePReadError = fs.File.PReadError || error{AsyncCancel};
 pub const FileWriteError = fs.File.WriteError || error{AsyncCancel};
+pub const FilePWriteError = fs.File.PWriteError || error{AsyncCancel};
+
+pub const Timestamp = enum(i96) {
+    _,
+
+    pub fn durationTo(from: Timestamp, to: Timestamp) i96 {
+        return @intFromEnum(to) - @intFromEnum(from);
+    }
+
+    pub fn addDuration(from: Timestamp, duration: i96) Timestamp {
+        return @enumFromInt(@intFromEnum(from) + duration);
+    }
+};
+pub const Deadline = union(enum) {
+    nanoseconds: i96,
+    timestamp: Timestamp,
+};
+pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{AsyncCancel};
+pub const SleepError = error{ UnsupportedClock, Unexpected, AsyncCancel };
 
 pub const AnyFuture = opaque {};
 
@@ -694,11 +715,19 @@ pub fn closeFile(io: Io, file: fs.File) void {
 }
 
 pub fn read(io: Io, file: fs.File, buffer: []u8) FileReadError!usize {
-    return io.vtable.read(io.userdata, file, buffer);
+    return @errorCast(io.pread(file, buffer, -1));
+}
+
+pub fn pread(io: Io, file: fs.File, buffer: []u8, offset: std.posix.off_t) FilePReadError!usize {
+    return io.vtable.pread(io.userdata, file, buffer, offset);
 }
 
 pub fn write(io: Io, file: fs.File, buffer: []const u8) FileWriteError!usize {
-    return io.vtable.write(io.userdata, file, buffer);
+    return @errorCast(io.pwrite(file, buffer, -1));
+}
+
+pub fn pwrite(io: Io, file: fs.File, buffer: []const u8, offset: std.posix.off_t) FilePWriteError!usize {
+    return io.vtable.pwrite(io.userdata, file, buffer, offset);
 }
 
 pub fn writeAll(io: Io, file: fs.File, bytes: []const u8) FileWriteError!void {
@@ -717,3 +746,11 @@ pub fn readAll(io: Io, file: fs.File, buffer: []u8) FileReadError!usize {
     }
     return index;
 }
+
+pub fn now(io: Io, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp {
+    return io.vtable.now(io.userdata, clockid);
+}
+
+pub fn sleep(io: Io, clockid: std.posix.clockid_t, deadline: Deadline) SleepError!void {
+    return io.vtable.sleep(io.userdata, clockid, deadline);
+}
lib/std/start.zig
@@ -651,8 +651,8 @@ inline fn callMainWithArgs(argc: usize, argv: [*][*:0]u8, envp: [][*:0]u8) u8 {
     std.os.argv = argv[0..argc];
     std.os.environ = envp;
 
+    maybeIgnoreSignals();
     std.debug.maybeEnableSegfaultHandler();
-    maybeIgnoreSigpipe();
 
     return callMain();
 }
@@ -757,8 +757,8 @@ pub fn call_wWinMain() std.os.windows.INT {
     return root.wWinMain(hInstance, null, lpCmdLine, nCmdShow);
 }
 
-fn maybeIgnoreSigpipe() void {
-    const have_sigpipe_support = switch (builtin.os.tag) {
+fn maybeIgnoreSignals() void {
+    switch (builtin.os.tag) {
         .linux,
         .plan9,
         .illumos,
@@ -773,22 +773,20 @@ fn maybeIgnoreSigpipe() void {
         .dragonfly,
         .freebsd,
         .serenity,
-        => true,
-
-        else => false,
-    };
-
-    if (have_sigpipe_support and !std.options.keep_sigpipe) {
-        const posix = std.posix;
-        const act: posix.Sigaction = .{
-            // Set handler to a noop function instead of `SIG.IGN` to prevent
-            // leaking signal disposition to a child process.
-            .handler = .{ .handler = noopSigHandler },
-            .mask = posix.sigemptyset(),
-            .flags = 0,
-        };
-        posix.sigaction(posix.SIG.PIPE, &act, null);
+        => {},
+        else => return,
     }
+    const posix = std.posix;
+    const act: posix.Sigaction = .{
+        // Set handler to a noop function instead of `SIG.IGN` to prevent
+        // leaking signal disposition to a child process.
+        .handler = .{ .handler = noopSigHandler },
+        .mask = posix.sigemptyset(),
+        .flags = 0,
+    };
+    if (!std.options.keep_sigpoll) posix.sigaction(posix.SIG.POLL, &act, null);
+    if (@hasField(posix.SIG, "IO") and posix.SIG.IO != posix.SIG.POLL and !std.options.keep_sigio) posix.sigaction(posix.SIG.IO, &act, null);
+    if (!std.options.keep_sigpipe) posix.sigaction(posix.SIG.PIPE, &act, null);
 }
 
 fn noopSigHandler(_: i32) callconv(.c) void {}
lib/std/std.zig
@@ -145,6 +145,9 @@ pub const Options = struct {
 
     crypto_fork_safety: bool = true,
 
+    keep_sigpoll: bool = false,
+    keep_sigio: bool = false,
+
     /// By default Zig disables SIGPIPE by setting a "no-op" handler for it.  Set this option
     /// to `true` to prevent that.
     ///