Commit c9db420a09
Changed files (3)
lib/std/mutex.zig
@@ -1,19 +1,13 @@
const std = @import("std.zig");
const builtin = @import("builtin");
-const AtomicOrder = builtin.AtomicOrder;
-const AtomicRmwOp = builtin.AtomicRmwOp;
const testing = std.testing;
const SpinLock = std.SpinLock;
-const linux = std.os.linux;
-const windows = std.os.windows;
+const ThreadParker = std.ThreadParker;
/// Lock may be held only once. If the same thread
/// tries to acquire the same mutex twice, it deadlocks.
-/// This type must be initialized at runtime, and then deinitialized when no
-/// longer needed, to free resources.
-/// If you need static initialization, use std.StaticallyInitializedMutex.
-/// The Linux implementation is based on mutex3 from
-/// https://www.akkadia.org/drepper/futex.pdf
+/// This type supports static initialization and is based off of Golang 1.13 runtime.lock_futex:
+/// https://github.com/golang/go/blob/master/src/runtime/lock_futex.go
/// When an application is built in single threaded release mode, all the functions are
/// no-ops. In single threaded debug mode, there is deadlock detection.
pub const Mutex = if (builtin.single_threaded)
@@ -43,83 +37,78 @@ pub const Mutex = if (builtin.single_threaded)
return Held{ .mutex = self };
}
}
-else switch (builtin.os) {
- builtin.Os.linux => struct {
- /// 0: unlocked
- /// 1: locked, no waiters
- /// 2: locked, one or more waiters
- lock: i32,
-
- pub const Held = struct {
- mutex: *Mutex,
-
- pub fn release(self: Held) void {
- const c = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Sub, 1, AtomicOrder.Release);
- if (c != 1) {
- _ = @atomicRmw(i32, &self.mutex.lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
- const rc = linux.futex_wake(&self.mutex.lock, linux.FUTEX_WAKE | linux.FUTEX_PRIVATE_FLAG, 1);
- switch (linux.getErrno(rc)) {
- 0 => {},
- linux.EINVAL => unreachable,
- else => unreachable,
- }
- }
- }
+else struct {
+ state: u32, // TODO: make this an enum
+ parker: ThreadParker,
+
+ const Unlocked = 0;
+ const Sleeping = 1;
+ const Locked = 2;
+
+ /// number of iterations to spin yielding the cpu
+ const SpinCpu = 4;
+ /// number of iterations to perform in the cpu yield loop
+ const SpinCpuCount = 30;
+ /// number of iterations to spin yielding the thread
+ const SpinThread = 1;
+
+ pub fn init() Mutex {
+ return Mutex{
+ .state = Unlocked,
+ .parker = ThreadParker.init(),
};
+ }
- pub fn init() Mutex {
- return Mutex{ .lock = 0 };
- }
+ pub fn deinit(self: *Mutex) void {
+ self.parker.deinit();
+ }
- pub fn deinit(self: *Mutex) void {}
+ pub const Held = struct {
+ mutex: *Mutex,
- pub fn acquire(self: *Mutex) Held {
- var c = @cmpxchgWeak(i32, &self.lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic) orelse
- return Held{ .mutex = self };
- if (c != 2)
- c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
- while (c != 0) {
- const rc = linux.futex_wait(&self.lock, linux.FUTEX_WAIT | linux.FUTEX_PRIVATE_FLAG, 2, null);
- switch (linux.getErrno(rc)) {
- 0, linux.EINTR, linux.EAGAIN => {},
- linux.EINVAL => unreachable,
- else => unreachable,
- }
- c = @atomicRmw(i32, &self.lock, AtomicRmwOp.Xchg, 2, AtomicOrder.Acquire);
+ pub fn release(self: Held) void {
+ switch (@atomicRmw(u32, &self.mutex.state, .Xchg, Unlocked, .Release)) {
+ Locked => {},
+ Sleeping => self.mutex.parker.unpark(&self.mutex.state),
+ Unlocked => unreachable, // unlocking an unlocked mutex
+ else => unreachable, // should never be anything else
}
- return Held{ .mutex = self };
}
- },
- // TODO once https://github.com/ziglang/zig/issues/287 (copy elision) is solved, we can make a
- // better implementation of this. The problem is we need the init() function to have access to
- // the address of the CRITICAL_SECTION, and then have it not move.
- builtin.Os.windows => std.StaticallyInitializedMutex,
- else => struct {
- /// TODO better implementation than spin lock.
- /// When changing this, one must also change the corresponding
- /// std.StaticallyInitializedMutex code, since it aliases this type,
- /// under the assumption that it works both statically and at runtime.
- lock: SpinLock,
+ };
- pub const Held = struct {
- mutex: *Mutex,
+ pub fn acquire(self: *Mutex) Held {
+ // Try and speculatively grab the lock.
+ // If it fails, the state is either Locked or Sleeping
+ // depending on if theres a thread stuck sleeping below.
+ var state = @atomicRmw(u32, &self.state, .Xchg, Locked, .Acquire);
+ if (state == Unlocked)
+ return Held{ .mutex = self };
- pub fn release(self: Held) void {
- SpinLock.Held.release(SpinLock.Held{ .spinlock = &self.mutex.lock });
+ while (true) {
+ // try and acquire the lock using cpu spinning on failure
+ for (([SpinCpu]void)(undefined)) |_| {
+ var value = @atomicLoad(u32, &self.state, .Monotonic);
+ while (value == Unlocked)
+ value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
+ for (([SpinCpuCount]void)(undefined)) |_|
+ SpinLock.yieldCpu();
}
- };
- pub fn init() Mutex {
- return Mutex{ .lock = SpinLock.init() };
- }
-
- pub fn deinit(self: *Mutex) void {}
+ // try and acquire the lock using thread rescheduling on failure
+ for (([SpinThread]void)(undefined)) |_| {
+ var value = @atomicLoad(u32, &self.state, .Monotonic);
+ while (value == Unlocked)
+ value = @cmpxchgWeak(u32, &self.state, Unlocked, state, .Acquire, .Monotonic) orelse return Held{ .mutex = self };
+ SpinLock.yieldThread();
+ }
- pub fn acquire(self: *Mutex) Held {
- _ = self.lock.acquire();
- return Held{ .mutex = self };
+ // failed to acquire the lock, go to sleep until woken up by `Held.release()`
+ if (@atomicRmw(u32, &self.state, .Xchg, Sleeping, .Acquire) == Unlocked)
+ return Held{ .mutex = self };
+ state = Sleeping;
+ self.parker.park(&self.state, Sleeping);
}
- },
+ }
};
const TestContext = struct {
lib/std/parker.zig
@@ -159,7 +159,7 @@ const WindowsParker = struct {
const key = @ptrCast(*const c_void, ptr);
var waiting = @atomicLoad(u32, waiters, .Acquire);
while (waiting != 0) {
- waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - 1, .AcqRel, .Monotonic) orelse {
+ waiting = @cmpxchgWeak(u32, waiters, waiting, waiting - 1, .Acquire, .Monotonic) orelse {
const rc = windows.ntdll.NtReleaseKeyedEvent(self.handle, key, windows.FALSE, null);
assert(rc == 0);
return;
@@ -338,3 +338,39 @@ const PosixParker = struct {
else => unreachable,
};
};
+
+test "std.ThreadParker" {
+ const Context = struct {
+ parker: ThreadParker,
+ data: u32,
+
+ fn receiver(self: *@This()) void {
+ self.parker.park(&self.data, 0); // receives 1
+ assert(@atomicRmw(u32, &self.data, .Xchg, 2, .SeqCst) == 1); // sends 2
+ self.parker.unpark(&self.data); // wakes up waiters on 2
+ self.parker.park(&self.data, 2); // receives 3
+ assert(@atomicRmw(u32, &self.data, .Xchg, 4, .SeqCst) == 3); // sends 4
+ self.parker.unpark(&self.data); // wakes up waiters on 4
+ }
+
+ fn sender(self: *@This()) void {
+ assert(@atomicRmw(u32, &self.data, .Xchg, 1, .SeqCst) == 0); // sends 1
+ self.parker.unpark(&self.data); // wakes up waiters on 1
+ self.parker.park(&self.data, 1); // receives 2
+ assert(@atomicRmw(u32, &self.data, .Xchg, 3, .SeqCst) == 2); // sends 3
+ self.parker.unpark(&self.data); // wakes up waiters on 3
+ self.parker.park(&self.data, 3); // receives 4
+ }
+ };
+
+ var context = Context{
+ .parker = ThreadParker.init(),
+ .data = 0,
+ };
+ defer context.parker.deinit();
+
+ var receiver = try std.Thread.spawn(&context, Context.receiver);
+ defer receiver.wait();
+
+ context.sender();
+}
\ No newline at end of file
lib/std/spinlock.zig
@@ -28,7 +28,7 @@ pub const SpinLock = struct {
return Held{ .spinlock = self };
}
- fn yieldCpu() void {
+ pub fn yieldCpu() void {
switch (builtin.arch) {
.i386, .x86_64 => asm volatile("pause" ::: "memory"),
.arm, .aarch64 => asm volatile("yield"),
@@ -36,7 +36,7 @@ pub const SpinLock = struct {
}
}
- fn yieldThread() void {
+ pub fn yieldThread() void {
switch (builtin.os) {
.linux => assert(linux.syscall0(linux.SYS_sched_yield) == 0),
.windows => _ = windows.kernel32.SwitchToThread(),