master
   1const builtin = @import("builtin");
   2const is_windows = builtin.os.tag == .windows;
   3
   4const std = @import("std.zig");
   5const windows = std.os.windows;
   6const posix = std.posix;
   7const math = std.math;
   8const assert = std.debug.assert;
   9const Allocator = std.mem.Allocator;
  10const Alignment = std.mem.Alignment;
  11
  12pub const Limit = enum(usize) {
  13    nothing = 0,
  14    unlimited = std.math.maxInt(usize),
  15    _,
  16
  17    /// `std.math.maxInt(usize)` is interpreted to mean `.unlimited`.
  18    pub fn limited(n: usize) Limit {
  19        return @enumFromInt(n);
  20    }
  21
  22    /// Any value grater than `std.math.maxInt(usize)` is interpreted to mean
  23    /// `.unlimited`.
  24    pub fn limited64(n: u64) Limit {
  25        return @enumFromInt(@min(n, std.math.maxInt(usize)));
  26    }
  27
  28    pub fn countVec(data: []const []const u8) Limit {
  29        var total: usize = 0;
  30        for (data) |d| total += d.len;
  31        return .limited(total);
  32    }
  33
  34    pub fn min(a: Limit, b: Limit) Limit {
  35        return @enumFromInt(@min(@intFromEnum(a), @intFromEnum(b)));
  36    }
  37
  38    pub fn minInt(l: Limit, n: usize) usize {
  39        return @min(n, @intFromEnum(l));
  40    }
  41
  42    pub fn minInt64(l: Limit, n: u64) usize {
  43        return @min(n, @intFromEnum(l));
  44    }
  45
  46    pub fn slice(l: Limit, s: []u8) []u8 {
  47        return s[0..l.minInt(s.len)];
  48    }
  49
  50    pub fn sliceConst(l: Limit, s: []const u8) []const u8 {
  51        return s[0..l.minInt(s.len)];
  52    }
  53
  54    pub fn toInt(l: Limit) ?usize {
  55        return switch (l) {
  56            else => @intFromEnum(l),
  57            .unlimited => null,
  58        };
  59    }
  60
  61    /// Reduces a slice to account for the limit, leaving room for one extra
  62    /// byte above the limit, allowing for the use case of differentiating
  63    /// between end-of-stream and reaching the limit.
  64    pub fn slice1(l: Limit, non_empty_buffer: []u8) []u8 {
  65        assert(non_empty_buffer.len >= 1);
  66        return non_empty_buffer[0..@min(@intFromEnum(l) +| 1, non_empty_buffer.len)];
  67    }
  68
  69    pub fn nonzero(l: Limit) bool {
  70        return @intFromEnum(l) > 0;
  71    }
  72
  73    /// Return a new limit reduced by `amount` or return `null` indicating
  74    /// limit would be exceeded.
  75    pub fn subtract(l: Limit, amount: usize) ?Limit {
  76        if (l == .unlimited) return .unlimited;
  77        if (amount > @intFromEnum(l)) return null;
  78        return @enumFromInt(@intFromEnum(l) - amount);
  79    }
  80};
  81
  82pub const Reader = @import("Io/Reader.zig");
  83pub const Writer = @import("Io/Writer.zig");
  84
  85pub const tty = @import("Io/tty.zig");
  86
  87pub fn poll(
  88    gpa: Allocator,
  89    comptime StreamEnum: type,
  90    files: PollFiles(StreamEnum),
  91) Poller(StreamEnum) {
  92    const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
  93    var result: Poller(StreamEnum) = .{
  94        .gpa = gpa,
  95        .readers = @splat(.failing),
  96        .poll_fds = undefined,
  97        .windows = if (is_windows) .{
  98            .first_read_done = false,
  99            .overlapped = [1]windows.OVERLAPPED{
 100                std.mem.zeroes(windows.OVERLAPPED),
 101            } ** enum_fields.len,
 102            .small_bufs = undefined,
 103            .active = .{
 104                .count = 0,
 105                .handles_buf = undefined,
 106                .stream_map = undefined,
 107            },
 108        } else {},
 109    };
 110
 111    inline for (enum_fields, 0..) |field, i| {
 112        if (is_windows) {
 113            result.windows.active.handles_buf[i] = @field(files, field.name).handle;
 114        } else {
 115            result.poll_fds[i] = .{
 116                .fd = @field(files, field.name).handle,
 117                .events = posix.POLL.IN,
 118                .revents = undefined,
 119            };
 120        }
 121    }
 122
 123    return result;
 124}
 125
 126pub fn Poller(comptime StreamEnum: type) type {
 127    return struct {
 128        const enum_fields = @typeInfo(StreamEnum).@"enum".fields;
 129        const PollFd = if (is_windows) void else posix.pollfd;
 130
 131        gpa: Allocator,
 132        readers: [enum_fields.len]Reader,
 133        poll_fds: [enum_fields.len]PollFd,
 134        windows: if (is_windows) struct {
 135            first_read_done: bool,
 136            overlapped: [enum_fields.len]windows.OVERLAPPED,
 137            small_bufs: [enum_fields.len][128]u8,
 138            active: struct {
 139                count: math.IntFittingRange(0, enum_fields.len),
 140                handles_buf: [enum_fields.len]windows.HANDLE,
 141                stream_map: [enum_fields.len]StreamEnum,
 142
 143                pub fn removeAt(self: *@This(), index: u32) void {
 144                    assert(index < self.count);
 145                    for (index + 1..self.count) |i| {
 146                        self.handles_buf[i - 1] = self.handles_buf[i];
 147                        self.stream_map[i - 1] = self.stream_map[i];
 148                    }
 149                    self.count -= 1;
 150                }
 151            },
 152        } else void,
 153
 154        const Self = @This();
 155
 156        pub fn deinit(self: *Self) void {
 157            const gpa = self.gpa;
 158            if (is_windows) {
 159                // cancel any pending IO to prevent clobbering OVERLAPPED value
 160                for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| {
 161                    _ = windows.kernel32.CancelIo(h);
 162                }
 163            }
 164            inline for (&self.readers) |*r| gpa.free(r.buffer);
 165            self.* = undefined;
 166        }
 167
 168        pub fn poll(self: *Self) !bool {
 169            if (is_windows) {
 170                return pollWindows(self, null);
 171            } else {
 172                return pollPosix(self, null);
 173            }
 174        }
 175
 176        pub fn pollTimeout(self: *Self, nanoseconds: u64) !bool {
 177            if (is_windows) {
 178                return pollWindows(self, nanoseconds);
 179            } else {
 180                return pollPosix(self, nanoseconds);
 181            }
 182        }
 183
 184        pub fn reader(self: *Self, which: StreamEnum) *Reader {
 185            return &self.readers[@intFromEnum(which)];
 186        }
 187
 188        pub fn toOwnedSlice(self: *Self, which: StreamEnum) error{OutOfMemory}![]u8 {
 189            const gpa = self.gpa;
 190            const r = reader(self, which);
 191            if (r.seek == 0) {
 192                const new = try gpa.realloc(r.buffer, r.end);
 193                r.buffer = &.{};
 194                r.end = 0;
 195                return new;
 196            }
 197            const new = try gpa.dupe(u8, r.buffered());
 198            gpa.free(r.buffer);
 199            r.buffer = &.{};
 200            r.seek = 0;
 201            r.end = 0;
 202            return new;
 203        }
 204
 205        fn pollWindows(self: *Self, nanoseconds: ?u64) !bool {
 206            const bump_amt = 512;
 207            const gpa = self.gpa;
 208
 209            if (!self.windows.first_read_done) {
 210                var already_read_data = false;
 211                for (0..enum_fields.len) |i| {
 212                    const handle = self.windows.active.handles_buf[i];
 213                    switch (try windowsAsyncReadToFifoAndQueueSmallRead(
 214                        gpa,
 215                        handle,
 216                        &self.windows.overlapped[i],
 217                        &self.readers[i],
 218                        &self.windows.small_bufs[i],
 219                        bump_amt,
 220                    )) {
 221                        .populated, .empty => |state| {
 222                            if (state == .populated) already_read_data = true;
 223                            self.windows.active.handles_buf[self.windows.active.count] = handle;
 224                            self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i));
 225                            self.windows.active.count += 1;
 226                        },
 227                        .closed => {}, // don't add to the wait_objects list
 228                        .closed_populated => {
 229                            // don't add to the wait_objects list, but we did already get data
 230                            already_read_data = true;
 231                        },
 232                    }
 233                }
 234                self.windows.first_read_done = true;
 235                if (already_read_data) return true;
 236            }
 237
 238            while (true) {
 239                if (self.windows.active.count == 0) return false;
 240
 241                const status = windows.kernel32.WaitForMultipleObjects(
 242                    self.windows.active.count,
 243                    &self.windows.active.handles_buf,
 244                    0,
 245                    if (nanoseconds) |ns|
 246                        @min(std.math.cast(u32, ns / std.time.ns_per_ms) orelse (windows.INFINITE - 1), windows.INFINITE - 1)
 247                    else
 248                        windows.INFINITE,
 249                );
 250                if (status == windows.WAIT_FAILED)
 251                    return windows.unexpectedError(windows.GetLastError());
 252                if (status == windows.WAIT_TIMEOUT)
 253                    return true;
 254
 255                if (status < windows.WAIT_OBJECT_0 or status > windows.WAIT_OBJECT_0 + enum_fields.len - 1)
 256                    unreachable;
 257
 258                const active_idx = status - windows.WAIT_OBJECT_0;
 259
 260                const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]);
 261                const handle = self.windows.active.handles_buf[active_idx];
 262
 263                const overlapped = &self.windows.overlapped[stream_idx];
 264                const stream_reader = &self.readers[stream_idx];
 265                const small_buf = &self.windows.small_bufs[stream_idx];
 266
 267                const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
 268                    .success => |n| n,
 269                    .closed => {
 270                        self.windows.active.removeAt(active_idx);
 271                        continue;
 272                    },
 273                    .aborted => unreachable,
 274                };
 275                const buf = small_buf[0..num_bytes_read];
 276                const dest = try writableSliceGreedyAlloc(stream_reader, gpa, buf.len);
 277                @memcpy(dest[0..buf.len], buf);
 278                advanceBufferEnd(stream_reader, buf.len);
 279
 280                switch (try windowsAsyncReadToFifoAndQueueSmallRead(
 281                    gpa,
 282                    handle,
 283                    overlapped,
 284                    stream_reader,
 285                    small_buf,
 286                    bump_amt,
 287                )) {
 288                    .empty => {}, // irrelevant, we already got data from the small buffer
 289                    .populated => {},
 290                    .closed,
 291                    .closed_populated, // identical, since we already got data from the small buffer
 292                    => self.windows.active.removeAt(active_idx),
 293                }
 294                return true;
 295            }
 296        }
 297
 298        fn pollPosix(self: *Self, nanoseconds: ?u64) !bool {
 299            const gpa = self.gpa;
 300            // We ask for ensureUnusedCapacity with this much extra space. This
 301            // has more of an effect on small reads because once the reads
 302            // start to get larger the amount of space an ArrayList will
 303            // allocate grows exponentially.
 304            const bump_amt = 512;
 305
 306            const err_mask = posix.POLL.ERR | posix.POLL.NVAL | posix.POLL.HUP;
 307
 308            const events_len = try posix.poll(&self.poll_fds, if (nanoseconds) |ns|
 309                std.math.cast(i32, ns / std.time.ns_per_ms) orelse std.math.maxInt(i32)
 310            else
 311                -1);
 312            if (events_len == 0) {
 313                for (self.poll_fds) |poll_fd| {
 314                    if (poll_fd.fd != -1) return true;
 315                } else return false;
 316            }
 317
 318            var keep_polling = false;
 319            for (&self.poll_fds, &self.readers) |*poll_fd, *r| {
 320                // Try reading whatever is available before checking the error
 321                // conditions.
 322                // It's still possible to read after a POLL.HUP is received,
 323                // always check if there's some data waiting to be read first.
 324                if (poll_fd.revents & posix.POLL.IN != 0) {
 325                    const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
 326                    const amt = posix.read(poll_fd.fd, buf) catch |err| switch (err) {
 327                        error.BrokenPipe => 0, // Handle the same as EOF.
 328                        else => |e| return e,
 329                    };
 330                    advanceBufferEnd(r, amt);
 331                    if (amt == 0) {
 332                        // Remove the fd when the EOF condition is met.
 333                        poll_fd.fd = -1;
 334                    } else {
 335                        keep_polling = true;
 336                    }
 337                } else if (poll_fd.revents & err_mask != 0) {
 338                    // Exclude the fds that signaled an error.
 339                    poll_fd.fd = -1;
 340                } else if (poll_fd.fd != -1) {
 341                    keep_polling = true;
 342                }
 343            }
 344            return keep_polling;
 345        }
 346
 347        /// Returns a slice into the unused capacity of `buffer` with at least
 348        /// `min_len` bytes, extending `buffer` by resizing it with `gpa` as necessary.
 349        ///
 350        /// After calling this function, typically the caller will follow up with a
 351        /// call to `advanceBufferEnd` to report the actual number of bytes buffered.
 352        fn writableSliceGreedyAlloc(r: *Reader, allocator: Allocator, min_len: usize) Allocator.Error![]u8 {
 353            {
 354                const unused = r.buffer[r.end..];
 355                if (unused.len >= min_len) return unused;
 356            }
 357            if (r.seek > 0) {
 358                const data = r.buffer[r.seek..r.end];
 359                @memmove(r.buffer[0..data.len], data);
 360                r.seek = 0;
 361                r.end = data.len;
 362            }
 363            {
 364                var list: std.ArrayList(u8) = .{
 365                    .items = r.buffer[0..r.end],
 366                    .capacity = r.buffer.len,
 367                };
 368                defer r.buffer = list.allocatedSlice();
 369                try list.ensureUnusedCapacity(allocator, min_len);
 370            }
 371            const unused = r.buffer[r.end..];
 372            assert(unused.len >= min_len);
 373            return unused;
 374        }
 375
 376        /// After writing directly into the unused capacity of `buffer`, this function
 377        /// updates `end` so that users of `Reader` can receive the data.
 378        fn advanceBufferEnd(r: *Reader, n: usize) void {
 379            assert(n <= r.buffer.len - r.end);
 380            r.end += n;
 381        }
 382
 383        /// The `ReadFile` docuementation states that `lpNumberOfBytesRead` does not have a meaningful
 384        /// result when using overlapped I/O, but also that it cannot be `null` on Windows 7. For
 385        /// compatibility, we point it to this dummy variables, which we never otherwise access.
 386        /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
 387        var win_dummy_bytes_read: u32 = undefined;
 388
 389        /// Read as much data as possible from `handle` with `overlapped`, and write it to the FIFO. Before
 390        /// returning, queue a read into `small_buf` so that `WaitForMultipleObjects` returns when more data
 391        /// is available. `handle` must have no pending asynchronous operation.
 392        fn windowsAsyncReadToFifoAndQueueSmallRead(
 393            gpa: Allocator,
 394            handle: windows.HANDLE,
 395            overlapped: *windows.OVERLAPPED,
 396            r: *Reader,
 397            small_buf: *[128]u8,
 398            bump_amt: usize,
 399        ) !enum { empty, populated, closed_populated, closed } {
 400            var read_any_data = false;
 401            while (true) {
 402                const fifo_read_pending = while (true) {
 403                    const buf = try writableSliceGreedyAlloc(r, gpa, bump_amt);
 404                    const buf_len = math.cast(u32, buf.len) orelse math.maxInt(u32);
 405
 406                    if (0 == windows.kernel32.ReadFile(
 407                        handle,
 408                        buf.ptr,
 409                        buf_len,
 410                        &win_dummy_bytes_read,
 411                        overlapped,
 412                    )) switch (windows.GetLastError()) {
 413                        .IO_PENDING => break true,
 414                        .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
 415                        else => |err| return windows.unexpectedError(err),
 416                    };
 417
 418                    const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
 419                        .success => |n| n,
 420                        .closed => return if (read_any_data) .closed_populated else .closed,
 421                        .aborted => unreachable,
 422                    };
 423
 424                    read_any_data = true;
 425                    advanceBufferEnd(r, num_bytes_read);
 426
 427                    if (num_bytes_read == buf_len) {
 428                        // We filled the buffer, so there's probably more data available.
 429                        continue;
 430                    } else {
 431                        // We didn't fill the buffer, so assume we're out of data.
 432                        // There is no pending read.
 433                        break false;
 434                    }
 435                };
 436
 437                if (fifo_read_pending) cancel_read: {
 438                    // Cancel the pending read into the FIFO.
 439                    _ = windows.kernel32.CancelIo(handle);
 440
 441                    // We have to wait for the handle to be signalled, i.e. for the cancellation to complete.
 442                    switch (windows.kernel32.WaitForSingleObject(handle, windows.INFINITE)) {
 443                        windows.WAIT_OBJECT_0 => {},
 444                        windows.WAIT_FAILED => return windows.unexpectedError(windows.GetLastError()),
 445                        else => unreachable,
 446                    }
 447
 448                    // If it completed before we canceled, make sure to tell the FIFO!
 449                    const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, true)) {
 450                        .success => |n| n,
 451                        .closed => return if (read_any_data) .closed_populated else .closed,
 452                        .aborted => break :cancel_read,
 453                    };
 454                    read_any_data = true;
 455                    advanceBufferEnd(r, num_bytes_read);
 456                }
 457
 458                // Try to queue the 1-byte read.
 459                if (0 == windows.kernel32.ReadFile(
 460                    handle,
 461                    small_buf,
 462                    small_buf.len,
 463                    &win_dummy_bytes_read,
 464                    overlapped,
 465                )) switch (windows.GetLastError()) {
 466                    .IO_PENDING => {
 467                        // 1-byte read pending as intended
 468                        return if (read_any_data) .populated else .empty;
 469                    },
 470                    .BROKEN_PIPE => return if (read_any_data) .closed_populated else .closed,
 471                    else => |err| return windows.unexpectedError(err),
 472                };
 473
 474                // We got data back this time. Write it to the FIFO and run the main loop again.
 475                const num_bytes_read = switch (try windowsGetReadResult(handle, overlapped, false)) {
 476                    .success => |n| n,
 477                    .closed => return if (read_any_data) .closed_populated else .closed,
 478                    .aborted => unreachable,
 479                };
 480                const buf = small_buf[0..num_bytes_read];
 481                const dest = try writableSliceGreedyAlloc(r, gpa, buf.len);
 482                @memcpy(dest[0..buf.len], buf);
 483                advanceBufferEnd(r, buf.len);
 484                read_any_data = true;
 485            }
 486        }
 487
 488        /// Simple wrapper around `GetOverlappedResult` to determine the result of a `ReadFile` operation.
 489        /// If `!allow_aborted`, then `aborted` is never returned (`OPERATION_ABORTED` is considered unexpected).
 490        ///
 491        /// The `ReadFile` documentation states that the number of bytes read by an overlapped `ReadFile` must be determined using `GetOverlappedResult`, even if the
 492        /// operation immediately returns data:
 493        /// "Use NULL for [lpNumberOfBytesRead] if this is an asynchronous operation to avoid potentially
 494        /// erroneous results."
 495        /// "If `hFile` was opened with `FILE_FLAG_OVERLAPPED`, the following conditions are in effect: [...]
 496        /// The lpNumberOfBytesRead parameter should be set to NULL. Use the GetOverlappedResult function to
 497        /// get the actual number of bytes read."
 498        /// See: https://learn.microsoft.com/en-us/windows/win32/api/fileapi/nf-fileapi-readfile
 499        fn windowsGetReadResult(
 500            handle: windows.HANDLE,
 501            overlapped: *windows.OVERLAPPED,
 502            allow_aborted: bool,
 503        ) !union(enum) {
 504            success: u32,
 505            closed,
 506            aborted,
 507        } {
 508            var num_bytes_read: u32 = undefined;
 509            if (0 == windows.kernel32.GetOverlappedResult(
 510                handle,
 511                overlapped,
 512                &num_bytes_read,
 513                0,
 514            )) switch (windows.GetLastError()) {
 515                .BROKEN_PIPE => return .closed,
 516                .OPERATION_ABORTED => |err| if (allow_aborted) {
 517                    return .aborted;
 518                } else {
 519                    return windows.unexpectedError(err);
 520                },
 521                else => |err| return windows.unexpectedError(err),
 522            };
 523            return .{ .success = num_bytes_read };
 524        }
 525    };
 526}
 527
 528/// Given an enum, returns a struct with fields of that enum, each field
 529/// representing an I/O stream for polling.
 530pub fn PollFiles(comptime StreamEnum: type) type {
 531    return @Struct(.auto, null, std.meta.fieldNames(StreamEnum), &@splat(std.fs.File), &@splat(.{}));
 532}
 533
 534test {
 535    _ = net;
 536    _ = Reader;
 537    _ = Writer;
 538    _ = tty;
 539    _ = Evented;
 540    _ = Threaded;
 541    _ = @import("Io/test.zig");
 542}
 543
 544const Io = @This();
 545
 546pub const Evented = switch (builtin.os.tag) {
 547    .linux => switch (builtin.cpu.arch) {
 548        .x86_64, .aarch64 => @import("Io/IoUring.zig"),
 549        else => void, // context-switching code not implemented yet
 550    },
 551    .dragonfly, .freebsd, .netbsd, .openbsd, .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => switch (builtin.cpu.arch) {
 552        .x86_64, .aarch64 => @import("Io/Kqueue.zig"),
 553        else => void, // context-switching code not implemented yet
 554    },
 555    else => void,
 556};
 557pub const Threaded = @import("Io/Threaded.zig");
 558pub const net = @import("Io/net.zig");
 559
 560userdata: ?*anyopaque,
 561vtable: *const VTable,
 562
 563pub const VTable = struct {
 564    /// If it returns `null` it means `result` has been already populated and
 565    /// `await` will be a no-op.
 566    ///
 567    /// When this function returns non-null, the implementation guarantees that
 568    /// a unit of concurrency has been assigned to the returned task.
 569    ///
 570    /// Thread-safe.
 571    async: *const fn (
 572        /// Corresponds to `Io.userdata`.
 573        userdata: ?*anyopaque,
 574        /// The pointer of this slice is an "eager" result value.
 575        /// The length is the size in bytes of the result type.
 576        /// This pointer's lifetime expires directly after the call to this function.
 577        result: []u8,
 578        result_alignment: std.mem.Alignment,
 579        /// Copied and then passed to `start`.
 580        context: []const u8,
 581        context_alignment: std.mem.Alignment,
 582        start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 583    ) ?*AnyFuture,
 584    /// Thread-safe.
 585    concurrent: *const fn (
 586        /// Corresponds to `Io.userdata`.
 587        userdata: ?*anyopaque,
 588        result_len: usize,
 589        result_alignment: std.mem.Alignment,
 590        /// Copied and then passed to `start`.
 591        context: []const u8,
 592        context_alignment: std.mem.Alignment,
 593        start: *const fn (context: *const anyopaque, result: *anyopaque) void,
 594    ) ConcurrentError!*AnyFuture,
 595    /// This function is only called when `async` returns a non-null value.
 596    ///
 597    /// Thread-safe.
 598    await: *const fn (
 599        /// Corresponds to `Io.userdata`.
 600        userdata: ?*anyopaque,
 601        /// The same value that was returned from `async`.
 602        any_future: *AnyFuture,
 603        /// Points to a buffer where the result is written.
 604        /// The length is equal to size in bytes of result type.
 605        result: []u8,
 606        result_alignment: std.mem.Alignment,
 607    ) void,
 608    /// Equivalent to `await` but initiates cancel request.
 609    ///
 610    /// This function is only called when `async` returns a non-null value.
 611    ///
 612    /// Thread-safe.
 613    cancel: *const fn (
 614        /// Corresponds to `Io.userdata`.
 615        userdata: ?*anyopaque,
 616        /// The same value that was returned from `async`.
 617        any_future: *AnyFuture,
 618        /// Points to a buffer where the result is written.
 619        /// The length is equal to size in bytes of result type.
 620        result: []u8,
 621        result_alignment: std.mem.Alignment,
 622    ) void,
 623    /// Returns whether the current thread of execution is known to have
 624    /// been requested to cancel.
 625    ///
 626    /// Thread-safe.
 627    cancelRequested: *const fn (?*anyopaque) bool,
 628
 629    /// When this function returns, implementation guarantees that `start` has
 630    /// either already been called, or a unit of concurrency has been assigned
 631    /// to the task of calling the function.
 632    ///
 633    /// Thread-safe.
 634    groupAsync: *const fn (
 635        /// Corresponds to `Io.userdata`.
 636        userdata: ?*anyopaque,
 637        /// Owner of the spawned async task.
 638        group: *Group,
 639        /// Copied and then passed to `start`.
 640        context: []const u8,
 641        context_alignment: std.mem.Alignment,
 642        start: *const fn (*Group, context: *const anyopaque) void,
 643    ) void,
 644    /// Thread-safe.
 645    groupConcurrent: *const fn (
 646        /// Corresponds to `Io.userdata`.
 647        userdata: ?*anyopaque,
 648        /// Owner of the spawned async task.
 649        group: *Group,
 650        /// Copied and then passed to `start`.
 651        context: []const u8,
 652        context_alignment: std.mem.Alignment,
 653        start: *const fn (*Group, context: *const anyopaque) void,
 654    ) ConcurrentError!void,
 655    groupWait: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
 656    groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
 657
 658    /// Blocks until one of the futures from the list has a result ready, such
 659    /// that awaiting it will not block. Returns that index.
 660    select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
 661
 662    mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
 663    mutexLockUncancelable: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
 664    mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
 665
 666    conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) Cancelable!void,
 667    conditionWaitUncancelable: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex) void,
 668    conditionWake: *const fn (?*anyopaque, cond: *Condition, wake: Condition.Wake) void,
 669
 670    dirMake: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void,
 671    dirMakePath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.Mode) Dir.MakeError!void,
 672    dirMakeOpenPath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.OpenOptions) Dir.MakeOpenPathError!Dir,
 673    dirStat: *const fn (?*anyopaque, Dir) Dir.StatError!Dir.Stat,
 674    dirStatPath: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.StatPathOptions) Dir.StatPathError!File.Stat,
 675    dirAccess: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.AccessOptions) Dir.AccessError!void,
 676    dirCreateFile: *const fn (?*anyopaque, Dir, sub_path: []const u8, File.CreateFlags) File.OpenError!File,
 677    dirOpenFile: *const fn (?*anyopaque, Dir, sub_path: []const u8, File.OpenFlags) File.OpenError!File,
 678    dirOpenDir: *const fn (?*anyopaque, Dir, sub_path: []const u8, Dir.OpenOptions) Dir.OpenError!Dir,
 679    dirClose: *const fn (?*anyopaque, Dir) void,
 680    fileStat: *const fn (?*anyopaque, File) File.StatError!File.Stat,
 681    fileClose: *const fn (?*anyopaque, File) void,
 682    fileWriteStreaming: *const fn (?*anyopaque, File, buffer: [][]const u8) File.WriteStreamingError!usize,
 683    fileWritePositional: *const fn (?*anyopaque, File, buffer: [][]const u8, offset: u64) File.WritePositionalError!usize,
 684    /// Returns 0 on end of stream.
 685    fileReadStreaming: *const fn (?*anyopaque, File, data: [][]u8) File.Reader.Error!usize,
 686    /// Returns 0 on end of stream.
 687    fileReadPositional: *const fn (?*anyopaque, File, data: [][]u8, offset: u64) File.ReadPositionalError!usize,
 688    fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
 689    fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
 690    openSelfExe: *const fn (?*anyopaque, File.OpenFlags) File.OpenSelfExeError!File,
 691
 692    now: *const fn (?*anyopaque, Clock) Clock.Error!Timestamp,
 693    sleep: *const fn (?*anyopaque, Timeout) SleepError!void,
 694
 695    netListenIp: *const fn (?*anyopaque, address: net.IpAddress, net.IpAddress.ListenOptions) net.IpAddress.ListenError!net.Server,
 696    netAccept: *const fn (?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream,
 697    netBindIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.BindOptions) net.IpAddress.BindError!net.Socket,
 698    netConnectIp: *const fn (?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream,
 699    netListenUnix: *const fn (?*anyopaque, *const net.UnixAddress, net.UnixAddress.ListenOptions) net.UnixAddress.ListenError!net.Socket.Handle,
 700    netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
 701    netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
 702    netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
 703    /// Returns 0 on end of stream.
 704    netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
 705    netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
 706    netClose: *const fn (?*anyopaque, handle: net.Socket.Handle) void,
 707    netInterfaceNameResolve: *const fn (?*anyopaque, *const net.Interface.Name) net.Interface.Name.ResolveError!net.Interface,
 708    netInterfaceName: *const fn (?*anyopaque, net.Interface) net.Interface.NameError!net.Interface.Name,
 709    netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) void,
 710};
 711
 712pub const Cancelable = error{
 713    /// Caller has requested the async operation to stop.
 714    Canceled,
 715};
 716
 717pub const UnexpectedError = error{
 718    /// The Operating System returned an undocumented error code.
 719    ///
 720    /// This error is in theory not possible, but it would be better
 721    /// to handle this error than to invoke undefined behavior.
 722    ///
 723    /// When this error code is observed, it usually means the Zig Standard
 724    /// Library needs a small patch to add the error code to the error set for
 725    /// the respective function.
 726    Unexpected,
 727};
 728
 729pub const Dir = @import("Io/Dir.zig");
 730pub const File = @import("Io/File.zig");
 731
 732pub const Clock = enum {
 733    /// A settable system-wide clock that measures real (i.e. wall-clock)
 734    /// time. This clock is affected by discontinuous jumps in the system
 735    /// time (e.g., if the system administrator manually changes the
 736    /// clock), and by frequency adjustments performed by NTP and similar
 737    /// applications.
 738    ///
 739    /// This clock normally counts the number of seconds since 1970-01-01
 740    /// 00:00:00 Coordinated Universal Time (UTC) except that it ignores
 741    /// leap seconds; near a leap second it is typically adjusted by NTP to
 742    /// stay roughly in sync with UTC.
 743    ///
 744    /// Timestamps returned by implementations of this clock represent time
 745    /// elapsed since 1970-01-01T00:00:00Z, the POSIX/Unix epoch, ignoring
 746    /// leap seconds. This is colloquially known as "Unix time". If the
 747    /// underlying OS uses a different epoch for native timestamps (e.g.,
 748    /// Windows, which uses 1601-01-01) they are translated accordingly.
 749    real,
 750    /// A nonsettable system-wide clock that represents time since some
 751    /// unspecified point in the past.
 752    ///
 753    /// Monotonic: Guarantees that the time returned by consecutive calls
 754    /// will not go backwards, but successive calls may return identical
 755    /// (not-increased) time values.
 756    ///
 757    /// Not affected by discontinuous jumps in the system time (e.g., if
 758    /// the system administrator manually changes the clock), but may be
 759    /// affected by frequency adjustments.
 760    ///
 761    /// This clock expresses intent to **exclude time that the system is
 762    /// suspended**. However, implementations may be unable to satisify
 763    /// this, and may include that time.
 764    ///
 765    /// * On Linux, corresponds `CLOCK_MONOTONIC`.
 766    /// * On macOS, corresponds to `CLOCK_UPTIME_RAW`.
 767    awake,
 768    /// Identical to `awake` except it expresses intent to **include time
 769    /// that the system is suspended**, however, due to limitations it may
 770    /// behave identically to `awake`.
 771    ///
 772    /// * On Linux, corresponds `CLOCK_BOOTTIME`.
 773    /// * On macOS, corresponds to `CLOCK_MONOTONIC_RAW`.
 774    boot,
 775    /// Tracks the amount of CPU in user or kernel mode used by the calling
 776    /// process.
 777    cpu_process,
 778    /// Tracks the amount of CPU in user or kernel mode used by the calling
 779    /// thread.
 780    cpu_thread,
 781
 782    pub const Error = error{UnsupportedClock} || UnexpectedError;
 783
 784    /// This function is not cancelable because first of all it does not block,
 785    /// but more importantly, the cancelation logic itself may want to check
 786    /// the time.
 787    pub fn now(clock: Clock, io: Io) Error!Io.Timestamp {
 788        return io.vtable.now(io.userdata, clock);
 789    }
 790
 791    pub const Timestamp = struct {
 792        raw: Io.Timestamp,
 793        clock: Clock,
 794
 795        /// This function is not cancelable because first of all it does not block,
 796        /// but more importantly, the cancelation logic itself may want to check
 797        /// the time.
 798        pub fn now(io: Io, clock: Clock) Error!Clock.Timestamp {
 799            return .{
 800                .raw = try io.vtable.now(io.userdata, clock),
 801                .clock = clock,
 802            };
 803        }
 804
 805        pub fn wait(t: Clock.Timestamp, io: Io) SleepError!void {
 806            return io.vtable.sleep(io.userdata, .{ .deadline = t });
 807        }
 808
 809        pub fn durationTo(from: Clock.Timestamp, to: Clock.Timestamp) Clock.Duration {
 810            assert(from.clock == to.clock);
 811            return .{
 812                .raw = from.raw.durationTo(to.raw),
 813                .clock = from.clock,
 814            };
 815        }
 816
 817        pub fn addDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
 818            assert(from.clock == duration.clock);
 819            return .{
 820                .raw = from.raw.addDuration(duration.raw),
 821                .clock = from.clock,
 822            };
 823        }
 824
 825        pub fn subDuration(from: Clock.Timestamp, duration: Clock.Duration) Clock.Timestamp {
 826            assert(from.clock == duration.clock);
 827            return .{
 828                .raw = from.raw.subDuration(duration.raw),
 829                .clock = from.clock,
 830            };
 831        }
 832
 833        pub fn fromNow(io: Io, duration: Clock.Duration) Error!Clock.Timestamp {
 834            return .{
 835                .clock = duration.clock,
 836                .raw = (try duration.clock.now(io)).addDuration(duration.raw),
 837            };
 838        }
 839
 840        pub fn untilNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
 841            const now_ts = try Clock.Timestamp.now(io, timestamp.clock);
 842            return timestamp.durationTo(now_ts);
 843        }
 844
 845        pub fn durationFromNow(timestamp: Clock.Timestamp, io: Io) Error!Clock.Duration {
 846            const now_ts = try timestamp.clock.now(io);
 847            return .{
 848                .clock = timestamp.clock,
 849                .raw = now_ts.durationTo(timestamp.raw),
 850            };
 851        }
 852
 853        pub fn toClock(t: Clock.Timestamp, io: Io, clock: Clock) Error!Clock.Timestamp {
 854            if (t.clock == clock) return t;
 855            const now_old = try t.clock.now(io);
 856            const now_new = try clock.now(io);
 857            const duration = now_old.durationTo(t);
 858            return .{
 859                .clock = clock,
 860                .raw = now_new.addDuration(duration),
 861            };
 862        }
 863
 864        pub fn compare(lhs: Clock.Timestamp, op: std.math.CompareOperator, rhs: Clock.Timestamp) bool {
 865            assert(lhs.clock == rhs.clock);
 866            return std.math.compare(lhs.raw.nanoseconds, op, rhs.raw.nanoseconds);
 867        }
 868    };
 869
 870    pub const Duration = struct {
 871        raw: Io.Duration,
 872        clock: Clock,
 873
 874        pub fn sleep(duration: Clock.Duration, io: Io) SleepError!void {
 875            return io.vtable.sleep(io.userdata, .{ .duration = duration });
 876        }
 877    };
 878};
 879
 880pub const Timestamp = struct {
 881    nanoseconds: i96,
 882
 883    pub const zero: Timestamp = .{ .nanoseconds = 0 };
 884
 885    pub fn durationTo(from: Timestamp, to: Timestamp) Duration {
 886        return .{ .nanoseconds = to.nanoseconds - from.nanoseconds };
 887    }
 888
 889    pub fn addDuration(from: Timestamp, duration: Duration) Timestamp {
 890        return .{ .nanoseconds = from.nanoseconds + duration.nanoseconds };
 891    }
 892
 893    pub fn subDuration(from: Timestamp, duration: Duration) Timestamp {
 894        return .{ .nanoseconds = from.nanoseconds - duration.nanoseconds };
 895    }
 896
 897    pub fn withClock(t: Timestamp, clock: Clock) Clock.Timestamp {
 898        return .{ .nanoseconds = t.nanoseconds, .clock = clock };
 899    }
 900
 901    pub fn fromNanoseconds(x: i96) Timestamp {
 902        return .{ .nanoseconds = x };
 903    }
 904
 905    pub fn toMilliseconds(t: Timestamp) i64 {
 906        return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_ms));
 907    }
 908
 909    pub fn toSeconds(t: Timestamp) i64 {
 910        return @intCast(@divTrunc(t.nanoseconds, std.time.ns_per_s));
 911    }
 912
 913    pub fn toNanoseconds(t: Timestamp) i96 {
 914        return t.nanoseconds;
 915    }
 916
 917    pub fn formatNumber(t: Timestamp, w: *std.Io.Writer, n: std.fmt.Number) std.Io.Writer.Error!void {
 918        return w.printInt(t.nanoseconds, n.mode.base() orelse 10, n.case, .{
 919            .precision = n.precision,
 920            .width = n.width,
 921            .alignment = n.alignment,
 922            .fill = n.fill,
 923        });
 924    }
 925};
 926
 927pub const Duration = struct {
 928    nanoseconds: i96,
 929
 930    pub const zero: Duration = .{ .nanoseconds = 0 };
 931    pub const max: Duration = .{ .nanoseconds = std.math.maxInt(i96) };
 932
 933    pub fn fromNanoseconds(x: i96) Duration {
 934        return .{ .nanoseconds = x };
 935    }
 936
 937    pub fn fromMilliseconds(x: i64) Duration {
 938        return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_ms };
 939    }
 940
 941    pub fn fromSeconds(x: i64) Duration {
 942        return .{ .nanoseconds = @as(i96, x) * std.time.ns_per_s };
 943    }
 944
 945    pub fn toMilliseconds(d: Duration) i64 {
 946        return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_ms));
 947    }
 948
 949    pub fn toSeconds(d: Duration) i64 {
 950        return @intCast(@divTrunc(d.nanoseconds, std.time.ns_per_s));
 951    }
 952
 953    pub fn toNanoseconds(d: Duration) i96 {
 954        return d.nanoseconds;
 955    }
 956};
 957
 958/// Declares under what conditions an operation should return `error.Timeout`.
 959pub const Timeout = union(enum) {
 960    none,
 961    duration: Clock.Duration,
 962    deadline: Clock.Timestamp,
 963
 964    pub const Error = error{ Timeout, UnsupportedClock };
 965
 966    pub fn toDeadline(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
 967        return switch (t) {
 968            .none => null,
 969            .duration => |d| try .fromNow(io, d),
 970            .deadline => |d| d,
 971        };
 972    }
 973
 974    pub fn toDurationFromNow(t: Timeout, io: Io) Clock.Error!?Clock.Duration {
 975        return switch (t) {
 976            .none => null,
 977            .duration => |d| d,
 978            .deadline => |d| try d.durationFromNow(io),
 979        };
 980    }
 981
 982    pub fn sleep(timeout: Timeout, io: Io) SleepError!void {
 983        return io.vtable.sleep(io.userdata, timeout);
 984    }
 985};
 986
 987pub const AnyFuture = opaque {};
 988
 989pub fn Future(Result: type) type {
 990    return struct {
 991        any_future: ?*AnyFuture,
 992        result: Result,
 993
 994        /// Equivalent to `await` but places a cancellation request.
 995        ///
 996        /// Idempotent. Not threadsafe.
 997        pub fn cancel(f: *@This(), io: Io) Result {
 998            const any_future = f.any_future orelse return f.result;
 999            io.vtable.cancel(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
1000            f.any_future = null;
1001            return f.result;
1002        }
1003
1004        /// Idempotent. Not threadsafe.
1005        pub fn await(f: *@This(), io: Io) Result {
1006            const any_future = f.any_future orelse return f.result;
1007            io.vtable.await(io.userdata, any_future, @ptrCast(&f.result), .of(Result));
1008            f.any_future = null;
1009            return f.result;
1010        }
1011    };
1012}
1013
1014pub const Group = struct {
1015    state: usize,
1016    context: ?*anyopaque,
1017    token: ?*anyopaque,
1018
1019    pub const init: Group = .{ .state = 0, .context = null, .token = null };
1020
1021    /// Calls `function` with `args` asynchronously. The resource spawned is
1022    /// owned by the group.
1023    ///
1024    /// `function` *may* be called immediately, before `async` returns.
1025    ///
1026    /// When this function returns, it is guaranteed that `function` has
1027    /// already been called and completed, or it has successfully been assigned
1028    /// a unit of concurrency.
1029    ///
1030    /// After this is called, `wait` or `cancel` must be called before the
1031    /// group is deinitialized.
1032    ///
1033    /// Threadsafe.
1034    ///
1035    /// See also:
1036    /// * `concurrent`
1037    /// * `Io.async`
1038    pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
1039        const Args = @TypeOf(args);
1040        const TypeErased = struct {
1041            fn start(group: *Group, context: *const anyopaque) void {
1042                _ = group;
1043                const args_casted: *const Args = @ptrCast(@alignCast(context));
1044                @call(.auto, function, args_casted.*);
1045            }
1046        };
1047        io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
1048    }
1049
1050    /// Calls `function` with `args`, such that the function is not guaranteed
1051    /// to have returned until `wait` is called, allowing the caller to
1052    /// progress while waiting for any `Io` operations.
1053    ///
1054    /// The resource spawned is owned by the group; after this is called,
1055    /// `wait` or `cancel` must be called before the group is deinitialized.
1056    ///
1057    /// This has stronger guarantee than `async`, placing restrictions on what kind
1058    /// of `Io` implementations are supported. By calling `async` instead, one
1059    /// allows, for example, stackful single-threaded blocking I/O.
1060    ///
1061    /// Threadsafe.
1062    ///
1063    /// See also:
1064    /// * `async`
1065    /// * `Io.concurrent`
1066    pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
1067        const Args = @TypeOf(args);
1068        const TypeErased = struct {
1069            fn start(group: *Group, context: *const anyopaque) void {
1070                _ = group;
1071                const args_casted: *const Args = @ptrCast(@alignCast(context));
1072                @call(.auto, function, args_casted.*);
1073            }
1074        };
1075        return io.vtable.groupConcurrent(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
1076    }
1077
1078    /// Blocks until all tasks of the group finish. During this time,
1079    /// cancellation requests propagate to all members of the group.
1080    ///
1081    /// Idempotent. Not threadsafe.
1082    pub fn wait(g: *Group, io: Io) void {
1083        const token = g.token orelse return;
1084        g.token = null;
1085        io.vtable.groupWait(io.userdata, g, token);
1086    }
1087
1088    /// Equivalent to `wait` but immediately requests cancellation on all
1089    /// members of the group.
1090    ///
1091    /// Idempotent. Not threadsafe.
1092    pub fn cancel(g: *Group, io: Io) void {
1093        const token = g.token orelse return;
1094        g.token = null;
1095        io.vtable.groupCancel(io.userdata, g, token);
1096    }
1097};
1098
1099pub fn Select(comptime U: type) type {
1100    return struct {
1101        io: Io,
1102        group: Group,
1103        queue: Queue(U),
1104        outstanding: usize,
1105
1106        const S = @This();
1107
1108        pub const Union = U;
1109
1110        pub const Field = std.meta.FieldEnum(U);
1111
1112        pub fn init(io: Io, buffer: []U) S {
1113            return .{
1114                .io = io,
1115                .queue = .init(buffer),
1116                .group = .init,
1117                .outstanding = 0,
1118            };
1119        }
1120
1121        /// Calls `function` with `args` asynchronously. The resource spawned is
1122        /// owned by the select.
1123        ///
1124        /// `function` must have return type matching the `field` field of `Union`.
1125        ///
1126        /// `function` *may* be called immediately, before `async` returns.
1127        ///
1128        /// When this function returns, it is guaranteed that `function` has
1129        /// already been called and completed, or it has successfully been
1130        /// assigned a unit of concurrency.
1131        ///
1132        /// After this is called, `wait` or `cancel` must be called before the
1133        /// select is deinitialized.
1134        ///
1135        /// Threadsafe.
1136        ///
1137        /// Related:
1138        /// * `Io.async`
1139        /// * `Group.async`
1140        pub fn async(
1141            s: *S,
1142            comptime field: Field,
1143            function: anytype,
1144            args: std.meta.ArgsTuple(@TypeOf(function)),
1145        ) void {
1146            const Args = @TypeOf(args);
1147            const TypeErased = struct {
1148                fn start(group: *Group, context: *const anyopaque) void {
1149                    const args_casted: *const Args = @ptrCast(@alignCast(context));
1150                    const unerased_select: *S = @fieldParentPtr("group", group);
1151                    const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*));
1152                    unerased_select.queue.putOneUncancelable(unerased_select.io, elem);
1153                }
1154            };
1155            _ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
1156            s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&args), .of(Args), TypeErased.start);
1157        }
1158
1159        /// Blocks until another task of the select finishes.
1160        ///
1161        /// Asserts there is at least one more `outstanding` task.
1162        ///
1163        /// Not threadsafe.
1164        pub fn wait(s: *S) Cancelable!U {
1165            s.outstanding -= 1;
1166            return s.queue.getOne(s.io);
1167        }
1168
1169        /// Equivalent to `wait` but requests cancellation on all remaining
1170        /// tasks owned by the select.
1171        ///
1172        /// It is illegal to call `wait` after this.
1173        ///
1174        /// Idempotent. Not threadsafe.
1175        pub fn cancel(s: *S) void {
1176            s.outstanding = 0;
1177            s.group.cancel(s.io);
1178        }
1179    };
1180}
1181
1182pub const Mutex = struct {
1183    state: State,
1184
1185    pub const State = enum(usize) {
1186        locked_once = 0b00,
1187        unlocked = 0b01,
1188        contended = 0b10,
1189        /// contended
1190        _,
1191
1192        pub fn isUnlocked(state: State) bool {
1193            return @intFromEnum(state) & @intFromEnum(State.unlocked) == @intFromEnum(State.unlocked);
1194        }
1195    };
1196
1197    pub const init: Mutex = .{ .state = .unlocked };
1198
1199    pub fn tryLock(mutex: *Mutex) bool {
1200        const prev_state: State = @enumFromInt(@atomicRmw(
1201            usize,
1202            @as(*usize, @ptrCast(&mutex.state)),
1203            .And,
1204            ~@intFromEnum(State.unlocked),
1205            .acquire,
1206        ));
1207        return prev_state.isUnlocked();
1208    }
1209
1210    pub fn lock(mutex: *Mutex, io: std.Io) Cancelable!void {
1211        const prev_state: State = @enumFromInt(@atomicRmw(
1212            usize,
1213            @as(*usize, @ptrCast(&mutex.state)),
1214            .And,
1215            ~@intFromEnum(State.unlocked),
1216            .acquire,
1217        ));
1218        if (prev_state.isUnlocked()) {
1219            @branchHint(.likely);
1220            return;
1221        }
1222        return io.vtable.mutexLock(io.userdata, prev_state, mutex);
1223    }
1224
1225    /// Same as `lock` but cannot be canceled.
1226    pub fn lockUncancelable(mutex: *Mutex, io: std.Io) void {
1227        const prev_state: State = @enumFromInt(@atomicRmw(
1228            usize,
1229            @as(*usize, @ptrCast(&mutex.state)),
1230            .And,
1231            ~@intFromEnum(State.unlocked),
1232            .acquire,
1233        ));
1234        if (prev_state.isUnlocked()) {
1235            @branchHint(.likely);
1236            return;
1237        }
1238        return io.vtable.mutexLockUncancelable(io.userdata, prev_state, mutex);
1239    }
1240
1241    pub fn unlock(mutex: *Mutex, io: std.Io) void {
1242        const prev_state = @cmpxchgWeak(State, &mutex.state, .locked_once, .unlocked, .release, .acquire) orelse {
1243            @branchHint(.likely);
1244            return;
1245        };
1246        assert(prev_state != .unlocked); // mutex not locked
1247        return io.vtable.mutexUnlock(io.userdata, prev_state, mutex);
1248    }
1249};
1250
1251pub const Condition = struct {
1252    state: u64 = 0,
1253
1254    pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) Cancelable!void {
1255        return io.vtable.conditionWait(io.userdata, cond, mutex);
1256    }
1257
1258    pub fn waitUncancelable(cond: *Condition, io: Io, mutex: *Mutex) void {
1259        return io.vtable.conditionWaitUncancelable(io.userdata, cond, mutex);
1260    }
1261
1262    pub fn signal(cond: *Condition, io: Io) void {
1263        io.vtable.conditionWake(io.userdata, cond, .one);
1264    }
1265
1266    pub fn broadcast(cond: *Condition, io: Io) void {
1267        io.vtable.conditionWake(io.userdata, cond, .all);
1268    }
1269
1270    pub const Wake = enum {
1271        /// Wake up only one thread.
1272        one,
1273        /// Wake up all threads.
1274        all,
1275    };
1276};
1277
1278pub const TypeErasedQueue = struct {
1279    mutex: Mutex,
1280
1281    /// Ring buffer. This data is logically *after* queued getters.
1282    buffer: []u8,
1283    start: usize,
1284    len: usize,
1285
1286    putters: std.DoublyLinkedList,
1287    getters: std.DoublyLinkedList,
1288
1289    const Put = struct {
1290        remaining: []const u8,
1291        condition: Condition,
1292        node: std.DoublyLinkedList.Node,
1293    };
1294
1295    const Get = struct {
1296        remaining: []u8,
1297        condition: Condition,
1298        node: std.DoublyLinkedList.Node,
1299    };
1300
1301    pub fn init(buffer: []u8) TypeErasedQueue {
1302        return .{
1303            .mutex = .init,
1304            .buffer = buffer,
1305            .start = 0,
1306            .len = 0,
1307            .putters = .{},
1308            .getters = .{},
1309        };
1310    }
1311
1312    pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) Cancelable!usize {
1313        assert(elements.len >= min);
1314        if (elements.len == 0) return 0;
1315        try q.mutex.lock(io);
1316        defer q.mutex.unlock(io);
1317        return q.putLocked(io, elements, min, false);
1318    }
1319
1320    /// Same as `put` but cannot be canceled.
1321    pub fn putUncancelable(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
1322        assert(elements.len >= min);
1323        if (elements.len == 0) return 0;
1324        q.mutex.lockUncancelable(io);
1325        defer q.mutex.unlock(io);
1326        return q.putLocked(io, elements, min, true) catch |err| switch (err) {
1327            error.Canceled => unreachable,
1328        };
1329    }
1330
1331    fn puttableSlice(q: *const TypeErasedQueue) ?[]u8 {
1332        const unwrapped_index = q.start + q.len;
1333        const wrapped_index, const overflow = @subWithOverflow(unwrapped_index, q.buffer.len);
1334        const slice = switch (overflow) {
1335            1 => q.buffer[unwrapped_index..],
1336            0 => q.buffer[wrapped_index..q.start],
1337        };
1338        return if (slice.len > 0) slice else null;
1339    }
1340
1341    fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) Cancelable!usize {
1342        // Getters have first priority on the data, and only when the getters
1343        // queue is empty do we start populating the buffer.
1344
1345        var remaining = elements;
1346        while (q.getters.popFirst()) |getter_node| {
1347            const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node));
1348            const copy_len = @min(getter.remaining.len, remaining.len);
1349            assert(copy_len > 0);
1350            @memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]);
1351            remaining = remaining[copy_len..];
1352            getter.remaining = getter.remaining[copy_len..];
1353            if (getter.remaining.len == 0) {
1354                getter.condition.signal(io);
1355                if (remaining.len > 0) continue;
1356            } else q.getters.prepend(getter_node);
1357            assert(remaining.len == 0);
1358            return elements.len;
1359        }
1360
1361        while (q.puttableSlice()) |slice| {
1362            const copy_len = @min(slice.len, remaining.len);
1363            assert(copy_len > 0);
1364            @memcpy(slice[0..copy_len], remaining[0..copy_len]);
1365            q.len += copy_len;
1366            remaining = remaining[copy_len..];
1367            if (remaining.len == 0) return elements.len;
1368        }
1369
1370        const total_filled = elements.len - remaining.len;
1371        if (total_filled >= min) return total_filled;
1372
1373        var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} };
1374        q.putters.append(&pending.node);
1375        defer if (pending.remaining.len > 0) q.putters.remove(&pending.node);
1376        while (pending.remaining.len > 0) if (uncancelable)
1377            pending.condition.waitUncancelable(io, &q.mutex)
1378        else
1379            try pending.condition.wait(io, &q.mutex);
1380        return elements.len;
1381    }
1382
1383    pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize {
1384        assert(buffer.len >= min);
1385        if (buffer.len == 0) return 0;
1386        try q.mutex.lock(io);
1387        defer q.mutex.unlock(io);
1388        return q.getLocked(io, buffer, min, false);
1389    }
1390
1391    pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
1392        assert(buffer.len >= min);
1393        if (buffer.len == 0) return 0;
1394        q.mutex.lockUncancelable(io);
1395        defer q.mutex.unlock(io);
1396        return q.getLocked(io, buffer, min, true) catch |err| switch (err) {
1397            error.Canceled => unreachable,
1398        };
1399    }
1400
1401    fn gettableSlice(q: *const TypeErasedQueue) ?[]const u8 {
1402        const overlong_slice = q.buffer[q.start..];
1403        const slice = overlong_slice[0..@min(overlong_slice.len, q.len)];
1404        return if (slice.len > 0) slice else null;
1405    }
1406
1407    fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize {
1408        // The ring buffer gets first priority, then data should come from any
1409        // queued putters, then finally the ring buffer should be filled with
1410        // data from putters so they can be resumed.
1411
1412        var remaining = buffer;
1413        while (q.gettableSlice()) |slice| {
1414            const copy_len = @min(slice.len, remaining.len);
1415            assert(copy_len > 0);
1416            @memcpy(remaining[0..copy_len], slice[0..copy_len]);
1417            q.start += copy_len;
1418            if (q.buffer.len - q.start == 0) q.start = 0;
1419            q.len -= copy_len;
1420            remaining = remaining[copy_len..];
1421            if (remaining.len == 0) {
1422                q.fillRingBufferFromPutters(io);
1423                return buffer.len;
1424            }
1425        }
1426
1427        // Copy directly from putters into buffer.
1428        while (q.putters.popFirst()) |putter_node| {
1429            const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
1430            const copy_len = @min(putter.remaining.len, remaining.len);
1431            assert(copy_len > 0);
1432            @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]);
1433            putter.remaining = putter.remaining[copy_len..];
1434            remaining = remaining[copy_len..];
1435            if (putter.remaining.len == 0) {
1436                putter.condition.signal(io);
1437                if (remaining.len > 0) continue;
1438            } else q.putters.prepend(putter_node);
1439            assert(remaining.len == 0);
1440            q.fillRingBufferFromPutters(io);
1441            return buffer.len;
1442        }
1443
1444        // Both ring buffer and putters queue is empty.
1445        const total_filled = buffer.len - remaining.len;
1446        if (total_filled >= min) return total_filled;
1447
1448        var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} };
1449        q.getters.append(&pending.node);
1450        defer if (pending.remaining.len > 0) q.getters.remove(&pending.node);
1451        while (pending.remaining.len > 0) if (uncancelable)
1452            pending.condition.waitUncancelable(io, &q.mutex)
1453        else
1454            try pending.condition.wait(io, &q.mutex);
1455        q.fillRingBufferFromPutters(io);
1456        return buffer.len;
1457    }
1458
1459    /// Called when there is nonzero space available in the ring buffer and
1460    /// potentially putters waiting. The mutex is already held and the task is
1461    /// to copy putter data to the ring buffer and signal any putters whose
1462    /// buffers been fully copied.
1463    fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io) void {
1464        while (q.putters.popFirst()) |putter_node| {
1465            const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
1466            while (q.puttableSlice()) |slice| {
1467                const copy_len = @min(slice.len, putter.remaining.len);
1468                assert(copy_len > 0);
1469                @memcpy(slice[0..copy_len], putter.remaining[0..copy_len]);
1470                q.len += copy_len;
1471                putter.remaining = putter.remaining[copy_len..];
1472                if (putter.remaining.len == 0) {
1473                    putter.condition.signal(io);
1474                    break;
1475                }
1476            } else {
1477                q.putters.prepend(putter_node);
1478                break;
1479            }
1480        }
1481    }
1482};
1483
1484/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
1485/// When buffer is empty, consumers suspend and are resumed by producers.
1486/// When buffer is full, producers suspend and are resumed by consumers.
1487pub fn Queue(Elem: type) type {
1488    return struct {
1489        type_erased: TypeErasedQueue,
1490
1491        pub fn init(buffer: []Elem) @This() {
1492            return .{ .type_erased = .init(@ptrCast(buffer)) };
1493        }
1494
1495        /// Appends elements to the end of the queue. The function returns when
1496        /// at least `min` elements have been added to the buffer or sent
1497        /// directly to a consumer.
1498        ///
1499        /// Returns how many elements have been added to the queue.
1500        ///
1501        /// Asserts that `elements.len >= min`.
1502        pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) Cancelable!usize {
1503            return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
1504        }
1505
1506        /// Same as `put` but blocks until all elements have been added to the queue.
1507        pub fn putAll(q: *@This(), io: Io, elements: []const Elem) Cancelable!void {
1508            assert(try q.put(io, elements, elements.len) == elements.len);
1509        }
1510
1511        /// Same as `put` but cannot be interrupted.
1512        pub fn putUncancelable(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
1513            return @divExact(q.type_erased.putUncancelable(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
1514        }
1515
1516        pub fn putOne(q: *@This(), io: Io, item: Elem) Cancelable!void {
1517            assert(try q.put(io, &.{item}, 1) == 1);
1518        }
1519
1520        pub fn putOneUncancelable(q: *@This(), io: Io, item: Elem) void {
1521            assert(q.putUncancelable(io, &.{item}, 1) == 1);
1522        }
1523
1524        /// Receives elements from the beginning of the queue. The function
1525        /// returns when at least `min` elements have been populated inside
1526        /// `buffer`.
1527        ///
1528        /// Returns how many elements of `buffer` have been populated.
1529        ///
1530        /// Asserts that `buffer.len >= min`.
1531        pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) Cancelable!usize {
1532            return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
1533        }
1534
1535        pub fn getUncancelable(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
1536            return @divExact(q.type_erased.getUncancelable(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
1537        }
1538
1539        pub fn getOne(q: *@This(), io: Io) Cancelable!Elem {
1540            var buf: [1]Elem = undefined;
1541            assert(try q.get(io, &buf, 1) == 1);
1542            return buf[0];
1543        }
1544
1545        pub fn getOneUncancelable(q: *@This(), io: Io) Elem {
1546            var buf: [1]Elem = undefined;
1547            assert(q.getUncancelable(io, &buf, 1) == 1);
1548            return buf[0];
1549        }
1550
1551        /// Returns buffer length in `Elem` units.
1552        pub fn capacity(q: *const @This()) usize {
1553            return @divExact(q.type_erased.buffer.len, @sizeOf(Elem));
1554        }
1555    };
1556}
1557
1558/// Calls `function` with `args`, such that the return value of the function is
1559/// not guaranteed to be available until `await` is called.
1560///
1561/// `function` *may* be called immediately, before `async` returns. This has
1562/// weaker guarantees than `concurrent`, making more portable and reusable.
1563///
1564/// When this function returns, it is guaranteed that `function` has already
1565/// been called and completed, or it has successfully been assigned a unit of
1566/// concurrency.
1567///
1568/// See also:
1569/// * `Group`
1570pub fn async(
1571    io: Io,
1572    function: anytype,
1573    args: std.meta.ArgsTuple(@TypeOf(function)),
1574) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
1575    const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
1576    const Args = @TypeOf(args);
1577    const TypeErased = struct {
1578        fn start(context: *const anyopaque, result: *anyopaque) void {
1579            const args_casted: *const Args = @ptrCast(@alignCast(context));
1580            const result_casted: *Result = @ptrCast(@alignCast(result));
1581            result_casted.* = @call(.auto, function, args_casted.*);
1582        }
1583    };
1584    var future: Future(Result) = undefined;
1585    future.any_future = io.vtable.async(
1586        io.userdata,
1587        @ptrCast(&future.result),
1588        .of(Result),
1589        @ptrCast(&args),
1590        .of(Args),
1591        TypeErased.start,
1592    );
1593    return future;
1594}
1595
1596pub const ConcurrentError = error{
1597    /// May occur due to a temporary condition such as resource exhaustion, or
1598    /// to the Io implementation not supporting concurrency.
1599    ConcurrencyUnavailable,
1600};
1601
1602/// Calls `function` with `args`, such that the return value of the function is
1603/// not guaranteed to be available until `await` is called, allowing the caller
1604/// to progress while waiting for any `Io` operations.
1605///
1606/// This has stronger guarantee than `async`, placing restrictions on what kind
1607/// of `Io` implementations are supported. By calling `async` instead, one
1608/// allows, for example, stackful single-threaded blocking I/O.
1609pub fn concurrent(
1610    io: Io,
1611    function: anytype,
1612    args: std.meta.ArgsTuple(@TypeOf(function)),
1613) ConcurrentError!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
1614    const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
1615    const Args = @TypeOf(args);
1616    const TypeErased = struct {
1617        fn start(context: *const anyopaque, result: *anyopaque) void {
1618            const args_casted: *const Args = @ptrCast(@alignCast(context));
1619            const result_casted: *Result = @ptrCast(@alignCast(result));
1620            result_casted.* = @call(.auto, function, args_casted.*);
1621        }
1622    };
1623    var future: Future(Result) = undefined;
1624    future.any_future = try io.vtable.concurrent(
1625        io.userdata,
1626        @sizeOf(Result),
1627        .of(Result),
1628        @ptrCast(&args),
1629        .of(Args),
1630        TypeErased.start,
1631    );
1632    return future;
1633}
1634
1635pub fn cancelRequested(io: Io) bool {
1636    return io.vtable.cancelRequested(io.userdata);
1637}
1638
1639pub const SleepError = error{UnsupportedClock} || UnexpectedError || Cancelable;
1640
1641pub fn sleep(io: Io, duration: Duration, clock: Clock) SleepError!void {
1642    return io.vtable.sleep(io.userdata, .{ .duration = .{
1643        .raw = duration,
1644        .clock = clock,
1645    } });
1646}
1647
1648/// Given a struct with each field a `*Future`, returns a union with the same
1649/// fields, each field type the future's result.
1650pub fn SelectUnion(S: type) type {
1651    const struct_fields = @typeInfo(S).@"struct".fields;
1652    var names: [struct_fields.len][]const u8 = undefined;
1653    var types: [struct_fields.len]type = undefined;
1654    for (struct_fields, &names, &types) |struct_field, *union_field_name, *UnionFieldType| {
1655        const FieldFuture = @typeInfo(struct_field.type).pointer.child;
1656        union_field_name.* = struct_field.name;
1657        UnionFieldType.* = @FieldType(FieldFuture, "result");
1658    }
1659    return @Union(.auto, std.meta.FieldEnum(S), &names, &types, &@splat(.{}));
1660}
1661
1662/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type,
1663/// and can be different for each field.
1664pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) {
1665    const U = SelectUnion(@TypeOf(s));
1666    const S = @TypeOf(s);
1667    const fields = @typeInfo(S).@"struct".fields;
1668    var futures: [fields.len]*AnyFuture = undefined;
1669    inline for (fields, &futures) |field, *any_future| {
1670        const future = @field(s, field.name);
1671        any_future.* = future.any_future orelse return @unionInit(U, field.name, future.result);
1672    }
1673    switch (try io.vtable.select(io.userdata, &futures)) {
1674        inline 0...(fields.len - 1) => |selected_index| {
1675            const field_name = fields[selected_index].name;
1676            return @unionInit(U, field_name, @field(s, field_name).await(io));
1677        },
1678        else => unreachable,
1679    }
1680}