master
   1//! A mechanism used to block (`wait`) and unblock (`wake`) threads using a
   2//! 32bit memory address as hints.
   3//!
   4//! Blocking a thread is acknowledged only if the 32bit memory address is equal
   5//! to a given value. This check helps avoid block/unblock deadlocks which
   6//! occur if a `wake()` happens before a `wait()`.
   7//!
   8//! Using Futex, other Thread synchronization primitives can be built which
   9//! efficiently wait for cross-thread events or signals.
  10
  11const std = @import("../std.zig");
  12const builtin = @import("builtin");
  13const Futex = @This();
  14const windows = std.os.windows;
  15const linux = std.os.linux;
  16const c = std.c;
  17
  18const assert = std.debug.assert;
  19const testing = std.testing;
  20const atomic = std.atomic;
  21
  22/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
  23/// - The value at `ptr` is no longer equal to `expect`.
  24/// - The caller is unblocked by a matching `wake()`.
  25/// - The caller is unblocked spuriously ("at random").
  26///
  27/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
  28/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
  29pub fn wait(ptr: *const atomic.Value(u32), expect: u32) void {
  30    @branchHint(.cold);
  31
  32    Impl.wait(ptr, expect, null) catch |err| switch (err) {
  33        error.Timeout => unreachable, // null timeout meant to wait forever
  34    };
  35}
  36
  37/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
  38/// - The value at `ptr` is no longer equal to `expect`.
  39/// - The caller is unblocked by a matching `wake()`.
  40/// - The caller is unblocked spuriously ("at random").
  41/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned.
  42///
  43/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
  44/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
  45pub fn timedWait(ptr: *const atomic.Value(u32), expect: u32, timeout_ns: u64) error{Timeout}!void {
  46    @branchHint(.cold);
  47
  48    // Avoid calling into the OS for no-op timeouts.
  49    if (timeout_ns == 0) {
  50        if (ptr.load(.seq_cst) != expect) return;
  51        return error.Timeout;
  52    }
  53
  54    return Impl.wait(ptr, expect, timeout_ns);
  55}
  56
  57/// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`.
  58pub fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
  59    @branchHint(.cold);
  60
  61    // Avoid calling into the OS if there's nothing to wake up.
  62    if (max_waiters == 0) {
  63        return;
  64    }
  65
  66    Impl.wake(ptr, max_waiters);
  67}
  68
  69const Impl = if (builtin.single_threaded)
  70    SingleThreadedImpl
  71else if (builtin.os.tag == .windows)
  72    WindowsImpl
  73else if (builtin.os.tag.isDarwin())
  74    DarwinImpl
  75else if (builtin.os.tag == .linux)
  76    LinuxImpl
  77else if (builtin.os.tag == .freebsd)
  78    FreebsdImpl
  79else if (builtin.os.tag == .openbsd)
  80    OpenbsdImpl
  81else if (builtin.os.tag == .dragonfly)
  82    DragonflyImpl
  83else if (builtin.target.cpu.arch.isWasm())
  84    WasmImpl
  85else if (std.Thread.use_pthreads)
  86    PosixImpl
  87else
  88    UnsupportedImpl;
  89
  90/// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated.
  91/// So instead, we @compileError() on the methods themselves for platforms which don't support futex.
  92const UnsupportedImpl = struct {
  93    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
  94        return unsupported(.{ ptr, expect, timeout });
  95    }
  96
  97    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
  98        return unsupported(.{ ptr, max_waiters });
  99    }
 100
 101    fn unsupported(unused: anytype) noreturn {
 102        _ = unused;
 103        @compileError("Unsupported operating system " ++ @tagName(builtin.target.os.tag));
 104    }
 105};
 106
 107const SingleThreadedImpl = struct {
 108    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 109        if (ptr.raw != expect) {
 110            return;
 111        }
 112
 113        // There are no threads to wake us up.
 114        // So if we wait without a timeout we would never wake up.
 115        const delay = timeout orelse {
 116            unreachable; // deadlock detected
 117        };
 118
 119        _ = delay;
 120        return error.Timeout;
 121    }
 122
 123    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 124        // There are no other threads to possibly wake up
 125        _ = ptr;
 126        _ = max_waiters;
 127    }
 128};
 129
 130// We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll
 131// as it's generally already a linked target and is autoloaded into all processes anyway.
 132const WindowsImpl = struct {
 133    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 134        var timeout_value: windows.LARGE_INTEGER = undefined;
 135        var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
 136
 137        // NTDLL functions work with time in units of 100 nanoseconds.
 138        // Positive values are absolute deadlines while negative values are relative durations.
 139        if (timeout) |delay| {
 140            timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100));
 141            timeout_value = -timeout_value;
 142            timeout_ptr = &timeout_value;
 143        }
 144
 145        const rc = windows.ntdll.RtlWaitOnAddress(
 146            ptr,
 147            &expect,
 148            @sizeOf(@TypeOf(expect)),
 149            timeout_ptr,
 150        );
 151
 152        switch (rc) {
 153            .SUCCESS => {},
 154            .TIMEOUT => {
 155                assert(timeout != null);
 156                return error.Timeout;
 157            },
 158            else => unreachable,
 159        }
 160    }
 161
 162    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 163        const address: ?*const anyopaque = ptr;
 164        assert(max_waiters != 0);
 165
 166        switch (max_waiters) {
 167            1 => windows.ntdll.RtlWakeAddressSingle(address),
 168            else => windows.ntdll.RtlWakeAddressAll(address),
 169        }
 170    }
 171};
 172
 173const DarwinImpl = struct {
 174    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 175        // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
 176        // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
 177        //
 178        // This XNU version appears to correspond to 11.0.1:
 179        // https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html
 180        //
 181        // ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout
 182        // ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
 183        const supports_ulock_wait2 = builtin.target.os.version_range.semver.min.major >= 11;
 184
 185        var timeout_ns: u64 = 0;
 186        if (timeout) |delay| {
 187            assert(delay != 0); // handled by timedWait()
 188            timeout_ns = delay;
 189        }
 190
 191        // If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of
 192        // micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users
 193        // should handle spurious wakeups), but we need to remember that we did so, so that
 194        // we don't return `Timeout` incorrectly. If that happens, we set this variable to
 195        // true so that we we know to ignore the ETIMEDOUT result.
 196        var timeout_overflowed = false;
 197
 198        const addr: *const anyopaque = ptr;
 199        const flags: c.UL = .{
 200            .op = .COMPARE_AND_WAIT,
 201            .NO_ERRNO = true,
 202        };
 203        const status = blk: {
 204            if (supports_ulock_wait2) {
 205                break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
 206            }
 207
 208            const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: {
 209                timeout_overflowed = true;
 210                break :overflow std.math.maxInt(u32);
 211            };
 212
 213            break :blk c.__ulock_wait(flags, addr, expect, timeout_us);
 214        };
 215
 216        if (status >= 0) return;
 217        switch (@as(c.E, @enumFromInt(-status))) {
 218            // Wait was interrupted by the OS or other spurious signalling.
 219            .INTR => {},
 220            // Address of the futex was paged out. This is unlikely, but possible in theory, and
 221            // pthread/libdispatch on darwin bother to handle it. In this case we'll return
 222            // without waiting, but the caller should retry anyway.
 223            .FAULT => {},
 224            // Only report Timeout if we didn't have to cap the timeout
 225            .TIMEDOUT => {
 226                assert(timeout != null);
 227                if (!timeout_overflowed) return error.Timeout;
 228            },
 229            else => unreachable,
 230        }
 231    }
 232
 233    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 234        const flags: c.UL = .{
 235            .op = .COMPARE_AND_WAIT,
 236            .NO_ERRNO = true,
 237            .WAKE_ALL = max_waiters > 1,
 238        };
 239
 240        while (true) {
 241            const addr: *const anyopaque = ptr;
 242            const status = c.__ulock_wake(flags, addr, 0);
 243
 244            if (status >= 0) return;
 245            switch (@as(c.E, @enumFromInt(-status))) {
 246                .INTR => continue, // spurious wake()
 247                .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
 248                .NOENT => return, // nothing was woken up
 249                .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
 250                else => unreachable,
 251            }
 252        }
 253    }
 254};
 255
 256// https://man7.org/linux/man-pages/man2/futex.2.html
 257const LinuxImpl = struct {
 258    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 259        var ts: linux.timespec = undefined;
 260        if (timeout) |timeout_ns| {
 261            ts.sec = @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
 262            ts.nsec = @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
 263        }
 264
 265        const rc = linux.futex_4arg(
 266            &ptr.raw,
 267            .{ .cmd = .WAIT, .private = true },
 268            expect,
 269            if (timeout != null) &ts else null,
 270        );
 271
 272        switch (linux.errno(rc)) {
 273            .SUCCESS => {}, // notified by `wake()`
 274            .INTR => {}, // spurious wakeup
 275            .AGAIN => {}, // ptr.* != expect
 276            .TIMEDOUT => {
 277                assert(timeout != null);
 278                return error.Timeout;
 279            },
 280            .INVAL => {}, // possibly timeout overflow
 281            .FAULT => unreachable, // ptr was invalid
 282            else => unreachable,
 283        }
 284    }
 285
 286    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 287        const rc = linux.futex_3arg(
 288            &ptr.raw,
 289            .{ .cmd = .WAKE, .private = true },
 290            @min(max_waiters, std.math.maxInt(i32)),
 291        );
 292
 293        switch (linux.errno(rc)) {
 294            .SUCCESS => {}, // successful wake up
 295            .INVAL => {}, // invalid futex_wait() on ptr done elsewhere
 296            .FAULT => {}, // pointer became invalid while doing the wake
 297            else => unreachable,
 298        }
 299    }
 300};
 301
 302// https://www.freebsd.org/cgi/man.cgi?query=_umtx_op&sektion=2&n=1
 303const FreebsdImpl = struct {
 304    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 305        var tm_size: usize = 0;
 306        var tm: c._umtx_time = undefined;
 307        var tm_ptr: ?*const c._umtx_time = null;
 308
 309        if (timeout) |timeout_ns| {
 310            tm_ptr = &tm;
 311            tm_size = @sizeOf(@TypeOf(tm));
 312
 313            tm.flags = 0; // use relative time not UMTX_ABSTIME
 314            tm.clockid = .MONOTONIC;
 315            tm.timeout.sec = @as(@TypeOf(tm.timeout.sec), @intCast(timeout_ns / std.time.ns_per_s));
 316            tm.timeout.nsec = @as(@TypeOf(tm.timeout.nsec), @intCast(timeout_ns % std.time.ns_per_s));
 317        }
 318
 319        const rc = c._umtx_op(
 320            @intFromPtr(&ptr.raw),
 321            @intFromEnum(c.UMTX_OP.WAIT_UINT_PRIVATE),
 322            @as(c_ulong, expect),
 323            tm_size,
 324            @intFromPtr(tm_ptr),
 325        );
 326
 327        switch (std.posix.errno(rc)) {
 328            .SUCCESS => {},
 329            .FAULT => unreachable, // one of the args points to invalid memory
 330            .INVAL => unreachable, // arguments should be correct
 331            .TIMEDOUT => {
 332                assert(timeout != null);
 333                return error.Timeout;
 334            },
 335            .INTR => {}, // spurious wake
 336            else => unreachable,
 337        }
 338    }
 339
 340    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 341        const rc = c._umtx_op(
 342            @intFromPtr(&ptr.raw),
 343            @intFromEnum(c.UMTX_OP.WAKE_PRIVATE),
 344            @as(c_ulong, max_waiters),
 345            0, // there is no timeout struct
 346            0, // there is no timeout struct pointer
 347        );
 348
 349        switch (std.posix.errno(rc)) {
 350            .SUCCESS => {},
 351            .FAULT => {}, // it's ok if the ptr doesn't point to valid memory
 352            .INVAL => unreachable, // arguments should be correct
 353            else => unreachable,
 354        }
 355    }
 356};
 357
 358// https://man.openbsd.org/futex.2
 359const OpenbsdImpl = struct {
 360    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 361        var ts: c.timespec = undefined;
 362        if (timeout) |timeout_ns| {
 363            ts.sec = @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
 364            ts.nsec = @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
 365        }
 366
 367        const rc = c.futex(
 368            @as(*const volatile u32, @ptrCast(&ptr.raw)),
 369            c.FUTEX.WAIT | c.FUTEX.PRIVATE_FLAG,
 370            @as(c_int, @bitCast(expect)),
 371            if (timeout != null) &ts else null,
 372            null, // FUTEX.WAIT takes no requeue address
 373        );
 374
 375        switch (std.posix.errno(rc)) {
 376            .SUCCESS => {}, // woken up by wake
 377            .NOSYS => unreachable, // the futex operation shouldn't be invalid
 378            .FAULT => unreachable, // ptr was invalid
 379            .AGAIN => {}, // ptr != expect
 380            .INVAL => unreachable, // invalid timeout
 381            .TIMEDOUT => {
 382                assert(timeout != null);
 383                return error.Timeout;
 384            },
 385            .INTR => {}, // spurious wake from signal
 386            .CANCELED => {}, // spurious wake from signal with SA_RESTART
 387            else => unreachable,
 388        }
 389    }
 390
 391    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 392        const rc = c.futex(
 393            @as(*const volatile u32, @ptrCast(&ptr.raw)),
 394            c.FUTEX.WAKE | c.FUTEX.PRIVATE_FLAG,
 395            std.math.cast(c_int, max_waiters) orelse std.math.maxInt(c_int),
 396            null, // FUTEX.WAKE takes no timeout ptr
 397            null, // FUTEX.WAKE takes no requeue address
 398        );
 399
 400        // returns number of threads woken up.
 401        assert(rc >= 0);
 402    }
 403};
 404
 405// https://man.dragonflybsd.org/?command=umtx&section=2
 406const DragonflyImpl = struct {
 407    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 408        // Dragonfly uses a scheme where 0 timeout means wait until signaled or spurious wake.
 409        // It's reporting of timeout's is also unrealiable so we use an external timing source (Timer) instead.
 410        var timeout_us: c_int = 0;
 411        var timeout_overflowed = false;
 412        var sleep_timer: std.time.Timer = undefined;
 413
 414        if (timeout) |delay| {
 415            assert(delay != 0); // handled by timedWait().
 416            timeout_us = std.math.cast(c_int, delay / std.time.ns_per_us) orelse blk: {
 417                timeout_overflowed = true;
 418                break :blk std.math.maxInt(c_int);
 419            };
 420
 421            // Only need to record the start time if we can provide somewhat accurate error.Timeout's
 422            if (!timeout_overflowed) {
 423                sleep_timer = std.time.Timer.start() catch unreachable;
 424            }
 425        }
 426
 427        const value = @as(c_int, @bitCast(expect));
 428        const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
 429        const rc = c.umtx_sleep(addr, value, timeout_us);
 430
 431        switch (std.posix.errno(rc)) {
 432            .SUCCESS => {},
 433            .BUSY => {}, // ptr != expect
 434            .AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh
 435                if (timeout) |timeout_ns| {
 436                    // Report error.Timeout only if we know the timeout duration has passed.
 437                    // If not, there's not much choice other than treating it as a spurious wake.
 438                    if (!timeout_overflowed and sleep_timer.read() >= timeout_ns) {
 439                        return error.Timeout;
 440                    }
 441                }
 442            },
 443            .INTR => {}, // spurious wake
 444            .INVAL => unreachable, // invalid timeout
 445            else => unreachable,
 446        }
 447    }
 448
 449    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 450        // A count of zero means wake all waiters.
 451        assert(max_waiters != 0);
 452        const to_wake = std.math.cast(c_int, max_waiters) orelse 0;
 453
 454        // https://man.dragonflybsd.org/?command=umtx&section=2
 455        // > umtx_wakeup() will generally return 0 unless the address is bad.
 456        // We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore)
 457        const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
 458        _ = c.umtx_wakeup(addr, to_wake);
 459    }
 460};
 461
 462const WasmImpl = struct {
 463    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 464        if (!comptime builtin.cpu.has(.wasm, .atomics)) @compileError("WASI target missing cpu feature 'atomics'");
 465
 466        const to: i64 = if (timeout) |to| @intCast(to) else -1;
 467        const result = asm volatile (
 468            \\local.get %[ptr]
 469            \\local.get %[expected]
 470            \\local.get %[timeout]
 471            \\memory.atomic.wait32 0
 472            \\local.set %[ret]
 473            : [ret] "=r" (-> u32),
 474            : [ptr] "r" (&ptr.raw),
 475              [expected] "r" (@as(i32, @bitCast(expect))),
 476              [timeout] "r" (to),
 477        );
 478        switch (result) {
 479            0 => {}, // ok
 480            1 => {}, // expected =! loaded
 481            2 => return error.Timeout,
 482            else => unreachable,
 483        }
 484    }
 485
 486    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 487        if (!comptime builtin.cpu.has(.wasm, .atomics)) @compileError("WASI target missing cpu feature 'atomics'");
 488
 489        assert(max_waiters != 0);
 490        const woken_count = asm volatile (
 491            \\local.get %[ptr]
 492            \\local.get %[waiters]
 493            \\memory.atomic.notify 0
 494            \\local.set %[ret]
 495            : [ret] "=r" (-> u32),
 496            : [ptr] "r" (&ptr.raw),
 497              [waiters] "r" (max_waiters),
 498        );
 499        _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
 500    }
 501};
 502
 503/// Modified version of linux's futex and Go's sema to implement userspace wait queues with pthread:
 504/// https://code.woboq.org/linux/linux/kernel/futex.c.html
 505/// https://go.dev/src/runtime/sema.go
 506const PosixImpl = struct {
 507    const Event = struct {
 508        cond: c.pthread_cond_t,
 509        mutex: c.pthread_mutex_t,
 510        state: enum { empty, waiting, notified },
 511
 512        fn init(self: *Event) void {
 513            // Use static init instead of pthread_cond/mutex_init() since this is generally faster.
 514            self.cond = .{};
 515            self.mutex = .{};
 516            self.state = .empty;
 517        }
 518
 519        fn deinit(self: *Event) void {
 520            // Some platforms reportedly give EINVAL for statically initialized pthread types.
 521            const rc = c.pthread_cond_destroy(&self.cond);
 522            assert(rc == .SUCCESS or rc == .INVAL);
 523
 524            const rm = c.pthread_mutex_destroy(&self.mutex);
 525            assert(rm == .SUCCESS or rm == .INVAL);
 526
 527            self.* = undefined;
 528        }
 529
 530        fn wait(self: *Event, timeout: ?u64) error{Timeout}!void {
 531            assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
 532            defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
 533
 534            // Early return if the event was already set.
 535            if (self.state == .notified) {
 536                return;
 537            }
 538
 539            // Compute the absolute timeout if one was specified.
 540            // POSIX requires that REALTIME is used by default for the pthread timedwait functions.
 541            // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere.
 542            var ts: c.timespec = undefined;
 543            if (timeout) |timeout_ns| {
 544                ts = std.posix.clock_gettime(c.CLOCK.REALTIME) catch unreachable;
 545                ts.sec +|= @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
 546                ts.nsec += @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
 547
 548                if (ts.nsec >= std.time.ns_per_s) {
 549                    ts.sec +|= 1;
 550                    ts.nsec -= std.time.ns_per_s;
 551                }
 552            }
 553
 554            // Start waiting on the event - there can be only one thread waiting.
 555            assert(self.state == .empty);
 556            self.state = .waiting;
 557
 558            while (true) {
 559                // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout.
 560                const rc = blk: {
 561                    if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex);
 562                    break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
 563                };
 564
 565                // After waking up, check if the event was set.
 566                if (self.state == .notified) {
 567                    return;
 568                }
 569
 570                assert(self.state == .waiting);
 571                switch (rc) {
 572                    .SUCCESS => {},
 573                    .TIMEDOUT => {
 574                        // If timed out, reset the event to avoid the set() thread doing an unnecessary signal().
 575                        self.state = .empty;
 576                        return error.Timeout;
 577                    },
 578                    .INVAL => unreachable, // cond, mutex, and potentially ts should all be valid
 579                    .PERM => unreachable, // mutex is locked when cond_*wait() functions are called
 580                    else => unreachable,
 581                }
 582            }
 583        }
 584
 585        fn set(self: *Event) void {
 586            assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
 587            defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
 588
 589            // Make sure that multiple calls to set() were not done on the same Event.
 590            const old_state = self.state;
 591            assert(old_state != .notified);
 592
 593            // Mark the event as set and wake up the waiting thread if there was one.
 594            // This must be done while the mutex as the wait() thread could deallocate
 595            // the condition variable once it observes the new state, potentially causing a UAF if done unlocked.
 596            self.state = .notified;
 597            if (old_state == .waiting) {
 598                assert(c.pthread_cond_signal(&self.cond) == .SUCCESS);
 599            }
 600        }
 601    };
 602
 603    const Treap = std.Treap(usize, std.math.order);
 604    const Waiter = struct {
 605        node: Treap.Node,
 606        prev: ?*Waiter,
 607        next: ?*Waiter,
 608        tail: ?*Waiter,
 609        is_queued: bool,
 610        event: Event,
 611    };
 612
 613    // An unordered set of Waiters
 614    const WaitList = struct {
 615        top: ?*Waiter = null,
 616        len: usize = 0,
 617
 618        fn push(self: *WaitList, waiter: *Waiter) void {
 619            waiter.next = self.top;
 620            self.top = waiter;
 621            self.len += 1;
 622        }
 623
 624        fn pop(self: *WaitList) ?*Waiter {
 625            const waiter = self.top orelse return null;
 626            self.top = waiter.next;
 627            self.len -= 1;
 628            return waiter;
 629        }
 630    };
 631
 632    const WaitQueue = struct {
 633        fn insert(treap: *Treap, address: usize, waiter: *Waiter) void {
 634            // prepare the waiter to be inserted.
 635            waiter.next = null;
 636            waiter.is_queued = true;
 637
 638            // Find the wait queue entry associated with the address.
 639            // If there isn't a wait queue on the address, this waiter creates the queue.
 640            var entry = treap.getEntryFor(address);
 641            const entry_node = entry.node orelse {
 642                waiter.prev = null;
 643                waiter.tail = waiter;
 644                entry.set(&waiter.node);
 645                return;
 646            };
 647
 648            // There's a wait queue on the address; get the queue head and tail.
 649            const head: *Waiter = @fieldParentPtr("node", entry_node);
 650            const tail = head.tail orelse unreachable;
 651
 652            // Push the waiter to the tail by replacing it and linking to the previous tail.
 653            head.tail = waiter;
 654            tail.next = waiter;
 655            waiter.prev = tail;
 656        }
 657
 658        fn remove(treap: *Treap, address: usize, max_waiters: usize) WaitList {
 659            // Find the wait queue associated with this address and get the head/tail if any.
 660            var entry = treap.getEntryFor(address);
 661            var queue_head: ?*Waiter = if (entry.node) |node| @fieldParentPtr("node", node) else null;
 662            const queue_tail = if (queue_head) |head| head.tail else null;
 663
 664            // Once we're done updating the head, fix it's tail pointer and update the treap's queue head as well.
 665            defer entry.set(blk: {
 666                const new_head = queue_head orelse break :blk null;
 667                new_head.tail = queue_tail;
 668                break :blk &new_head.node;
 669            });
 670
 671            var removed = WaitList{};
 672            while (removed.len < max_waiters) {
 673                // dequeue and collect waiters from their wait queue.
 674                const waiter = queue_head orelse break;
 675                queue_head = waiter.next;
 676                removed.push(waiter);
 677
 678                // When dequeueing, we must mark is_queued as false.
 679                // This ensures that a waiter which calls tryRemove() returns false.
 680                assert(waiter.is_queued);
 681                waiter.is_queued = false;
 682            }
 683
 684            return removed;
 685        }
 686
 687        fn tryRemove(treap: *Treap, address: usize, waiter: *Waiter) bool {
 688            if (!waiter.is_queued) {
 689                return false;
 690            }
 691
 692            queue_remove: {
 693                // Find the wait queue associated with the address.
 694                var entry = blk: {
 695                    // A waiter without a previous link means it's the queue head that's in the treap so we can avoid lookup.
 696                    if (waiter.prev == null) {
 697                        assert(waiter.node.key == address);
 698                        break :blk treap.getEntryForExisting(&waiter.node);
 699                    }
 700                    break :blk treap.getEntryFor(address);
 701                };
 702
 703                // The queue head and tail must exist if we're removing a queued waiter.
 704                const head: *Waiter = @fieldParentPtr("node", entry.node orelse unreachable);
 705                const tail = head.tail orelse unreachable;
 706
 707                // A waiter with a previous link is never the head of the queue.
 708                if (waiter.prev) |prev| {
 709                    assert(waiter != head);
 710                    prev.next = waiter.next;
 711
 712                    // A waiter with both a previous and next link is in the middle.
 713                    // We only need to update the surrounding waiter's links to remove it.
 714                    if (waiter.next) |next| {
 715                        assert(waiter != tail);
 716                        next.prev = waiter.prev;
 717                        break :queue_remove;
 718                    }
 719
 720                    // A waiter with a previous but no next link means it's the tail of the queue.
 721                    // In that case, we need to update the head's tail reference.
 722                    assert(waiter == tail);
 723                    head.tail = waiter.prev;
 724                    break :queue_remove;
 725                }
 726
 727                // A waiter with no previous link means it's the queue head of queue.
 728                // We must replace (or remove) the head waiter reference in the treap.
 729                assert(waiter == head);
 730                entry.set(blk: {
 731                    const new_head = waiter.next orelse break :blk null;
 732                    new_head.tail = head.tail;
 733                    break :blk &new_head.node;
 734                });
 735            }
 736
 737            // Mark the waiter as successfully removed.
 738            waiter.is_queued = false;
 739            return true;
 740        }
 741    };
 742
 743    const Bucket = struct {
 744        mutex: c.pthread_mutex_t align(atomic.cache_line) = .{},
 745        pending: atomic.Value(usize) = atomic.Value(usize).init(0),
 746        treap: Treap = .{},
 747
 748        // Global array of buckets that addresses map to.
 749        // Bucket array size is pretty much arbitrary here, but it must be a power of two for fibonacci hashing.
 750        var buckets = [_]Bucket{.{}} ** @bitSizeOf(usize);
 751
 752        // https://github.com/Amanieu/parking_lot/blob/1cf12744d097233316afa6c8b7d37389e4211756/core/src/parking_lot.rs#L343-L353
 753        fn from(address: usize) *Bucket {
 754            // The upper `@bitSizeOf(usize)` bits of the fibonacci golden ratio.
 755            // Hashing this via (h * k) >> (64 - b) where k=golden-ration and b=bitsize-of-array
 756            // evenly lays out h=hash values over the bit range even when the hash has poor entropy (identity-hash for pointers).
 757            const max_multiplier_bits = @bitSizeOf(usize);
 758            const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - max_multiplier_bits);
 759
 760            const max_bucket_bits = @ctz(buckets.len);
 761            comptime assert(std.math.isPowerOfTwo(buckets.len));
 762
 763            const index = (address *% fibonacci_multiplier) >> (max_multiplier_bits - max_bucket_bits);
 764            return &buckets[index];
 765        }
 766    };
 767
 768    const Address = struct {
 769        fn from(ptr: *const atomic.Value(u32)) usize {
 770            // Get the alignment of the pointer.
 771            const alignment = @alignOf(atomic.Value(u32));
 772            comptime assert(std.math.isPowerOfTwo(alignment));
 773
 774            // Make sure the pointer is aligned,
 775            // then cut off the zero bits from the alignment to get the unique address.
 776            const addr = @intFromPtr(ptr);
 777            assert(addr & (alignment - 1) == 0);
 778            return addr >> @ctz(@as(usize, alignment));
 779        }
 780    };
 781
 782    fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
 783        const address = Address.from(ptr);
 784        const bucket = Bucket.from(address);
 785
 786        // Announce that there's a waiter in the bucket before checking the ptr/expect condition.
 787        // If the announcement is reordered after the ptr check, the waiter could deadlock:
 788        //
 789        // - T1: checks ptr == expect which is true
 790        // - T2: updates ptr to != expect
 791        // - T2: does Futex.wake(), sees no pending waiters, exits
 792        // - T1: bumps pending waiters (was reordered after the ptr == expect check)
 793        // - T1: goes to sleep and misses both the ptr change and T2's wake up
 794        //
 795        // acquire barrier to ensure the announcement happens before the ptr check below.
 796        var pending = bucket.pending.fetchAdd(1, .acquire);
 797        assert(pending < std.math.maxInt(usize));
 798
 799        // If the wait gets canceled, remove the pending count we previously added.
 800        // This is done outside the mutex lock to keep the critical section short in case of contention.
 801        var canceled = false;
 802        defer if (canceled) {
 803            pending = bucket.pending.fetchSub(1, .monotonic);
 804            assert(pending > 0);
 805        };
 806
 807        var waiter: Waiter = undefined;
 808        {
 809            assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
 810            defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
 811
 812            canceled = ptr.load(.monotonic) != expect;
 813            if (canceled) {
 814                return;
 815            }
 816
 817            waiter.event.init();
 818            WaitQueue.insert(&bucket.treap, address, &waiter);
 819        }
 820
 821        defer {
 822            assert(!waiter.is_queued);
 823            waiter.event.deinit();
 824        }
 825
 826        waiter.event.wait(timeout) catch {
 827            // If we fail to cancel after a timeout, it means a wake() thread dequeued us and will wake us up.
 828            // We must wait until the event is set as that's a signal that the wake() thread won't access the waiter memory anymore.
 829            // If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF.
 830            defer if (!canceled) waiter.event.wait(null) catch unreachable;
 831
 832            assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
 833            defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
 834
 835            canceled = WaitQueue.tryRemove(&bucket.treap, address, &waiter);
 836            if (canceled) {
 837                return error.Timeout;
 838            }
 839        };
 840    }
 841
 842    fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
 843        const address = Address.from(ptr);
 844        const bucket = Bucket.from(address);
 845
 846        // Quick check if there's even anything to wake up.
 847        // The change to the ptr's value must happen before we check for pending waiters.
 848        // If not, the wake() thread could miss a sleeping waiter and have it deadlock:
 849        //
 850        // - T2: p = has pending waiters (reordered before the ptr update)
 851        // - T1: bump pending waiters
 852        // - T1: if ptr == expected: sleep()
 853        // - T2: update ptr != expected
 854        // - T2: p is false from earlier so doesn't wake (T1 missed ptr update and T2 missed T1 sleeping)
 855        //
 856        // What we really want here is a Release load, but that doesn't exist under the C11 memory model.
 857        // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing,
 858        // LLVM lowers the fetchAdd(0, .release) into an mfence+load which avoids gaining ownership of the cache-line.
 859        if (bucket.pending.fetchAdd(0, .release) == 0) {
 860            return;
 861        }
 862
 863        // Keep a list of all the waiters notified and wake then up outside the mutex critical section.
 864        var notified = WaitList{};
 865        defer if (notified.len > 0) {
 866            const pending = bucket.pending.fetchSub(notified.len, .monotonic);
 867            assert(pending >= notified.len);
 868
 869            while (notified.pop()) |waiter| {
 870                assert(!waiter.is_queued);
 871                waiter.event.set();
 872            }
 873        };
 874
 875        assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
 876        defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
 877
 878        // Another pending check again to avoid the WaitQueue lookup if not necessary.
 879        if (bucket.pending.load(.monotonic) > 0) {
 880            notified = WaitQueue.remove(&bucket.treap, address, max_waiters);
 881        }
 882    }
 883};
 884
 885test "smoke test" {
 886    var value = atomic.Value(u32).init(0);
 887
 888    // Try waits with invalid values.
 889    Futex.wait(&value, 0xdeadbeef);
 890    Futex.timedWait(&value, 0xdeadbeef, 0) catch {};
 891
 892    // Try timeout waits.
 893    try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, 0));
 894    try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, std.time.ns_per_ms));
 895
 896    // Try wakes
 897    Futex.wake(&value, 0);
 898    Futex.wake(&value, 1);
 899    Futex.wake(&value, std.math.maxInt(u32));
 900}
 901
 902test "signaling" {
 903    // This test requires spawning threads
 904    if (builtin.single_threaded) {
 905        return error.SkipZigTest;
 906    }
 907
 908    const num_threads = 4;
 909    const num_iterations = 4;
 910
 911    const Paddle = struct {
 912        value: atomic.Value(u32) = atomic.Value(u32).init(0),
 913        current: u32 = 0,
 914
 915        fn hit(self: *@This()) void {
 916            _ = self.value.fetchAdd(1, .release);
 917            Futex.wake(&self.value, 1);
 918        }
 919
 920        fn run(self: *@This(), hit_to: *@This()) !void {
 921            while (self.current < num_iterations) {
 922                // Wait for the value to change from hit()
 923                var new_value: u32 = undefined;
 924                while (true) {
 925                    new_value = self.value.load(.acquire);
 926                    if (new_value != self.current) break;
 927                    Futex.wait(&self.value, self.current);
 928                }
 929
 930                // change the internal "current" value
 931                try testing.expectEqual(new_value, self.current + 1);
 932                self.current = new_value;
 933
 934                // hit the next paddle
 935                hit_to.hit();
 936            }
 937        }
 938    };
 939
 940    var paddles = [_]Paddle{.{}} ** num_threads;
 941    var threads = [_]std.Thread{undefined} ** num_threads;
 942
 943    // Create a circle of paddles which hit each other
 944    for (&threads, 0..) |*t, i| {
 945        const paddle = &paddles[i];
 946        const hit_to = &paddles[(i + 1) % paddles.len];
 947        t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
 948    }
 949
 950    // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
 951    paddles[0].hit();
 952    for (threads) |t| t.join();
 953    for (paddles) |p| try testing.expectEqual(p.current, num_iterations);
 954}
 955
 956test "broadcasting" {
 957    // This test requires spawning threads
 958    if (builtin.single_threaded) {
 959        return error.SkipZigTest;
 960    }
 961
 962    const num_threads = 4;
 963    const num_iterations = 4;
 964
 965    const Barrier = struct {
 966        count: atomic.Value(u32) = atomic.Value(u32).init(num_threads),
 967        futex: atomic.Value(u32) = atomic.Value(u32).init(0),
 968
 969        fn wait(self: *@This()) !void {
 970            // Decrement the counter.
 971            // Release ensures stuff before this barrier.wait() happens before the last one.
 972            // Acquire for the last counter ensures stuff before previous barrier.wait()s happened before it.
 973            const count = self.count.fetchSub(1, .acq_rel);
 974            try testing.expect(count <= num_threads);
 975            try testing.expect(count > 0);
 976
 977            // First counter to reach zero wakes all other threads.
 978            // Release on futex update ensures stuff before all barrier.wait()'s happens before they all return.
 979            if (count - 1 == 0) {
 980                self.futex.store(1, .release);
 981                Futex.wake(&self.futex, num_threads - 1);
 982                return;
 983            }
 984
 985            // Other threads wait until last counter wakes them up.
 986            // Acquire on futex synchronizes with last barrier count to ensure stuff before all barrier.wait()'s happen before us.
 987            while (self.futex.load(.acquire) == 0) {
 988                Futex.wait(&self.futex, 0);
 989            }
 990        }
 991    };
 992
 993    const Broadcast = struct {
 994        barriers: [num_iterations]Barrier = [_]Barrier{.{}} ** num_iterations,
 995        threads: [num_threads]std.Thread = undefined,
 996
 997        fn run(self: *@This()) !void {
 998            for (&self.barriers) |*barrier| {
 999                try barrier.wait();
1000            }
1001        }
1002    };
1003
1004    var broadcast = Broadcast{};
1005    for (&broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast});
1006    for (broadcast.threads) |t| t.join();
1007}
1008
1009/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout.
1010///
1011/// Futex's timedWait() api uses a relative duration which suffers from over-waiting
1012/// when used in a loop which is often required due to the possibility of spurious wakeups.
1013///
1014/// Deadline instead converts the relative timeout to an absolute one so that multiple calls
1015/// to Futex timedWait() can block for and report more accurate error.Timeouts.
1016pub const Deadline = struct {
1017    timeout: ?u64,
1018    started: std.time.Timer,
1019
1020    /// Create the deadline to expire after the given amount of time in nanoseconds passes.
1021    /// Pass in `null` to have the deadline call `Futex.wait()` and never expire.
1022    pub fn init(expires_in_ns: ?u64) Deadline {
1023        var deadline: Deadline = undefined;
1024        deadline.timeout = expires_in_ns;
1025
1026        // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout.
1027        if (deadline.timeout != null) {
1028            deadline.started = std.time.Timer.start() catch unreachable;
1029        }
1030
1031        return deadline;
1032    }
1033
1034    /// Wait until either:
1035    /// - the `ptr`'s value changes from `expect`.
1036    /// - `Futex.wake()` is called on the `ptr`.
1037    /// - A spurious wake occurs.
1038    /// - The deadline expires; In which case `error.Timeout` is returned.
1039    pub fn wait(self: *Deadline, ptr: *const atomic.Value(u32), expect: u32) error{Timeout}!void {
1040        @branchHint(.cold);
1041
1042        // Check if we actually have a timeout to wait until.
1043        // If not just wait "forever".
1044        const timeout_ns = self.timeout orelse {
1045            return Futex.wait(ptr, expect);
1046        };
1047
1048        // Get how much time has passed since we started waiting
1049        // then subtract that from the init() timeout to get how much longer to wait.
1050        // Use overflow to detect when we've been waiting longer than the init() timeout.
1051        const elapsed_ns = self.started.read();
1052        const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0;
1053        return Futex.timedWait(ptr, expect, until_timeout_ns);
1054    }
1055};
1056
1057test "Deadline" {
1058    var deadline = Deadline.init(100 * std.time.ns_per_ms);
1059    var futex_word = atomic.Value(u32).init(0);
1060
1061    while (true) {
1062        deadline.wait(&futex_word, 0) catch break;
1063    }
1064}