Commit 41070932f8
Changed files (1)
lib
std
lib/std/Io/IoUring.zig
@@ -10,12 +10,9 @@ const IoUring = std.os.linux.IoUring;
/// Must be a thread-safe allocator.
gpa: Allocator,
+mutex: std.Thread.Mutex,
main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
threads: Thread.List,
-detached: struct {
- mutex: std.Io.Mutex,
- list: std.DoublyLinkedList,
-},
/// Empirically saw >128KB being used by the self-hosted backend to panic.
const idle_stack_size = 256 * 1024;
@@ -142,7 +139,6 @@ pub fn io(el: *EventLoop) Io {
.async = async,
.concurrent = concurrent,
.await = await,
- .asyncDetached = asyncDetached,
.select = select,
.cancel = cancel,
.cancelRequested = cancelRequested,
@@ -172,16 +168,13 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
errdefer gpa.free(allocated_slice);
el.* = .{
.gpa = gpa,
+ .mutex = .{},
.main_fiber_buffer = undefined,
.threads = .{
.allocated = @ptrCast(allocated_slice[0..threads_size]),
.reserved = 1,
.active = 1,
},
- .detached = .{
- .mutex = .init,
- .list = .{},
- },
};
const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
main_fiber.* = .{
@@ -223,22 +216,6 @@ pub fn init(el: *EventLoop, gpa: Allocator) !void {
}
pub fn deinit(el: *EventLoop) void {
- while (true) cancel(el, detached_future: {
- el.detached.mutex.lock(el.io()) catch |err| switch (err) {
- error.Canceled => unreachable, // main fiber cannot be canceled
- };
- defer el.detached.mutex.unlock(el.io());
- const detached: *DetachedClosure = @fieldParentPtr(
- "detached_queue_node",
- el.detached.list.pop() orelse break,
- );
- // notify the detached fiber that it is no longer allowed to recycle itself
- detached.detached_queue_node = .{
- .prev = &detached.detached_queue_node,
- .next = &detached.detached_queue_node,
- };
- break :detached_future @ptrCast(detached.fiber);
- }, &.{}, .@"1");
const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
for (el.threads.allocated[0..active_threads]) |*thread| {
const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
@@ -492,7 +469,7 @@ const SwitchMessage = struct {
const PendingTask = union(enum) {
nothing,
reschedule,
- recycle,
+ recycle: *Fiber,
register_awaiter: *?*Fiber,
register_select: []const *Io.AnyFuture,
mutex_lock: struct {
@@ -516,10 +493,8 @@ const SwitchMessage = struct {
assert(prev_fiber.queue_next == null);
el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
},
- .recycle => {
- const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
- assert(prev_fiber.queue_next == null);
- el.recycle(prev_fiber);
+ .recycle => |fiber| {
+ el.recycle(fiber);
},
.register_awaiter => |awaiter| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
@@ -829,12 +804,9 @@ fn fiberEntry() callconv(.naked) void {
switch (builtin.cpu.arch) {
.x86_64 => asm volatile (
\\ leaq 8(%%rsp), %%rdi
- \\ jmpq *(%%rsp)
- ),
- .aarch64 => asm volatile (
- \\ mov x0, sp
- \\ ldr x2, [sp, #-8]
- \\ br x2
+ \\ jmp %[AsyncClosure_call:P]
+ :
+ : [AsyncClosure_call] "X" (&AsyncClosure.call),
),
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
}
@@ -905,18 +877,16 @@ fn concurrent(
std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = .fromFiber(fiber);
- const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure));
- (stack_end - 1)[0..1].* = .{@intFromPtr(&AsyncClosure.call)};
fiber.* = .{
.required_align = {},
.context = switch (builtin.cpu.arch) {
.x86_64 => .{
- .rsp = @intFromPtr(stack_end - 1),
+ .rsp = @intFromPtr(closure) - @sizeOf(usize),
.rbp = 0,
.rip = @intFromPtr(&fiberEntry),
},
.aarch64 => .{
- .sp = @intFromPtr(stack_end),
+ .sp = @intFromPtr(closure) - @sizeOf(usize) - 1,
.fp = 0,
.pc = @intFromPtr(&fiberEntry),
},
@@ -968,70 +938,6 @@ const DetachedClosure = struct {
}
};
-fn asyncDetached(
- userdata: ?*anyopaque,
- context: []const u8,
- context_alignment: std.mem.Alignment,
- start: *const fn (context: *const anyopaque) void,
-) void {
- assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
- assert(context.len <= Fiber.max_context_size); // TODO
-
- const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
- const fiber = Fiber.allocate(event_loop) catch {
- start(context.ptr);
- return;
- };
- std.log.debug("allocated {*}", .{fiber});
-
- const current_thread: *Thread = .current();
- const closure: *DetachedClosure = @ptrFromInt(Fiber.max_context_align.max(.of(DetachedClosure)).backward(
- @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
- ) - @sizeOf(DetachedClosure));
- const stack_end: [*]align(16) usize = @ptrCast(@alignCast(closure));
- (stack_end - 1)[0..1].* = .{@intFromPtr(&DetachedClosure.call)};
- fiber.* = .{
- .required_align = {},
- .context = switch (builtin.cpu.arch) {
- .x86_64 => .{
- .rsp = @intFromPtr(stack_end - 1),
- .rbp = 0,
- .rip = @intFromPtr(&fiberEntry),
- },
- .aarch64 => .{
- .sp = @intFromPtr(stack_end),
- .fp = 0,
- .pc = @intFromPtr(&fiberEntry),
- },
- else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
- },
- .awaiter = null,
- .queue_next = null,
- .cancel_thread = null,
- .awaiting_completions = .initEmpty(),
- };
- closure.* = .{
- .event_loop = event_loop,
- .fiber = fiber,
- .start = start,
- .detached_queue_node = .{},
- };
- {
- event_loop.detached.mutex.lock(event_loop.io()) catch |err| switch (err) {
- error.Canceled => {
- event_loop.recycle(fiber);
- start(context.ptr);
- return;
- },
- };
- defer event_loop.detached.mutex.unlock(event_loop.io());
- event_loop.detached.list.append(&closure.detached_queue_node);
- }
- @memcpy(closure.contextPointer(), context);
-
- event_loop.schedule(current_thread, .{ .head = fiber, .tail = fiber });
-}
-
fn await(
userdata: ?*anyopaque,
any_future: *std.Io.AnyFuture,