master
1//! A mechanism used to block (`wait`) and unblock (`wake`) threads using a
2//! 32bit memory address as hints.
3//!
4//! Blocking a thread is acknowledged only if the 32bit memory address is equal
5//! to a given value. This check helps avoid block/unblock deadlocks which
6//! occur if a `wake()` happens before a `wait()`.
7//!
8//! Using Futex, other Thread synchronization primitives can be built which
9//! efficiently wait for cross-thread events or signals.
10
11const std = @import("../std.zig");
12const builtin = @import("builtin");
13const Futex = @This();
14const windows = std.os.windows;
15const linux = std.os.linux;
16const c = std.c;
17
18const assert = std.debug.assert;
19const testing = std.testing;
20const atomic = std.atomic;
21
22/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
23/// - The value at `ptr` is no longer equal to `expect`.
24/// - The caller is unblocked by a matching `wake()`.
25/// - The caller is unblocked spuriously ("at random").
26///
27/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
28/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
29pub fn wait(ptr: *const atomic.Value(u32), expect: u32) void {
30 @branchHint(.cold);
31
32 Impl.wait(ptr, expect, null) catch |err| switch (err) {
33 error.Timeout => unreachable, // null timeout meant to wait forever
34 };
35}
36
37/// Checks if `ptr` still contains the value `expect` and, if so, blocks the caller until either:
38/// - The value at `ptr` is no longer equal to `expect`.
39/// - The caller is unblocked by a matching `wake()`.
40/// - The caller is unblocked spuriously ("at random").
41/// - The caller blocks for longer than the given timeout. In which case, `error.Timeout` is returned.
42///
43/// The checking of `ptr` and `expect`, along with blocking the caller, is done atomically
44/// and totally ordered (sequentially consistent) with respect to other wait()/wake() calls on the same `ptr`.
45pub fn timedWait(ptr: *const atomic.Value(u32), expect: u32, timeout_ns: u64) error{Timeout}!void {
46 @branchHint(.cold);
47
48 // Avoid calling into the OS for no-op timeouts.
49 if (timeout_ns == 0) {
50 if (ptr.load(.seq_cst) != expect) return;
51 return error.Timeout;
52 }
53
54 return Impl.wait(ptr, expect, timeout_ns);
55}
56
57/// Unblocks at most `max_waiters` callers blocked in a `wait()` call on `ptr`.
58pub fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
59 @branchHint(.cold);
60
61 // Avoid calling into the OS if there's nothing to wake up.
62 if (max_waiters == 0) {
63 return;
64 }
65
66 Impl.wake(ptr, max_waiters);
67}
68
69const Impl = if (builtin.single_threaded)
70 SingleThreadedImpl
71else if (builtin.os.tag == .windows)
72 WindowsImpl
73else if (builtin.os.tag.isDarwin())
74 DarwinImpl
75else if (builtin.os.tag == .linux)
76 LinuxImpl
77else if (builtin.os.tag == .freebsd)
78 FreebsdImpl
79else if (builtin.os.tag == .openbsd)
80 OpenbsdImpl
81else if (builtin.os.tag == .dragonfly)
82 DragonflyImpl
83else if (builtin.target.cpu.arch.isWasm())
84 WasmImpl
85else if (std.Thread.use_pthreads)
86 PosixImpl
87else
88 UnsupportedImpl;
89
90/// We can't do @compileError() in the `Impl` switch statement above as its eagerly evaluated.
91/// So instead, we @compileError() on the methods themselves for platforms which don't support futex.
92const UnsupportedImpl = struct {
93 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
94 return unsupported(.{ ptr, expect, timeout });
95 }
96
97 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
98 return unsupported(.{ ptr, max_waiters });
99 }
100
101 fn unsupported(unused: anytype) noreturn {
102 _ = unused;
103 @compileError("Unsupported operating system " ++ @tagName(builtin.target.os.tag));
104 }
105};
106
107const SingleThreadedImpl = struct {
108 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
109 if (ptr.raw != expect) {
110 return;
111 }
112
113 // There are no threads to wake us up.
114 // So if we wait without a timeout we would never wake up.
115 const delay = timeout orelse {
116 unreachable; // deadlock detected
117 };
118
119 _ = delay;
120 return error.Timeout;
121 }
122
123 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
124 // There are no other threads to possibly wake up
125 _ = ptr;
126 _ = max_waiters;
127 }
128};
129
130// We use WaitOnAddress through NtDll instead of API-MS-Win-Core-Synch-l1-2-0.dll
131// as it's generally already a linked target and is autoloaded into all processes anyway.
132const WindowsImpl = struct {
133 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
134 var timeout_value: windows.LARGE_INTEGER = undefined;
135 var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
136
137 // NTDLL functions work with time in units of 100 nanoseconds.
138 // Positive values are absolute deadlines while negative values are relative durations.
139 if (timeout) |delay| {
140 timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100));
141 timeout_value = -timeout_value;
142 timeout_ptr = &timeout_value;
143 }
144
145 const rc = windows.ntdll.RtlWaitOnAddress(
146 ptr,
147 &expect,
148 @sizeOf(@TypeOf(expect)),
149 timeout_ptr,
150 );
151
152 switch (rc) {
153 .SUCCESS => {},
154 .TIMEOUT => {
155 assert(timeout != null);
156 return error.Timeout;
157 },
158 else => unreachable,
159 }
160 }
161
162 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
163 const address: ?*const anyopaque = ptr;
164 assert(max_waiters != 0);
165
166 switch (max_waiters) {
167 1 => windows.ntdll.RtlWakeAddressSingle(address),
168 else => windows.ntdll.RtlWakeAddressAll(address),
169 }
170 }
171};
172
173const DarwinImpl = struct {
174 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
175 // Darwin XNU 7195.50.7.100.1 introduced __ulock_wait2 and migrated code paths (notably pthread_cond_t) towards it:
176 // https://github.com/apple/darwin-xnu/commit/d4061fb0260b3ed486147341b72468f836ed6c8f#diff-08f993cc40af475663274687b7c326cc6c3031e0db3ac8de7b24624610616be6
177 //
178 // This XNU version appears to correspond to 11.0.1:
179 // https://kernelshaman.blogspot.com/2021/01/building-xnu-for-macos-big-sur-1101.html
180 //
181 // ulock_wait() uses 32-bit micro-second timeouts where 0 = INFINITE or no-timeout
182 // ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
183 const supports_ulock_wait2 = builtin.target.os.version_range.semver.min.major >= 11;
184
185 var timeout_ns: u64 = 0;
186 if (timeout) |delay| {
187 assert(delay != 0); // handled by timedWait()
188 timeout_ns = delay;
189 }
190
191 // If we're using `__ulock_wait` and `timeout` is too big to fit inside a `u32` count of
192 // micro-seconds (around 70min), we'll request a shorter timeout. This is fine (users
193 // should handle spurious wakeups), but we need to remember that we did so, so that
194 // we don't return `Timeout` incorrectly. If that happens, we set this variable to
195 // true so that we we know to ignore the ETIMEDOUT result.
196 var timeout_overflowed = false;
197
198 const addr: *const anyopaque = ptr;
199 const flags: c.UL = .{
200 .op = .COMPARE_AND_WAIT,
201 .NO_ERRNO = true,
202 };
203 const status = blk: {
204 if (supports_ulock_wait2) {
205 break :blk c.__ulock_wait2(flags, addr, expect, timeout_ns, 0);
206 }
207
208 const timeout_us = std.math.cast(u32, timeout_ns / std.time.ns_per_us) orelse overflow: {
209 timeout_overflowed = true;
210 break :overflow std.math.maxInt(u32);
211 };
212
213 break :blk c.__ulock_wait(flags, addr, expect, timeout_us);
214 };
215
216 if (status >= 0) return;
217 switch (@as(c.E, @enumFromInt(-status))) {
218 // Wait was interrupted by the OS or other spurious signalling.
219 .INTR => {},
220 // Address of the futex was paged out. This is unlikely, but possible in theory, and
221 // pthread/libdispatch on darwin bother to handle it. In this case we'll return
222 // without waiting, but the caller should retry anyway.
223 .FAULT => {},
224 // Only report Timeout if we didn't have to cap the timeout
225 .TIMEDOUT => {
226 assert(timeout != null);
227 if (!timeout_overflowed) return error.Timeout;
228 },
229 else => unreachable,
230 }
231 }
232
233 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
234 const flags: c.UL = .{
235 .op = .COMPARE_AND_WAIT,
236 .NO_ERRNO = true,
237 .WAKE_ALL = max_waiters > 1,
238 };
239
240 while (true) {
241 const addr: *const anyopaque = ptr;
242 const status = c.__ulock_wake(flags, addr, 0);
243
244 if (status >= 0) return;
245 switch (@as(c.E, @enumFromInt(-status))) {
246 .INTR => continue, // spurious wake()
247 .FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
248 .NOENT => return, // nothing was woken up
249 .ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
250 else => unreachable,
251 }
252 }
253 }
254};
255
256// https://man7.org/linux/man-pages/man2/futex.2.html
257const LinuxImpl = struct {
258 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
259 var ts: linux.timespec = undefined;
260 if (timeout) |timeout_ns| {
261 ts.sec = @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
262 ts.nsec = @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
263 }
264
265 const rc = linux.futex_4arg(
266 &ptr.raw,
267 .{ .cmd = .WAIT, .private = true },
268 expect,
269 if (timeout != null) &ts else null,
270 );
271
272 switch (linux.errno(rc)) {
273 .SUCCESS => {}, // notified by `wake()`
274 .INTR => {}, // spurious wakeup
275 .AGAIN => {}, // ptr.* != expect
276 .TIMEDOUT => {
277 assert(timeout != null);
278 return error.Timeout;
279 },
280 .INVAL => {}, // possibly timeout overflow
281 .FAULT => unreachable, // ptr was invalid
282 else => unreachable,
283 }
284 }
285
286 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
287 const rc = linux.futex_3arg(
288 &ptr.raw,
289 .{ .cmd = .WAKE, .private = true },
290 @min(max_waiters, std.math.maxInt(i32)),
291 );
292
293 switch (linux.errno(rc)) {
294 .SUCCESS => {}, // successful wake up
295 .INVAL => {}, // invalid futex_wait() on ptr done elsewhere
296 .FAULT => {}, // pointer became invalid while doing the wake
297 else => unreachable,
298 }
299 }
300};
301
302// https://www.freebsd.org/cgi/man.cgi?query=_umtx_op&sektion=2&n=1
303const FreebsdImpl = struct {
304 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
305 var tm_size: usize = 0;
306 var tm: c._umtx_time = undefined;
307 var tm_ptr: ?*const c._umtx_time = null;
308
309 if (timeout) |timeout_ns| {
310 tm_ptr = &tm;
311 tm_size = @sizeOf(@TypeOf(tm));
312
313 tm.flags = 0; // use relative time not UMTX_ABSTIME
314 tm.clockid = .MONOTONIC;
315 tm.timeout.sec = @as(@TypeOf(tm.timeout.sec), @intCast(timeout_ns / std.time.ns_per_s));
316 tm.timeout.nsec = @as(@TypeOf(tm.timeout.nsec), @intCast(timeout_ns % std.time.ns_per_s));
317 }
318
319 const rc = c._umtx_op(
320 @intFromPtr(&ptr.raw),
321 @intFromEnum(c.UMTX_OP.WAIT_UINT_PRIVATE),
322 @as(c_ulong, expect),
323 tm_size,
324 @intFromPtr(tm_ptr),
325 );
326
327 switch (std.posix.errno(rc)) {
328 .SUCCESS => {},
329 .FAULT => unreachable, // one of the args points to invalid memory
330 .INVAL => unreachable, // arguments should be correct
331 .TIMEDOUT => {
332 assert(timeout != null);
333 return error.Timeout;
334 },
335 .INTR => {}, // spurious wake
336 else => unreachable,
337 }
338 }
339
340 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
341 const rc = c._umtx_op(
342 @intFromPtr(&ptr.raw),
343 @intFromEnum(c.UMTX_OP.WAKE_PRIVATE),
344 @as(c_ulong, max_waiters),
345 0, // there is no timeout struct
346 0, // there is no timeout struct pointer
347 );
348
349 switch (std.posix.errno(rc)) {
350 .SUCCESS => {},
351 .FAULT => {}, // it's ok if the ptr doesn't point to valid memory
352 .INVAL => unreachable, // arguments should be correct
353 else => unreachable,
354 }
355 }
356};
357
358// https://man.openbsd.org/futex.2
359const OpenbsdImpl = struct {
360 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
361 var ts: c.timespec = undefined;
362 if (timeout) |timeout_ns| {
363 ts.sec = @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
364 ts.nsec = @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
365 }
366
367 const rc = c.futex(
368 @as(*const volatile u32, @ptrCast(&ptr.raw)),
369 c.FUTEX.WAIT | c.FUTEX.PRIVATE_FLAG,
370 @as(c_int, @bitCast(expect)),
371 if (timeout != null) &ts else null,
372 null, // FUTEX.WAIT takes no requeue address
373 );
374
375 switch (std.posix.errno(rc)) {
376 .SUCCESS => {}, // woken up by wake
377 .NOSYS => unreachable, // the futex operation shouldn't be invalid
378 .FAULT => unreachable, // ptr was invalid
379 .AGAIN => {}, // ptr != expect
380 .INVAL => unreachable, // invalid timeout
381 .TIMEDOUT => {
382 assert(timeout != null);
383 return error.Timeout;
384 },
385 .INTR => {}, // spurious wake from signal
386 .CANCELED => {}, // spurious wake from signal with SA_RESTART
387 else => unreachable,
388 }
389 }
390
391 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
392 const rc = c.futex(
393 @as(*const volatile u32, @ptrCast(&ptr.raw)),
394 c.FUTEX.WAKE | c.FUTEX.PRIVATE_FLAG,
395 std.math.cast(c_int, max_waiters) orelse std.math.maxInt(c_int),
396 null, // FUTEX.WAKE takes no timeout ptr
397 null, // FUTEX.WAKE takes no requeue address
398 );
399
400 // returns number of threads woken up.
401 assert(rc >= 0);
402 }
403};
404
405// https://man.dragonflybsd.org/?command=umtx§ion=2
406const DragonflyImpl = struct {
407 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
408 // Dragonfly uses a scheme where 0 timeout means wait until signaled or spurious wake.
409 // It's reporting of timeout's is also unrealiable so we use an external timing source (Timer) instead.
410 var timeout_us: c_int = 0;
411 var timeout_overflowed = false;
412 var sleep_timer: std.time.Timer = undefined;
413
414 if (timeout) |delay| {
415 assert(delay != 0); // handled by timedWait().
416 timeout_us = std.math.cast(c_int, delay / std.time.ns_per_us) orelse blk: {
417 timeout_overflowed = true;
418 break :blk std.math.maxInt(c_int);
419 };
420
421 // Only need to record the start time if we can provide somewhat accurate error.Timeout's
422 if (!timeout_overflowed) {
423 sleep_timer = std.time.Timer.start() catch unreachable;
424 }
425 }
426
427 const value = @as(c_int, @bitCast(expect));
428 const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
429 const rc = c.umtx_sleep(addr, value, timeout_us);
430
431 switch (std.posix.errno(rc)) {
432 .SUCCESS => {},
433 .BUSY => {}, // ptr != expect
434 .AGAIN => { // maybe timed out, or paged out, or hit 2s kernel refresh
435 if (timeout) |timeout_ns| {
436 // Report error.Timeout only if we know the timeout duration has passed.
437 // If not, there's not much choice other than treating it as a spurious wake.
438 if (!timeout_overflowed and sleep_timer.read() >= timeout_ns) {
439 return error.Timeout;
440 }
441 }
442 },
443 .INTR => {}, // spurious wake
444 .INVAL => unreachable, // invalid timeout
445 else => unreachable,
446 }
447 }
448
449 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
450 // A count of zero means wake all waiters.
451 assert(max_waiters != 0);
452 const to_wake = std.math.cast(c_int, max_waiters) orelse 0;
453
454 // https://man.dragonflybsd.org/?command=umtx§ion=2
455 // > umtx_wakeup() will generally return 0 unless the address is bad.
456 // We are fine with the address being bad (e.g. for Semaphore.post() where Semaphore.wait() frees the Semaphore)
457 const addr = @as(*const volatile c_int, @ptrCast(&ptr.raw));
458 _ = c.umtx_wakeup(addr, to_wake);
459 }
460};
461
462const WasmImpl = struct {
463 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
464 if (!comptime builtin.cpu.has(.wasm, .atomics)) @compileError("WASI target missing cpu feature 'atomics'");
465
466 const to: i64 = if (timeout) |to| @intCast(to) else -1;
467 const result = asm volatile (
468 \\local.get %[ptr]
469 \\local.get %[expected]
470 \\local.get %[timeout]
471 \\memory.atomic.wait32 0
472 \\local.set %[ret]
473 : [ret] "=r" (-> u32),
474 : [ptr] "r" (&ptr.raw),
475 [expected] "r" (@as(i32, @bitCast(expect))),
476 [timeout] "r" (to),
477 );
478 switch (result) {
479 0 => {}, // ok
480 1 => {}, // expected =! loaded
481 2 => return error.Timeout,
482 else => unreachable,
483 }
484 }
485
486 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
487 if (!comptime builtin.cpu.has(.wasm, .atomics)) @compileError("WASI target missing cpu feature 'atomics'");
488
489 assert(max_waiters != 0);
490 const woken_count = asm volatile (
491 \\local.get %[ptr]
492 \\local.get %[waiters]
493 \\memory.atomic.notify 0
494 \\local.set %[ret]
495 : [ret] "=r" (-> u32),
496 : [ptr] "r" (&ptr.raw),
497 [waiters] "r" (max_waiters),
498 );
499 _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
500 }
501};
502
503/// Modified version of linux's futex and Go's sema to implement userspace wait queues with pthread:
504/// https://code.woboq.org/linux/linux/kernel/futex.c.html
505/// https://go.dev/src/runtime/sema.go
506const PosixImpl = struct {
507 const Event = struct {
508 cond: c.pthread_cond_t,
509 mutex: c.pthread_mutex_t,
510 state: enum { empty, waiting, notified },
511
512 fn init(self: *Event) void {
513 // Use static init instead of pthread_cond/mutex_init() since this is generally faster.
514 self.cond = .{};
515 self.mutex = .{};
516 self.state = .empty;
517 }
518
519 fn deinit(self: *Event) void {
520 // Some platforms reportedly give EINVAL for statically initialized pthread types.
521 const rc = c.pthread_cond_destroy(&self.cond);
522 assert(rc == .SUCCESS or rc == .INVAL);
523
524 const rm = c.pthread_mutex_destroy(&self.mutex);
525 assert(rm == .SUCCESS or rm == .INVAL);
526
527 self.* = undefined;
528 }
529
530 fn wait(self: *Event, timeout: ?u64) error{Timeout}!void {
531 assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
532 defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
533
534 // Early return if the event was already set.
535 if (self.state == .notified) {
536 return;
537 }
538
539 // Compute the absolute timeout if one was specified.
540 // POSIX requires that REALTIME is used by default for the pthread timedwait functions.
541 // This can be changed with pthread_condattr_setclock, but it's an extension and may not be available everywhere.
542 var ts: c.timespec = undefined;
543 if (timeout) |timeout_ns| {
544 ts = std.posix.clock_gettime(c.CLOCK.REALTIME) catch unreachable;
545 ts.sec +|= @as(@TypeOf(ts.sec), @intCast(timeout_ns / std.time.ns_per_s));
546 ts.nsec += @as(@TypeOf(ts.nsec), @intCast(timeout_ns % std.time.ns_per_s));
547
548 if (ts.nsec >= std.time.ns_per_s) {
549 ts.sec +|= 1;
550 ts.nsec -= std.time.ns_per_s;
551 }
552 }
553
554 // Start waiting on the event - there can be only one thread waiting.
555 assert(self.state == .empty);
556 self.state = .waiting;
557
558 while (true) {
559 // Block using either pthread_cond_wait or pthread_cond_timewait if there's an absolute timeout.
560 const rc = blk: {
561 if (timeout == null) break :blk c.pthread_cond_wait(&self.cond, &self.mutex);
562 break :blk c.pthread_cond_timedwait(&self.cond, &self.mutex, &ts);
563 };
564
565 // After waking up, check if the event was set.
566 if (self.state == .notified) {
567 return;
568 }
569
570 assert(self.state == .waiting);
571 switch (rc) {
572 .SUCCESS => {},
573 .TIMEDOUT => {
574 // If timed out, reset the event to avoid the set() thread doing an unnecessary signal().
575 self.state = .empty;
576 return error.Timeout;
577 },
578 .INVAL => unreachable, // cond, mutex, and potentially ts should all be valid
579 .PERM => unreachable, // mutex is locked when cond_*wait() functions are called
580 else => unreachable,
581 }
582 }
583 }
584
585 fn set(self: *Event) void {
586 assert(c.pthread_mutex_lock(&self.mutex) == .SUCCESS);
587 defer assert(c.pthread_mutex_unlock(&self.mutex) == .SUCCESS);
588
589 // Make sure that multiple calls to set() were not done on the same Event.
590 const old_state = self.state;
591 assert(old_state != .notified);
592
593 // Mark the event as set and wake up the waiting thread if there was one.
594 // This must be done while the mutex as the wait() thread could deallocate
595 // the condition variable once it observes the new state, potentially causing a UAF if done unlocked.
596 self.state = .notified;
597 if (old_state == .waiting) {
598 assert(c.pthread_cond_signal(&self.cond) == .SUCCESS);
599 }
600 }
601 };
602
603 const Treap = std.Treap(usize, std.math.order);
604 const Waiter = struct {
605 node: Treap.Node,
606 prev: ?*Waiter,
607 next: ?*Waiter,
608 tail: ?*Waiter,
609 is_queued: bool,
610 event: Event,
611 };
612
613 // An unordered set of Waiters
614 const WaitList = struct {
615 top: ?*Waiter = null,
616 len: usize = 0,
617
618 fn push(self: *WaitList, waiter: *Waiter) void {
619 waiter.next = self.top;
620 self.top = waiter;
621 self.len += 1;
622 }
623
624 fn pop(self: *WaitList) ?*Waiter {
625 const waiter = self.top orelse return null;
626 self.top = waiter.next;
627 self.len -= 1;
628 return waiter;
629 }
630 };
631
632 const WaitQueue = struct {
633 fn insert(treap: *Treap, address: usize, waiter: *Waiter) void {
634 // prepare the waiter to be inserted.
635 waiter.next = null;
636 waiter.is_queued = true;
637
638 // Find the wait queue entry associated with the address.
639 // If there isn't a wait queue on the address, this waiter creates the queue.
640 var entry = treap.getEntryFor(address);
641 const entry_node = entry.node orelse {
642 waiter.prev = null;
643 waiter.tail = waiter;
644 entry.set(&waiter.node);
645 return;
646 };
647
648 // There's a wait queue on the address; get the queue head and tail.
649 const head: *Waiter = @fieldParentPtr("node", entry_node);
650 const tail = head.tail orelse unreachable;
651
652 // Push the waiter to the tail by replacing it and linking to the previous tail.
653 head.tail = waiter;
654 tail.next = waiter;
655 waiter.prev = tail;
656 }
657
658 fn remove(treap: *Treap, address: usize, max_waiters: usize) WaitList {
659 // Find the wait queue associated with this address and get the head/tail if any.
660 var entry = treap.getEntryFor(address);
661 var queue_head: ?*Waiter = if (entry.node) |node| @fieldParentPtr("node", node) else null;
662 const queue_tail = if (queue_head) |head| head.tail else null;
663
664 // Once we're done updating the head, fix it's tail pointer and update the treap's queue head as well.
665 defer entry.set(blk: {
666 const new_head = queue_head orelse break :blk null;
667 new_head.tail = queue_tail;
668 break :blk &new_head.node;
669 });
670
671 var removed = WaitList{};
672 while (removed.len < max_waiters) {
673 // dequeue and collect waiters from their wait queue.
674 const waiter = queue_head orelse break;
675 queue_head = waiter.next;
676 removed.push(waiter);
677
678 // When dequeueing, we must mark is_queued as false.
679 // This ensures that a waiter which calls tryRemove() returns false.
680 assert(waiter.is_queued);
681 waiter.is_queued = false;
682 }
683
684 return removed;
685 }
686
687 fn tryRemove(treap: *Treap, address: usize, waiter: *Waiter) bool {
688 if (!waiter.is_queued) {
689 return false;
690 }
691
692 queue_remove: {
693 // Find the wait queue associated with the address.
694 var entry = blk: {
695 // A waiter without a previous link means it's the queue head that's in the treap so we can avoid lookup.
696 if (waiter.prev == null) {
697 assert(waiter.node.key == address);
698 break :blk treap.getEntryForExisting(&waiter.node);
699 }
700 break :blk treap.getEntryFor(address);
701 };
702
703 // The queue head and tail must exist if we're removing a queued waiter.
704 const head: *Waiter = @fieldParentPtr("node", entry.node orelse unreachable);
705 const tail = head.tail orelse unreachable;
706
707 // A waiter with a previous link is never the head of the queue.
708 if (waiter.prev) |prev| {
709 assert(waiter != head);
710 prev.next = waiter.next;
711
712 // A waiter with both a previous and next link is in the middle.
713 // We only need to update the surrounding waiter's links to remove it.
714 if (waiter.next) |next| {
715 assert(waiter != tail);
716 next.prev = waiter.prev;
717 break :queue_remove;
718 }
719
720 // A waiter with a previous but no next link means it's the tail of the queue.
721 // In that case, we need to update the head's tail reference.
722 assert(waiter == tail);
723 head.tail = waiter.prev;
724 break :queue_remove;
725 }
726
727 // A waiter with no previous link means it's the queue head of queue.
728 // We must replace (or remove) the head waiter reference in the treap.
729 assert(waiter == head);
730 entry.set(blk: {
731 const new_head = waiter.next orelse break :blk null;
732 new_head.tail = head.tail;
733 break :blk &new_head.node;
734 });
735 }
736
737 // Mark the waiter as successfully removed.
738 waiter.is_queued = false;
739 return true;
740 }
741 };
742
743 const Bucket = struct {
744 mutex: c.pthread_mutex_t align(atomic.cache_line) = .{},
745 pending: atomic.Value(usize) = atomic.Value(usize).init(0),
746 treap: Treap = .{},
747
748 // Global array of buckets that addresses map to.
749 // Bucket array size is pretty much arbitrary here, but it must be a power of two for fibonacci hashing.
750 var buckets = [_]Bucket{.{}} ** @bitSizeOf(usize);
751
752 // https://github.com/Amanieu/parking_lot/blob/1cf12744d097233316afa6c8b7d37389e4211756/core/src/parking_lot.rs#L343-L353
753 fn from(address: usize) *Bucket {
754 // The upper `@bitSizeOf(usize)` bits of the fibonacci golden ratio.
755 // Hashing this via (h * k) >> (64 - b) where k=golden-ration and b=bitsize-of-array
756 // evenly lays out h=hash values over the bit range even when the hash has poor entropy (identity-hash for pointers).
757 const max_multiplier_bits = @bitSizeOf(usize);
758 const fibonacci_multiplier = 0x9E3779B97F4A7C15 >> (64 - max_multiplier_bits);
759
760 const max_bucket_bits = @ctz(buckets.len);
761 comptime assert(std.math.isPowerOfTwo(buckets.len));
762
763 const index = (address *% fibonacci_multiplier) >> (max_multiplier_bits - max_bucket_bits);
764 return &buckets[index];
765 }
766 };
767
768 const Address = struct {
769 fn from(ptr: *const atomic.Value(u32)) usize {
770 // Get the alignment of the pointer.
771 const alignment = @alignOf(atomic.Value(u32));
772 comptime assert(std.math.isPowerOfTwo(alignment));
773
774 // Make sure the pointer is aligned,
775 // then cut off the zero bits from the alignment to get the unique address.
776 const addr = @intFromPtr(ptr);
777 assert(addr & (alignment - 1) == 0);
778 return addr >> @ctz(@as(usize, alignment));
779 }
780 };
781
782 fn wait(ptr: *const atomic.Value(u32), expect: u32, timeout: ?u64) error{Timeout}!void {
783 const address = Address.from(ptr);
784 const bucket = Bucket.from(address);
785
786 // Announce that there's a waiter in the bucket before checking the ptr/expect condition.
787 // If the announcement is reordered after the ptr check, the waiter could deadlock:
788 //
789 // - T1: checks ptr == expect which is true
790 // - T2: updates ptr to != expect
791 // - T2: does Futex.wake(), sees no pending waiters, exits
792 // - T1: bumps pending waiters (was reordered after the ptr == expect check)
793 // - T1: goes to sleep and misses both the ptr change and T2's wake up
794 //
795 // acquire barrier to ensure the announcement happens before the ptr check below.
796 var pending = bucket.pending.fetchAdd(1, .acquire);
797 assert(pending < std.math.maxInt(usize));
798
799 // If the wait gets canceled, remove the pending count we previously added.
800 // This is done outside the mutex lock to keep the critical section short in case of contention.
801 var canceled = false;
802 defer if (canceled) {
803 pending = bucket.pending.fetchSub(1, .monotonic);
804 assert(pending > 0);
805 };
806
807 var waiter: Waiter = undefined;
808 {
809 assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
810 defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
811
812 canceled = ptr.load(.monotonic) != expect;
813 if (canceled) {
814 return;
815 }
816
817 waiter.event.init();
818 WaitQueue.insert(&bucket.treap, address, &waiter);
819 }
820
821 defer {
822 assert(!waiter.is_queued);
823 waiter.event.deinit();
824 }
825
826 waiter.event.wait(timeout) catch {
827 // If we fail to cancel after a timeout, it means a wake() thread dequeued us and will wake us up.
828 // We must wait until the event is set as that's a signal that the wake() thread won't access the waiter memory anymore.
829 // If we return early without waiting, the waiter on the stack would be invalidated and the wake() thread risks a UAF.
830 defer if (!canceled) waiter.event.wait(null) catch unreachable;
831
832 assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
833 defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
834
835 canceled = WaitQueue.tryRemove(&bucket.treap, address, &waiter);
836 if (canceled) {
837 return error.Timeout;
838 }
839 };
840 }
841
842 fn wake(ptr: *const atomic.Value(u32), max_waiters: u32) void {
843 const address = Address.from(ptr);
844 const bucket = Bucket.from(address);
845
846 // Quick check if there's even anything to wake up.
847 // The change to the ptr's value must happen before we check for pending waiters.
848 // If not, the wake() thread could miss a sleeping waiter and have it deadlock:
849 //
850 // - T2: p = has pending waiters (reordered before the ptr update)
851 // - T1: bump pending waiters
852 // - T1: if ptr == expected: sleep()
853 // - T2: update ptr != expected
854 // - T2: p is false from earlier so doesn't wake (T1 missed ptr update and T2 missed T1 sleeping)
855 //
856 // What we really want here is a Release load, but that doesn't exist under the C11 memory model.
857 // We could instead do `bucket.pending.fetchAdd(0, Release) == 0` which achieves effectively the same thing,
858 // LLVM lowers the fetchAdd(0, .release) into an mfence+load which avoids gaining ownership of the cache-line.
859 if (bucket.pending.fetchAdd(0, .release) == 0) {
860 return;
861 }
862
863 // Keep a list of all the waiters notified and wake then up outside the mutex critical section.
864 var notified = WaitList{};
865 defer if (notified.len > 0) {
866 const pending = bucket.pending.fetchSub(notified.len, .monotonic);
867 assert(pending >= notified.len);
868
869 while (notified.pop()) |waiter| {
870 assert(!waiter.is_queued);
871 waiter.event.set();
872 }
873 };
874
875 assert(c.pthread_mutex_lock(&bucket.mutex) == .SUCCESS);
876 defer assert(c.pthread_mutex_unlock(&bucket.mutex) == .SUCCESS);
877
878 // Another pending check again to avoid the WaitQueue lookup if not necessary.
879 if (bucket.pending.load(.monotonic) > 0) {
880 notified = WaitQueue.remove(&bucket.treap, address, max_waiters);
881 }
882 }
883};
884
885test "smoke test" {
886 var value = atomic.Value(u32).init(0);
887
888 // Try waits with invalid values.
889 Futex.wait(&value, 0xdeadbeef);
890 Futex.timedWait(&value, 0xdeadbeef, 0) catch {};
891
892 // Try timeout waits.
893 try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, 0));
894 try testing.expectError(error.Timeout, Futex.timedWait(&value, 0, std.time.ns_per_ms));
895
896 // Try wakes
897 Futex.wake(&value, 0);
898 Futex.wake(&value, 1);
899 Futex.wake(&value, std.math.maxInt(u32));
900}
901
902test "signaling" {
903 // This test requires spawning threads
904 if (builtin.single_threaded) {
905 return error.SkipZigTest;
906 }
907
908 const num_threads = 4;
909 const num_iterations = 4;
910
911 const Paddle = struct {
912 value: atomic.Value(u32) = atomic.Value(u32).init(0),
913 current: u32 = 0,
914
915 fn hit(self: *@This()) void {
916 _ = self.value.fetchAdd(1, .release);
917 Futex.wake(&self.value, 1);
918 }
919
920 fn run(self: *@This(), hit_to: *@This()) !void {
921 while (self.current < num_iterations) {
922 // Wait for the value to change from hit()
923 var new_value: u32 = undefined;
924 while (true) {
925 new_value = self.value.load(.acquire);
926 if (new_value != self.current) break;
927 Futex.wait(&self.value, self.current);
928 }
929
930 // change the internal "current" value
931 try testing.expectEqual(new_value, self.current + 1);
932 self.current = new_value;
933
934 // hit the next paddle
935 hit_to.hit();
936 }
937 }
938 };
939
940 var paddles = [_]Paddle{.{}} ** num_threads;
941 var threads = [_]std.Thread{undefined} ** num_threads;
942
943 // Create a circle of paddles which hit each other
944 for (&threads, 0..) |*t, i| {
945 const paddle = &paddles[i];
946 const hit_to = &paddles[(i + 1) % paddles.len];
947 t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to });
948 }
949
950 // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations.
951 paddles[0].hit();
952 for (threads) |t| t.join();
953 for (paddles) |p| try testing.expectEqual(p.current, num_iterations);
954}
955
956test "broadcasting" {
957 // This test requires spawning threads
958 if (builtin.single_threaded) {
959 return error.SkipZigTest;
960 }
961
962 const num_threads = 4;
963 const num_iterations = 4;
964
965 const Barrier = struct {
966 count: atomic.Value(u32) = atomic.Value(u32).init(num_threads),
967 futex: atomic.Value(u32) = atomic.Value(u32).init(0),
968
969 fn wait(self: *@This()) !void {
970 // Decrement the counter.
971 // Release ensures stuff before this barrier.wait() happens before the last one.
972 // Acquire for the last counter ensures stuff before previous barrier.wait()s happened before it.
973 const count = self.count.fetchSub(1, .acq_rel);
974 try testing.expect(count <= num_threads);
975 try testing.expect(count > 0);
976
977 // First counter to reach zero wakes all other threads.
978 // Release on futex update ensures stuff before all barrier.wait()'s happens before they all return.
979 if (count - 1 == 0) {
980 self.futex.store(1, .release);
981 Futex.wake(&self.futex, num_threads - 1);
982 return;
983 }
984
985 // Other threads wait until last counter wakes them up.
986 // Acquire on futex synchronizes with last barrier count to ensure stuff before all barrier.wait()'s happen before us.
987 while (self.futex.load(.acquire) == 0) {
988 Futex.wait(&self.futex, 0);
989 }
990 }
991 };
992
993 const Broadcast = struct {
994 barriers: [num_iterations]Barrier = [_]Barrier{.{}} ** num_iterations,
995 threads: [num_threads]std.Thread = undefined,
996
997 fn run(self: *@This()) !void {
998 for (&self.barriers) |*barrier| {
999 try barrier.wait();
1000 }
1001 }
1002 };
1003
1004 var broadcast = Broadcast{};
1005 for (&broadcast.threads) |*t| t.* = try std.Thread.spawn(.{}, Broadcast.run, .{&broadcast});
1006 for (broadcast.threads) |t| t.join();
1007}
1008
1009/// Deadline is used to wait efficiently for a pointer's value to change using Futex and a fixed timeout.
1010///
1011/// Futex's timedWait() api uses a relative duration which suffers from over-waiting
1012/// when used in a loop which is often required due to the possibility of spurious wakeups.
1013///
1014/// Deadline instead converts the relative timeout to an absolute one so that multiple calls
1015/// to Futex timedWait() can block for and report more accurate error.Timeouts.
1016pub const Deadline = struct {
1017 timeout: ?u64,
1018 started: std.time.Timer,
1019
1020 /// Create the deadline to expire after the given amount of time in nanoseconds passes.
1021 /// Pass in `null` to have the deadline call `Futex.wait()` and never expire.
1022 pub fn init(expires_in_ns: ?u64) Deadline {
1023 var deadline: Deadline = undefined;
1024 deadline.timeout = expires_in_ns;
1025
1026 // std.time.Timer is required to be supported for somewhat accurate reportings of error.Timeout.
1027 if (deadline.timeout != null) {
1028 deadline.started = std.time.Timer.start() catch unreachable;
1029 }
1030
1031 return deadline;
1032 }
1033
1034 /// Wait until either:
1035 /// - the `ptr`'s value changes from `expect`.
1036 /// - `Futex.wake()` is called on the `ptr`.
1037 /// - A spurious wake occurs.
1038 /// - The deadline expires; In which case `error.Timeout` is returned.
1039 pub fn wait(self: *Deadline, ptr: *const atomic.Value(u32), expect: u32) error{Timeout}!void {
1040 @branchHint(.cold);
1041
1042 // Check if we actually have a timeout to wait until.
1043 // If not just wait "forever".
1044 const timeout_ns = self.timeout orelse {
1045 return Futex.wait(ptr, expect);
1046 };
1047
1048 // Get how much time has passed since we started waiting
1049 // then subtract that from the init() timeout to get how much longer to wait.
1050 // Use overflow to detect when we've been waiting longer than the init() timeout.
1051 const elapsed_ns = self.started.read();
1052 const until_timeout_ns = std.math.sub(u64, timeout_ns, elapsed_ns) catch 0;
1053 return Futex.timedWait(ptr, expect, until_timeout_ns);
1054 }
1055};
1056
1057test "Deadline" {
1058 var deadline = Deadline.init(100 * std.time.ns_per_ms);
1059 var futex_word = atomic.Value(u32).init(0);
1060
1061 while (true) {
1062 deadline.wait(&futex_word, 0) catch break;
1063 }
1064}