master
1//! Mutex is a synchronization primitive which enforces atomic access to a
2//! shared region of code known as the "critical section".
3//!
4//! It does this by blocking ensuring only one thread is in the critical
5//! section at any given point in time by blocking the others.
6//!
7//! Mutex can be statically initialized and is at most `@sizeOf(u64)` large.
8//! Use `lock()` or `tryLock()` to enter the critical section and `unlock()` to leave it.
9
10const std = @import("../std.zig");
11const builtin = @import("builtin");
12const Mutex = @This();
13
14const assert = std.debug.assert;
15const testing = std.testing;
16const Thread = std.Thread;
17const Futex = Thread.Futex;
18
19impl: Impl = .{},
20
21pub const Recursive = @import("Mutex/Recursive.zig");
22
23/// Tries to acquire the mutex without blocking the caller's thread.
24/// Returns `false` if the calling thread would have to block to acquire it.
25/// Otherwise, returns `true` and the caller should `unlock()` the Mutex to release it.
26pub fn tryLock(self: *Mutex) bool {
27 return self.impl.tryLock();
28}
29
30/// Acquires the mutex, blocking the caller's thread until it can.
31/// It is undefined behavior if the mutex is already held by the caller's thread.
32/// Once acquired, call `unlock()` on the Mutex to release it.
33pub fn lock(self: *Mutex) void {
34 self.impl.lock();
35}
36
37/// Releases the mutex which was previously acquired with `lock()` or `tryLock()`.
38/// It is undefined behavior if the mutex is unlocked from a different thread that it was locked from.
39pub fn unlock(self: *Mutex) void {
40 self.impl.unlock();
41}
42
43const Impl = if (builtin.mode == .Debug and !builtin.single_threaded)
44 DebugImpl
45else
46 ReleaseImpl;
47
48const ReleaseImpl = if (builtin.single_threaded)
49 SingleThreadedImpl
50else if (builtin.os.tag == .windows)
51 WindowsImpl
52else if (builtin.os.tag.isDarwin())
53 DarwinImpl
54else
55 FutexImpl;
56
57const DebugImpl = struct {
58 locking_thread: std.atomic.Value(Thread.Id) = std.atomic.Value(Thread.Id).init(0), // 0 means it's not locked.
59 impl: ReleaseImpl = .{},
60
61 inline fn tryLock(self: *@This()) bool {
62 const locking = self.impl.tryLock();
63 if (locking) {
64 self.locking_thread.store(Thread.getCurrentId(), .unordered);
65 }
66 return locking;
67 }
68
69 inline fn lock(self: *@This()) void {
70 const current_id = Thread.getCurrentId();
71 if (self.locking_thread.load(.unordered) == current_id and current_id != 0) {
72 @panic("Deadlock detected");
73 }
74 self.impl.lock();
75 self.locking_thread.store(current_id, .unordered);
76 }
77
78 inline fn unlock(self: *@This()) void {
79 assert(self.locking_thread.load(.unordered) == Thread.getCurrentId());
80 self.locking_thread.store(0, .unordered);
81 self.impl.unlock();
82 }
83};
84
85const SingleThreadedImpl = struct {
86 is_locked: bool = false,
87
88 fn tryLock(self: *@This()) bool {
89 if (self.is_locked) return false;
90 self.is_locked = true;
91 return true;
92 }
93
94 fn lock(self: *@This()) void {
95 if (!self.tryLock()) {
96 unreachable; // deadlock detected
97 }
98 }
99
100 fn unlock(self: *@This()) void {
101 assert(self.is_locked);
102 self.is_locked = false;
103 }
104};
105
106/// SRWLOCK on windows is almost always faster than Futex solution.
107/// It also implements an efficient Condition with requeue support for us.
108const WindowsImpl = struct {
109 srwlock: windows.SRWLOCK = .{},
110
111 fn tryLock(self: *@This()) bool {
112 return windows.ntdll.RtlTryAcquireSRWLockExclusive(&self.srwlock) != windows.FALSE;
113 }
114
115 fn lock(self: *@This()) void {
116 windows.ntdll.RtlAcquireSRWLockExclusive(&self.srwlock);
117 }
118
119 fn unlock(self: *@This()) void {
120 windows.ntdll.RtlReleaseSRWLockExclusive(&self.srwlock);
121 }
122
123 const windows = std.os.windows;
124};
125
126/// os_unfair_lock on darwin supports priority inheritance and is generally faster than Futex solutions.
127const DarwinImpl = struct {
128 oul: c.os_unfair_lock = .{},
129
130 fn tryLock(self: *@This()) bool {
131 return c.os_unfair_lock_trylock(&self.oul);
132 }
133
134 fn lock(self: *@This()) void {
135 c.os_unfair_lock_lock(&self.oul);
136 }
137
138 fn unlock(self: *@This()) void {
139 c.os_unfair_lock_unlock(&self.oul);
140 }
141
142 const c = std.c;
143};
144
145const FutexImpl = struct {
146 state: std.atomic.Value(u32) = std.atomic.Value(u32).init(unlocked),
147
148 const unlocked: u32 = 0b00;
149 const locked: u32 = 0b01;
150 const contended: u32 = 0b11; // must contain the `locked` bit for x86 optimization below
151
152 fn lock(self: *@This()) void {
153 if (!self.tryLock())
154 self.lockSlow();
155 }
156
157 fn tryLock(self: *@This()) bool {
158 // On x86, use `lock bts` instead of `lock cmpxchg` as:
159 // - they both seem to mark the cache-line as modified regardless: https://stackoverflow.com/a/63350048
160 // - `lock bts` is smaller instruction-wise which makes it better for inlining
161 if (builtin.target.cpu.arch.isX86()) {
162 const locked_bit = @ctz(locked);
163 return self.state.bitSet(locked_bit, .acquire) == 0;
164 }
165
166 // Acquire barrier ensures grabbing the lock happens before the critical section
167 // and that the previous lock holder's critical section happens before we grab the lock.
168 return self.state.cmpxchgWeak(unlocked, locked, .acquire, .monotonic) == null;
169 }
170
171 fn lockSlow(self: *@This()) void {
172 @branchHint(.cold);
173
174 // Avoid doing an atomic swap below if we already know the state is contended.
175 // An atomic swap unconditionally stores which marks the cache-line as modified unnecessarily.
176 if (self.state.load(.monotonic) == contended) {
177 Futex.wait(&self.state, contended);
178 }
179
180 // Try to acquire the lock while also telling the existing lock holder that there are threads waiting.
181 //
182 // Once we sleep on the Futex, we must acquire the mutex using `contended` rather than `locked`.
183 // If not, threads sleeping on the Futex wouldn't see the state change in unlock and potentially deadlock.
184 // The downside is that the last mutex unlocker will see `contended` and do an unnecessary Futex wake
185 // but this is better than having to wake all waiting threads on mutex unlock.
186 //
187 // Acquire barrier ensures grabbing the lock happens before the critical section
188 // and that the previous lock holder's critical section happens before we grab the lock.
189 while (self.state.swap(contended, .acquire) != unlocked) {
190 Futex.wait(&self.state, contended);
191 }
192 }
193
194 fn unlock(self: *@This()) void {
195 // Unlock the mutex and wake up a waiting thread if any.
196 //
197 // A waiting thread will acquire with `contended` instead of `locked`
198 // which ensures that it wakes up another thread on the next unlock().
199 //
200 // Release barrier ensures the critical section happens before we let go of the lock
201 // and that our critical section happens before the next lock holder grabs the lock.
202 const state = self.state.swap(unlocked, .release);
203 assert(state != unlocked);
204
205 if (state == contended) {
206 Futex.wake(&self.state, 1);
207 }
208 }
209};
210
211test "smoke test" {
212 var mutex = Mutex{};
213
214 try testing.expect(mutex.tryLock());
215 try testing.expect(!mutex.tryLock());
216 mutex.unlock();
217
218 mutex.lock();
219 try testing.expect(!mutex.tryLock());
220 mutex.unlock();
221}
222
223// A counter which is incremented without atomic instructions
224const NonAtomicCounter = struct {
225 // direct u128 could maybe use xmm ops on x86 which are atomic
226 value: [2]u64 = [_]u64{ 0, 0 },
227
228 fn get(self: NonAtomicCounter) u128 {
229 return @as(u128, @bitCast(self.value));
230 }
231
232 fn inc(self: *NonAtomicCounter) void {
233 for (@as([2]u64, @bitCast(self.get() + 1)), 0..) |v, i| {
234 @as(*volatile u64, @ptrCast(&self.value[i])).* = v;
235 }
236 }
237};
238
239test "many uncontended" {
240 // This test requires spawning threads.
241 if (builtin.single_threaded) {
242 return error.SkipZigTest;
243 }
244
245 const num_threads = 4;
246 const num_increments = 1000;
247
248 const Runner = struct {
249 mutex: Mutex = .{},
250 thread: Thread = undefined,
251 counter: NonAtomicCounter = .{},
252
253 fn run(self: *@This()) void {
254 var i: usize = num_increments;
255 while (i > 0) : (i -= 1) {
256 self.mutex.lock();
257 defer self.mutex.unlock();
258
259 self.counter.inc();
260 }
261 }
262 };
263
264 var runners = [_]Runner{.{}} ** num_threads;
265 for (&runners) |*r| r.thread = try Thread.spawn(.{}, Runner.run, .{r});
266 for (runners) |r| r.thread.join();
267 for (runners) |r| try testing.expectEqual(r.counter.get(), num_increments);
268}
269
270test "many contended" {
271 // This test requires spawning threads.
272 if (builtin.single_threaded) {
273 return error.SkipZigTest;
274 }
275
276 const num_threads = 4;
277 const num_increments = 1000;
278
279 const Runner = struct {
280 mutex: Mutex = .{},
281 counter: NonAtomicCounter = .{},
282
283 fn run(self: *@This()) void {
284 var i: usize = num_increments;
285 while (i > 0) : (i -= 1) {
286 // Occasionally hint to let another thread run.
287 defer if (i % 100 == 0) Thread.yield() catch {};
288
289 self.mutex.lock();
290 defer self.mutex.unlock();
291
292 self.counter.inc();
293 }
294 }
295 };
296
297 var runner = Runner{};
298
299 var threads: [num_threads]Thread = undefined;
300 for (&threads) |*t| t.* = try Thread.spawn(.{}, Runner.run, .{&runner});
301 for (threads) |t| t.join();
302
303 try testing.expectEqual(runner.counter.get(), num_increments * num_threads);
304}
305
306// https://github.com/ziglang/zig/issues/19295
307//test @This() {
308// var m: Mutex = .{};
309//
310// {
311// m.lock();
312// defer m.unlock();
313// // ... critical section code
314// }
315//
316// if (m.tryLock()) {
317// defer m.unlock();
318// // ... critical section code
319// }
320//}