master
1const EventLoop = @This();
2const builtin = @import("builtin");
3
4const std = @import("../std.zig");
5const Io = std.Io;
6const assert = std.debug.assert;
7const Allocator = std.mem.Allocator;
8const Alignment = std.mem.Alignment;
9const IoUring = std.os.linux.IoUring;
10
11/// Must be a thread-safe allocator.
12gpa: Allocator,
13mutex: std.Thread.Mutex,
14main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
15threads: Thread.List,
16
17/// Empirically saw >128KB being used by the self-hosted backend to panic.
18const idle_stack_size = 256 * 1024;
19
20const max_idle_search = 4;
21const max_steal_ready_search = 4;
22
23const io_uring_entries = 64;
24
25const Thread = struct {
26 thread: std.Thread,
27 idle_context: Context,
28 current_context: *Context,
29 ready_queue: ?*Fiber,
30 io_uring: IoUring,
31 idle_search_index: u32,
32 steal_ready_search_index: u32,
33
34 const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
35
36 threadlocal var self: *Thread = undefined;
37
38 fn current() *Thread {
39 return self;
40 }
41
42 fn currentFiber(thread: *Thread) *Fiber {
43 return @fieldParentPtr("context", thread.current_context);
44 }
45
46 const List = struct {
47 allocated: []Thread,
48 reserved: u32,
49 active: u32,
50 };
51};
52
53const Fiber = struct {
54 required_align: void align(4),
55 context: Context,
56 awaiter: ?*Fiber,
57 queue_next: ?*Fiber,
58 cancel_thread: ?*Thread,
59 awaiting_completions: std.StaticBitSet(3),
60
61 const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
62
63 const max_result_align: Alignment = .@"16";
64 const max_result_size = max_result_align.forward(64);
65 /// This includes any stack realignments that need to happen, and also the
66 /// initial frame return address slot and argument frame, depending on target.
67 const min_stack_size = 4 * 1024 * 1024;
68 const max_context_align: Alignment = .@"16";
69 const max_context_size = max_context_align.forward(1024);
70 const max_closure_size: usize = @sizeOf(AsyncClosure);
71 const max_closure_align: Alignment = .of(AsyncClosure);
72 const allocation_size = std.mem.alignForward(
73 usize,
74 max_closure_align.max(max_context_align).forward(
75 max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
76 ) + max_closure_size + max_context_size,
77 std.heap.page_size_max,
78 );
79
80 fn allocate(el: *EventLoop) error{OutOfMemory}!*Fiber {
81 return @ptrCast(try el.gpa.alignedAlloc(u8, .of(Fiber), allocation_size));
82 }
83
84 fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
85 return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
86 }
87
88 fn allocatedEnd(f: *Fiber) [*]u8 {
89 const allocated_slice = f.allocatedSlice();
90 return allocated_slice[allocated_slice.len..].ptr;
91 }
92
93 fn resultPointer(f: *Fiber, comptime Result: type) *Result {
94 return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
95 }
96
97 fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
98 return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
99 }
100
101 fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
102 if (@cmpxchgStrong(
103 ?*Thread,
104 &fiber.cancel_thread,
105 null,
106 thread,
107 .acq_rel,
108 .acquire,
109 )) |cancel_thread| {
110 assert(cancel_thread == Thread.canceling);
111 return error.Canceled;
112 }
113 }
114
115 fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
116 if (@cmpxchgStrong(
117 ?*Thread,
118 &fiber.cancel_thread,
119 thread,
120 null,
121 .acq_rel,
122 .acquire,
123 )) |cancel_thread| assert(cancel_thread == Thread.canceling);
124 }
125
126 const Queue = struct { head: *Fiber, tail: *Fiber };
127};
128
129fn recycle(el: *EventLoop, fiber: *Fiber) void {
130 std.log.debug("recyling {*}", .{fiber});
131 assert(fiber.queue_next == null);
132 el.gpa.free(fiber.allocatedSlice());
133}
134
135pub fn io(el: *EventLoop) Io {
136 return .{
137 .userdata = el,
138 .vtable = &.{
139 .async = async,
140 .concurrent = concurrent,
141 .await = await,
142 .select = select,
143 .cancel = cancel,
144 .cancelRequested = cancelRequested,
145
146 .mutexLock = mutexLock,
147 .mutexUnlock = mutexUnlock,
148
149 .conditionWait = conditionWait,
150 .conditionWake = conditionWake,
151
152 .createFile = createFile,
153 .fileOpen = fileOpen,
154 .fileClose = fileClose,
155 .pread = pread,
156 .pwrite = pwrite,
157
158 .now = now,
159 .sleep = sleep,
160 },
161 };
162}
163
164pub fn init(el: *EventLoop, gpa: Allocator) !void {
165 const threads_size = @max(std.Thread.getCpuCount() catch 1, 1) * @sizeOf(Thread);
166 const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
167 const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
168 errdefer gpa.free(allocated_slice);
169 el.* = .{
170 .gpa = gpa,
171 .mutex = .{},
172 .main_fiber_buffer = undefined,
173 .threads = .{
174 .allocated = @ptrCast(allocated_slice[0..threads_size]),
175 .reserved = 1,
176 .active = 1,
177 },
178 };
179 const main_fiber: *Fiber = @ptrCast(&el.main_fiber_buffer);
180 main_fiber.* = .{
181 .required_align = {},
182 .context = undefined,
183 .awaiter = null,
184 .queue_next = null,
185 .cancel_thread = null,
186 .awaiting_completions = .initEmpty(),
187 };
188 const main_thread = &el.threads.allocated[0];
189 Thread.self = main_thread;
190 const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
191 (idle_stack_end - 1)[0..1].* = .{@intFromPtr(el)};
192 main_thread.* = .{
193 .thread = undefined,
194 .idle_context = switch (builtin.cpu.arch) {
195 .aarch64 => .{
196 .sp = @intFromPtr(idle_stack_end),
197 .fp = 0,
198 .pc = @intFromPtr(&mainIdleEntry),
199 },
200 .x86_64 => .{
201 .rsp = @intFromPtr(idle_stack_end - 1),
202 .rbp = 0,
203 .rip = @intFromPtr(&mainIdleEntry),
204 },
205 else => @compileError("unimplemented architecture"),
206 },
207 .current_context = &main_fiber.context,
208 .ready_queue = null,
209 .io_uring = try IoUring.init(io_uring_entries, 0),
210 .idle_search_index = 1,
211 .steal_ready_search_index = 1,
212 };
213 errdefer main_thread.io_uring.deinit();
214 std.log.debug("created main idle {*}", .{&main_thread.idle_context});
215 std.log.debug("created main {*}", .{main_fiber});
216}
217
218pub fn deinit(el: *EventLoop) void {
219 const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
220 for (el.threads.allocated[0..active_threads]) |*thread| {
221 const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
222 assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
223 }
224 el.yield(null, .exit);
225 const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(el.threads.allocated.ptr));
226 const idle_stack_end_offset = std.mem.alignForward(usize, el.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
227 for (el.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
228 el.gpa.free(allocated_ptr[0..idle_stack_end_offset]);
229 el.* = undefined;
230}
231
232fn findReadyFiber(el: *EventLoop, thread: *Thread) ?*Fiber {
233 if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| {
234 @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
235 ready_fiber.queue_next = null;
236 return ready_fiber;
237 }
238 const active_threads = @atomicLoad(u32, &el.threads.active, .acquire);
239 for (0..@min(max_steal_ready_search, active_threads)) |_| {
240 defer thread.steal_ready_search_index += 1;
241 if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0;
242 const steal_ready_search_thread = &el.threads.allocated[0..active_threads][thread.steal_ready_search_index];
243 if (steal_ready_search_thread == thread) continue;
244 const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
245 if (ready_fiber == Fiber.finished) continue;
246 if (@cmpxchgWeak(
247 ?*Fiber,
248 &steal_ready_search_thread.ready_queue,
249 ready_fiber,
250 null,
251 .acquire,
252 .monotonic,
253 )) |_| continue;
254 @atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
255 ready_fiber.queue_next = null;
256 return ready_fiber;
257 }
258 // couldn't find anything to do, so we are now open for business
259 @atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic);
260 return null;
261}
262
263fn yield(el: *EventLoop, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
264 const thread: *Thread = .current();
265 const ready_context = if (maybe_ready_fiber orelse el.findReadyFiber(thread)) |ready_fiber|
266 &ready_fiber.context
267 else
268 &thread.idle_context;
269 const message: SwitchMessage = .{
270 .contexts = .{
271 .prev = thread.current_context,
272 .ready = ready_context,
273 },
274 .pending_task = pending_task,
275 };
276 std.log.debug("switching from {*} to {*}", .{ message.contexts.prev, message.contexts.ready });
277 contextSwitch(&message).handle(el);
278}
279
280fn schedule(el: *EventLoop, thread: *Thread, ready_queue: Fiber.Queue) void {
281 {
282 var fiber = ready_queue.head;
283 while (true) {
284 std.log.debug("scheduling {*}", .{fiber});
285 fiber = fiber.queue_next orelse break;
286 }
287 assert(fiber == ready_queue.tail);
288 }
289 // shared fields of previous `Thread` must be initialized before later ones are marked as active
290 const new_thread_index = @atomicLoad(u32, &el.threads.active, .acquire);
291 for (0..@min(max_idle_search, new_thread_index)) |_| {
292 defer thread.idle_search_index += 1;
293 if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
294 const idle_search_thread = &el.threads.allocated[0..new_thread_index][thread.idle_search_index];
295 if (idle_search_thread == thread) continue;
296 if (@cmpxchgWeak(
297 ?*Fiber,
298 &idle_search_thread.ready_queue,
299 null,
300 ready_queue.head,
301 .release,
302 .monotonic,
303 )) |_| continue;
304 getSqe(&thread.io_uring).* = .{
305 .opcode = .MSG_RING,
306 .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
307 .ioprio = 0,
308 .fd = idle_search_thread.io_uring.fd,
309 .off = @intFromEnum(Completion.UserData.wakeup),
310 .addr = 0,
311 .len = 0,
312 .rw_flags = 0,
313 .user_data = @intFromEnum(Completion.UserData.wakeup),
314 .buf_index = 0,
315 .personality = 0,
316 .splice_fd_in = 0,
317 .addr3 = 0,
318 .resv = 0,
319 };
320 return;
321 }
322 spawn_thread: {
323 // previous failed reservations must have completed before retrying
324 if (new_thread_index == el.threads.allocated.len or @cmpxchgWeak(
325 u32,
326 &el.threads.reserved,
327 new_thread_index,
328 new_thread_index + 1,
329 .acquire,
330 .monotonic,
331 ) != null) break :spawn_thread;
332 const new_thread = &el.threads.allocated[new_thread_index];
333 const next_thread_index = new_thread_index + 1;
334 new_thread.* = .{
335 .thread = undefined,
336 .idle_context = undefined,
337 .current_context = &new_thread.idle_context,
338 .ready_queue = ready_queue.head,
339 .io_uring = IoUring.init(io_uring_entries, 0) catch |err| {
340 @atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
341 // no more access to `thread` after giving up reservation
342 std.log.warn("unable to create worker thread due to io_uring init failure: {s}", .{@errorName(err)});
343 break :spawn_thread;
344 },
345 .idle_search_index = 0,
346 .steal_ready_search_index = 0,
347 };
348 new_thread.thread = std.Thread.spawn(.{
349 .stack_size = idle_stack_size,
350 .allocator = el.gpa,
351 }, threadEntry, .{ el, new_thread_index }) catch |err| {
352 new_thread.io_uring.deinit();
353 @atomicStore(u32, &el.threads.reserved, new_thread_index, .release);
354 // no more access to `thread` after giving up reservation
355 std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
356 break :spawn_thread;
357 };
358 // shared fields of `Thread` must be initialized before being marked active
359 @atomicStore(u32, &el.threads.active, next_thread_index, .release);
360 return;
361 }
362 // nobody wanted it, so just queue it on ourselves
363 while (@cmpxchgWeak(
364 ?*Fiber,
365 &thread.ready_queue,
366 ready_queue.tail.queue_next,
367 ready_queue.head,
368 .acq_rel,
369 .acquire,
370 )) |old_head| ready_queue.tail.queue_next = old_head;
371}
372
373fn mainIdle(el: *EventLoop, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Context)))) noreturn {
374 message.handle(el);
375 el.idle(&el.threads.allocated[0]);
376 el.yield(@ptrCast(&el.main_fiber_buffer), .nothing);
377 unreachable; // switched to dead fiber
378}
379
380fn threadEntry(el: *EventLoop, index: u32) void {
381 const thread: *Thread = &el.threads.allocated[index];
382 Thread.self = thread;
383 std.log.debug("created thread idle {*}", .{&thread.idle_context});
384 el.idle(thread);
385}
386
387const Completion = struct {
388 const UserData = enum(usize) {
389 unused,
390 wakeup,
391 cleanup,
392 exit,
393 /// *Fiber
394 _,
395 };
396 result: i32,
397 flags: u32,
398};
399
400fn idle(el: *EventLoop, thread: *Thread) void {
401 var maybe_ready_fiber: ?*Fiber = null;
402 while (true) {
403 while (maybe_ready_fiber orelse el.findReadyFiber(thread)) |ready_fiber| {
404 el.yield(ready_fiber, .nothing);
405 maybe_ready_fiber = null;
406 }
407 _ = thread.io_uring.submit_and_wait(1) catch |err| switch (err) {
408 error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}),
409 else => |e| @panic(@errorName(e)),
410 };
411 var cqes_buffer: [io_uring_entries]std.os.linux.io_uring_cqe = undefined;
412 var maybe_ready_queue: ?Fiber.Queue = null;
413 for (cqes_buffer[0 .. thread.io_uring.copy_cqes(&cqes_buffer, 0) catch |err| switch (err) {
414 error.SignalInterrupt => cqes_len: {
415 std.log.warn("copy_cqes failed with SignalInterrupt", .{});
416 break :cqes_len 0;
417 },
418 else => |e| @panic(@errorName(e)),
419 }]) |cqe| switch (@as(Completion.UserData, @enumFromInt(cqe.user_data))) {
420 .unused => unreachable, // bad submission queued?
421 .wakeup => {},
422 .cleanup => @panic("failed to notify other threads that we are exiting"),
423 .exit => {
424 assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
425 return;
426 },
427 _ => switch (errno(cqe.res)) {
428 .INTR => getSqe(&thread.io_uring).* = .{
429 .opcode = .ASYNC_CANCEL,
430 .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
431 .ioprio = 0,
432 .fd = 0,
433 .off = 0,
434 .addr = cqe.user_data,
435 .len = 0,
436 .rw_flags = 0,
437 .user_data = @intFromEnum(Completion.UserData.wakeup),
438 .buf_index = 0,
439 .personality = 0,
440 .splice_fd_in = 0,
441 .addr3 = 0,
442 .resv = 0,
443 },
444 else => {
445 const fiber: *Fiber = @ptrFromInt(cqe.user_data);
446 assert(fiber.queue_next == null);
447 fiber.resultPointer(Completion).* = .{
448 .result = cqe.res,
449 .flags = cqe.flags,
450 };
451 if (maybe_ready_fiber == null) maybe_ready_fiber = fiber else if (maybe_ready_queue) |*ready_queue| {
452 ready_queue.tail.queue_next = fiber;
453 ready_queue.tail = fiber;
454 } else maybe_ready_queue = .{ .head = fiber, .tail = fiber };
455 },
456 },
457 };
458 if (maybe_ready_queue) |ready_queue| el.schedule(thread, ready_queue);
459 }
460}
461
462const SwitchMessage = struct {
463 contexts: extern struct {
464 prev: *Context,
465 ready: *Context,
466 },
467 pending_task: PendingTask,
468
469 const PendingTask = union(enum) {
470 nothing,
471 reschedule,
472 recycle: *Fiber,
473 register_awaiter: *?*Fiber,
474 register_select: []const *Io.AnyFuture,
475 mutex_lock: struct {
476 prev_state: Io.Mutex.State,
477 mutex: *Io.Mutex,
478 },
479 condition_wait: struct {
480 cond: *Io.Condition,
481 mutex: *Io.Mutex,
482 },
483 exit,
484 };
485
486 fn handle(message: *const SwitchMessage, el: *EventLoop) void {
487 const thread: *Thread = .current();
488 thread.current_context = message.contexts.ready;
489 switch (message.pending_task) {
490 .nothing => {},
491 .reschedule => if (message.contexts.prev != &thread.idle_context) {
492 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
493 assert(prev_fiber.queue_next == null);
494 el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
495 },
496 .recycle => |fiber| {
497 el.recycle(fiber);
498 },
499 .register_awaiter => |awaiter| {
500 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
501 assert(prev_fiber.queue_next == null);
502 if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
503 el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
504 },
505 .register_select => |futures| {
506 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
507 assert(prev_fiber.queue_next == null);
508 for (futures) |any_future| {
509 const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
510 if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) {
511 const closure: *AsyncClosure = .fromFiber(future_fiber);
512 if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) {
513 el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
514 }
515 }
516 }
517 },
518 .mutex_lock => |mutex_lock| {
519 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
520 assert(prev_fiber.queue_next == null);
521 var prev_state = mutex_lock.prev_state;
522 while (switch (prev_state) {
523 else => next_state: {
524 prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
525 break :next_state @cmpxchgWeak(
526 Io.Mutex.State,
527 &mutex_lock.mutex.state,
528 prev_state,
529 @enumFromInt(@intFromPtr(prev_fiber)),
530 .release,
531 .acquire,
532 );
533 },
534 .unlocked => @cmpxchgWeak(
535 Io.Mutex.State,
536 &mutex_lock.mutex.state,
537 .unlocked,
538 .locked_once,
539 .acquire,
540 .acquire,
541 ) orelse {
542 prev_fiber.queue_next = null;
543 el.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
544 return;
545 },
546 }) |next_state| prev_state = next_state;
547 },
548 .condition_wait => |condition_wait| {
549 const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
550 assert(prev_fiber.queue_next == null);
551 const cond_impl = prev_fiber.resultPointer(ConditionImpl);
552 cond_impl.* = .{
553 .tail = prev_fiber,
554 .event = .queued,
555 };
556 if (@cmpxchgStrong(
557 ?*Fiber,
558 @as(*?*Fiber, @ptrCast(&condition_wait.cond.state)),
559 null,
560 prev_fiber,
561 .release,
562 .acquire,
563 )) |waiting_fiber| {
564 const waiting_cond_impl = waiting_fiber.?.resultPointer(ConditionImpl);
565 assert(waiting_cond_impl.tail.queue_next == null);
566 waiting_cond_impl.tail.queue_next = prev_fiber;
567 waiting_cond_impl.tail = prev_fiber;
568 }
569 condition_wait.mutex.unlock(el.io());
570 },
571 .exit => for (el.threads.allocated[0..@atomicLoad(u32, &el.threads.active, .acquire)]) |*each_thread| {
572 getSqe(&thread.io_uring).* = .{
573 .opcode = .MSG_RING,
574 .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
575 .ioprio = 0,
576 .fd = each_thread.io_uring.fd,
577 .off = @intFromEnum(Completion.UserData.exit),
578 .addr = 0,
579 .len = 0,
580 .rw_flags = 0,
581 .user_data = @intFromEnum(Completion.UserData.cleanup),
582 .buf_index = 0,
583 .personality = 0,
584 .splice_fd_in = 0,
585 .addr3 = 0,
586 .resv = 0,
587 };
588 },
589 }
590 }
591};
592
593const Context = switch (builtin.cpu.arch) {
594 .aarch64 => extern struct {
595 sp: u64,
596 fp: u64,
597 pc: u64,
598 },
599 .x86_64 => extern struct {
600 rsp: u64,
601 rbp: u64,
602 rip: u64,
603 },
604 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
605};
606
607inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
608 return @fieldParentPtr("contexts", switch (builtin.cpu.arch) {
609 .aarch64 => asm volatile (
610 \\ ldp x0, x2, [x1]
611 \\ ldr x3, [x2, #16]
612 \\ mov x4, sp
613 \\ stp x4, fp, [x0]
614 \\ adr x5, 0f
615 \\ ldp x4, fp, [x2]
616 \\ str x5, [x0, #16]
617 \\ mov sp, x4
618 \\ br x3
619 \\0:
620 : [received_message] "={x1}" (-> *const @FieldType(SwitchMessage, "contexts")),
621 : [message_to_send] "{x1}" (&message.contexts),
622 : .{
623 .x0 = true,
624 .x1 = true,
625 .x2 = true,
626 .x3 = true,
627 .x4 = true,
628 .x5 = true,
629 .x6 = true,
630 .x7 = true,
631 .x8 = true,
632 .x9 = true,
633 .x10 = true,
634 .x11 = true,
635 .x12 = true,
636 .x13 = true,
637 .x14 = true,
638 .x15 = true,
639 .x16 = true,
640 .x17 = true,
641 .x18 = true,
642 .x19 = true,
643 .x20 = true,
644 .x21 = true,
645 .x22 = true,
646 .x23 = true,
647 .x24 = true,
648 .x25 = true,
649 .x26 = true,
650 .x27 = true,
651 .x28 = true,
652 .x30 = true,
653 .z0 = true,
654 .z1 = true,
655 .z2 = true,
656 .z3 = true,
657 .z4 = true,
658 .z5 = true,
659 .z6 = true,
660 .z7 = true,
661 .z8 = true,
662 .z9 = true,
663 .z10 = true,
664 .z11 = true,
665 .z12 = true,
666 .z13 = true,
667 .z14 = true,
668 .z15 = true,
669 .z16 = true,
670 .z17 = true,
671 .z18 = true,
672 .z19 = true,
673 .z20 = true,
674 .z21 = true,
675 .z22 = true,
676 .z23 = true,
677 .z24 = true,
678 .z25 = true,
679 .z26 = true,
680 .z27 = true,
681 .z28 = true,
682 .z29 = true,
683 .z30 = true,
684 .z31 = true,
685 .p0 = true,
686 .p1 = true,
687 .p2 = true,
688 .p3 = true,
689 .p4 = true,
690 .p5 = true,
691 .p6 = true,
692 .p7 = true,
693 .p8 = true,
694 .p9 = true,
695 .p10 = true,
696 .p11 = true,
697 .p12 = true,
698 .p13 = true,
699 .p14 = true,
700 .p15 = true,
701 .fpcr = true,
702 .fpsr = true,
703 .ffr = true,
704 .memory = true,
705 }),
706 .x86_64 => asm volatile (
707 \\ movq 0(%%rsi), %%rax
708 \\ movq 8(%%rsi), %%rcx
709 \\ leaq 0f(%%rip), %%rdx
710 \\ movq %%rsp, 0(%%rax)
711 \\ movq %%rbp, 8(%%rax)
712 \\ movq %%rdx, 16(%%rax)
713 \\ movq 0(%%rcx), %%rsp
714 \\ movq 8(%%rcx), %%rbp
715 \\ jmpq *16(%%rcx)
716 \\0:
717 : [received_message] "={rsi}" (-> *const @FieldType(SwitchMessage, "contexts")),
718 : [message_to_send] "{rsi}" (&message.contexts),
719 : .{
720 .rax = true,
721 .rcx = true,
722 .rdx = true,
723 .rbx = true,
724 .rsi = true,
725 .rdi = true,
726 .r8 = true,
727 .r9 = true,
728 .r10 = true,
729 .r11 = true,
730 .r12 = true,
731 .r13 = true,
732 .r14 = true,
733 .r15 = true,
734 .mm0 = true,
735 .mm1 = true,
736 .mm2 = true,
737 .mm3 = true,
738 .mm4 = true,
739 .mm5 = true,
740 .mm6 = true,
741 .mm7 = true,
742 .zmm0 = true,
743 .zmm1 = true,
744 .zmm2 = true,
745 .zmm3 = true,
746 .zmm4 = true,
747 .zmm5 = true,
748 .zmm6 = true,
749 .zmm7 = true,
750 .zmm8 = true,
751 .zmm9 = true,
752 .zmm10 = true,
753 .zmm11 = true,
754 .zmm12 = true,
755 .zmm13 = true,
756 .zmm14 = true,
757 .zmm15 = true,
758 .zmm16 = true,
759 .zmm17 = true,
760 .zmm18 = true,
761 .zmm19 = true,
762 .zmm20 = true,
763 .zmm21 = true,
764 .zmm22 = true,
765 .zmm23 = true,
766 .zmm24 = true,
767 .zmm25 = true,
768 .zmm26 = true,
769 .zmm27 = true,
770 .zmm28 = true,
771 .zmm29 = true,
772 .zmm30 = true,
773 .zmm31 = true,
774 .fpsr = true,
775 .fpcr = true,
776 .mxcsr = true,
777 .rflags = true,
778 .dirflag = true,
779 .memory = true,
780 }),
781 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
782 });
783}
784
785fn mainIdleEntry() callconv(.naked) void {
786 switch (builtin.cpu.arch) {
787 .x86_64 => asm volatile (
788 \\ movq (%%rsp), %%rdi
789 \\ jmp %[mainIdle:P]
790 :
791 : [mainIdle] "X" (&mainIdle),
792 ),
793 .aarch64 => asm volatile (
794 \\ ldr x0, [sp, #-8]
795 \\ b %[mainIdle]
796 :
797 : [mainIdle] "X" (&mainIdle),
798 ),
799 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
800 }
801}
802
803fn fiberEntry() callconv(.naked) void {
804 switch (builtin.cpu.arch) {
805 .x86_64 => asm volatile (
806 \\ leaq 8(%%rsp), %%rdi
807 \\ jmp %[AsyncClosure_call:P]
808 :
809 : [AsyncClosure_call] "X" (&AsyncClosure.call),
810 ),
811 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
812 }
813}
814
815const AsyncClosure = struct {
816 event_loop: *EventLoop,
817 fiber: *Fiber,
818 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
819 result_align: Alignment,
820 already_awaited: bool,
821
822 fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
823 return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
824 }
825
826 fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
827 message.handle(closure.event_loop);
828 const fiber = closure.fiber;
829 std.log.debug("{*} performing async", .{fiber});
830 closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
831 const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
832 const ready_awaiter = r: {
833 const a = awaiter orelse break :r null;
834 if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
835 break :r a;
836 };
837 closure.event_loop.yield(ready_awaiter, .nothing);
838 unreachable; // switched to dead fiber
839 }
840
841 fn fromFiber(fiber: *Fiber) *AsyncClosure {
842 return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
843 @intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
844 ) - @sizeOf(AsyncClosure));
845 }
846};
847
848fn async(
849 userdata: ?*anyopaque,
850 result: []u8,
851 result_alignment: Alignment,
852 context: []const u8,
853 context_alignment: Alignment,
854 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
855) ?*std.Io.AnyFuture {
856 return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
857 start(context.ptr, result.ptr);
858 return null;
859 };
860}
861
862fn concurrent(
863 userdata: ?*anyopaque,
864 result_len: usize,
865 result_alignment: Alignment,
866 context: []const u8,
867 context_alignment: Alignment,
868 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
869) Io.ConcurrentError!*std.Io.AnyFuture {
870 assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
871 assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
872 assert(result_len <= Fiber.max_result_size); // TODO
873 assert(context.len <= Fiber.max_context_size); // TODO
874
875 const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
876 const fiber = try Fiber.allocate(event_loop);
877 std.log.debug("allocated {*}", .{fiber});
878
879 const closure: *AsyncClosure = .fromFiber(fiber);
880 fiber.* = .{
881 .required_align = {},
882 .context = switch (builtin.cpu.arch) {
883 .x86_64 => .{
884 .rsp = @intFromPtr(closure) - @sizeOf(usize),
885 .rbp = 0,
886 .rip = @intFromPtr(&fiberEntry),
887 },
888 .aarch64 => .{
889 .sp = @intFromPtr(closure),
890 .fp = 0,
891 .pc = @intFromPtr(&fiberEntry),
892 },
893 else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
894 },
895 .awaiter = null,
896 .queue_next = null,
897 .cancel_thread = null,
898 .awaiting_completions = .initEmpty(),
899 };
900 closure.* = .{
901 .event_loop = event_loop,
902 .fiber = fiber,
903 .start = start,
904 .result_align = result_alignment,
905 .already_awaited = false,
906 };
907 @memcpy(closure.contextPointer(), context);
908
909 event_loop.schedule(.current(), .{ .head = fiber, .tail = fiber });
910 return @ptrCast(fiber);
911}
912
913fn await(
914 userdata: ?*anyopaque,
915 any_future: *std.Io.AnyFuture,
916 result: []u8,
917 result_alignment: Alignment,
918) void {
919 const event_loop: *EventLoop = @ptrCast(@alignCast(userdata));
920 const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
921 if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
922 event_loop.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
923 @memcpy(result, future_fiber.resultBytes(result_alignment));
924 event_loop.recycle(future_fiber);
925}
926
927fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
928 const el: *EventLoop = @ptrCast(@alignCast(userdata));
929
930 // Optimization to avoid the yield below.
931 for (futures, 0..) |any_future, i| {
932 const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
933 if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) == Fiber.finished)
934 return i;
935 }
936
937 el.yield(null, .{ .register_select = futures });
938
939 std.log.debug("back from select yield", .{});
940
941 const my_thread: *Thread = .current();
942 const my_fiber = my_thread.currentFiber();
943 var result: ?usize = null;
944
945 for (futures, 0..) |any_future, i| {
946 const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
947 if (@cmpxchgStrong(?*Fiber, &future_fiber.awaiter, my_fiber, null, .seq_cst, .seq_cst)) |awaiter| {
948 if (awaiter == Fiber.finished) {
949 if (result == null) result = i;
950 } else if (awaiter) |a| {
951 const closure: *AsyncClosure = .fromFiber(a);
952 closure.already_awaited = false;
953 }
954 } else {
955 const closure: *AsyncClosure = .fromFiber(my_fiber);
956 closure.already_awaited = false;
957 }
958 }
959
960 return result.?;
961}
962
963fn cancel(
964 userdata: ?*anyopaque,
965 any_future: *std.Io.AnyFuture,
966 result: []u8,
967 result_alignment: Alignment,
968) void {
969 const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
970 if (@atomicRmw(
971 ?*Thread,
972 &future_fiber.cancel_thread,
973 .Xchg,
974 Thread.canceling,
975 .acq_rel,
976 )) |cancel_thread| if (cancel_thread != Thread.canceling) {
977 getSqe(&Thread.current().io_uring).* = .{
978 .opcode = .MSG_RING,
979 .flags = std.os.linux.IOSQE_CQE_SKIP_SUCCESS,
980 .ioprio = 0,
981 .fd = cancel_thread.io_uring.fd,
982 .off = @intFromPtr(future_fiber),
983 .addr = 0,
984 .len = @bitCast(-@as(i32, @intFromEnum(std.os.linux.E.INTR))),
985 .rw_flags = 0,
986 .user_data = @intFromEnum(Completion.UserData.cleanup),
987 .buf_index = 0,
988 .personality = 0,
989 .splice_fd_in = 0,
990 .addr3 = 0,
991 .resv = 0,
992 };
993 };
994 await(userdata, any_future, result, result_alignment);
995}
996
997fn cancelRequested(userdata: ?*anyopaque) bool {
998 _ = userdata;
999 return @atomicLoad(?*Thread, &Thread.current().currentFiber().cancel_thread, .acquire) == Thread.canceling;
1000}
1001
1002fn createFile(
1003 userdata: ?*anyopaque,
1004 dir: Io.Dir,
1005 sub_path: []const u8,
1006 flags: Io.File.CreateFlags,
1007) Io.File.OpenError!Io.File {
1008 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1009 const thread: *Thread = .current();
1010 const iou = &thread.io_uring;
1011 const fiber = thread.currentFiber();
1012 try fiber.enterCancelRegion(thread);
1013
1014 const posix = std.posix;
1015 const sub_path_c = try posix.toPosixPath(sub_path);
1016
1017 var os_flags: posix.O = .{
1018 .ACCMODE = if (flags.read) .RDWR else .WRONLY,
1019 .CREAT = true,
1020 .TRUNC = flags.truncate,
1021 .EXCL = flags.exclusive,
1022 };
1023 if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
1024 if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
1025
1026 // Use the O locking flags if the os supports them to acquire the lock
1027 // atomically. Note that the NONBLOCK flag is removed after the openat()
1028 // call is successful.
1029 const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
1030 if (has_flock_open_flags) switch (flags.lock) {
1031 .none => {},
1032 .shared => {
1033 os_flags.SHLOCK = true;
1034 os_flags.NONBLOCK = flags.lock_nonblocking;
1035 },
1036 .exclusive => {
1037 os_flags.EXLOCK = true;
1038 os_flags.NONBLOCK = flags.lock_nonblocking;
1039 },
1040 };
1041 const have_flock = @TypeOf(posix.system.flock) != void;
1042
1043 if (have_flock and !has_flock_open_flags and flags.lock != .none) {
1044 @panic("TODO");
1045 }
1046
1047 if (has_flock_open_flags and flags.lock_nonblocking) {
1048 @panic("TODO");
1049 }
1050
1051 getSqe(iou).* = .{
1052 .opcode = .OPENAT,
1053 .flags = 0,
1054 .ioprio = 0,
1055 .fd = dir.handle,
1056 .off = 0,
1057 .addr = @intFromPtr(&sub_path_c),
1058 .len = @intCast(flags.mode),
1059 .rw_flags = @bitCast(os_flags),
1060 .user_data = @intFromPtr(fiber),
1061 .buf_index = 0,
1062 .personality = 0,
1063 .splice_fd_in = 0,
1064 .addr3 = 0,
1065 .resv = 0,
1066 };
1067
1068 el.yield(null, .nothing);
1069 fiber.exitCancelRegion(thread);
1070
1071 const completion = fiber.resultPointer(Completion);
1072 switch (errno(completion.result)) {
1073 .SUCCESS => return .{ .handle = completion.result },
1074 .INTR => unreachable,
1075 .CANCELED => return error.Canceled,
1076
1077 .FAULT => unreachable,
1078 .INVAL => return error.BadPathName,
1079 .BADF => unreachable,
1080 .ACCES => return error.AccessDenied,
1081 .FBIG => return error.FileTooBig,
1082 .OVERFLOW => return error.FileTooBig,
1083 .ISDIR => return error.IsDir,
1084 .LOOP => return error.SymLinkLoop,
1085 .MFILE => return error.ProcessFdQuotaExceeded,
1086 .NAMETOOLONG => return error.NameTooLong,
1087 .NFILE => return error.SystemFdQuotaExceeded,
1088 .NODEV => return error.NoDevice,
1089 .NOENT => return error.FileNotFound,
1090 .NOMEM => return error.SystemResources,
1091 .NOSPC => return error.NoSpaceLeft,
1092 .NOTDIR => return error.NotDir,
1093 .PERM => return error.PermissionDenied,
1094 .EXIST => return error.PathAlreadyExists,
1095 .BUSY => return error.DeviceBusy,
1096 .OPNOTSUPP => return error.FileLocksNotSupported,
1097 .AGAIN => return error.WouldBlock,
1098 .TXTBSY => return error.FileBusy,
1099 .NXIO => return error.NoDevice,
1100 else => |err| return posix.unexpectedErrno(err),
1101 }
1102}
1103
1104fn fileOpen(
1105 userdata: ?*anyopaque,
1106 dir: Io.Dir,
1107 sub_path: []const u8,
1108 flags: Io.File.OpenFlags,
1109) Io.File.OpenError!Io.File {
1110 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1111 const thread: *Thread = .current();
1112 const iou = &thread.io_uring;
1113 const fiber = thread.currentFiber();
1114 try fiber.enterCancelRegion(thread);
1115
1116 const posix = std.posix;
1117 const sub_path_c = try posix.toPosixPath(sub_path);
1118
1119 var os_flags: posix.O = .{
1120 .ACCMODE = switch (flags.mode) {
1121 .read_only => .RDONLY,
1122 .write_only => .WRONLY,
1123 .read_write => .RDWR,
1124 },
1125 };
1126
1127 if (@hasField(posix.O, "CLOEXEC")) os_flags.CLOEXEC = true;
1128 if (@hasField(posix.O, "LARGEFILE")) os_flags.LARGEFILE = true;
1129 if (@hasField(posix.O, "NOCTTY")) os_flags.NOCTTY = !flags.allow_ctty;
1130
1131 // Use the O locking flags if the os supports them to acquire the lock
1132 // atomically.
1133 const has_flock_open_flags = @hasField(posix.O, "EXLOCK");
1134 if (has_flock_open_flags) {
1135 // Note that the NONBLOCK flag is removed after the openat() call
1136 // is successful.
1137 switch (flags.lock) {
1138 .none => {},
1139 .shared => {
1140 os_flags.SHLOCK = true;
1141 os_flags.NONBLOCK = flags.lock_nonblocking;
1142 },
1143 .exclusive => {
1144 os_flags.EXLOCK = true;
1145 os_flags.NONBLOCK = flags.lock_nonblocking;
1146 },
1147 }
1148 }
1149 const have_flock = @TypeOf(posix.system.flock) != void;
1150
1151 if (have_flock and !has_flock_open_flags and flags.lock != .none) {
1152 @panic("TODO");
1153 }
1154
1155 if (has_flock_open_flags and flags.lock_nonblocking) {
1156 @panic("TODO");
1157 }
1158
1159 getSqe(iou).* = .{
1160 .opcode = .OPENAT,
1161 .flags = 0,
1162 .ioprio = 0,
1163 .fd = dir.handle,
1164 .off = 0,
1165 .addr = @intFromPtr(&sub_path_c),
1166 .len = 0,
1167 .rw_flags = @bitCast(os_flags),
1168 .user_data = @intFromPtr(fiber),
1169 .buf_index = 0,
1170 .personality = 0,
1171 .splice_fd_in = 0,
1172 .addr3 = 0,
1173 .resv = 0,
1174 };
1175
1176 el.yield(null, .nothing);
1177 fiber.exitCancelRegion(thread);
1178
1179 const completion = fiber.resultPointer(Completion);
1180 switch (errno(completion.result)) {
1181 .SUCCESS => return .{ .handle = completion.result },
1182 .INTR => unreachable,
1183 .CANCELED => return error.Canceled,
1184
1185 .FAULT => unreachable,
1186 .INVAL => return error.BadPathName,
1187 .BADF => unreachable,
1188 .ACCES => return error.AccessDenied,
1189 .FBIG => return error.FileTooBig,
1190 .OVERFLOW => return error.FileTooBig,
1191 .ISDIR => return error.IsDir,
1192 .LOOP => return error.SymLinkLoop,
1193 .MFILE => return error.ProcessFdQuotaExceeded,
1194 .NAMETOOLONG => return error.NameTooLong,
1195 .NFILE => return error.SystemFdQuotaExceeded,
1196 .NODEV => return error.NoDevice,
1197 .NOENT => return error.FileNotFound,
1198 .NOMEM => return error.SystemResources,
1199 .NOSPC => return error.NoSpaceLeft,
1200 .NOTDIR => return error.NotDir,
1201 .PERM => return error.PermissionDenied,
1202 .EXIST => return error.PathAlreadyExists,
1203 .BUSY => return error.DeviceBusy,
1204 .OPNOTSUPP => return error.FileLocksNotSupported,
1205 .AGAIN => return error.WouldBlock,
1206 .TXTBSY => return error.FileBusy,
1207 .NXIO => return error.NoDevice,
1208 else => |err| return posix.unexpectedErrno(err),
1209 }
1210}
1211
1212fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
1213 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1214 const thread: *Thread = .current();
1215 const iou = &thread.io_uring;
1216 const fiber = thread.currentFiber();
1217
1218 getSqe(iou).* = .{
1219 .opcode = .CLOSE,
1220 .flags = 0,
1221 .ioprio = 0,
1222 .fd = file.handle,
1223 .off = 0,
1224 .addr = 0,
1225 .len = 0,
1226 .rw_flags = 0,
1227 .user_data = @intFromPtr(fiber),
1228 .buf_index = 0,
1229 .personality = 0,
1230 .splice_fd_in = 0,
1231 .addr3 = 0,
1232 .resv = 0,
1233 };
1234
1235 el.yield(null, .nothing);
1236
1237 const completion = fiber.resultPointer(Completion);
1238 switch (errno(completion.result)) {
1239 .SUCCESS => return,
1240 .INTR => unreachable,
1241 .CANCELED => return,
1242
1243 .BADF => unreachable, // Always a race condition.
1244 else => return,
1245 }
1246}
1247
1248fn pread(userdata: ?*anyopaque, file: Io.File, buffer: []u8, offset: std.posix.off_t) Io.File.PReadError!usize {
1249 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1250 const thread: *Thread = .current();
1251 const iou = &thread.io_uring;
1252 const fiber = thread.currentFiber();
1253 try fiber.enterCancelRegion(thread);
1254
1255 getSqe(iou).* = .{
1256 .opcode = .READ,
1257 .flags = 0,
1258 .ioprio = 0,
1259 .fd = file.handle,
1260 .off = @bitCast(offset),
1261 .addr = @intFromPtr(buffer.ptr),
1262 .len = @min(buffer.len, 0x7ffff000),
1263 .rw_flags = 0,
1264 .user_data = @intFromPtr(fiber),
1265 .buf_index = 0,
1266 .personality = 0,
1267 .splice_fd_in = 0,
1268 .addr3 = 0,
1269 .resv = 0,
1270 };
1271
1272 el.yield(null, .nothing);
1273 fiber.exitCancelRegion(thread);
1274
1275 const completion = fiber.resultPointer(Completion);
1276 switch (errno(completion.result)) {
1277 .SUCCESS => return @as(u32, @bitCast(completion.result)),
1278 .INTR => unreachable,
1279 .CANCELED => return error.Canceled,
1280
1281 .INVAL => unreachable,
1282 .FAULT => unreachable,
1283 .NOENT => return error.ProcessNotFound,
1284 .AGAIN => return error.WouldBlock,
1285 .BADF => return error.NotOpenForReading, // Can be a race condition.
1286 .IO => return error.InputOutput,
1287 .ISDIR => return error.IsDir,
1288 .NOBUFS => return error.SystemResources,
1289 .NOMEM => return error.SystemResources,
1290 .NOTCONN => return error.SocketUnconnected,
1291 .CONNRESET => return error.ConnectionResetByPeer,
1292 .TIMEDOUT => return error.Timeout,
1293 .NXIO => return error.Unseekable,
1294 .SPIPE => return error.Unseekable,
1295 .OVERFLOW => return error.Unseekable,
1296 else => |err| return std.posix.unexpectedErrno(err),
1297 }
1298}
1299
1300fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: std.posix.off_t) Io.File.PWriteError!usize {
1301 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1302 const thread: *Thread = .current();
1303 const iou = &thread.io_uring;
1304 const fiber = thread.currentFiber();
1305 try fiber.enterCancelRegion(thread);
1306
1307 getSqe(iou).* = .{
1308 .opcode = .WRITE,
1309 .flags = 0,
1310 .ioprio = 0,
1311 .fd = file.handle,
1312 .off = @bitCast(offset),
1313 .addr = @intFromPtr(buffer.ptr),
1314 .len = @min(buffer.len, 0x7ffff000),
1315 .rw_flags = 0,
1316 .user_data = @intFromPtr(fiber),
1317 .buf_index = 0,
1318 .personality = 0,
1319 .splice_fd_in = 0,
1320 .addr3 = 0,
1321 .resv = 0,
1322 };
1323
1324 el.yield(null, .nothing);
1325 fiber.exitCancelRegion(thread);
1326
1327 const completion = fiber.resultPointer(Completion);
1328 switch (errno(completion.result)) {
1329 .SUCCESS => return @as(u32, @bitCast(completion.result)),
1330 .INTR => unreachable,
1331 .CANCELED => return error.Canceled,
1332
1333 .INVAL => return error.InvalidArgument,
1334 .FAULT => unreachable,
1335 .NOENT => return error.ProcessNotFound,
1336 .AGAIN => return error.WouldBlock,
1337 .BADF => return error.NotOpenForWriting, // can be a race condition.
1338 .DESTADDRREQ => unreachable, // `connect` was never called.
1339 .DQUOT => return error.DiskQuota,
1340 .FBIG => return error.FileTooBig,
1341 .IO => return error.InputOutput,
1342 .NOSPC => return error.NoSpaceLeft,
1343 .ACCES => return error.AccessDenied,
1344 .PERM => return error.PermissionDenied,
1345 .PIPE => return error.BrokenPipe,
1346 .NXIO => return error.Unseekable,
1347 .SPIPE => return error.Unseekable,
1348 .OVERFLOW => return error.Unseekable,
1349 .BUSY => return error.DeviceBusy,
1350 .CONNRESET => return error.ConnectionResetByPeer,
1351 .MSGSIZE => return error.MessageOversize,
1352 else => |err| return std.posix.unexpectedErrno(err),
1353 }
1354}
1355
1356fn now(userdata: ?*anyopaque, clockid: std.posix.clockid_t) Io.ClockGetTimeError!Io.Timestamp {
1357 _ = userdata;
1358 const timespec = try std.posix.clock_gettime(clockid);
1359 return @enumFromInt(@as(i128, timespec.sec) * std.time.ns_per_s + timespec.nsec);
1360}
1361
1362fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.Deadline) Io.SleepError!void {
1363 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1364 const thread: *Thread = .current();
1365 const iou = &thread.io_uring;
1366 const fiber = thread.currentFiber();
1367 try fiber.enterCancelRegion(thread);
1368
1369 const deadline_nanoseconds: i96 = switch (deadline) {
1370 .duration => |duration| duration.nanoseconds,
1371 .timestamp => |timestamp| @intFromEnum(timestamp),
1372 };
1373 const timespec: std.os.linux.kernel_timespec = .{
1374 .sec = @intCast(@divFloor(deadline_nanoseconds, std.time.ns_per_s)),
1375 .nsec = @intCast(@mod(deadline_nanoseconds, std.time.ns_per_s)),
1376 };
1377 getSqe(iou).* = .{
1378 .opcode = .TIMEOUT,
1379 .flags = 0,
1380 .ioprio = 0,
1381 .fd = 0,
1382 .off = 0,
1383 .addr = @intFromPtr(×pec),
1384 .len = 1,
1385 .rw_flags = @as(u32, switch (deadline) {
1386 .duration => 0,
1387 .timestamp => std.os.linux.IORING_TIMEOUT_ABS,
1388 }) | @as(u32, switch (clockid) {
1389 .REALTIME => std.os.linux.IORING_TIMEOUT_REALTIME,
1390 .MONOTONIC => 0,
1391 .BOOTTIME => std.os.linux.IORING_TIMEOUT_BOOTTIME,
1392 else => return error.UnsupportedClock,
1393 }),
1394 .user_data = @intFromPtr(fiber),
1395 .buf_index = 0,
1396 .personality = 0,
1397 .splice_fd_in = 0,
1398 .addr3 = 0,
1399 .resv = 0,
1400 };
1401
1402 el.yield(null, .nothing);
1403 fiber.exitCancelRegion(thread);
1404
1405 const completion = fiber.resultPointer(Completion);
1406 switch (errno(completion.result)) {
1407 .SUCCESS, .TIME => return,
1408 .INTR => unreachable,
1409 .CANCELED => return error.Canceled,
1410
1411 else => |err| return std.posix.unexpectedErrno(err),
1412 }
1413}
1414
1415fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) error{Canceled}!void {
1416 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1417 el.yield(null, .{ .mutex_lock = .{ .prev_state = prev_state, .mutex = mutex } });
1418}
1419fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
1420 var maybe_waiting_fiber: ?*Fiber = @ptrFromInt(@intFromEnum(prev_state));
1421 while (if (maybe_waiting_fiber) |waiting_fiber| @cmpxchgWeak(
1422 Io.Mutex.State,
1423 &mutex.state,
1424 @enumFromInt(@intFromPtr(waiting_fiber)),
1425 @enumFromInt(@intFromPtr(waiting_fiber.queue_next)),
1426 .release,
1427 .acquire,
1428 ) else @cmpxchgWeak(
1429 Io.Mutex.State,
1430 &mutex.state,
1431 .locked_once,
1432 .unlocked,
1433 .release,
1434 .acquire,
1435 ) orelse return) |next_state| maybe_waiting_fiber = @ptrFromInt(@intFromEnum(next_state));
1436 maybe_waiting_fiber.?.queue_next = null;
1437 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1438 el.yield(maybe_waiting_fiber.?, .reschedule);
1439}
1440
1441const ConditionImpl = struct {
1442 tail: *Fiber,
1443 event: union(enum) {
1444 queued,
1445 wake: Io.Condition.Wake,
1446 },
1447};
1448
1449fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
1450 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1451 el.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
1452 const thread = Thread.current();
1453 const fiber = thread.currentFiber();
1454 const cond_impl = fiber.resultPointer(ConditionImpl);
1455 try mutex.lock(el.io());
1456 switch (cond_impl.event) {
1457 .queued => {},
1458 .wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) {
1459 .one => if (@cmpxchgStrong(
1460 ?*Fiber,
1461 @as(*?*Fiber, @ptrCast(&cond.state)),
1462 null,
1463 next_fiber,
1464 .release,
1465 .acquire,
1466 )) |old_fiber| {
1467 const old_cond_impl = old_fiber.?.resultPointer(ConditionImpl);
1468 assert(old_cond_impl.tail.queue_next == null);
1469 old_cond_impl.tail.queue_next = next_fiber;
1470 old_cond_impl.tail = cond_impl.tail;
1471 },
1472 .all => el.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }),
1473 },
1474 }
1475 fiber.queue_next = null;
1476}
1477
1478fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
1479 const el: *EventLoop = @ptrCast(@alignCast(userdata));
1480 const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
1481 waiting_fiber.resultPointer(ConditionImpl).event = .{ .wake = wake };
1482 el.yield(waiting_fiber, .reschedule);
1483}
1484
1485fn errno(signed: i32) std.os.linux.E {
1486 return .init(@bitCast(@as(isize, signed)));
1487}
1488
1489fn getSqe(iou: *IoUring) *std.os.linux.io_uring_sqe {
1490 while (true) return iou.get_sqe() catch {
1491 _ = iou.submit_and_wait(0) catch |err| switch (err) {
1492 error.SignalInterrupt => std.log.warn("submit_and_wait failed with SignalInterrupt", .{}),
1493 else => |e| @panic(@errorName(e)),
1494 };
1495 continue;
1496 };
1497}