master
1const builtin = @import("builtin");
2const is_windows = builtin.os.tag == .windows;
3
4const std = @import("std.zig");
5const windows = std.os.windows;
6const posix = std.posix;
7const math = std.math;
8const assert = std.debug.assert;
9const Allocator = std.mem.Allocator;
10const Alignment = std.mem.Alignment;
11
12pub const Limit = enum(usize) {
13 nothing = 0,
14 unlimited = std.math.maxInt(usize),
15 _,
16
17 /// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`.
18 pub fn limited(n: usize) Limit {
19 return @enumFromInt(n);
20 }
21
22 /// Any value grater than `std.math.maxInt(usize)` is interpreted to mean
23 /// `.unlimited`.
24 pub fn limited64(n: u64) Limit {
25 return @enumFromInt(@min(n, std.math.maxInt(usize)));
26 }
27
28 pub fn countVec(data: []const []const u8) Limit {
29 var total: usize = 0;
30 for (data) |d| total += d.len;
31 return .limited(total);
32 }
33
34 pub fn min(a: Limit, b: Limit) Limit {
35 return @enumFromInt(@min(@intFromEnum(a), @intFromEnum(b)));
36 }
37
38 pub fn minInt(l: Limit, n: usize) usize {
39 return @min(n, @intFromEnum(l));
40 }
41
42 pub fn minInt64(l: Limit, n: u64) usize {
43 return @min(n, @intFromEnum(l));
44 }
45
46 pub fn slice(l: Limit, s: []u8) []u8 {
47 return s[0..l.minInt(s.len)];
48 }
49
50 pub fn sliceConst(l: Limit, s: []const u8) []const u8 {
51 return s[0..l.minInt(s.len)];
52 }
53
54 pub fn toInt(l: Limit) ?usize {
55 return switch (l) {
56 else => @intFromEnum(l),
57 .unlimited => null,
58 };
59 }
60
61 /// Reduces a slice to account for the limit, leaving room for one extra
62 /// byte above the limit, allowing for the use case of differentiating
63 /// between end-of-stream and reaching the limit.
64 pub fn slice1(l: Limit, non_empty_buffer: []u8) []u8 {
65 assert(non_empty_buffer.len >= 1);
66 return non_empty_buffer[0..@min(@intFromEnum(l) +| 1, non_empty_buffer.len)];
67 }
68
69 pub fn nonzero(l: Limit) bool {
70 return @intFromEnum(l) > 0;
71 }
72
73 /// Return a new limit reduced by `amount` or return `null` indicating
74 /// limit would be exceeded.
75 pub fn subtract(l: Limit, amount: usize) ?Limit {
76 if (l == .unlimited) return .unlimited;
77 if (amount > @intFromEnum(l)) return null;
78 return @enumFromInt(@intFromEnum(l) - amount);
79 }
80};
81
82pub const Reader = @import("Io/Reader.zig");
83pub const Writer = @import("Io/Writer.zig");
84
85pub const tty = @import("Io/tty.zig");
86
87pub fn poll(
88 gpa: Allocator,
89 comptime StreamEnum: type,
90 files: PollFiles(StreamEnum),
91) Poller(StreamEnum) {
92 const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
93 var result: Poller(StreamEnum) = .{
94 .gpa = gpa,
95 .readers = @splat(.failing),
96 .poll_fds = undefined,
97 .windows = if (is_windows) .{
98 .first_read_done = false,
99 .overlapped = [1]windows.OVERLAPPED{
100 std.mem.zeroes(windows.OVERLAPPED),
101 } ** enum_fields.len,
102 .small_bufs = undefined,
103 .active = .{
104 .count = 0,
105 .handles_buf = undefined,
106 .stream_map = undefined,
107 },
108 } else {},
109 };
110
111 inline for (enum_fields, 0..) |field, i| {
112 if (is_windows) {
113 result.windows.active.handles_buf[i] = @field(files, field.name).handle;
114 } else {
115 result.poll_fds[i] = .{
116 .fd = @field(files, field.name).handle,
117 .events = posix.POLL.IN,
118 .revents = undefined,
119 };
120 }
121 }
122
123 return result;
124}
125
126pub fn Poller(comptime StreamEnum: type) type {
127 return struct {
128 const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
129 const PollFd = if (is_windows) void else posix.pollfd;
130
131 gpa: Allocator,
132 readers: [enum_fields.len]Reader,
133 poll_fds: [enum_fields.len]PollFd,
134 windows: if (is_windows) struct {
135 first_read_done: bool,
136 overlapped: [enum_fields.len]windows.OVERLAPPED,
137 small_bufs: [enum_fields.len][128]u8,
138 active: struct {
139 count: math.IntFittingRange(0, enum_fields.len),
140 handles_buf: [enum_fields.len]windows.HANDLE,
141 stream_map: [enum_fields.len]StreamEnum,
142
143 pub fn removeAt(self: *@This(), index: u32) void {
144 assert(index < self.count);
145 for (index + 1..self.count) |i| {
146 self.handles_buf[i - 1] = self.handles_buf[i];
147 self.stream_map[i - 1] = self.stream_map[i];
148 }
149 self.count -= 1;
150 }
151 },
152 } else void,
153
154 const Self = @This();
155
156 pub fn deinit(self: *Self) void {
157 const gpa = self.gpa;
158 if (is_windows) {
159 // cancel any pending IO to prevent clobbering OVERLAPPED value
160 for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
161 _ = windows.kernel32.CancelIo(h);
162 }
163 }
164 inline for (&self.readers) |*r| gpa.free(r.buffer);
165 self.* = undefined;
166 }
167
168 pub fn poll(self: *Self) !bool {
169 if (is_windows) {
170 return pollWindows(self, null);
171 } else {
172 return pollPosix(self, null);
173 }
174 }
175
176 pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool {
177 if (is_windows) {
178 return pollWindows(self, nanoseconds);
179 } else {
180 return pollPosix(self, nanoseconds);
181 }
182 }
183
184 pub fn reader(self: *Self, which: StreamEnum) *Reader {
185 return &self.readers[@intFromEnum(which)];
186 }
187
188 pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 {
189 const gpa = self.gpa;
190 const r = reader(self, which);
191 if (r.seek == 0) {
192 const new = try gpa.realloc(r.buffer, r.end);
193 r.buffer = &.{};
194 r.end = 0;
195 return new;
196 }
197 const new = try gpa.dupe(u8, r.buffered());
198 gpa.free(r.buffer);
199 r.buffer = &.{};
200 r.seek = 0;
201 r.end = 0;
202 return new;
203 }
204
205 fn pollWindows(self: *Self, nanoseconds: ?u64) !bool {
206 const bump_amt = 512;
207 const gpa = self.gpa;
208
209 if (!self.windows.first_read_done) {
210 var already_read_data = false;
211 for (0..enum_fields.len) |i| {
212 const handle = self.windows.active.handles_buf[i];
213 switch (try windowsAsyncReadToFifoAndQueueSmallRead(
214 gpa,
215 handle,
216 &self.windows.overlapped[i],
217 &self.readers[i],
218 &self.windows.small_bufs[i],
219 bump_amt,
220 )) {
221 .populated, .empty => |state| {
222 if (state == .populated) already_read_data = true;
223 self.windows.active.handles_buf[self.windows.active.count] = handle;
224 self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i));
225 self.windows.active.count += 1;
226 },
227 .closed => {}, // don't add to the wait_objects list
228 .closed_populated => {
229 // don't add to the wait_objects list, but we did already get data
230 already_read_data = true;
231 },
232 }
233 }
234 self.windows.first_read_done = true;
235 if (already_read_data) return true;
236 }
237
238 while (true) {
239 if (self.windows.active.count == 0) return false;
240
241 const status = windows.kernel32.WaitForMultipleObjects(
242 self.windows.active.count,
243 &self.windows.active.handles_buf,
244 0,
245 if (nanoseconds) |ns|
246 @min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1)
247 else
248 windows.INFINITE,
249 );
250 if (status == windows.WAIT_FAILED)
251 return windows.unexpectedError(windows.GetLastError());
252 if (status == windows.WAIT_TIMEOUT)
253 return true;
254
255 if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1)
256 unreachable;
257
258 const active_idx = status - windows.WAIT_OBJECT_0;
259
260 const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]);
261 const handle = self.windows.active.handles_buf[active_idx];
262
263 const overlapped = &self.windows.overlapped[stream_idx];
264 const stream_reader = &self.readers[stream_idx];
265 const small_buf = &self.windows.small_bufs[stream_idx];
266
267 const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
268 .success => |n| n,
269 .closed => {
270 self.windows.active.removeAt(active_idx);
271 continue;
272 },
273 .aborted => unreachable,
274 };
275 const buf = small_buf[0..num_bytes_read];
276 const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len);
277 @memcpy(dest[0..buf.len], buf);
278 advanceBufferEnd(stream_reader, buf.len);
279
280 switch (try windowsAsyncReadToFifoAndQueueSmallRead(
281 gpa,
282 handle,
283 overlapped,
284 stream_reader,
285 small_buf,
286 bump_amt,
287 )) {
288 .empty => {}, // irrelevant, we already got data from the small buffer
289 .populated => {},
290 .closed,
291 .closed_populated, // identical, since we already got data from the small buffer
292 => self.windows.active.removeAt(active_idx),
293 }
294 return true;
295 }
296 }
297
298 fn pollPosix(self: *Self, nanoseconds: ?u64) !bool {
299 const gpa = self.gpa;
300 // We ask for ensureUnusedCapacity with this much extra space. This
301 // has more of an effect on small reads because once the reads
302 // start to get larger the amount of space an ArrayList will
303 // allocate grows exponentially.
304 const bump_amt = 512;
305
306 const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP;
307
308 const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns|
309 std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32)
310 else
311 -1);
312 if (events_len == 0) {
313 for (self.poll_fds) |poll_fd| {
314 if (poll_fd.fd != -1) return true;
315 } else return false;
316 }
317
318 var keep_polling = false;
319 for (&self.poll_fds, &self.readers) |*poll_fd, *r| {
320 // Try reading whatever is available before checking the error
321 // conditions.
322 // It's still possible to read after a POLL.HUP is received,
323 // always check if there's some data waiting to be read first.
324 if (poll_fd.revents & posix.POLL.IN != 0) {
325 const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
326 const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) {
327 error.BrokenPipe => 0, // Handle the same as EOF.
328 else => |e| return e,
329 };
330 advanceBufferEnd(r, amt);
331 if (amt == 0) {
332 // Remove the fd when the EOF condition is met.
333 poll_fd.fd = -1;
334 } else {
335 keep_polling = true;
336 }
337 } else if (poll_fd.revents & err_mask != 0) {
338 // Exclude the fds that signaled an error.
339 poll_fd.fd = -1;
340 } else if (poll_fd.fd != -1) {
341 keep_polling = true;
342 }
343 }
344 return keep_polling;
345 }
346
347 /// Returns a slice into the unused capacity of `buffer` with at least
348 /// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary.
349 ///
350 /// After calling this function, typically the caller will follow up with a
351 /// call to `advanceBufferEnd` to report the actual number of bytes buffered.
352 fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 {
353 {
354 const unused = r.buffer[r.end..];
355 if (unused.len >= min_len) return unused;
356 }
357 if (r.seek > 0) {
358 const data = r.buffer[r.seek..r.end];
359 @memmove(r.buffer[0..data.len], data);
360 r.seek = 0;
361 r.end = data.len;
362 }
363 {
364 var list: std.ArrayList(u8) = .{
365 .items = r.buffer[0..r.end],
366 .capacity = r.buffer.len,
367 };
368 defer r.buffer = list.allocatedSlice();
369 try list.ensureUnusedCapacity(allocator, min_len);
370 }
371 const unused = r.buffer[r.end..];
372 assert(unused.len >= min_len);
373 return unused;
374 }
375
376 /// After writing directly into the unused capacity of `buffer`, this function
377 /// updates `end` so that users of `Reader` can receive the data.
378 fn advanceBufferEnd(r: *Reader, n: usize) void {
379 assert(n <= r.buffer.len - r.end);
380 r.end += n;
381 }
382
383 /// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
384 /// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
385 /// compatibility, we point it to this dummy variables, which we never otherwise access.
386 /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
387 var win_dummy_bytes_read: u32 = undefined;
388
389 /// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
390 /// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
391 /// is available. `handle` must have no pending asynchronous operation.
392 fn windowsAsyncReadToFifoAndQueueSmallRead(
393 gpa: Allocator,
394 handle: windows.HANDLE,
395 overlapped: *windows.OVERLAPPED,
396 r: *Reader,
397 small_buf: *[128]u8,
398 bump_amt: usize,
399 ) !enum { empty, populated, closed_populated, closed } {
400 var read_any_data = false;
401 while (true) {
402 const fifo_read_pending = while (true) {
403 const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
404 const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
405
406 if (0 == windows.kernel32.ReadFile(
407 handle,
408 buf.ptr,
409 buf_len,
410 &win_dummy_bytes_read,
411 overlapped,
412 )) switch (windows.GetLastError()) {
413 .IO_PENDING => break true,
414 .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
415 else => |err| return windows.unexpectedError(err),
416 };
417
418 const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
419 .success => |n| n,
420 .closed => return if (read_any_data) .closed_populated else .closed,
421 .aborted => unreachable,
422 };
423
424 read_any_data = true;
425 advanceBufferEnd(r, num_bytes_read);
426
427 if (num_bytes_read == buf_len) {
428 // We filled the buffer, so there's probably more data available.
429 continue;
430 } else {
431 // We didn't fill the buffer, so assume we're out of data.
432 // There is no pending read.
433 break false;
434 }
435 };
436
437 if (fifo_read_pending) cancel_read: {
438 // Cancel the pending read into the FIFO.
439 _ = windows.kernel32.CancelIo(handle);
440
441 // We have to wait for the handle to be signalled, i.e. for the cancellation to complete.
442 switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
443 windows.WAIT_OBJECT_0 => {},
444 windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
445 else => unreachable,
446 }
447
448 // If it completed before we canceled, make sure to tell the FIFO!
449 const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) {
450 .success => |n| n,
451 .closed => return if (read_any_data) .closed_populated else .closed,
452 .aborted => break :cancel_read,
453 };
454 read_any_data = true;
455 advanceBufferEnd(r, num_bytes_read);
456 }
457
458 // Try to queue the 1-byte read.
459 if (0 == windows.kernel32.ReadFile(
460 handle,
461 small_buf,
462 small_buf.len,
463 &win_dummy_bytes_read,
464 overlapped,
465 )) switch (windows.GetLastError()) {
466 .IO_PENDING => {
467 // 1-byte read pending as intended
468 return if (read_any_data) .populated else .empty;
469 },
470 .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
471 else => |err| return windows.unexpectedError(err),
472 };
473
474 // We got data back this time. Write it to the FIFO and run the main loop again.
475 const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
476 .success => |n| n,
477 .closed => return if (read_any_data) .closed_populated else .closed,
478 .aborted => unreachable,
479 };
480 const buf = small_buf[0..num_bytes_read];
481 const dest = try writableSliceGreedyAlloc(r, gpa, buf.len);
482 @memcpy(dest[0..buf.len], buf);
483 advanceBufferEnd(r, buf.len);
484 read_any_data = true;
485 }
486 }
487
488 /// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
489 /// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
490 ///
491 /// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
492 /// operation immediately returns data:
493 /// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
494 /// erroneous results."
495 /// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
496 /// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
497 /// get the actual number of bytes read."
498 /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
499 fn windowsGetReadResult(
500 handle: windows.HANDLE,
501 overlapped: *windows.OVERLAPPED,
502 allow_aborted: bool,
503 ) !union(enum) {
504 success: u32,
505 closed,
506 aborted,
507 } {
508 var num_bytes_read: u32 = undefined;
509 if (0 == windows.kernel32.GetOverlappedResult(
510 handle,
511 overlapped,
512 &num_bytes_read,
513 0,
514 )) switch (windows.GetLastError()) {
515 .BROKEN_PIPE => return .closed,
516 .OPERATION_ABORTED => |err| if (allow_aborted) {
517 return .aborted;
518 } else {
519 return windows.unexpectedError(err);
520 },
521 else => |err| return windows.unexpectedError(err),
522 };
523 return .{ .success = num_bytes_read };
524 }
525 };
526}
527
528/// Given an enum, returns a struct with fields of that enum, each field
529/// representing an I/O stream for polling.
530pub fn PollFiles(comptime StreamEnum: type) type {
531 return @Struct(.auto, null, std.meta.fieldNames(StreamEnum), &@splat(std.fs.File), &@splat(.{}));
532}
533
534test {
535 _ = net;
536 _ = Reader;
537 _ = Writer;
538 _ = tty;
539 _ = Evented;
540 _ = Threaded;
541 _ = @import("Io/test.zig");
542}
543
544const Io = @This();
545
546pub const Evented = switch (builtin.os.tag) {
547 .linux => switch (builtin.cpu.arch) {
548 .x86_64, .aarch64 => @import("Io/IoUring.zig"),
549 else => void, // context-switching code not implemented yet
550 },
551 .dragonfly, .freebsd, .netbsd, .openbsd, .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => switch (builtin.cpu.arch) {
552 .x86_64, .aarch64 => @import("Io/Kqueue.zig"),
553 else => void, // context-switching code not implemented yet
554 },
555 else => void,
556};
557pub const Threaded = @import("Io/Threaded.zig");
558pub const net = @import("Io/net.zig");
559
560userdata: ?*anyopaque,
561vtable: *const VTable,
562
563pub const VTable = struct {
564 /// If it returns `null` it means `result` has been already populated and
565 /// `await` will be a no-op.
566 ///
567 /// When this function returns non-null, the implementation guarantees that
568 /// a unit of concurrency has been assigned to the returned task.
569 ///
570 /// Thread-safe.
571 async: *const fn (
572 /// Corresponds to `Io.userdata`.
573 userdata: ?*anyopaque,
574 /// The pointer of this slice is an "eager" result value.
575 /// The length is the size in bytes of the result type.
576 /// This pointer's lifetime expires directly after the call to this function.
577 result: []u8,
578 result_alignment: std.mem.Alignment,
579 /// Copied and then passed to `start`.
580 context: []const u8,
581 context_alignment: std.mem.Alignment,
582 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
583 ) ?*AnyFuture,
584 /// Thread-safe.
585 concurrent: *const fn (
586 /// Corresponds to `Io.userdata`.
587 userdata: ?*anyopaque,
588 result_len: usize,
589 result_alignment: std.mem.Alignment,
590 /// Copied and then passed to `start`.
591 context: []const u8,
592 context_alignment: std.mem.Alignment,
593 start: *const fn (context: *const anyopaque, result: *anyopaque) void,
594 ) ConcurrentError!*AnyFuture,
595 /// This function is only called when `async` returns a non-null value.
596 ///
597 /// Thread-safe.
598 await: *const fn (
599 /// Corresponds to `Io.userdata`.
600 userdata: ?*anyopaque,
601 /// The same value that was returned from `async`.
602 any_future: *AnyFuture,
603 /// Points to a buffer where the result is written.
604 /// The length is equal to size in bytes of result type.
605 result: []u8,
606 result_alignment: std.mem.Alignment,
607 ) void,
608 /// Equivalent to `await` but initiates cancel request.
609 ///
610 /// This function is only called when `async` returns a non-null value.
611 ///
612 /// Thread-safe.
613 cancel: *const fn (
614 /// Corresponds to `Io.userdata`.
615 userdata: ?*anyopaque,
616 /// The same value that was returned from `async`.
617 any_future: *AnyFuture,
618 /// Points to a buffer where the result is written.
619 /// The length is equal to size in bytes of result type.
620 result: []u8,
621 result_alignment: std.mem.Alignment,
622 ) void,
623 /// Returns whether the current thread of execution is known to have
624 /// been requested to cancel.
625 ///
626 /// Thread-safe.
627 cancelRequested: *const fn (?*anyopaque) bool,
628
629 /// When this function returns, implementation guarantees that `start` has
630 /// either already been called, or a unit of concurrency has been assigned
631 /// to the task of calling the function.
632 ///
633 /// Thread-safe.
634 groupAsync: *const fn (
635 /// Corresponds to `Io.userdata`.
636 userdata: ?*anyopaque,
637 /// Owner of the spawned async task.
638 group: *Group,
639 /// Copied and then passed to `start`.
640 context: []const u8,
641 context_alignment: std.mem.Alignment,
642 start: *const fn (*Group, context: *const anyopaque) void,
643 ) void,
644 /// Thread-safe.
645 groupConcurrent: *const fn (
646 /// Corresponds to `Io.userdata`.
647 userdata: ?*anyopaque,
648 /// Owner of the spawned async task.
649 group: *Group,
650 /// Copied and then passed to `start`.
651 context: []const u8,
652 context_alignment: std.mem.Alignment,
653 start: *const fn (*Group, context: *const anyopaque) void,
654 ) ConcurrentError!void,
655 groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
656 groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
657
658 /// Blocks until one of the futures from the list has a result ready, such
659 /// that awaiting it will not block. Returns that index.
660 select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
661
662 mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
663 mutexLockUncancelable: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
664 mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
665
666 conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void,
667 conditionWaitUncancelable: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) void,
668 conditionWake: *const fn (?*anyopaque, cond: *Condition, wake: Condition.Wake) void,
669
670 dirMake: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void,
671 dirMakePath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void,
672 dirMakeOpenPath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.OpenOptions) Dir.MakeOpenPathError!Dir,
673 dirStat: *const fn (?*anyopaque, Dir) Dir.StatError!Dir.Stat,
674 dirStatPath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.StatPathOptions) Dir.StatPathError!File.Stat,
675 dirAccess: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.AccessOptions) Dir.AccessError!void,
676 dirCreateFile: *const fn (?*anyopaque, Dir, sub_path: []const u8, File.CreateFlags) File.OpenError!File,
677 dirOpenFile: *const fn (?*anyopaque, Dir, sub_path: []const u8, File.OpenFlags) File.OpenError!File,
678 dirOpenDir: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.OpenOptions) Dir.OpenError!Dir,
679 dirClose: *const fn (?*anyopaque, Dir) void,
680 fileStat: *const fn (?*anyopaque, File) File.StatError!File.Stat,
681 fileClose: *const fn (?*anyopaque, File) void,
682 fileWriteStreaming: *const fn (?*anyopaque, File, buffer: [][]const u8) File.WriteStreamingError!usize,
683 fileWritePositional: *const fn (?*anyopaque, File, buffer: [][]const u8, offset: u64) File.WritePositionalError!usize,
684 /// Returns 0 on end of stream.
685 fileReadStreaming: *const fn (?*anyopaque, File, data: [][]u8) File.Reader.Error!usize,
686 /// Returns 0 on end of stream.
687 fileReadPositional: *const fn (?*anyopaque, File, data: [][]u8, offset: u64) File.ReadPositionalError!usize,
688 fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
689 fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
690 openSelfExe: *const fn (?*anyopaque, File.OpenFlags) File.OpenSelfExeError!File,
691
692 now: *const fn (?*anyopaque, Clock) Clock.Error!Timestamp,
693 sleep: *const fn (?*anyopaque, Timeout) SleepError!void,
694
695 netListenIp: *const fn (?*anyopaque, address: net.IpAddress, net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
696 netAccept: *const fn (?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream,
697 netBindIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
698 netConnectIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream,
699 netListenUnix: *const fn (?*anyopaque, *const net.UnixAddress, net.UnixAddress.ListenOptions) net.UnixAddress.ListenError!net.Socket.Handle,
700 netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
701 netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
702 netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
703 /// Returns 0 on end of stream.
704 netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
705 netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
706 netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void,
707 netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface,
708 netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name,
709 netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) void,
710};
711
712pub const Cancelable = error{
713 /// Caller has requested the async operation to stop.
714 Canceled,
715};
716
717pub const UnexpectedError = error{
718 /// The Operating System returned an undocumented error code.
719 ///
720 /// This error is in theory not possible, but it would be better
721 /// to handle this error than to invoke undefined behavior.
722 ///
723 /// When this error code is observed, it usually means the Zig Standard
724 /// Library needs a small patch to add the error code to the error set for
725 /// the respective function.
726 Unexpected,
727};
728
729pub const Dir = @import("Io/Dir.zig");
730pub const File = @import("Io/File.zig");
731
732pub const Clock = enum {
733 /// A settable system-wide clock that measures real (i.e. wall-clock)
734 /// time. This clock is affected by discontinuous jumps in the system
735 /// time (e.g., if the system administrator manually changes the
736 /// clock), and by frequency adjustments performed by NTP and similar
737 /// applications.
738 ///
739 /// This clock normally counts the number of seconds since 1970-01-01
740 /// 00:00:00 Coordinated Universal Time (UTC) except that it ignores
741 /// leap seconds; near a leap second it is typically adjusted by NTP to
742 /// stay roughly in sync with UTC.
743 ///
744 /// Timestamps returned by implementations of this clock represent time
745 /// elapsed since 1970-01-01T00:00:00Z, the POSIX/Unix epoch, ignoring
746 /// leap seconds. This is colloquially known as "Unix time". If the
747 /// underlying OS uses a different epoch for native timestamps (e.g.,
748 /// Windows, which uses 1601-01-01) they are translated accordingly.
749 real,
750 /// A nonsettable system-wide clock that represents time since some
751 /// unspecified point in the past.
752 ///
753 /// Monotonic: Guarantees that the time returned by consecutive calls
754 /// will not go backwards, but successive calls may return identical
755 /// (not-increased) time values.
756 ///
757 /// Not affected by discontinuous jumps in the system time (e.g., if
758 /// the system administrator manually changes the clock), but may be
759 /// affected by frequency adjustments.
760 ///
761 /// This clock expresses intent to **exclude time that the system is
762 /// suspended**. However, implementations may be unable to satisify
763 /// this, and may include that time.
764 ///
765 /// * On Linux, corresponds `CLOCK_MONOTONIC`.
766 /// * On macOS, corresponds to `CLOCK_UPTIME_RAW`.
767 awake,
768 /// Identical to `awake` except it expresses intent to **include time
769 /// that the system is suspended**, however, due to limitations it may
770 /// behave identically to `awake`.
771 ///
772 /// * On Linux, corresponds `CLOCK_BOOTTIME`.
773 /// * On macOS, corresponds to `CLOCK_MONOTONIC_RAW`.
774 boot,
775 /// Tracks the amount of CPU in user or kernel mode used by the calling
776 /// process.
777 cpu_process,
778 /// Tracks the amount of CPU in user or kernel mode used by the calling
779 /// thread.
780 cpu_thread,
781
782 pub const Error = error{UnsupportedClock} || UnexpectedError;
783
784 /// This function is not cancelable because first of all it does not block,
785 /// but more importantly, the cancelation logic itself may want to check
786 /// the time.
787 pub fn now(clock: Clock, io: Io) Error!Io.Timestamp {
788 return io.vtable.now(io.userdata, clock);
789 }
790
791 pub const Timestamp = struct {
792 raw: Io.Timestamp,
793 clock: Clock,
794
795 /// This function is not cancelable because first of all it does not block,
796 /// but more importantly, the cancelation logic itself may want to check
797 /// the time.
798 pub fn now(io: Io, clock: Clock) Error!Clock.Timestamp {
799 return .{
800 .raw = try io.vtable.now(io.userdata, clock),
801 .clock = clock,
802 };
803 }
804
805 pub fn wait(t: Clock.Timestamp, io: Io) SleepError!void {
806 return io.vtable.sleep(io.userdata, .{ .deadline = t });
807 }
808
809 pub fn durationTo(from: Clock.Timestamp, to: Clock.Timestamp) Clock.Duration {
810 assert(from.clock == to.clock);
811 return .{
812 .raw = from.raw.durationTo(to.raw),
813 .clock = from.clock,
814 };
815 }
816
817 pub fn addDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
818 assert(from.clock == duration.clock);
819 return .{
820 .raw = from.raw.addDuration(duration.raw),
821 .clock = from.clock,
822 };
823 }
824
825 pub fn subDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
826 assert(from.clock == duration.clock);
827 return .{
828 .raw = from.raw.subDuration(duration.raw),
829 .clock = from.clock,
830 };
831 }
832
833 pub fn fromNow(io: Io, duration: Clock.Duration) Error!Clock.Timestamp {
834 return .{
835 .clock = duration.clock,
836 .raw = (try duration.clock.now(io)).addDuration(duration.raw),
837 };
838 }
839
840 pub fn untilNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
841 const now_ts = try Clock.Timestamp.now(io, timestamp.clock);
842 return timestamp.durationTo(now_ts);
843 }
844
845 pub fn durationFromNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
846 const now_ts = try timestamp.clock.now(io);
847 return .{
848 .clock = timestamp.clock,
849 .raw = now_ts.durationTo(timestamp.raw),
850 };
851 }
852
853 pub fn toClock(t: Clock.Timestamp, io: Io, clock: Clock) Error!Clock.Timestamp {
854 if (t.clock == clock) return t;
855 const now_old = try t.clock.now(io);
856 const now_new = try clock.now(io);
857 const duration = now_old.durationTo(t);
858 return .{
859 .clock = clock,
860 .raw = now_new.addDuration(duration),
861 };
862 }
863
864 pub fn compare(lhs: Clock.Timestamp, op: std.math.CompareOperator, rhs: Clock.Timestamp) bool {
865 assert(lhs.clock == rhs.clock);
866 return std.math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds);
867 }
868 };
869
870 pub const Duration = struct {
871 raw: Io.Duration,
872 clock: Clock,
873
874 pub fn sleep(duration: Clock.Duration, io: Io) SleepError!void {
875 return io.vtable.sleep(io.userdata, .{ .duration = duration });
876 }
877 };
878};
879
880pub const Timestamp = struct {
881 nanoseconds: i96,
882
883 pub const zero: Timestamp = .{ .nanoseconds = 0 };
884
885 pub fn durationTo(from: Timestamp, to: Timestamp) Duration {
886 return .{ .nanoseconds = to.nanoseconds - from.nanoseconds };
887 }
888
889 pub fn addDuration(from: Timestamp, duration: Duration) Timestamp {
890 return .{ .nanoseconds = from.nanoseconds + duration.nanoseconds };
891 }
892
893 pub fn subDuration(from: Timestamp, duration: Duration) Timestamp {
894 return .{ .nanoseconds = from.nanoseconds - duration.nanoseconds };
895 }
896
897 pub fn withClock(t: Timestamp, clock: Clock) Clock.Timestamp {
898 return .{ .nanoseconds = t.nanoseconds, .clock = clock };
899 }
900
901 pub fn fromNanoseconds(x: i96) Timestamp {
902 return .{ .nanoseconds = x };
903 }
904
905 pub fn toMilliseconds(t: Timestamp) i64 {
906 return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_ms));
907 }
908
909 pub fn toSeconds(t: Timestamp) i64 {
910 return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_s));
911 }
912
913 pub fn toNanoseconds(t: Timestamp) i96 {
914 return t.nanoseconds;
915 }
916
917 pub fn formatNumber(t: Timestamp, w: *std.Io.Writer, n: std.fmt.Number) std.Io.Writer.Error!void {
918 return w.printInt(t.nanoseconds, n.mode.base() orelse 10, n.case, .{
919 .precision = n.precision,
920 .width = n.width,
921 .alignment = n.alignment,
922 .fill = n.fill,
923 });
924 }
925};
926
927pub const Duration = struct {
928 nanoseconds: i96,
929
930 pub const zero: Duration = .{ .nanoseconds = 0 };
931 pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) };
932
933 pub fn fromNanoseconds(x: i96) Duration {
934 return .{ .nanoseconds = x };
935 }
936
937 pub fn fromMilliseconds(x: i64) Duration {
938 return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_ms };
939 }
940
941 pub fn fromSeconds(x: i64) Duration {
942 return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_s };
943 }
944
945 pub fn toMilliseconds(d: Duration) i64 {
946 return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_ms));
947 }
948
949 pub fn toSeconds(d: Duration) i64 {
950 return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_s));
951 }
952
953 pub fn toNanoseconds(d: Duration) i96 {
954 return d.nanoseconds;
955 }
956};
957
958/// Declares under what conditions an operation should return `error.Timeout`.
959pub const Timeout = union(enum) {
960 none,
961 duration: Clock.Duration,
962 deadline: Clock.Timestamp,
963
964 pub const Error = error{ Timeout, UnsupportedClock };
965
966 pub fn toDeadline(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
967 return switch (t) {
968 .none => null,
969 .duration => |d| try .fromNow(io, d),
970 .deadline => |d| d,
971 };
972 }
973
974 pub fn toDurationFromNow(t: Timeout, io: Io) Clock.Error!?Clock.Duration {
975 return switch (t) {
976 .none => null,
977 .duration => |d| d,
978 .deadline => |d| try d.durationFromNow(io),
979 };
980 }
981
982 pub fn sleep(timeout: Timeout, io: Io) SleepError!void {
983 return io.vtable.sleep(io.userdata, timeout);
984 }
985};
986
987pub const AnyFuture = opaque {};
988
989pub fn Future(Result: type) type {
990 return struct {
991 any_future: ?*AnyFuture,
992 result: Result,
993
994 /// Equivalent to `await` but places a cancellation request.
995 ///
996 /// Idempotent. Not threadsafe.
997 pub fn cancel(f: *@This(), io: Io) Result {
998 const any_future = f.any_future orelse return f.result;
999 io.vtable.cancel(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
1000 f.any_future = null;
1001 return f.result;
1002 }
1003
1004 /// Idempotent. Not threadsafe.
1005 pub fn await(f: *@This(), io: Io) Result {
1006 const any_future = f.any_future orelse return f.result;
1007 io.vtable.await(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
1008 f.any_future = null;
1009 return f.result;
1010 }
1011 };
1012}
1013
1014pub const Group = struct {
1015 state: usize,
1016 context: ?*anyopaque,
1017 token: ?*anyopaque,
1018
1019 pub const init: Group = .{ .state = 0, .context = null, .token = null };
1020
1021 /// Calls `function` with `args` asynchronously. The resource spawned is
1022 /// owned by the group.
1023 ///
1024 /// `function` *may* be called immediately, before `async` returns.
1025 ///
1026 /// When this function returns, it is guaranteed that `function` has
1027 /// already been called and completed, or it has successfully been assigned
1028 /// a unit of concurrency.
1029 ///
1030 /// After this is called, `wait` or `cancel` must be called before the
1031 /// group is deinitialized.
1032 ///
1033 /// Threadsafe.
1034 ///
1035 /// See also:
1036 /// * `concurrent`
1037 /// * `Io.async`
1038 pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
1039 const Args = @TypeOf(args);
1040 const TypeErased = struct {
1041 fn start(group: *Group, context: *const anyopaque) void {
1042 _ = group;
1043 const args_casted: *const Args = @ptrCast(@alignCast(context));
1044 @call(.auto, function, args_casted.*);
1045 }
1046 };
1047 io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
1048 }
1049
1050 /// Calls `function` with `args`, such that the function is not guaranteed
1051 /// to have returned until `wait` is called, allowing the caller to
1052 /// progress while waiting for any `Io` operations.
1053 ///
1054 /// The resource spawned is owned by the group; after this is called,
1055 /// `wait` or `cancel` must be called before the group is deinitialized.
1056 ///
1057 /// This has stronger guarantee than `async`, placing restrictions on what kind
1058 /// of `Io` implementations are supported. By calling `async` instead, one
1059 /// allows, for example, stackful single-threaded blocking I/O.
1060 ///
1061 /// Threadsafe.
1062 ///
1063 /// See also:
1064 /// * `async`
1065 /// * `Io.concurrent`
1066 pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
1067 const Args = @TypeOf(args);
1068 const TypeErased = struct {
1069 fn start(group: *Group, context: *const anyopaque) void {
1070 _ = group;
1071 const args_casted: *const Args = @ptrCast(@alignCast(context));
1072 @call(.auto, function, args_casted.*);
1073 }
1074 };
1075 return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
1076 }
1077
1078 /// Blocks until all tasks of the group finish. During this time,
1079 /// cancellation requests propagate to all members of the group.
1080 ///
1081 /// Idempotent. Not threadsafe.
1082 pub fn wait(g: *Group, io: Io) void {
1083 const token = g.token orelse return;
1084 g.token = null;
1085 io.vtable.groupWait(io.userdata, g, token);
1086 }
1087
1088 /// Equivalent to `wait` but immediately requests cancellation on all
1089 /// members of the group.
1090 ///
1091 /// Idempotent. Not threadsafe.
1092 pub fn cancel(g: *Group, io: Io) void {
1093 const token = g.token orelse return;
1094 g.token = null;
1095 io.vtable.groupCancel(io.userdata, g, token);
1096 }
1097};
1098
1099pub fn Select(comptime U: type) type {
1100 return struct {
1101 io: Io,
1102 group: Group,
1103 queue: Queue(U),
1104 outstanding: usize,
1105
1106 const S = @This();
1107
1108 pub const Union = U;
1109
1110 pub const Field = std.meta.FieldEnum(U);
1111
1112 pub fn init(io: Io, buffer: []U) S {
1113 return .{
1114 .io = io,
1115 .queue = .init(buffer),
1116 .group = .init,
1117 .outstanding = 0,
1118 };
1119 }
1120
1121 /// Calls `function` with `args` asynchronously. The resource spawned is
1122 /// owned by the select.
1123 ///
1124 /// `function` must have return type matching the `field` field of `Union`.
1125 ///
1126 /// `function` *may* be called immediately, before `async` returns.
1127 ///
1128 /// When this function returns, it is guaranteed that `function` has
1129 /// already been called and completed, or it has successfully been
1130 /// assigned a unit of concurrency.
1131 ///
1132 /// After this is called, `wait` or `cancel` must be called before the
1133 /// select is deinitialized.
1134 ///
1135 /// Threadsafe.
1136 ///
1137 /// Related:
1138 /// * `Io.async`
1139 /// * `Group.async`
1140 pub fn async(
1141 s: *S,
1142 comptime field: Field,
1143 function: anytype,
1144 args: std.meta.ArgsTuple(@TypeOf(function)),
1145 ) void {
1146 const Args = @TypeOf(args);
1147 const TypeErased = struct {
1148 fn start(group: *Group, context: *const anyopaque) void {
1149 const args_casted: *const Args = @ptrCast(@alignCast(context));
1150 const unerased_select: *S = @fieldParentPtr("group", group);
1151 const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*));
1152 unerased_select.queue.putOneUncancelable(unerased_select.io, elem);
1153 }
1154 };
1155 _ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
1156 s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&args), .of(Args), TypeErased.start);
1157 }
1158
1159 /// Blocks until another task of the select finishes.
1160 ///
1161 /// Asserts there is at least one more `outstanding` task.
1162 ///
1163 /// Not threadsafe.
1164 pub fn wait(s: *S) Cancelable!U {
1165 s.outstanding -= 1;
1166 return s.queue.getOne(s.io);
1167 }
1168
1169 /// Equivalent to `wait` but requests cancellation on all remaining
1170 /// tasks owned by the select.
1171 ///
1172 /// It is illegal to call `wait` after this.
1173 ///
1174 /// Idempotent. Not threadsafe.
1175 pub fn cancel(s: *S) void {
1176 s.outstanding = 0;
1177 s.group.cancel(s.io);
1178 }
1179 };
1180}
1181
1182pub const Mutex = struct {
1183 state: State,
1184
1185 pub const State = enum(usize) {
1186 locked_once = 0b00,
1187 unlocked = 0b01,
1188 contended = 0b10,
1189 /// contended
1190 _,
1191
1192 pub fn isUnlocked(state: State) bool {
1193 return @intFromEnum(state) & @intFromEnum(State.unlocked) == @intFromEnum(State.unlocked);
1194 }
1195 };
1196
1197 pub const init: Mutex = .{ .state = .unlocked };
1198
1199 pub fn tryLock(mutex: *Mutex) bool {
1200 const prev_state: State = @enumFromInt(@atomicRmw(
1201 usize,
1202 @as(*usize, @ptrCast(&mutex.state)),
1203 .And,
1204 ~@intFromEnum(State.unlocked),
1205 .acquire,
1206 ));
1207 return prev_state.isUnlocked();
1208 }
1209
1210 pub fn lock(mutex: *Mutex, io: std.Io) Cancelable!void {
1211 const prev_state: State = @enumFromInt(@atomicRmw(
1212 usize,
1213 @as(*usize, @ptrCast(&mutex.state)),
1214 .And,
1215 ~@intFromEnum(State.unlocked),
1216 .acquire,
1217 ));
1218 if (prev_state.isUnlocked()) {
1219 @branchHint(.likely);
1220 return;
1221 }
1222 return io.vtable.mutexLock(io.userdata, prev_state, mutex);
1223 }
1224
1225 /// Same as `lock` but cannot be canceled.
1226 pub fn lockUncancelable(mutex: *Mutex, io: std.Io) void {
1227 const prev_state: State = @enumFromInt(@atomicRmw(
1228 usize,
1229 @as(*usize, @ptrCast(&mutex.state)),
1230 .And,
1231 ~@intFromEnum(State.unlocked),
1232 .acquire,
1233 ));
1234 if (prev_state.isUnlocked()) {
1235 @branchHint(.likely);
1236 return;
1237 }
1238 return io.vtable.mutexLockUncancelable(io.userdata, prev_state, mutex);
1239 }
1240
1241 pub fn unlock(mutex: *Mutex, io: std.Io) void {
1242 const prev_state = @cmpxchgWeak(State, &mutex.state, .locked_once, .unlocked, .release, .acquire) orelse {
1243 @branchHint(.likely);
1244 return;
1245 };
1246 assert(prev_state != .unlocked); // mutex not locked
1247 return io.vtable.mutexUnlock(io.userdata, prev_state, mutex);
1248 }
1249};
1250
1251pub const Condition = struct {
1252 state: u64 = 0,
1253
1254 pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
1255 return io.vtable.conditionWait(io.userdata, cond, mutex);
1256 }
1257
1258 pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
1259 return io.vtable.conditionWaitUncancelable(io.userdata, cond, mutex);
1260 }
1261
1262 pub fn signal(cond: *Condition, io: Io) void {
1263 io.vtable.conditionWake(io.userdata, cond, .one);
1264 }
1265
1266 pub fn broadcast(cond: *Condition, io: Io) void {
1267 io.vtable.conditionWake(io.userdata, cond, .all);
1268 }
1269
1270 pub const Wake = enum {
1271 /// Wake up only one thread.
1272 one,
1273 /// Wake up all threads.
1274 all,
1275 };
1276};
1277
1278pub const TypeErasedQueue = struct {
1279 mutex: Mutex,
1280
1281 /// Ring buffer. This data is logically *after* queued getters.
1282 buffer: []u8,
1283 start: usize,
1284 len: usize,
1285
1286 putters: std.DoublyLinkedList,
1287 getters: std.DoublyLinkedList,
1288
1289 const Put = struct {
1290 remaining: []const u8,
1291 condition: Condition,
1292 node: std.DoublyLinkedList.Node,
1293 };
1294
1295 const Get = struct {
1296 remaining: []u8,
1297 condition: Condition,
1298 node: std.DoublyLinkedList.Node,
1299 };
1300
1301 pub fn init(buffer: []u8) TypeErasedQueue {
1302 return .{
1303 .mutex = .init,
1304 .buffer = buffer,
1305 .start = 0,
1306 .len = 0,
1307 .putters = .{},
1308 .getters = .{},
1309 };
1310 }
1311
1312 pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize {
1313 assert(elements.len >= min);
1314 if (elements.len == 0) return 0;
1315 try q.mutex.lock(io);
1316 defer q.mutex.unlock(io);
1317 return q.putLocked(io, elements, min, false);
1318 }
1319
1320 /// Same as `put` but cannot be canceled.
1321 pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
1322 assert(elements.len >= min);
1323 if (elements.len == 0) return 0;
1324 q.mutex.lockUncancelable(io);
1325 defer q.mutex.unlock(io);
1326 return q.putLocked(io, elements, min, true) catch |err| switch (err) {
1327 error.Canceled => unreachable,
1328 };
1329 }
1330
1331 fn puttableSlice(q: *const TypeErasedQueue) ?[]u8 {
1332 const unwrapped_index = q.start + q.len;
1333 const wrapped_index, const overflow = @subWithOverflow(unwrapped_index, q.buffer.len);
1334 const slice = switch (overflow) {
1335 1 => q.buffer[unwrapped_index..],
1336 0 => q.buffer[wrapped_index..q.start],
1337 };
1338 return if (slice.len > 0) slice else null;
1339 }
1340
1341 fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) Cancelable!usize {
1342 // Getters have first priority on the data, and only when the getters
1343 // queue is empty do we start populating the buffer.
1344
1345 var remaining = elements;
1346 while (q.getters.popFirst()) |getter_node| {
1347 const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node));
1348 const copy_len = @min(getter.remaining.len, remaining.len);
1349 assert(copy_len > 0);
1350 @memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]);
1351 remaining = remaining[copy_len..];
1352 getter.remaining = getter.remaining[copy_len..];
1353 if (getter.remaining.len == 0) {
1354 getter.condition.signal(io);
1355 if (remaining.len > 0) continue;
1356 } else q.getters.prepend(getter_node);
1357 assert(remaining.len == 0);
1358 return elements.len;
1359 }
1360
1361 while (q.puttableSlice()) |slice| {
1362 const copy_len = @min(slice.len, remaining.len);
1363 assert(copy_len > 0);
1364 @memcpy(slice[0..copy_len], remaining[0..copy_len]);
1365 q.len += copy_len;
1366 remaining = remaining[copy_len..];
1367 if (remaining.len == 0) return elements.len;
1368 }
1369
1370 const total_filled = elements.len - remaining.len;
1371 if (total_filled >= min) return total_filled;
1372
1373 var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} };
1374 q.putters.append(&pending.node);
1375 defer if (pending.remaining.len > 0) q.putters.remove(&pending.node);
1376 while (pending.remaining.len > 0) if (uncancelable)
1377 pending.condition.waitUncancelable(io, &q.mutex)
1378 else
1379 try pending.condition.wait(io, &q.mutex);
1380 return elements.len;
1381 }
1382
1383 pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize {
1384 assert(buffer.len >= min);
1385 if (buffer.len == 0) return 0;
1386 try q.mutex.lock(io);
1387 defer q.mutex.unlock(io);
1388 return q.getLocked(io, buffer, min, false);
1389 }
1390
1391 pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
1392 assert(buffer.len >= min);
1393 if (buffer.len == 0) return 0;
1394 q.mutex.lockUncancelable(io);
1395 defer q.mutex.unlock(io);
1396 return q.getLocked(io, buffer, min, true) catch |err| switch (err) {
1397 error.Canceled => unreachable,
1398 };
1399 }
1400
1401 fn gettableSlice(q: *const TypeErasedQueue) ?[]const u8 {
1402 const overlong_slice = q.buffer[q.start..];
1403 const slice = overlong_slice[0..@min(overlong_slice.len, q.len)];
1404 return if (slice.len > 0) slice else null;
1405 }
1406
1407 fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize {
1408 // The ring buffer gets first priority, then data should come from any
1409 // queued putters, then finally the ring buffer should be filled with
1410 // data from putters so they can be resumed.
1411
1412 var remaining = buffer;
1413 while (q.gettableSlice()) |slice| {
1414 const copy_len = @min(slice.len, remaining.len);
1415 assert(copy_len > 0);
1416 @memcpy(remaining[0..copy_len], slice[0..copy_len]);
1417 q.start += copy_len;
1418 if (q.buffer.len - q.start == 0) q.start = 0;
1419 q.len -= copy_len;
1420 remaining = remaining[copy_len..];
1421 if (remaining.len == 0) {
1422 q.fillRingBufferFromPutters(io);
1423 return buffer.len;
1424 }
1425 }
1426
1427 // Copy directly from putters into buffer.
1428 while (q.putters.popFirst()) |putter_node| {
1429 const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
1430 const copy_len = @min(putter.remaining.len, remaining.len);
1431 assert(copy_len > 0);
1432 @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]);
1433 putter.remaining = putter.remaining[copy_len..];
1434 remaining = remaining[copy_len..];
1435 if (putter.remaining.len == 0) {
1436 putter.condition.signal(io);
1437 if (remaining.len > 0) continue;
1438 } else q.putters.prepend(putter_node);
1439 assert(remaining.len == 0);
1440 q.fillRingBufferFromPutters(io);
1441 return buffer.len;
1442 }
1443
1444 // Both ring buffer and putters queue is empty.
1445 const total_filled = buffer.len - remaining.len;
1446 if (total_filled >= min) return total_filled;
1447
1448 var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} };
1449 q.getters.append(&pending.node);
1450 defer if (pending.remaining.len > 0) q.getters.remove(&pending.node);
1451 while (pending.remaining.len > 0) if (uncancelable)
1452 pending.condition.waitUncancelable(io, &q.mutex)
1453 else
1454 try pending.condition.wait(io, &q.mutex);
1455 q.fillRingBufferFromPutters(io);
1456 return buffer.len;
1457 }
1458
1459 /// Called when there is nonzero space available in the ring buffer and
1460 /// potentially putters waiting. The mutex is already held and the task is
1461 /// to copy putter data to the ring buffer and signal any putters whose
1462 /// buffers been fully copied.
1463 fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io) void {
1464 while (q.putters.popFirst()) |putter_node| {
1465 const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
1466 while (q.puttableSlice()) |slice| {
1467 const copy_len = @min(slice.len, putter.remaining.len);
1468 assert(copy_len > 0);
1469 @memcpy(slice[0..copy_len], putter.remaining[0..copy_len]);
1470 q.len += copy_len;
1471 putter.remaining = putter.remaining[copy_len..];
1472 if (putter.remaining.len == 0) {
1473 putter.condition.signal(io);
1474 break;
1475 }
1476 } else {
1477 q.putters.prepend(putter_node);
1478 break;
1479 }
1480 }
1481 }
1482};
1483
1484/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
1485/// When buffer is empty, consumers suspend and are resumed by producers.
1486/// When buffer is full, producers suspend and are resumed by consumers.
1487pub fn Queue(Elem: type) type {
1488 return struct {
1489 type_erased: TypeErasedQueue,
1490
1491 pub fn init(buffer: []Elem) @This() {
1492 return .{ .type_erased = .init(@ptrCast(buffer)) };
1493 }
1494
1495 /// Appends elements to the end of the queue. The function returns when
1496 /// at least `min` elements have been added to the buffer or sent
1497 /// directly to a consumer.
1498 ///
1499 /// Returns how many elements have been added to the queue.
1500 ///
1501 /// Asserts that `elements.len >= min`.
1502 pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize {
1503 return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
1504 }
1505
1506 /// Same as `put` but blocks until all elements have been added to the queue.
1507 pub fn putAll(q: *@This(), io: Io, elements: []const Elem) Cancelable!void {
1508 assert(try q.put(io, elements, elements.len) == elements.len);
1509 }
1510
1511 /// Same as `put` but cannot be interrupted.
1512 pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
1513 return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
1514 }
1515
1516 pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void {
1517 assert(try q.put(io, &.{item}, 1) == 1);
1518 }
1519
1520 pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void {
1521 assert(q.putUncancelable(io, &.{item}, 1) == 1);
1522 }
1523
1524 /// Receives elements from the beginning of the queue. The function
1525 /// returns when at least `min` elements have been populated inside
1526 /// `buffer`.
1527 ///
1528 /// Returns how many elements of `buffer` have been populated.
1529 ///
1530 /// Asserts that `buffer.len >= min`.
1531 pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize {
1532 return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
1533 }
1534
1535 pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
1536 return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
1537 }
1538
1539 pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
1540 var buf: [1]Elem = undefined;
1541 assert(try q.get(io, &buf, 1) == 1);
1542 return buf[0];
1543 }
1544
1545 pub fn getOneUncancelable(q: *@This(), io: Io) Elem {
1546 var buf: [1]Elem = undefined;
1547 assert(q.getUncancelable(io, &buf, 1) == 1);
1548 return buf[0];
1549 }
1550
1551 /// Returns buffer length in `Elem` units.
1552 pub fn capacity(q: *const @This()) usize {
1553 return @divExact(q.type_erased.buffer.len, @sizeOf(Elem));
1554 }
1555 };
1556}
1557
1558/// Calls `function` with `args`, such that the return value of the function is
1559/// not guaranteed to be available until `await` is called.
1560///
1561/// `function` *may* be called immediately, before `async` returns. This has
1562/// weaker guarantees than `concurrent`, making more portable and reusable.
1563///
1564/// When this function returns, it is guaranteed that `function` has already
1565/// been called and completed, or it has successfully been assigned a unit of
1566/// concurrency.
1567///
1568/// See also:
1569/// * `Group`
1570pub fn async(
1571 io: Io,
1572 function: anytype,
1573 args: std.meta.ArgsTuple(@TypeOf(function)),
1574) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
1575 const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
1576 const Args = @TypeOf(args);
1577 const TypeErased = struct {
1578 fn start(context: *const anyopaque, result: *anyopaque) void {
1579 const args_casted: *const Args = @ptrCast(@alignCast(context));
1580 const result_casted: *Result = @ptrCast(@alignCast(result));
1581 result_casted.* = @call(.auto, function, args_casted.*);
1582 }
1583 };
1584 var future: Future(Result) = undefined;
1585 future.any_future = io.vtable.async(
1586 io.userdata,
1587 @ptrCast(&future.result),
1588 .of(Result),
1589 @ptrCast(&args),
1590 .of(Args),
1591 TypeErased.start,
1592 );
1593 return future;
1594}
1595
1596pub const ConcurrentError = error{
1597 /// May occur due to a temporary condition such as resource exhaustion, or
1598 /// to the Io implementation not supporting concurrency.
1599 ConcurrencyUnavailable,
1600};
1601
1602/// Calls `function` with `args`, such that the return value of the function is
1603/// not guaranteed to be available until `await` is called, allowing the caller
1604/// to progress while waiting for any `Io` operations.
1605///
1606/// This has stronger guarantee than `async`, placing restrictions on what kind
1607/// of `Io` implementations are supported. By calling `async` instead, one
1608/// allows, for example, stackful single-threaded blocking I/O.
1609pub fn concurrent(
1610 io: Io,
1611 function: anytype,
1612 args: std.meta.ArgsTuple(@TypeOf(function)),
1613) ConcurrentError!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
1614 const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
1615 const Args = @TypeOf(args);
1616 const TypeErased = struct {
1617 fn start(context: *const anyopaque, result: *anyopaque) void {
1618 const args_casted: *const Args = @ptrCast(@alignCast(context));
1619 const result_casted: *Result = @ptrCast(@alignCast(result));
1620 result_casted.* = @call(.auto, function, args_casted.*);
1621 }
1622 };
1623 var future: Future(Result) = undefined;
1624 future.any_future = try io.vtable.concurrent(
1625 io.userdata,
1626 @sizeOf(Result),
1627 .of(Result),
1628 @ptrCast(&args),
1629 .of(Args),
1630 TypeErased.start,
1631 );
1632 return future;
1633}
1634
1635pub fn cancelRequested(io: Io) bool {
1636 return io.vtable.cancelRequested(io.userdata);
1637}
1638
1639pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
1640
1641pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void {
1642 return io.vtable.sleep(io.userdata, .{ .duration = .{
1643 .raw = duration,
1644 .clock = clock,
1645 } });
1646}
1647
1648/// Given a struct with each field a `*Future`, returns a union with the same
1649/// fields, each field type the future's result.
1650pub fn SelectUnion(S: type) type {
1651 const struct_fields = @typeInfo(S).@"struct".fields;
1652 var names: [struct_fields.len][]const u8 = undefined;
1653 var types: [struct_fields.len]type = undefined;
1654 for (struct_fields, &names, &types) |struct_field, *union_field_name, *UnionFieldType| {
1655 const FieldFuture = @typeInfo(struct_field.type).pointer.child;
1656 union_field_name.* = struct_field.name;
1657 UnionFieldType.* = @FieldType(FieldFuture, "result");
1658 }
1659 return @Union(.auto, std.meta.FieldEnum(S), &names, &types, &@splat(.{}));
1660}
1661
1662/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type,
1663/// and can be different for each field.
1664pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) {
1665 const U = SelectUnion(@TypeOf(s));
1666 const S = @TypeOf(s);
1667 const fields = @typeInfo(S).@"struct".fields;
1668 var futures: [fields.len]*AnyFuture = undefined;
1669 inline for (fields, &futures) |field, *any_future| {
1670 const future = @field(s, field.name);
1671 any_future.* = future.any_future orelse return @unionInit(U, field.name, future.result);
1672 }
1673 switch (try io.vtable.select(io.userdata, &futures)) {
1674 inline 0...(fields.len - 1) => |selected_index| {
1675 const field_name = fields[selected_index].name;
1676 return @unionInit(U, field_name, @field(s, field_name).await(io));
1677 },
1678 else => unreachable,
1679 }
1680}