Commit 5041c9ad9c
Changed files (3)
lib
std
lib/std/Io/EventLoop.zig
@@ -10,38 +10,50 @@ const IoUring = std.os.linux.IoUring;
/// Must be a thread-safe allocator.
gpa: Allocator,
mutex: std.Thread.Mutex,
-queue: std.DoublyLinkedList,
-/// Atomic copy of queue.len
-queue_len: u32,
-free: std.DoublyLinkedList,
main_fiber: Fiber,
-idle_count: usize,
-threads: std.ArrayListUnmanaged(Thread),
-exiting: bool,
-
-threadlocal var thread_index: u32 = undefined;
+threads: Thread.List,
/// Empirically saw >128KB being used by the self-hosted backend to panic.
const idle_stack_size = 256 * 1024;
+const max_idle_search = 4;
+const max_steal_ready_search = 4;
+
const io_uring_entries = 64;
const Thread = struct {
thread: std.Thread,
idle_context: Context,
current_context: *Context,
+ ready_queue: ?*Fiber,
+ free_queue: ?*Fiber,
io_uring: IoUring,
+ idle_search_index: u32,
+ steal_ready_search_index: u32,
+
+ threadlocal var index: u32 = undefined;
+
+ fn current(el: *EventLoop) *Thread {
+ return &el.threads.allocated[index];
+ }
fn currentFiber(thread: *Thread) *Fiber {
return @fieldParentPtr("context", thread.current_context);
}
+
+ const List = struct {
+ allocated: []Thread,
+ reserved: u32,
+ active: u32,
+ };
};
const Fiber = struct {
context: Context,
awaiter: ?*Fiber,
- queue_node: std.DoublyLinkedList.Node,
- result_align: Alignment,
+ queue_next: ?*Fiber,
+ can_cancel: bool,
+ canceled: bool,
const finished: ?*Fiber = @ptrFromInt(std.mem.alignBackward(usize, std.math.maxInt(usize), @alignOf(Fiber)));
@@ -63,14 +75,13 @@ const Fiber = struct {
);
fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
- return if (free_node: {
- el.mutex.lock();
- defer el.mutex.unlock();
- break :free_node el.free.pop();
- }) |free_node|
- @alignCast(@fieldParentPtr("queue_node", free_node))
- else
- @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size));
+ const thread: *Thread = .current(el);
+ if (thread.free_queue) |free_fiber| {
+ thread.free_queue = free_fiber.queue_next;
+ free_fiber.queue_next = null;
+ return free_fiber;
+ }
+ return @ptrCast(try el.gpa.alignedAlloc(u8, @alignOf(Fiber), allocation_size));
}
fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
@@ -82,9 +93,15 @@ const Fiber = struct {
return allocated_slice[allocated_slice.len..].ptr;
}
- fn resultPointer(f: *Fiber) [*]u8 {
- return @ptrFromInt(f.result_align.forward(@intFromPtr(f) + @sizeOf(Fiber)));
+ fn resultPointer(f: *Fiber, comptime Result: type) *Result {
+ return @alignCast(@ptrCast(f.resultBytes(.of(Result))));
+ }
+
+ fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
+ return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
}
+
+ const Queue = struct { head: *Fiber, tail: *Fiber };
};
pub fn io(el: *EventLoop) Io {
@@ -93,6 +110,8 @@ pub fn io(el: *EventLoop) Io {
.vtable = &.{
.@"async" = @"async",
.@"await" = @"await",
+ .cancel = cancel,
+ .cancelRequested = cancelRequested,
.createFile = createFile,
.openFile = openFile,
.closeFile = closeFile,
@@ -110,58 +129,86 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
el.* = .{
.gpa = gpa,
.mutex = .{},
- .queue = .{},
- .queue_len = 0,
- .free = .{},
- .main_fiber = undefined,
- .idle_count = 0,
- .threads = .initBuffer(@ptrCast(allocated_slice[0..threads_size])),
- .exiting = false,
+ .main_fiber = .{
+ .context = undefined,
+ .awaiter = null,
+ .queue_next = null,
+ .can_cancel = false,
+ .canceled = false,
+ },
+ .threads = .{
+ .allocated = @ptrCast(allocated_slice[0..threads_size]),
+ .reserved = 1,
+ .active = 1,
+ },
};
- thread_index = 0;
- const main_thread = el.threads.addOneAssumeCapacity();
- main_thread.io_uring = try IoUring.init(io_uring_entries, 0);
+ Thread.index = 0;
+ const main_thread = &el.threads.allocated[0];
const idle_stack_end: [*]usize = @alignCast(@ptrCast(allocated_slice[idle_stack_end_offset..].ptr));
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
- main_thread.idle_context = .{
- .rsp = @intFromPtr(idle_stack_end - 1),
- .rbp = 0,
- .rip = @intFromPtr(&mainIdleEntry),
+ main_thread.* = .{
+ .thread = undefined,
+ .idle_context = .{
+ .rsp = @intFromPtr(idle_stack_end - 1),
+ .rbp = 0,
+ .rip = @intFromPtr(&mainIdleEntry),
+ },
+ .current_context = &el.main_fiber.context,
+ .ready_queue = null,
+ .free_queue = null,
+ .io_uring = try IoUring.init(io_uring_entries, 0),
+ .idle_search_index = 1,
+ .steal_ready_search_index = 1,
};
+ errdefer main_thread.io_uring.deinit();
std.log.debug("created main idle {*}", .{&main_thread.idle_context});
std.log.debug("created main {*}", .{&el.main_fiber});
- main_thread.current_context = &el.main_fiber.context;
}
pub fn deinit(el: *EventLoop) void {
- assert(el.queue.len == 0); // pending async
+ const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
+ for (el.threads.allocated[0..active_threads]) |*thread|
+ assert(@atomicLoad(?*Fiber, &thread.ready_queue, .unordered) == null); // pending async
el.yield(null, .exit);
- while (el.free.pop()) |free_node| {
- const free_fiber: *Fiber = @alignCast(@fieldParentPtr("queue_node", free_node));
- el.gpa.free(free_fiber.allocatedSlice());
+ const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.allocated.ptr));
+ const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
+ for (el.threads.allocated[1..active_threads]) |*thread| {
+ thread.thread.join();
+ while (thread.free_queue) |free_fiber| {
+ thread.free_queue = free_fiber.queue_next;
+ free_fiber.queue_next = null;
+ el.gpa.free(free_fiber.allocatedSlice());
+ }
}
- const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.capacity * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
- const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @alignCast(@ptrCast(el.threads.items.ptr));
- for (el.threads.items[1..]) |*thread| thread.thread.join();
el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
el.* = undefined;
}
-fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
- const thread: *Thread = &el.threads.items[thread_index];
- const ready_context: *Context = ready_context: {
- const ready_fiber: *Fiber = optional_fiber orelse if (ready_node: {
- el.mutex.lock();
- defer el.mutex.unlock();
- const expected_queue_len = std.math.lossyCast(u32, el.queue.len);
- const ready_node = el.queue.pop();
- _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic);
- break :ready_node ready_node;
- }) |ready_node|
- @alignCast(@fieldParentPtr("queue_node", ready_node))
- else
- break :ready_context &thread.idle_context;
+fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
+ const thread: *Thread = .current(el);
+ const ready_context: *Context = if (maybe_ready_fiber) |ready_fiber|
+ &ready_fiber.context
+ else if (thread.ready_queue) |ready_fiber| ready_context: {
+ thread.ready_queue = ready_fiber.queue_next;
+ ready_fiber.queue_next = null;
break :ready_context &ready_fiber.context;
+ } else ready_context: {
+ const ready_threads = @atomicLoad(u32, &el.threads.active, .acquire);
+ break :ready_context for (0..max_steal_ready_search) |_| {
+ defer thread.steal_ready_search_index += 1;
+ if (thread.steal_ready_search_index == ready_threads) thread.steal_ready_search_index = 0;
+ const steal_ready_search_thread = &el.threads.allocated[thread.steal_ready_search_index];
+ const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
+ if (@cmpxchgWeak(
+ ?*Fiber,
+ &steal_ready_search_thread.ready_queue,
+ ready_fiber,
+ @atomicLoad(?*Fiber, &ready_fiber.queue_next, .acquire),
+ .acq_rel,
+ .monotonic,
+ )) |_| continue;
+ break &ready_fiber.context;
+ } else &thread.idle_context;
};
const message: SwitchMessage = .{
.contexts = .{
@@ -174,111 +221,177 @@ fn yield(el: *EventLoop, optional_fiber: ?*Fiber, pending_task: SwitchMessage.Pe
contextSwitch(&message).handle(el);
}
-fn schedule(el: *EventLoop, fiber: *Fiber) void {
- std.log.debug("scheduling {*}", .{fiber});
- if (idle_count: {
- el.mutex.lock();
- defer el.mutex.unlock();
- const expected_queue_len = std.math.lossyCast(u32, el.queue.len);
- el.queue.append(&fiber.queue_node);
- _ = @cmpxchgStrong(u32, &el.queue_len, expected_queue_len, std.math.lossyCast(u32, el.queue.len), .monotonic, .monotonic);
- break :idle_count el.idle_count;
- } > 0) {
- _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), 1, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: io_uring
+fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
+ {
+ var fiber = ready_queue.head;
+ while (true) {
+ std.log.debug("scheduling {*}", .{fiber});
+ fiber = fiber.queue_next orelse break;
+ }
+ assert(fiber == ready_queue.tail);
+ }
+ // shared fields of previous `Thread` must be initialized before later ones are marked as active
+ const new_thread_index = @atomicLoad(u32, &el.threads.active, .acquire);
+ for (0..max_idle_search) |_| {
+ defer thread.idle_search_index += 1;
+ if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
+ const idle_search_thread = &el.threads.allocated[thread.idle_search_index];
+ if (@cmpxchgWeak(
+ ?*Fiber,
+ &idle_search_thread.ready_queue,
+ null,
+ ready_queue.head,
+ .acq_rel,
+ .monotonic,
+ )) |_| continue;
+ getSqe(&thread.io_uring).* = .{
+ .opcode = .MSG_RING,
+ .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
+ .ioprio = 0,
+ .fd = idle_search_thread.io_uring.fd,
+ .off = @intFromEnum(Completion.Key.wakeup),
+ .addr = 0,
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = @intFromEnum(Completion.Key.wakeup),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
return;
}
- if (el.threads.items.len == el.threads.capacity) return;
- const thread = el.threads.addOneAssumeCapacity();
- thread.thread = std.Thread.spawn(.{
- .stack_size = idle_stack_size,
- .allocator = el.gpa,
- }, threadEntry, .{ el, el.threads.items.len - 1 }) catch {
- el.threads.items.len -= 1;
+ spawn_thread: {
+ // previous failed reservations must have completed before retrying
+ if (new_thread_index == el.threads.allocated.len or @cmpxchgWeak(
+ u32,
+ &el.threads.reserved,
+ new_thread_index,
+ new_thread_index + 1,
+ .acquire,
+ .monotonic,
+ ) != null) break :spawn_thread;
+ const new_thread = &el.threads.allocated[new_thread_index];
+ const next_thread_index = new_thread_index + 1;
+ new_thread.* = .{
+ .thread = undefined,
+ .idle_context = undefined,
+ .current_context = &new_thread.idle_context,
+ .ready_queue = ready_queue.head,
+ .free_queue = null,
+ .io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
+ @atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
+ // no more access to `thread` after giving up reservation
+ std.log.warn("unable to create worker thread due to io_uring init failure: {s}", .{@errorName(err)});
+ break :spawn_thread;
+ },
+ .idle_search_index = next_thread_index,
+ .steal_ready_search_index = next_thread_index,
+ };
+ new_thread.thread = std.Thread.spawn(.{
+ .stack_size = idle_stack_size,
+ .allocator = el.gpa,
+ }, threadEntry, .{ el, new_thread_index }) catch |err| {
+ new_thread.io_uring.deinit();
+ @atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
+ // no more access to `thread` after giving up reservation
+ std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
+ break :spawn_thread;
+ };
+ // shared fields of `Thread` must be initialized before being marked active
+ @atomicStore(u32, &el.threads.active, next_thread_index, .release);
return;
- };
+ }
+ // nobody wanted it, so just queue it on ourselves
+ while (@cmpxchgWeak(
+ ?*Fiber,
+ &thread.ready_queue,
+ ready_queue.tail.queue_next,
+ ready_queue.head,
+ .acq_rel,
+ .acquire,
+ )) |old_head| ready_queue.tail.queue_next = old_head;
}
fn recycle(el: *EventLoop, fiber: *Fiber) void {
+ const thread: *Thread = .current(el);
std.log.debug("recyling {*}", .{fiber});
+ assert(fiber.queue_next == null);
@memset(fiber.allocatedSlice(), undefined);
- el.mutex.lock();
- defer el.mutex.unlock();
- el.free.append(&fiber.queue_node);
+ fiber.queue_next = thread.free_queue;
+ thread.free_queue = fiber;
}
fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
message.handle(el);
- el.idle();
+ const thread: *Thread = &el.threads.allocated[0];
+ el.idle(thread);
el.yield(&el.main_fiber, .nothing);
unreachable; // switched to dead fiber
}
-fn threadEntry(el: *EventLoop, index: usize) void {
- thread_index = @intCast(index);
- const thread: *Thread = &el.threads.items[index];
+fn threadEntry(el: *EventLoop, index: u32) void {
+ Thread.index = index;
+ const thread: *Thread = &el.threads.allocated[index];
std.log.debug("created thread idle {*}", .{&thread.idle_context});
- thread.io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
- std.log.warn("exiting worker thread during init due to io_uring init failure: {s}", .{@errorName(err)});
- return;
- };
- thread.current_context = &thread.idle_context;
- el.idle();
+ el.idle(thread);
}
-const CompletionKey = enum(u64) {
- queue_len_futex_wait = 1,
- _,
+const Completion = struct {
+ const Key = enum(usize) {
+ unused,
+ wakeup,
+ cancel,
+ cleanup,
+ exit,
+ /// *Fiber
+ _,
+ };
+ result: i32,
+ flags: u32,
};
-fn idle(el: *EventLoop) void {
- const thread: *Thread = &el.threads.items[thread_index];
- const iou = &thread.io_uring;
- var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
- var queue_len_futex_is_scheduled: bool = false;
-
+fn idle(el: *EventLoop, thread: *Thread) void {
+ var maybe_ready_fiber: ?*Fiber = null;
while (true) {
- el.yield(null, .nothing);
- if (@atomicLoad(bool, &el.exiting, .acquire)) return;
- if (!queue_len_futex_is_scheduled) {
- const sqe = getSqe(&thread.io_uring);
- sqe.prep_rw(.FUTEX_WAIT, std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE, @intFromPtr(&el.queue_len), 0, 0);
- sqe.addr3 = std.math.maxInt(u32);
- sqe.user_data = @intFromEnum(CompletionKey.queue_len_futex_wait);
- queue_len_futex_is_scheduled = true;
- }
- _ = iou.submit_and_wait(1) catch |err| switch (err) {
- error.SignalInterrupt => std.log.debug("submit_and_wait: SignalInterrupt", .{}),
- else => @panic(@errorName(err)),
+ el.yield(maybe_ready_fiber, .nothing);
+ maybe_ready_fiber = null;
+ _ = thread.io_uring.submit_and_wait(1) catch |err| switch (err) {
+ error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}),
+ else => |e| @panic(@errorName(e)),
};
- for (cqes_buffer[0 .. iou.copy_cqes(&cqes_buffer, 1) catch |err| switch (err) {
+ var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
+ var maybe_ready_queue: ?Fiber.Queue = null;
+ for (cqes_buffer[0 .. thread.io_uring.copy_cqes(&cqes_buffer, 0) catch |err| switch (err) {
error.SignalInterrupt => cqes_len: {
- std.log.debug("copy_cqes: SignalInterrupt", .{});
+ std.log.warn("copy_cqes failed with SignalInterrupt", .{});
break :cqes_len 0;
},
- else => @panic(@errorName(err)),
- }]) |cqe| switch (@as(CompletionKey, @enumFromInt(cqe.user_data))) {
- .queue_len_futex_wait => {
- switch (errno(cqe.res)) {
- .SUCCESS, .AGAIN => {},
- .INVAL => unreachable,
- else => |err| {
- std.posix.unexpectedErrno(err) catch {};
- @panic("unexpected");
- },
- }
- std.log.debug("{*} woken up with queue size of {d}", .{
- &thread.idle_context,
- @atomicLoad(u32, &el.queue_len, .unordered),
- });
- queue_len_futex_is_scheduled = false;
+ else => |e| @panic(@errorName(e)),
+ }]) |cqe| switch (@as(Completion.Key, @enumFromInt(cqe.user_data))) {
+ .unused => unreachable, // bad submission queued?
+ .wakeup => {},
+ .cancel => {},
+ .cleanup => @panic("failed to notify other threads that we are exiting"),
+ .exit => {
+ assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
+ return;
},
_ => {
const fiber: *Fiber = @ptrFromInt(cqe.user_data);
- const res: *i32 = @ptrCast(@alignCast(fiber.resultPointer()));
- res.* = cqe.res;
- el.schedule(fiber);
+ assert(fiber.queue_next == null);
+ fiber.resultPointer(Completion).* = .{
+ .result = cqe.res,
+ .flags = cqe.flags,
+ };
+ if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
+ ready_queue.tail.queue_next = fiber;
+ ready_queue.tail = fiber;
+ } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
},
};
+ if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue);
}
}
@@ -296,18 +409,37 @@ const SwitchMessage = struct {
};
fn handle(message: *const SwitchMessage, el: *EventLoop) void {
- const thread: *Thread = &el.threads.items[thread_index];
+ const thread: *Thread = .current(el);
thread.current_context = message.contexts.ready;
switch (message.pending_task) {
.nothing => {},
.register_awaiter => |awaiter| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
- if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) el.schedule(prev_fiber);
+ if (@atomicRmw(
+ ?*Fiber,
+ awaiter,
+ .Xchg,
+ prev_fiber,
+ .acq_rel,
+ ) == Fiber.finished) el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
},
- .exit => {
- @atomicStore(bool, &el.exiting, true, .unordered);
- @atomicStore(u32, &el.queue_len, std.math.maxInt(u32), .release);
- _ = std.os.linux.futex2_wake(&el.queue_len, std.math.maxInt(u32), std.math.maxInt(i32), std.os.linux.FUTEX2.SIZE_U32 | std.os.linux.FUTEX2.PRIVATE); // TODO: use io_uring
+ .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| {
+ getSqe(&thread.io_uring).* = .{
+ .opcode = .MSG_RING,
+ .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
+ .ioprio = 0,
+ .fd = each_thread.io_uring.fd,
+ .off = @intFromEnum(Completion.Key.exit),
+ .addr = 0,
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = @intFromEnum(Completion.Key.cleanup),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
},
}
}
@@ -374,7 +506,27 @@ fn fiberEntry() callconv(.naked) void {
}
}
-pub fn @"async"(
+const AsyncClosure = struct {
+ event_loop: *EventLoop,
+ fiber: *Fiber,
+ start: *const fn (context: *const anyopaque, result: *anyopaque) void,
+ result_align: Alignment,
+
+ fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
+ return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
+ }
+
+ fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
+ message.handle(closure.event_loop);
+ std.log.debug("{*} performing async", .{closure.fiber});
+ closure.start(closure.contextPointer(), closure.fiber.resultBytes(closure.result_align));
+ const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
+ closure.event_loop.yield(awaiter, .nothing);
+ unreachable; // switched to dead fiber
+ }
+};
+
+fn @"async"(
userdata: ?*anyopaque,
result: []u8,
result_alignment: Alignment,
@@ -407,58 +559,79 @@ pub fn @"async"(
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
},
.awaiter = null,
- .queue_node = undefined,
- .result_align = result_alignment,
+ .queue_next = null,
+ .can_cancel = false,
+ .canceled = false,
};
closure.* = .{
.event_loop = event_loop,
.fiber = fiber,
.start = start,
+ .result_align = result_alignment,
};
@memcpy(closure.contextPointer(), context);
- event_loop.schedule(fiber);
+ event_loop.schedule(.current(event_loop), .{ .head = fiber, .tail = fiber });
return @ptrCast(fiber);
}
-const AsyncClosure = struct {
- event_loop: *EventLoop,
- fiber: *Fiber,
- start: *const fn (context: *const anyopaque, result: *anyopaque) void,
-
- fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
- return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
- }
-
- fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
- message.handle(closure.event_loop);
- std.log.debug("{*} performing async", .{closure.fiber});
- closure.start(closure.contextPointer(), closure.fiber.resultPointer());
- const awaiter = @atomicRmw(?*Fiber, &closure.fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
- closure.event_loop.yield(awaiter, .nothing);
- unreachable; // switched to dead fiber
- }
-};
-
-pub fn @"await"(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
+fn @"await"(
+ userdata: ?*anyopaque,
+ any_future: *std.Io.AnyFuture,
+ result: []u8,
+ result_alignment: Alignment,
+) void {
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
- @memcpy(result, future_fiber.resultPointer());
+ @memcpy(result, future_fiber.resultBytes(result_alignment));
event_loop.recycle(future_fiber);
}
-pub fn cancel(userdata: ?*anyopaque, any_future: *std.Io.AnyFuture, result: []u8) void {
+fn cancel(
+ userdata: ?*anyopaque,
+ any_future: *std.Io.AnyFuture,
+ result: []u8,
+ result_alignment: Alignment,
+) void {
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
const future_fiber: *Fiber = @alignCast(@ptrCast(any_future));
- // TODO set a flag that makes all IO operations for this fiber return error.Canceled
- if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished) event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
- @memcpy(result, future_fiber.resultPointer());
- event_loop.recycle(future_fiber);
+ @atomicStore(bool, &future_fiber.canceled, true, .release);
+ if (@atomicLoad(bool, &future_fiber.can_cancel, .acquire)) {
+ const thread: *Thread = .current(event_loop);
+ getSqe(&thread.io_uring).* = .{
+ .opcode = .ASYNC_CANCEL,
+ .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
+ .ioprio = 0,
+ .fd = 0,
+ .off = 0,
+ .addr = @intFromPtr(future_fiber),
+ .len = 0,
+ .rw_flags = 0,
+ .user_data = @intFromEnum(Completion.Key.cancel),
+ .buf_index = 0,
+ .personality = 0,
+ .splice_fd_in = 0,
+ .addr3 = 0,
+ .resv = 0,
+ };
+ }
+ @"await"(userdata, any_future, result, result_alignment);
+}
+
+fn cancelRequested(userdata: ?*anyopaque) bool {
+ const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
+ const thread: *Thread = .current(event_loop);
+ return thread.currentFiber().canceled;
}
-pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.CreateFlags) std.fs.File.OpenError!std.fs.File {
- const el: *EventLoop = @ptrCast(@alignCast(userdata));
+pub fn createFile(
+ userdata: ?*anyopaque,
+ dir: std.fs.Dir,
+ sub_path: []const u8,
+ flags: Io.CreateFlags,
+) Io.FileOpenError!std.fs.File {
+ const el: *EventLoop = @alignCast(@ptrCast(userdata));
const posix = std.posix;
const sub_path_c = try posix.toPosixPath(sub_path);
@@ -497,22 +670,24 @@ pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8,
@panic("TODO");
}
- const thread: *Thread = &el.threads.items[thread_index];
+ const thread: *Thread = .current(el);
const iou = &thread.io_uring;
- const sqe = getSqe(iou);
const fiber = thread.currentFiber();
+ if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
+ const sqe = getSqe(iou);
sqe.prep_openat(dir.fd, &sub_path_c, os_flags, flags.mode);
sqe.user_data = @intFromPtr(fiber);
+ @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
+ @atomicStore(bool, &fiber.can_cancel, false, .release);
- const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
- const rc = result.*;
- switch (errno(rc)) {
- .SUCCESS => return .{ .handle = rc },
+ const completion = fiber.resultPointer(Completion);
+ switch (errno(completion.result)) {
+ .SUCCESS => return .{ .handle = completion.result },
.INTR => @panic("TODO is this reachable?"),
- .CANCELED => @panic("TODO figure out how this error code fits into things"),
+ .CANCELED => return error.AsyncCancel,
.FAULT => unreachable,
.INVAL => return error.BadPathName,
@@ -541,8 +716,17 @@ pub fn createFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8,
}
}
-pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, flags: std.fs.File.OpenFlags) std.fs.File.OpenError!std.fs.File {
- const el: *EventLoop = @ptrCast(@alignCast(userdata));
+pub fn openFile(
+ userdata: ?*anyopaque,
+ dir: std.fs.Dir,
+ sub_path: []const u8,
+ flags: Io.OpenFlags,
+) Io.FileOpenError!std.fs.File {
+ const el: *EventLoop = @alignCast(@ptrCast(userdata));
+ const thread: *Thread = .current(el);
+ const iou = &thread.io_uring;
+ const fiber = thread.currentFiber();
+ if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
const posix = std.posix;
const sub_path_c = try posix.toPosixPath(sub_path);
@@ -587,22 +771,19 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl
@panic("TODO");
}
- const thread: *Thread = &el.threads.items[thread_index];
- const iou = &thread.io_uring;
const sqe = getSqe(iou);
- const fiber = thread.currentFiber();
-
sqe.prep_openat(dir.fd, &sub_path_c, os_flags, 0);
sqe.user_data = @intFromPtr(fiber);
+ @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
+ @atomicStore(bool, &fiber.can_cancel, false, .release);
- const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
- const rc = result.*;
- switch (errno(rc)) {
- .SUCCESS => return .{ .handle = rc },
+ const completion = fiber.resultPointer(Completion);
+ switch (errno(completion.result)) {
+ .SUCCESS => return .{ .handle = completion.result },
.INTR => @panic("TODO is this reachable?"),
- .CANCELED => @panic("TODO figure out how this error code fits into things"),
+ .CANCELED => return error.AsyncCancel,
.FAULT => unreachable,
.INVAL => return error.BadPathName,
@@ -631,63 +812,49 @@ pub fn openFile(userdata: ?*anyopaque, dir: std.fs.Dir, sub_path: []const u8, fl
}
}
-fn errno(signed: i32) std.posix.E {
- const int = if (signed > -4096 and signed < 0) -signed else 0;
- return @enumFromInt(int);
-}
-
-fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
- return iou.get_sqe() catch @panic("TODO: handle submission queue full");
-}
-
pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
- const el: *EventLoop = @ptrCast(@alignCast(userdata));
-
- const posix = std.posix;
-
- const thread: *Thread = &el.threads.items[thread_index];
+ const el: *EventLoop = @alignCast(@ptrCast(userdata));
+ const thread: *Thread = .current(el);
const iou = &thread.io_uring;
- const sqe = getSqe(iou);
const fiber = thread.currentFiber();
+ const sqe = getSqe(iou);
sqe.prep_close(file.handle);
sqe.user_data = @intFromPtr(fiber);
el.yield(null, .nothing);
- const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
- const rc = result.*;
- switch (errno(rc)) {
+ const completion = fiber.resultPointer(Completion);
+ switch (errno(completion.result)) {
.SUCCESS => return,
.INTR => @panic("TODO is this reachable?"),
- .CANCELED => @panic("TODO figure out how this error code fits into things"),
+ .CANCELED => return,
.BADF => unreachable, // Always a race condition.
else => return,
}
}
-pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.ReadError!usize {
- const el: *EventLoop = @ptrCast(@alignCast(userdata));
-
- const posix = std.posix;
-
- const thread: *Thread = &el.threads.items[thread_index];
+pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) Io.FileReadError!usize {
+ const el: *EventLoop = @alignCast(@ptrCast(userdata));
+ const thread: *Thread = .current(el);
const iou = &thread.io_uring;
- const sqe = getSqe(iou);
const fiber = thread.currentFiber();
+ if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
+ const sqe = getSqe(iou);
sqe.prep_read(file.handle, buffer, std.math.maxInt(u64));
sqe.user_data = @intFromPtr(fiber);
+ @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
+ @atomicStore(bool, &fiber.can_cancel, false, .release);
- const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
- const rc = result.*;
- switch (errno(rc)) {
- .SUCCESS => return @as(u32, @bitCast(rc)),
+ const completion = fiber.resultPointer(Completion);
+ switch (errno(completion.result)) {
+ .SUCCESS => return @as(u32, @bitCast(completion.result)),
.INTR => @panic("TODO is this reachable?"),
- .CANCELED => @panic("TODO figure out how this error code fits into things"),
+ .CANCELED => return error.AsyncCancel,
.INVAL => unreachable,
.FAULT => unreachable,
@@ -701,31 +868,31 @@ pub fn read(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8) std.fs.File.
.NOTCONN => return error.SocketNotConnected,
.CONNRESET => return error.ConnectionResetByPeer,
.TIMEDOUT => return error.ConnectionTimedOut,
- else => |err| return posix.unexpectedErrno(err),
+ else => |err| return std.posix.unexpectedErrno(err),
}
}
-pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.fs.File.WriteError!usize {
- const el: *EventLoop = @ptrCast(@alignCast(userdata));
-
- const posix = std.posix;
+pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) Io.FileWriteError!usize {
+ const el: *EventLoop = @alignCast(@ptrCast(userdata));
- const thread: *Thread = &el.threads.items[thread_index];
+ const thread: *Thread = .current(el);
const iou = &thread.io_uring;
- const sqe = getSqe(iou);
const fiber = thread.currentFiber();
+ if (@atomicLoad(bool, &fiber.canceled, .acquire)) return error.AsyncCancel;
+ const sqe = getSqe(iou);
sqe.prep_write(file.handle, buffer, std.math.maxInt(u64));
sqe.user_data = @intFromPtr(fiber);
+ @atomicStore(bool, &fiber.can_cancel, true, .release);
el.yield(null, .nothing);
+ @atomicStore(bool, &fiber.can_cancel, false, .release);
- const result: *i32 = @alignCast(@ptrCast(fiber.resultPointer()[0..@sizeOf(posix.fd_t)]));
- const rc = result.*;
- switch (errno(rc)) {
- .SUCCESS => return @as(u32, @bitCast(rc)),
+ const completion = fiber.resultPointer(Completion);
+ switch (errno(completion.result)) {
+ .SUCCESS => return @as(u32, @bitCast(completion.result)),
.INTR => @panic("TODO is this reachable?"),
- .CANCELED => @panic("TODO figure out how this error code fits into things"),
+ .CANCELED => return error.AsyncCancel,
.INVAL => return error.InvalidArgument,
.FAULT => unreachable,
@@ -744,6 +911,15 @@ pub fn write(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8) std.f
.BUSY => return error.DeviceBusy,
.NXIO => return error.NoDevice,
.MSGSIZE => return error.MessageTooBig,
- else => |err| return posix.unexpectedErrno(err),
+ else => |err| return std.posix.unexpectedErrno(err),
}
}
+
+fn errno(signed: i32) std.posix.E {
+ const int = if (signed > -4096 and signed < 0) -signed else 0;
+ return @enumFromInt(int);
+}
+
+fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
+ return iou.get_sqe() catch @panic("TODO: handle submission queue full");
+}
lib/std/Thread/Pool.zig
@@ -435,13 +435,25 @@ fn @"async"(
return @ptrCast(closure);
}
-fn @"await"(userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8) void {
+fn @"await"(
+ userdata: ?*anyopaque,
+ any_future: *std.Io.AnyFuture,
+ result: []u8,
+ result_alignment: std.mem.Alignment,
+) void {
+ _ = result_alignment;
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
closure.waitAndFree(pool.allocator, result);
}
-fn cancel(userdata: ?*anyopaque, any_future: *Io.AnyFuture, result: []u8) void {
+fn cancel(
+ userdata: ?*anyopaque,
+ any_future: *Io.AnyFuture,
+ result: []u8,
+ result_alignment: std.mem.Alignment,
+) void {
+ _ = result_alignment;
const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
@atomicStore(bool, &closure.cancel_flag, true, .seq_cst);
lib/std/Io.zig
@@ -591,6 +591,7 @@ pub const VTable = struct {
/// Points to a buffer where the result is written.
/// The length is equal to size in bytes of result type.
result: []u8,
+ result_alignment: std.mem.Alignment,
) void,
/// Equivalent to `await` but initiates cancel request.
@@ -606,6 +607,7 @@ pub const VTable = struct {
/// Points to a buffer where the result is written.
/// The length is equal to size in bytes of result type.
result: []u8,
+ result_alignment: std.mem.Alignment,
) void,
/// Returns whether the current thread of execution is known to have
@@ -641,14 +643,14 @@ pub fn Future(Result: type) type {
/// Idempotent.
pub fn cancel(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
- io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]));
+ io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
f.any_future = null;
return f.result;
}
pub fn await(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
- io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]));
+ io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
f.any_future = null;
return f.result;
}
@@ -671,9 +673,9 @@ pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf(
future.any_future = io.vtable.async(
io.userdata,
@ptrCast((&future.result)[0..1]),
- .fromByteUnits(@alignOf(Result)),
+ .of(Result),
if (@sizeOf(Args) == 0) &.{} else @ptrCast((&args)[0..1]), // work around compiler bug
- .fromByteUnits(@alignOf(Args)),
+ .of(Args),
TypeErased.start,
);
return future;