master
  1//! Mutex is a synchronization primitive which enforces atomic access to a
  2//! shared region of code known as the "critical section".
  3//!
  4//! It does this by blocking ensuring only one thread is in the critical
  5//! section at any given point in time by blocking the others.
  6//!
  7//! Mutex can be statically initialized and is at most `@sizeOf(u64)` large.
  8//! Use `lock()` or `tryLock()` to enter the critical section and `unlock()` to leave it.
  9
 10const std = @import("../std.zig");
 11const builtin = @import("builtin");
 12const Mutex = @This();
 13
 14const assert = std.debug.assert;
 15const testing = std.testing;
 16const Thread = std.Thread;
 17const Futex = Thread.Futex;
 18
 19impl: Impl = .{},
 20
 21pub const Recursive = @import("Mutex/Recursive.zig");
 22
 23/// Tries to acquire the mutex without blocking the caller's thread.
 24/// Returns `false` if the calling thread would have to block to acquire it.
 25/// Otherwise, returns `true` and the caller should `unlock()` the Mutex to release it.
 26pub fn tryLock(self: *Mutex) bool {
 27    return self.impl.tryLock();
 28}
 29
 30/// Acquires the mutex, blocking the caller's thread until it can.
 31/// It is undefined behavior if the mutex is already held by the caller's thread.
 32/// Once acquired, call `unlock()` on the Mutex to release it.
 33pub fn lock(self: *Mutex) void {
 34    self.impl.lock();
 35}
 36
 37/// Releases the mutex which was previously acquired with `lock()` or `tryLock()`.
 38/// It is undefined behavior if the mutex is unlocked from a different thread that it was locked from.
 39pub fn unlock(self: *Mutex) void {
 40    self.impl.unlock();
 41}
 42
 43const Impl = if (builtin.mode == .Debug and !builtin.single_threaded)
 44    DebugImpl
 45else
 46    ReleaseImpl;
 47
 48const ReleaseImpl = if (builtin.single_threaded)
 49    SingleThreadedImpl
 50else if (builtin.os.tag == .windows)
 51    WindowsImpl
 52else if (builtin.os.tag.isDarwin())
 53    DarwinImpl
 54else
 55    FutexImpl;
 56
 57const DebugImpl = struct {
 58    locking_thread: std.atomic.Value(Thread.Id) = std.atomic.Value(Thread.Id).init(0), // 0 means it's not locked.
 59    impl: ReleaseImpl = .{},
 60
 61    inline fn tryLock(self: *@This()) bool {
 62        const locking = self.impl.tryLock();
 63        if (locking) {
 64            self.locking_thread.store(Thread.getCurrentId(), .unordered);
 65        }
 66        return locking;
 67    }
 68
 69    inline fn lock(self: *@This()) void {
 70        const current_id = Thread.getCurrentId();
 71        if (self.locking_thread.load(.unordered) == current_id and current_id != 0) {
 72            @panic("Deadlock detected");
 73        }
 74        self.impl.lock();
 75        self.locking_thread.store(current_id, .unordered);
 76    }
 77
 78    inline fn unlock(self: *@This()) void {
 79        assert(self.locking_thread.load(.unordered) == Thread.getCurrentId());
 80        self.locking_thread.store(0, .unordered);
 81        self.impl.unlock();
 82    }
 83};
 84
 85const SingleThreadedImpl = struct {
 86    is_locked: bool = false,
 87
 88    fn tryLock(self: *@This()) bool {
 89        if (self.is_locked) return false;
 90        self.is_locked = true;
 91        return true;
 92    }
 93
 94    fn lock(self: *@This()) void {
 95        if (!self.tryLock()) {
 96            unreachable; // deadlock detected
 97        }
 98    }
 99
100    fn unlock(self: *@This()) void {
101        assert(self.is_locked);
102        self.is_locked = false;
103    }
104};
105
106/// SRWLOCK on windows is almost always faster than Futex solution.
107/// It also implements an efficient Condition with requeue support for us.
108const WindowsImpl = struct {
109    srwlock: windows.SRWLOCK = .{},
110
111    fn tryLock(self: *@This()) bool {
112        return windows.ntdll.RtlTryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE;
113    }
114
115    fn lock(self: *@This()) void {
116        windows.ntdll.RtlAcquireSRWLockExclusive(&self.srwlock);
117    }
118
119    fn unlock(self: *@This()) void {
120        windows.ntdll.RtlReleaseSRWLockExclusive(&self.srwlock);
121    }
122
123    const windows = std.os.windows;
124};
125
126/// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions.
127const DarwinImpl = struct {
128    oul: c.os_unfair_lock = .{},
129
130    fn tryLock(self: *@This()) bool {
131        return c.os_unfair_lock_trylock(&self.oul);
132    }
133
134    fn lock(self: *@This()) void {
135        c.os_unfair_lock_lock(&self.oul);
136    }
137
138    fn unlock(self: *@This()) void {
139        c.os_unfair_lock_unlock(&self.oul);
140    }
141
142    const c = std.c;
143};
144
145const FutexImpl = struct {
146    state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked),
147
148    const unlocked: u32 = 0b00;
149    const locked: u32 = 0b01;
150    const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below
151
152    fn lock(self: *@This()) void {
153        if (!self.tryLock())
154            self.lockSlow();
155    }
156
157    fn tryLock(self: *@This()) bool {
158        // On x86, use `lock bts` instead of `lock cmpxchg` as:
159        // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
160        // - `lock bts` is smaller instruction-wise which makes it better for inlining
161        if (builtin.target.cpu.arch.isX86()) {
162            const locked_bit = @ctz(locked);
163            return self.state.bitSet(locked_bit, .acquire) == 0;
164        }
165
166        // Acquire barrier ensures grabbing the lock happens before the critical section
167        // and that the previous lock holder's critical section happens before we grab the lock.
168        return self.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null;
169    }
170
171    fn lockSlow(self: *@This()) void {
172        @branchHint(.cold);
173
174        // Avoid doing an atomic swap below if we already know the state is contended.
175        // An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily.
176        if (self.state.load(.monotonic) == contended) {
177            Futex.wait(&self.state, contended);
178        }
179
180        // Try to acquire the lock while also telling the existing lock holder that there are threads waiting.
181        //
182        // Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`.
183        // If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock.
184        // The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake
185        // but this is better than having to wake all waiting threads on mutex unlock.
186        //
187        // Acquire barrier ensures grabbing the lock happens before the critical section
188        // and that the previous lock holder's critical section happens before we grab the lock.
189        while (self.state.swap(contended, .acquire) != unlocked) {
190            Futex.wait(&self.state, contended);
191        }
192    }
193
194    fn unlock(self: *@This()) void {
195        // Unlock the mutex and wake up a waiting thread if any.
196        //
197        // A waiting thread will acquire with `contended` instead of `locked`
198        // which ensures that it wakes up another thread on the next unlock().
199        //
200        // Release barrier ensures the critical section happens before we let go of the lock
201        // and that our critical section happens before the next lock holder grabs the lock.
202        const state = self.state.swap(unlocked, .release);
203        assert(state != unlocked);
204
205        if (state == contended) {
206            Futex.wake(&self.state, 1);
207        }
208    }
209};
210
211test "smoke test" {
212    var mutex = Mutex{};
213
214    try testing.expect(mutex.tryLock());
215    try testing.expect(!mutex.tryLock());
216    mutex.unlock();
217
218    mutex.lock();
219    try testing.expect(!mutex.tryLock());
220    mutex.unlock();
221}
222
223// A counter which is incremented without atomic instructions
224const NonAtomicCounter = struct {
225    // direct u128 could maybe use xmm ops on x86 which are atomic
226    value: [2]u64 = [_]u64{ 0, 0 },
227
228    fn get(self: NonAtomicCounter) u128 {
229        return @as(u128, @bitCast(self.value));
230    }
231
232    fn inc(self: *NonAtomicCounter) void {
233        for (@as([2]u64, @bitCast(self.get() + 1)), 0..) |v, i| {
234            @as(*volatile u64, @ptrCast(&self.value[i])).* = v;
235        }
236    }
237};
238
239test "many uncontended" {
240    // This test requires spawning threads.
241    if (builtin.single_threaded) {
242        return error.SkipZigTest;
243    }
244
245    const num_threads = 4;
246    const num_increments = 1000;
247
248    const Runner = struct {
249        mutex: Mutex = .{},
250        thread: Thread = undefined,
251        counter: NonAtomicCounter = .{},
252
253        fn run(self: *@This()) void {
254            var i: usize = num_increments;
255            while (i > 0) : (i -= 1) {
256                self.mutex.lock();
257                defer self.mutex.unlock();
258
259                self.counter.inc();
260            }
261        }
262    };
263
264    var runners = [_]Runner{.{}} ** num_threads;
265    for (&runners) |*r| r.thread = try Thread.spawn(.{}, Runner.run, .{r});
266    for (runners) |r| r.thread.join();
267    for (runners) |r| try testing.expectEqual(r.counter.get(), num_increments);
268}
269
270test "many contended" {
271    // This test requires spawning threads.
272    if (builtin.single_threaded) {
273        return error.SkipZigTest;
274    }
275
276    const num_threads = 4;
277    const num_increments = 1000;
278
279    const Runner = struct {
280        mutex: Mutex = .{},
281        counter: NonAtomicCounter = .{},
282
283        fn run(self: *@This()) void {
284            var i: usize = num_increments;
285            while (i > 0) : (i -= 1) {
286                // Occasionally hint to let another thread run.
287                defer if (i % 100 == 0) Thread.yield() catch {};
288
289                self.mutex.lock();
290                defer self.mutex.unlock();
291
292                self.counter.inc();
293            }
294        }
295    };
296
297    var runner = Runner{};
298
299    var threads: [num_threads]Thread = undefined;
300    for (&threads) |*t| t.* = try Thread.spawn(.{}, Runner.run, .{&runner});
301    for (threads) |t| t.join();
302
303    try testing.expectEqual(runner.counter.get(), num_increments * num_threads);
304}
305
306// https://github.com/ziglang/zig/issues/19295
307//test @This() {
308//    var m: Mutex = .{};
309//
310//    {
311//        m.lock();
312//        defer m.unlock();
313//        // ... critical section code
314//    }
315//
316//    if (m.tryLock()) {
317//        defer m.unlock();
318//        // ... critical section code
319//    }
320//}