Commit 66cb75d114

Andrew Kelley <superjoe30@gmail.com>
2018-10-03 19:19:10
std.Mutex: implement blocking mutexes on linux
closes #1463 Thanks to Shawn Landden for the original pull request. This commit is based on that code.
1 parent acefcdb
std/event/loop.zig
@@ -736,8 +736,8 @@ pub const Loop = struct {
                 _ = os.bsdKEvent(self.os_data.fs_kqfd, fs_kevs, empty_kevs, null) catch unreachable;
             },
             builtin.Os.linux => {
-                _ = @atomicRmw(u8, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
-                const rc = os.linux.futex_wake(@ptrToInt(&self.os_data.fs_queue_item), os.linux.FUTEX_WAKE, 1);
+                _ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+                const rc = os.linux.futex_wake(&self.os_data.fs_queue_item, os.linux.FUTEX_WAKE, 1);
                 switch (os.linux.getErrno(rc)) {
                     0 => {},
                     posix.EINVAL => unreachable,
@@ -757,7 +757,7 @@ pub const Loop = struct {
     fn posixFsRun(self: *Loop) void {
         while (true) {
             if (builtin.os == builtin.Os.linux) {
-                _ = @atomicRmw(u8, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+                _ = @atomicRmw(i32, &self.os_data.fs_queue_item, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
             }
             while (self.os_data.fs_queue.get()) |node| {
                 switch (node.data.msg) {
@@ -794,11 +794,9 @@ pub const Loop = struct {
             }
             switch (builtin.os) {
                 builtin.Os.linux => {
-                    const rc = os.linux.futex_wait(@ptrToInt(&self.os_data.fs_queue_item), os.linux.FUTEX_WAIT, 0, null);
+                    const rc = os.linux.futex_wait(&self.os_data.fs_queue_item, os.linux.FUTEX_WAIT, 0, null);
                     switch (os.linux.getErrno(rc)) {
-                        0 => continue,
-                        posix.EINTR => continue,
-                        posix.EAGAIN => continue,
+                        0, posix.EINTR, posix.EAGAIN => continue,
                         else => unreachable,
                     }
                 },
@@ -838,7 +836,7 @@ pub const Loop = struct {
         final_eventfd: i32,
         final_eventfd_event: os.linux.epoll_event,
         fs_thread: *os.Thread,
-        fs_queue_item: u8,
+        fs_queue_item: i32,
         fs_queue: std.atomic.Queue(fs.Request),
         fs_end_request: fs.RequestNode,
     };
std/os/linux/index.zig
@@ -728,12 +728,12 @@ pub inline fn vfork() usize {
     return @inlineCall(syscall0, SYS_vfork);
 }
 
-pub fn futex_wait(uaddr: usize, futex_op: u32, val: i32, timeout: ?*timespec) usize {
-    return syscall4(SYS_futex, uaddr, futex_op, @bitCast(u32, val), @ptrToInt(timeout));
+pub fn futex_wait(uaddr: *const i32, futex_op: u32, val: i32, timeout: ?*timespec) usize {
+    return syscall4(SYS_futex, @ptrToInt(uaddr), futex_op, @bitCast(u32, val), @ptrToInt(timeout));
 }
 
-pub fn futex_wake(uaddr: usize, futex_op: u32, val: i32) usize {
-    return syscall3(SYS_futex, uaddr, futex_op, @bitCast(u32, val));
+pub fn futex_wake(uaddr: *const i32, futex_op: u32, val: i32) usize {
+    return syscall3(SYS_futex, @ptrToInt(uaddr), futex_op, @bitCast(u32, val));
 }
 
 pub fn getcwd(buf: [*]u8, size: usize) usize {
std/os/index.zig
@@ -2839,7 +2839,7 @@ pub const Thread = struct {
                 while (true) {
                     const pid_value = @atomicLoad(i32, &self.data.handle, builtin.AtomicOrder.SeqCst);
                     if (pid_value == 0) break;
-                    const rc = linux.futex_wait(@ptrToInt(&self.data.handle), linux.FUTEX_WAIT, pid_value, null);
+                    const rc = linux.futex_wait(&self.data.handle, linux.FUTEX_WAIT, pid_value, null);
                     switch (linux.getErrno(rc)) {
                         0 => continue,
                         posix.EINTR => continue,
std/index.zig
@@ -1,15 +1,16 @@
-pub const ArrayList = @import("array_list.zig").ArrayList;
 pub const AlignedArrayList = @import("array_list.zig").AlignedArrayList;
+pub const ArrayList = @import("array_list.zig").ArrayList;
+pub const AutoHashMap = @import("hash_map.zig").AutoHashMap;
 pub const BufMap = @import("buf_map.zig").BufMap;
 pub const BufSet = @import("buf_set.zig").BufSet;
 pub const Buffer = @import("buffer.zig").Buffer;
 pub const BufferOutStream = @import("io.zig").BufferOutStream;
+pub const DynLib = @import("dynamic_library.zig").DynLib;
 pub const HashMap = @import("hash_map.zig").HashMap;
-pub const AutoHashMap = @import("hash_map.zig").AutoHashMap;
 pub const LinkedList = @import("linked_list.zig").LinkedList;
-pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
-pub const DynLib = @import("dynamic_library.zig").DynLib;
 pub const Mutex = @import("mutex.zig").Mutex;
+pub const SegmentedList = @import("segmented_list.zig").SegmentedList;
+pub const SpinLock = @import("spinlock.zig").SpinLock;
 
 pub const atomic = @import("atomic/index.zig");
 pub const base64 = @import("base64.zig");
@@ -45,15 +46,16 @@ pub const lazyInit = @import("lazy_init.zig").lazyInit;
 
 test "std" {
     // run tests from these
-    _ = @import("atomic/index.zig");
     _ = @import("array_list.zig");
+    _ = @import("atomic/index.zig");
     _ = @import("buf_map.zig");
     _ = @import("buf_set.zig");
     _ = @import("buffer.zig");
     _ = @import("hash_map.zig");
     _ = @import("linked_list.zig");
-    _ = @import("segmented_list.zig");
     _ = @import("mutex.zig");
+    _ = @import("segmented_list.zig");
+    _ = @import("spinlock.zig");
 
     _ = @import("base64.zig");
     _ = @import("build.zig");
std/mutex.zig
@@ -3,25 +3,74 @@ const builtin = @import("builtin");
 const AtomicOrder = builtin.AtomicOrder;
 const AtomicRmwOp = builtin.AtomicRmwOp;
 const assert = std.debug.assert;
+const SpinLock = std.SpinLock;
+const linux = std.os.linux;
 
-/// TODO use syscalls instead of a spinlock
+/// Lock may be held only once. If the same thread
+/// tries to acquire the same mutex twice, it deadlocks.
 pub const Mutex = struct {
-    lock: u8, // TODO use a bool
+    /// 0: unlocked
+    /// 1: locked, no waiters
+    /// 2: locked, one or more waiters
+    linux_lock: @typeOf(linux_lock_init),
+
+    /// TODO better implementation than spin lock
+    spin_lock: @typeOf(spin_lock_init),
+
+    const linux_lock_init = if (builtin.os == builtin.Os.linux) i32(0) else {};
+    const spin_lock_init = if (builtin.os != builtin.Os.linux) SpinLock.init() else {};
 
     pub const Held = struct {
         mutex: *Mutex,
 
         pub fn release(self: Held) void {
-            assert(@atomicRmw(u8, &self.mutex.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+            if (builtin.os == builtin.Os.linux) {
+                // Always unlock. If the previous state was Locked-No-Waiters, then we're done.
+                // Otherwise, wake a waiter up.
+                const prev = @atomicRmw(i32, &self.mutex.linux_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.Release);
+                if (prev != 1) {
+                    assert(prev == 2);
+                    const rc = linux.futex_wake(&self.mutex.linux_lock, linux.FUTEX_WAKE, 1);
+                    switch (linux.getErrno(rc)) {
+                        0 => {},
+                        linux.EINVAL => unreachable,
+                        else => unreachable,
+                    }
+                }
+            } else {
+                SpinLock.Held.release(SpinLock.Held{ .spinlock = &self.mutex.spin_lock });
+            }
         }
     };
 
     pub fn init() Mutex {
-        return Mutex{ .lock = 0 };
+        return Mutex{
+            .linux_lock = linux_lock_init,
+            .spin_lock = spin_lock_init,
+        };
     }
 
     pub fn acquire(self: *Mutex) Held {
-        while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
+        if (builtin.os == builtin.Os.linux) {
+            // First try to go from Unlocked to Locked-No-Waiters. If this succeeds, no syscalls are needed.
+            // Otherwise, we need to be in the Locked-With-Waiters state. If we are already in that state,
+            // proceed to futex_wait. Otherwise, try to go from Locked-No-Waiters to Locked-With-Waiters.
+            // If that succeeds, proceed to futex_wait. Otherwise start the whole loop over again.
+            while (@cmpxchgWeak(i32, &self.linux_lock, 0, 1, AtomicOrder.Acquire, AtomicOrder.Monotonic)) |l| {
+                if (l == 2 or
+                    @cmpxchgWeak(i32, &self.linux_lock, 1, 2, AtomicOrder.Acquire, AtomicOrder.Monotonic) == null)
+                {
+                    const rc = linux.futex_wait(&self.linux_lock, linux.FUTEX_WAIT, 2, null);
+                    switch (linux.getErrno(rc)) {
+                        0, linux.EINTR, linux.EAGAIN => continue,
+                        linux.EINVAL => unreachable,
+                        else => unreachable,
+                    }
+                }
+            }
+        } else {
+            _ = self.spin_lock.acquire();
+        }
         return Held{ .mutex = self };
     }
 };
std/spinlock.zig
@@ -0,0 +1,32 @@
+const std = @import("index.zig");
+const builtin = @import("builtin");
+const AtomicOrder = builtin.AtomicOrder;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const assert = std.debug.assert;
+
+pub const SpinLock = struct {
+    lock: u8, // TODO use a bool or enum
+
+    pub const Held = struct {
+        spinlock: *SpinLock,
+
+        pub fn release(self: Held) void {
+            assert(@atomicRmw(u8, &self.spinlock.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+        }
+    };
+
+    pub fn init() SpinLock {
+        return SpinLock{ .lock = 0 };
+    }
+
+    pub fn acquire(self: *SpinLock) Held {
+        while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
+        return Held{ .spinlock = self };
+    }
+};
+
+test "spinlock" {
+    var lock = SpinLock.init();
+    const held = lock.acquire();
+    defer held.release();
+}
CMakeLists.txt
@@ -635,6 +635,7 @@ set(ZIG_STD_FILES
     "special/init-lib/src/main.zig"
     "special/panic.zig"
     "special/test_runner.zig"
+    "spinlock.zig"
     "unicode.zig"
     "zig/ast.zig"
     "zig/index.zig"