Commit a31d9f92f2

kprotty <45520026+kprotty@users.noreply.github.com>
2020-09-27 21:05:38
new std.event.Lock implementation
1 parent 8794ce6
Changed files (1)
lib
std
event
lib/std/event/lock.zig
@@ -16,107 +16,90 @@ const Loop = std.event.Loop;
 /// Allows only one actor to hold the lock.
 /// TODO: make this API also work in blocking I/O mode.
 pub const Lock = struct {
-    shared: bool,
-    queue: Queue,
-    queue_empty: bool,
+    mutex: std.Mutex = std.Mutex{},
+    head: usize = UNLOCKED,
 
-    const Queue = std.atomic.Queue(anyframe);
+    const UNLOCKED = 0;
+    const LOCKED = 69;
 
     const global_event_loop = Loop.instance orelse
         @compileError("std.event.Lock currently only works with event-based I/O");
 
-    pub const Held = struct {
-        lock: *Lock,
-
-        pub fn release(self: Held) void {
-            // Resume the next item from the queue.
-            if (self.lock.queue.get()) |node| {
-                global_event_loop.onNextTick(node);
-                return;
-            }
-
-            // We need to release the lock.
-            @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst);
-            @atomicStore(bool, &self.lock.shared, false, .SeqCst);
-
-            // There might be a queue item. If we know the queue is empty, we can be done,
-            // because the other actor will try to obtain the lock.
-            // But if there's a queue item, we are the actor which must loop and attempt
-            // to grab the lock again.
-            if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) {
-                return;
-            }
-
-            while (true) {
-                if (@atomicRmw(bool, &self.lock.shared, .Xchg, true, .SeqCst)) {
-                    // We did not obtain the lock. Great, the queue is someone else's problem.
-                    return;
-                }
-
-                // Resume the next item from the queue.
-                if (self.lock.queue.get()) |node| {
-                    global_event_loop.onNextTick(node);
-                    return;
-                }
+    const Waiter = struct {
+        next: ?*Waiter,
+        tail: *Waiter,
+        node: Loop.NextTickNode,
+    };
 
-                // Release the lock again.
-                @atomicStore(bool, &self.lock.queue_empty, true, .SeqCst);
-                @atomicStore(bool, &self.lock.shared, false, .SeqCst);
+    pub fn acquire(self: *Lock) Held {
+        const held = self.mutex.acquire();
 
-                // Find out if we can be done.
-                if (@atomicLoad(bool, &self.lock.queue_empty, .SeqCst)) {
-                    return;
-                }
-            }
+        if (self.head == UNLOCKED) {
+            self.head = LOCKED;
+            held.release();
+            return Held{ .lock = self };
         }
-    };
 
-    pub fn init() Lock {
-        return Lock{
-            .shared = false,
-            .queue = Queue.init(),
-            .queue_empty = true,
-        };
-    }
+        var waiter: Waiter = undefined;
+        waiter.next = null;
+        waiter.tail = &waiter; 
 
-    pub fn initLocked() Lock {
-        return Lock{
-            .shared = true,
-            .queue = Queue.init(),
-            .queue_empty = true,
+        const head = switch (self.head) {
+            UNLOCKED => unreachable,
+            LOCKED => null,
+            else => @intToPtr(?*Waiter, self.head),
         };
-    }
-
-    /// Must be called when not locked. Not thread safe.
-    /// All calls to acquire() and release() must complete before calling deinit().
-    pub fn deinit(self: *Lock) void {
-        assert(!self.shared);
-        while (self.queue.get()) |node| resume node.data;
-    }
 
-    pub fn acquire(self: *Lock) callconv(.Async) Held {
-        var my_tick_node = Loop.NextTickNode.init(@frame());
+        if (head) |h| {
+            h.tail.next = &waiter;
+            h.tail = &waiter;
+        } else {
+            self.head = @ptrToInt(&waiter);
+        }
 
-        errdefer _ = self.queue.remove(&my_tick_node); // TODO test canceling an acquire
         suspend {
-            self.queue.put(&my_tick_node);
-
-            // At this point, we are in the queue, so we might have already been resumed.
+            waiter.node = Loop.NextTickNode{
+                .prev = undefined,
+                .next = undefined,
+                .data = @frame(),
+            };
+            held.release();
+        }
 
-            // We set this bit so that later we can rely on the fact, that if queue_empty == true, some actor
-            // will attempt to grab the lock.
-            @atomicStore(bool, &self.queue_empty, false, .SeqCst);
+        return Held{ .lock = self };
+    }
 
-            if (!@atomicRmw(bool, &self.shared, .Xchg, true, .SeqCst)) {
-                if (self.queue.get()) |node| {
-                    // Whether this node is us or someone else, we tail resume it.
-                    resume node.data;
+    pub const Held = struct {
+        lock: *Lock, 
+    
+        pub fn release(self: Held) void {
+            const waiter = blk: {
+                const held = self.lock.mutex.acquire();
+                defer held.release();
+
+                switch (self.lock.head) {
+                    UNLOCKED => {
+                        std.debug.panic("Lock unlocked when already unlocked", .{});
+                    },
+                    LOCKED => {
+                        self.lock.head = UNLOCKED;
+                        break :blk null;
+                    },
+                    else => {
+                        const waiter = @intToPtr(*Waiter, self.lock.head);
+                        self.lock.head = if (waiter.next == null) LOCKED else @ptrToInt(waiter.next);
+                        if (waiter.next) |next|
+                            next.tail = waiter.tail;
+                        break :blk waiter;
+                    },
                 }
+            };
+
+            if (waiter) |w| {
+                global_event_loop.onNextTick(&w.node);
             }
         }
-
-        return Held{ .lock = self };
-    }
+    };
 };
 
 test "std.event.Lock" {
@@ -128,41 +111,16 @@ test "std.event.Lock" {
     // TODO https://github.com/ziglang/zig/issues/3251
     if (builtin.os.tag == .freebsd) return error.SkipZigTest;
 
-    // TODO this file has bit-rotted. repair it
-    if (true) return error.SkipZigTest;
-
-    var lock = Lock.init();
-    defer lock.deinit();
-
-    _ = async testLock(&lock);
+    var lock = Lock{};
+    testLock(&lock);
 
     const expected_result = [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len;
     testing.expectEqualSlices(i32, &expected_result, &shared_test_data);
 }
-fn testLock(lock: *Lock) callconv(.Async) void {
+fn testLock(lock: *Lock) void {
     var handle1 = async lockRunner(lock);
-    var tick_node1 = Loop.NextTickNode{
-        .prev = undefined,
-        .next = undefined,
-        .data = &handle1,
-    };
-    Loop.instance.?.onNextTick(&tick_node1);
-
     var handle2 = async lockRunner(lock);
-    var tick_node2 = Loop.NextTickNode{
-        .prev = undefined,
-        .next = undefined,
-        .data = &handle2,
-    };
-    Loop.instance.?.onNextTick(&tick_node2);
-
     var handle3 = async lockRunner(lock);
-    var tick_node3 = Loop.NextTickNode{
-        .prev = undefined,
-        .next = undefined,
-        .data = &handle3,
-    };
-    Loop.instance.?.onNextTick(&tick_node3);
 
     await handle1;
     await handle2;
@@ -171,13 +129,13 @@ fn testLock(lock: *Lock) callconv(.Async) void {
 
 var shared_test_data = [1]i32{0} ** 10;
 var shared_test_index: usize = 0;
-fn lockRunner(lock: *Lock) callconv(.Async) void {
-    suspend; // resumed by onNextTick
+
+fn lockRunner(lock: *Lock) void {
+    Lock.global_event_loop.yield();
 
     var i: usize = 0;
     while (i < shared_test_data.len) : (i += 1) {
-        var lock_frame = async lock.acquire();
-        const handle = await lock_frame;
+        const handle = lock.acquire();
         defer handle.release();
 
         shared_test_index = 0;