Commit 0d4b358dd8

Andrew Kelley <andrew@ziglang.org>
2025-03-31 04:56:30
implement Mutex, Condition, and Queue
1 parent 08b609a
Changed files (3)
lib
lib/std/Io/EventLoop.zig
@@ -102,7 +102,7 @@ const Fiber = struct {
         return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
     }
 
-    fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{AsyncCancel}!void {
+    fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
         if (@cmpxchgStrong(
             ?*Thread,
             &fiber.cancel_thread,
@@ -112,7 +112,7 @@ const Fiber = struct {
             .acquire,
         )) |cancel_thread| {
             assert(cancel_thread == Thread.canceling);
-            return error.AsyncCancel;
+            return error.Canceled;
         }
     }
 
@@ -746,7 +746,7 @@ pub fn createFile(
     switch (errno(completion.result)) {
         .SUCCESS => return .{ .handle = completion.result },
         .INTR => unreachable,
-        .CANCELED => return error.AsyncCancel,
+        .CANCELED => return error.Canceled,
 
         .FAULT => unreachable,
         .INVAL => return error.BadPathName,
@@ -854,7 +854,7 @@ pub fn openFile(
     switch (errno(completion.result)) {
         .SUCCESS => return .{ .handle = completion.result },
         .INTR => unreachable,
-        .CANCELED => return error.AsyncCancel,
+        .CANCELED => return error.Canceled,
 
         .FAULT => unreachable,
         .INVAL => return error.BadPathName,
@@ -950,7 +950,7 @@ pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std
     switch (errno(completion.result)) {
         .SUCCESS => return @as(u32, @bitCast(completion.result)),
         .INTR => unreachable,
-        .CANCELED => return error.AsyncCancel,
+        .CANCELED => return error.Canceled,
 
         .INVAL => unreachable,
         .FAULT => unreachable,
@@ -1002,7 +1002,7 @@ pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offs
     switch (errno(completion.result)) {
         .SUCCESS => return @as(u32, @bitCast(completion.result)),
         .INTR => unreachable,
-        .CANCELED => return error.AsyncCancel,
+        .CANCELED => return error.Canceled,
 
         .INVAL => return error.InvalidArgument,
         .FAULT => unreachable,
@@ -1080,7 +1080,7 @@ pub fn sleep(userdata: ?*anyopaque, clockid: std.posix.clockid_t, deadline: Io.D
     switch (errno(completion.result)) {
         .SUCCESS, .TIME => return,
         .INTR => unreachable,
-        .CANCELED => return error.AsyncCancel,
+        .CANCELED => return error.Canceled,
 
         else => |err| return std.posix.unexpectedErrno(err),
     }
lib/std/Thread/Pool.zig
@@ -332,9 +332,12 @@ pub fn io(pool: *Pool) Io {
         .vtable = &.{
             .@"async" = @"async",
             .@"await" = @"await",
-
             .cancel = cancel,
             .cancelRequested = cancelRequested,
+            .mutexLock = mutexLock,
+            .mutexUnlock = mutexUnlock,
+            .conditionWait = conditionWait,
+            .conditionWake = conditionWake,
 
             .createFile = createFile,
             .openFile = openFile,
@@ -517,11 +520,179 @@ fn cancelRequested(userdata: ?*anyopaque) bool {
     return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid;
 }
 
-fn checkCancel(pool: *Pool) error{AsyncCancel}!void {
-    if (cancelRequested(pool)) return error.AsyncCancel;
+fn checkCancel(pool: *Pool) error{Canceled}!void {
+    if (cancelRequested(pool)) return error.Canceled;
+}
+
+fn mutexLock(userdata: ?*anyopaque, m: *Io.Mutex) void {
+    @branchHint(.cold);
+    const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+    _ = pool;
+
+    // Avoid doing an atomic swap below if we already know the state is contended.
+    // An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily.
+    if (m.state.load(.monotonic) == Io.Mutex.contended) {
+        std.Thread.Futex.wait(&m.state, Io.Mutex.contended);
+    }
+
+    // Try to acquire the lock while also telling the existing lock holder that there are threads waiting.
+    //
+    // Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`.
+    // If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock.
+    // The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake
+    // but this is better than having to wake all waiting threads on mutex unlock.
+    //
+    // Acquire barrier ensures grabbing the lock happens before the critical section
+    // and that the previous lock holder's critical section happens before we grab the lock.
+    while (m.state.swap(Io.Mutex.contended, .acquire) != Io.Mutex.unlocked) {
+        std.Thread.Futex.wait(&m.state, Io.Mutex.contended);
+    }
+}
+
+fn mutexUnlock(userdata: ?*anyopaque, m: *Io.Mutex) void {
+    const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+    _ = pool;
+    // Needs to also wake up a waiting thread if any.
+    //
+    // A waiting thread will acquire with `contended` instead of `locked`
+    // which ensures that it wakes up another thread on the next unlock().
+    //
+    // Release barrier ensures the critical section happens before we let go of the lock
+    // and that our critical section happens before the next lock holder grabs the lock.
+    const state = m.state.swap(Io.Mutex.unlocked, .release);
+    assert(state != Io.Mutex.unlocked);
+
+    if (state == Io.Mutex.contended) {
+        std.Thread.Futex.wake(&m.state, 1);
+    }
+}
+
+fn mutexLockInternal(pool: *std.Thread.Pool, m: *Io.Mutex) void {
+    if (!m.tryLock()) {
+        @branchHint(.unlikely);
+        mutexLock(pool, m);
+    }
+}
+
+fn conditionWait(
+    userdata: ?*anyopaque,
+    cond: *Io.Condition,
+    mutex: *Io.Mutex,
+    timeout: ?u64,
+) Io.Condition.WaitError!void {
+    const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+    comptime assert(@TypeOf(cond.state) == u64);
+    const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
+    const cond_state = &ints[0];
+    const cond_epoch = &ints[1];
+    const one_waiter = 1;
+    const waiter_mask = 0xffff;
+    const one_signal = 1 << 16;
+    const signal_mask = 0xffff << 16;
+    // Observe the epoch, then check the state again to see if we should wake up.
+    // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock:
+    //
+    // - T1: s = LOAD(&state)
+    // - T2: UPDATE(&s, signal)
+    // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch)
+    // - T1: e = LOAD(&epoch) (was reordered after the state load)
+    // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change)
+    //
+    // Acquire barrier to ensure the epoch load happens before the state load.
+    var epoch = cond_epoch.load(.acquire);
+    var state = cond_state.fetchAdd(one_waiter, .monotonic);
+    assert(state & waiter_mask != waiter_mask);
+    state += one_waiter;
+
+    mutexUnlock(pool, mutex);
+    defer mutexLockInternal(pool, mutex);
+
+    var futex_deadline = std.Thread.Futex.Deadline.init(timeout);
+
+    while (true) {
+        futex_deadline.wait(cond_epoch, epoch) catch |err| switch (err) {
+            // On timeout, we must decrement the waiter we added above.
+            error.Timeout => {
+                while (true) {
+                    // If there's a signal when we're timing out, consume it and report being woken up instead.
+                    // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
+                    while (state & signal_mask != 0) {
+                        const new_state = state - one_waiter - one_signal;
+                        state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
+                    }
+
+                    // Remove the waiter we added and officially return timed out.
+                    const new_state = state - one_waiter;
+                    state = cond_state.cmpxchgWeak(state, new_state, .monotonic, .monotonic) orelse return err;
+                }
+            },
+        };
+
+        epoch = cond_epoch.load(.acquire);
+        state = cond_state.load(.monotonic);
+
+        // Try to wake up by consuming a signal and decremented the waiter we added previously.
+        // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return.
+        while (state & signal_mask != 0) {
+            const new_state = state - one_waiter - one_signal;
+            state = cond_state.cmpxchgWeak(state, new_state, .acquire, .monotonic) orelse return;
+        }
+    }
+}
+
+fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, notify: Io.Condition.Notify) void {
+    const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+    _ = pool;
+    comptime assert(@TypeOf(cond.state) == u64);
+    const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
+    const cond_state = &ints[0];
+    const cond_epoch = &ints[1];
+    const one_waiter = 1;
+    const waiter_mask = 0xffff;
+    const one_signal = 1 << 16;
+    const signal_mask = 0xffff << 16;
+    var state = cond_state.load(.monotonic);
+    while (true) {
+        const waiters = (state & waiter_mask) / one_waiter;
+        const signals = (state & signal_mask) / one_signal;
+
+        // Reserves which waiters to wake up by incrementing the signals count.
+        // Therefore, the signals count is always less than or equal to the waiters count.
+        // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters.
+        const wakeable = waiters - signals;
+        if (wakeable == 0) {
+            return;
+        }
+
+        const to_wake = switch (notify) {
+            .one => 1,
+            .all => wakeable,
+        };
+
+        // Reserve the amount of waiters to wake by incrementing the signals count.
+        // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads.
+        const new_state = state + (one_signal * to_wake);
+        state = cond_state.cmpxchgWeak(state, new_state, .release, .monotonic) orelse {
+            // Wake up the waiting threads we reserved above by changing the epoch value.
+            // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it.
+            // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption.
+            //
+            // Release barrier ensures the signal being added to the state happens before the epoch is changed.
+            // If not, the waiting thread could potentially deadlock from missing both the state and epoch change:
+            //
+            // - T2: UPDATE(&epoch, 1) (reordered before the state change)
+            // - T1: e = LOAD(&epoch)
+            // - T1: s = LOAD(&state)
+            // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch)
+            // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change)
+            _ = cond_epoch.fetchAdd(1, .release);
+            std.Thread.Futex.wake(cond_epoch, to_wake);
+            return;
+        };
+    }
 }
 
-pub fn createFile(
+fn createFile(
     userdata: ?*anyopaque,
     dir: std.fs.Dir,
     sub_path: []const u8,
@@ -532,7 +703,7 @@ pub fn createFile(
     return dir.createFile(sub_path, flags);
 }
 
-pub fn openFile(
+fn openFile(
     userdata: ?*anyopaque,
     dir: std.fs.Dir,
     sub_path: []const u8,
@@ -543,13 +714,13 @@ pub fn openFile(
     return dir.openFile(sub_path, flags);
 }
 
-pub fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
+fn closeFile(userdata: ?*anyopaque, file: std.fs.File) void {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     _ = pool;
     return file.close();
 }
 
-pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
+fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std.posix.off_t) Io.FilePReadError!usize {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     try pool.checkCancel();
     return switch (offset) {
@@ -558,7 +729,7 @@ pub fn pread(userdata: ?*anyopaque, file: std.fs.File, buffer: []u8, offset: std
     };
 }
 
-pub fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
+fn pwrite(userdata: ?*anyopaque, file: std.fs.File, buffer: []const u8, offset: std.posix.off_t) Io.FilePWriteError!usize {
     const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
     try pool.checkCancel();
     return switch (offset) {
lib/std/Io.zig
@@ -6,6 +6,7 @@ const windows = std.os.windows;
 const posix = std.posix;
 const math = std.math;
 const assert = std.debug.assert;
+const fs = std.fs;
 const Allocator = std.mem.Allocator;
 const Alignment = std.mem.Alignment;
 
@@ -614,6 +615,12 @@ pub const VTable = struct {
     /// Thread-safe.
     cancelRequested: *const fn (?*anyopaque) bool,
 
+    mutexLock: *const fn (?*anyopaque, mutex: *Mutex) void,
+    mutexUnlock: *const fn (?*anyopaque, mutex: *Mutex) void,
+
+    conditionWait: *const fn (?*anyopaque, cond: *Condition, mutex: *Mutex, timeout_ns: ?u64) Condition.WaitError!void,
+    conditionWake: *const fn (?*anyopaque, cond: *Condition, notify: Condition.Notify) void,
+
     createFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.CreateFlags) FileOpenError!fs.File,
     openFile: *const fn (?*anyopaque, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File,
     closeFile: *const fn (?*anyopaque, fs.File) void,
@@ -627,11 +634,11 @@ pub const VTable = struct {
 pub const OpenFlags = fs.File.OpenFlags;
 pub const CreateFlags = fs.File.CreateFlags;
 
-pub const FileOpenError = fs.File.OpenError || error{AsyncCancel};
-pub const FileReadError = fs.File.ReadError || error{AsyncCancel};
-pub const FilePReadError = fs.File.PReadError || error{AsyncCancel};
-pub const FileWriteError = fs.File.WriteError || error{AsyncCancel};
-pub const FilePWriteError = fs.File.PWriteError || error{AsyncCancel};
+pub const FileOpenError = fs.File.OpenError || error{Canceled};
+pub const FileReadError = fs.File.ReadError || error{Canceled};
+pub const FilePReadError = fs.File.PReadError || error{Canceled};
+pub const FileWriteError = fs.File.WriteError || error{Canceled};
+pub const FilePWriteError = fs.File.PWriteError || error{Canceled};
 
 pub const Timestamp = enum(i96) {
     _,
@@ -648,8 +655,8 @@ pub const Deadline = union(enum) {
     nanoseconds: i96,
     timestamp: Timestamp,
 };
-pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{AsyncCancel};
-pub const SleepError = error{ UnsupportedClock, Unexpected, AsyncCancel };
+pub const ClockGetTimeError = std.posix.ClockGetTimeError || error{Canceled};
+pub const SleepError = error{ UnsupportedClock, Unexpected, Canceled };
 
 pub const AnyFuture = opaque {};
 
@@ -678,6 +685,302 @@ pub fn Future(Result: type) type {
     };
 }
 
+pub const Mutex = struct {
+    state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked),
+
+    pub const unlocked: u32 = 0b00;
+    pub const locked: u32 = 0b01;
+    pub const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below
+
+    pub fn tryLock(m: *Mutex) bool {
+        // On x86, use `lock bts` instead of `lock cmpxchg` as:
+        // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
+        // - `lock bts` is smaller instruction-wise which makes it better for inlining
+        if (builtin.target.cpu.arch.isX86()) {
+            const locked_bit = @ctz(locked);
+            return m.state.bitSet(locked_bit, .acquire) == 0;
+        }
+
+        // Acquire barrier ensures grabbing the lock happens before the critical section
+        // and that the previous lock holder's critical section happens before we grab the lock.
+        return m.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null;
+    }
+
+    /// Avoids the vtable for uncontended locks.
+    pub fn lock(m: *Mutex, io: Io) void {
+        if (!m.tryLock()) {
+            @branchHint(.unlikely);
+            io.vtable.mutexLock(io.userdata, m);
+        }
+    }
+
+    pub fn unlock(m: *Mutex, io: Io) void {
+        io.vtable.mutexUnlock(io.userdata, m);
+    }
+};
+
+pub const Condition = struct {
+    state: u64 = 0,
+
+    pub const WaitError = error{
+        Timeout,
+        Canceled,
+    };
+
+    /// How many waiters to wake up.
+    pub const Notify = enum {
+        one,
+        all,
+    };
+
+    pub fn wait(cond: *Condition, io: Io, mutex: *Mutex) void {
+        io.vtable.conditionWait(io.userdata, cond, mutex, null) catch |err| switch (err) {
+            error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out
+            error.Canceled => return, // handled as spurious wakeup
+        };
+    }
+
+    pub fn timedWait(cond: *Condition, io: Io, mutex: *Mutex, timeout_ns: u64) WaitError!void {
+        return io.vtable.conditionWait(io.userdata, cond, mutex, timeout_ns);
+    }
+
+    pub fn signal(cond: *Condition, io: Io) void {
+        io.vtable.conditionWake(io.userdata, cond, .one);
+    }
+
+    pub fn broadcast(cond: *Condition, io: Io) void {
+        io.vtable.conditionWake(io.userdata, cond, .all);
+    }
+};
+
+pub const TypeErasedQueue = struct {
+    mutex: Mutex,
+
+    /// Ring buffer. This data is logically *after* queued getters.
+    buffer: []u8,
+    put_index: usize,
+    get_index: usize,
+
+    putters: std.DoublyLinkedList(PutNode),
+    getters: std.DoublyLinkedList(GetNode),
+
+    const PutNode = struct {
+        remaining: []const u8,
+        condition: Condition,
+    };
+
+    const GetNode = struct {
+        remaining: []u8,
+        condition: Condition,
+    };
+
+    pub fn init(buffer: []u8) TypeErasedQueue {
+        return .{
+            .mutex = .{},
+            .buffer = buffer,
+            .put_index = 0,
+            .get_index = 0,
+            .putters = .{},
+            .getters = .{},
+        };
+    }
+
+    pub fn put(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize) usize {
+        assert(elements.len >= min);
+
+        q.mutex.lock(io);
+        defer q.mutex.unlock(io);
+
+        // Getters have first priority on the data, and only when the getters
+        // queue is empty do we start populating the buffer.
+
+        var remaining = elements;
+        while (true) {
+            const getter = q.getters.popFirst() orelse break;
+            const copy_len = @min(getter.data.remaining.len, remaining.len);
+            @memcpy(getter.data.remaining[0..copy_len], remaining[0..copy_len]);
+            remaining = remaining[copy_len..];
+            getter.data.remaining = getter.data.remaining[copy_len..];
+            if (getter.data.remaining.len == 0) {
+                getter.data.condition.signal(io);
+                continue;
+            }
+            q.getters.prepend(getter);
+            assert(remaining.len == 0);
+            return elements.len;
+        }
+
+        while (true) {
+            {
+                const available = q.buffer[q.put_index..];
+                const copy_len = @min(available.len, remaining.len);
+                @memcpy(available[0..copy_len], remaining[0..copy_len]);
+                remaining = remaining[copy_len..];
+                q.put_index += copy_len;
+                if (remaining.len == 0) return elements.len;
+            }
+            {
+                const available = q.buffer[0..q.get_index];
+                const copy_len = @min(available.len, remaining.len);
+                @memcpy(available[0..copy_len], remaining[0..copy_len]);
+                remaining = remaining[copy_len..];
+                q.put_index = copy_len;
+                if (remaining.len == 0) return elements.len;
+            }
+
+            const total_filled = elements.len - remaining.len;
+            if (total_filled >= min) return total_filled;
+
+            var node: std.DoublyLinkedList(PutNode).Node = .{
+                .data = .{ .remaining = remaining, .condition = .{} },
+            };
+            q.putters.append(&node);
+            node.data.condition.wait(io, &q.mutex);
+            remaining = node.data.remaining;
+        }
+    }
+
+    pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
+        assert(buffer.len >= min);
+
+        q.mutex.lock(io);
+        defer q.mutex.unlock(io);
+
+        // The ring buffer gets first priority, then data should come from any
+        // queued putters, then finally the ring buffer should be filled with
+        // data from putters so they can be resumed.
+
+        var remaining = buffer;
+        while (true) {
+            if (q.get_index <= q.put_index) {
+                const available = q.buffer[q.get_index..q.put_index];
+                const copy_len = @min(available.len, remaining.len);
+                @memcpy(remaining[0..copy_len], available[0..copy_len]);
+                q.get_index += copy_len;
+                remaining = remaining[copy_len..];
+                if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
+            } else {
+                {
+                    const available = q.buffer[q.get_index..];
+                    const copy_len = @min(available.len, remaining.len);
+                    @memcpy(remaining[0..copy_len], available[0..copy_len]);
+                    q.get_index += copy_len;
+                    remaining = remaining[copy_len..];
+                    if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
+                }
+                {
+                    const available = q.buffer[0..q.put_index];
+                    const copy_len = @min(available.len, remaining.len);
+                    @memcpy(remaining[0..copy_len], available[0..copy_len]);
+                    q.get_index = copy_len;
+                    remaining = remaining[copy_len..];
+                    if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
+                }
+            }
+            // Copy directly from putters into buffer.
+            while (remaining.len > 0) {
+                const putter = q.putters.popFirst() orelse break;
+                const copy_len = @min(putter.data.remaining.len, remaining.len);
+                @memcpy(remaining[0..copy_len], putter.data.remaining[0..copy_len]);
+                putter.data.remaining = putter.data.remaining[copy_len..];
+                remaining = remaining[copy_len..];
+                if (putter.data.remaining.len == 0) {
+                    putter.data.condition.signal(io);
+                } else {
+                    assert(remaining.len == 0);
+                    q.putters.prepend(putter);
+                    return fillRingBufferFromPutters(q, io, buffer.len);
+                }
+            }
+            // Both ring buffer and putters queue is empty.
+            const total_filled = buffer.len - remaining.len;
+            if (total_filled >= min) return total_filled;
+
+            var node: std.DoublyLinkedList(GetNode).Node = .{
+                .data = .{ .remaining = remaining, .condition = .{} },
+            };
+            q.getters.append(&node);
+            node.data.condition.wait(io, &q.mutex);
+            remaining = node.data.remaining;
+        }
+    }
+
+    /// Called when there is nonzero space available in the ring buffer and
+    /// potentially putters waiting. The mutex is already held and the task is
+    /// to copy putter data to the ring buffer and signal any putters whose
+    /// buffers been fully copied.
+    fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io, len: usize) usize {
+        while (true) {
+            const putter = q.putters.popFirst() orelse return len;
+            const available = q.buffer[q.put_index..];
+            const copy_len = @min(available.len, putter.data.remaining.len);
+            @memcpy(available[0..copy_len], putter.data.remaining[0..copy_len]);
+            putter.data.remaining = putter.data.remaining[copy_len..];
+            q.put_index += copy_len;
+            if (putter.data.remaining.len == 0) {
+                putter.data.condition.signal(io);
+                continue;
+            }
+            const second_available = q.buffer[0..q.get_index];
+            const second_copy_len = @min(second_available.len, putter.data.remaining.len);
+            @memcpy(second_available[0..second_copy_len], putter.data.remaining[0..second_copy_len]);
+            putter.data.remaining = putter.data.remaining[copy_len..];
+            q.put_index = copy_len;
+            if (putter.data.remaining.len == 0) {
+                putter.data.condition.signal(io);
+                continue;
+            }
+            q.putters.prepend(putter);
+            return len;
+        }
+    }
+};
+
+/// Many producer, many consumer, thread-safe, runtime configurable buffer size.
+/// When buffer is empty, consumers suspend and are resumed by producers.
+/// When buffer is full, producers suspend and are resumed by consumers.
+pub fn Queue(Elem: type) type {
+    return struct {
+        type_erased: TypeErasedQueue,
+
+        pub fn init(buffer: []Elem) @This() {
+            return .{ .type_erased = .init(@ptrCast(buffer)) };
+        }
+
+        /// Appends elements to the end of the queue. The function returns when
+        /// at least `min` elements have been added to the buffer or sent
+        /// directly to a consumer.
+        ///
+        /// Returns how many elements have been added to the queue.
+        ///
+        /// Asserts that `elements.len >= min`.
+        pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) usize {
+            return @divExact(q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
+        }
+
+        /// Receives elements from the beginning of the queue. The function
+        /// returns when at least `min` elements have been populated inside
+        /// `buffer`.
+        ///
+        /// Returns how many elements of `buffer` have been populated.
+        ///
+        /// Asserts that `buffer.len >= min`.
+        pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) usize {
+            return @divExact(q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
+        }
+
+        pub fn putOne(q: *@This(), io: Io, item: Elem) void {
+            assert(q.put(io, &.{item}, 1) == 1);
+        }
+
+        pub fn getOne(q: *@This(), io: Io) Elem {
+            var buf: [1]Elem = undefined;
+            assert(q.get(io, &buf, 1) == 1);
+            return buf[0];
+        }
+    };
+}
+
 /// Calls `function` with `args`, such that the return value of the function is
 /// not guaranteed to be available until `await` is called.
 pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
@@ -685,7 +988,7 @@ pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf(
     const Args = @TypeOf(args);
     const TypeErased = struct {
         fn start(context: *const anyopaque, result: *anyopaque) void {
-            const args_casted: *const Args = @alignCast(@ptrCast(context));
+            const args_casted: *const Args = @ptrCast(@alignCast(context));
             const result_casted: *Result = @ptrCast(@alignCast(result));
             result_casted.* = @call(.auto, function, args_casted.*);
         }