Commit 71f8762959

jumpnbrownweasel <49791153+jumpnbrownweasel@users.noreply.github.com>
2022-10-18 01:15:15
Fix for #13163: DefaultRwLock accumulates write-waiters, eventually fails to write lock (#13180)
* Fix for: DefaultRwLock accumulates write-waiters, eventually fails to write lock #13163 * Comment out debug.print at the end of the last test. * Code formatting * - use equality test after lock/unlock rather than peeking into internals. however, this is still implementation specific and only done for DefaultRwLock. - add num_reads maximum to ensure that reader threads stop if writer threads are starved - use relaxed orderings for the read atomic counter - don't check at the end for non-zero read ops, since the reader threads may only run once if they are starved * More review changes - Monotonic is sufficient for incrementing the reads counter
1 parent ce3ffa5
Changed files (1)
lib
std
Thread
lib/std/Thread/RwLock.zig
@@ -9,6 +9,7 @@ const RwLock = @This();
 const std = @import("../std.zig");
 const builtin = @import("builtin");
 const assert = std.debug.assert;
+const testing = std.testing;
 
 pub const Impl = if (builtin.single_threaded)
     SingleThreadedRwLock
@@ -190,7 +191,7 @@ pub const DefaultRwLock = struct {
         _ = @atomicRmw(usize, &rwl.state, .Add, WRITER, .SeqCst);
         rwl.mutex.lock();
 
-        const state = @atomicRmw(usize, &rwl.state, .Or, IS_WRITING, .SeqCst);
+        const state = @atomicRmw(usize, &rwl.state, .Add, IS_WRITING -% WRITER, .SeqCst);
         if (state & READER_MASK != 0)
             rwl.semaphore.wait();
     }
@@ -247,3 +248,128 @@ pub const DefaultRwLock = struct {
             rwl.semaphore.post();
     }
 };
+
+test "DefaultRwLock - internal state" {
+    var rwl = DefaultRwLock{};
+
+    // The following failed prior to the fix for Issue #13163,
+    // where the WRITER flag was subtracted by the lock method.
+
+    rwl.lock();
+    rwl.unlock();
+    try testing.expectEqual(rwl, DefaultRwLock{});
+}
+
+test "RwLock - smoke test" {
+    var rwl = RwLock{};
+
+    rwl.lock();
+    try testing.expect(!rwl.tryLock());
+    try testing.expect(!rwl.tryLockShared());
+    rwl.unlock();
+
+    try testing.expect(rwl.tryLock());
+    try testing.expect(!rwl.tryLock());
+    try testing.expect(!rwl.tryLockShared());
+    rwl.unlock();
+
+    rwl.lockShared();
+    try testing.expect(!rwl.tryLock());
+    try testing.expect(rwl.tryLockShared());
+    rwl.unlockShared();
+    rwl.unlockShared();
+
+    try testing.expect(rwl.tryLockShared());
+    try testing.expect(!rwl.tryLock());
+    try testing.expect(rwl.tryLockShared());
+    rwl.unlockShared();
+    rwl.unlockShared();
+
+    rwl.lock();
+    rwl.unlock();
+}
+
+test "RwLock - concurrent access" {
+    if (builtin.single_threaded)
+        return;
+
+    const num_writers: usize = 2;
+    const num_readers: usize = 4;
+    const num_writes: usize = 10000;
+    const num_reads: usize = num_writes * 2;
+
+    const Runner = struct {
+        const Self = @This();
+
+        rwl: RwLock = .{},
+        writes: usize = 0,
+        reads: std.atomic.Atomic(usize) = std.atomic.Atomic(usize).init(0),
+
+        term1: usize = 0,
+        term2: usize = 0,
+        term_sum: usize = 0,
+
+        fn reader(self: *Self) !void {
+            while (true) {
+                self.rwl.lockShared();
+                defer self.rwl.unlockShared();
+
+                if (self.writes >= num_writes or self.reads.load(.Unordered) >= num_reads)
+                    break;
+
+                try self.check();
+
+                _ = self.reads.fetchAdd(1, .Monotonic);
+            }
+        }
+
+        fn writer(self: *Self, thread_idx: usize) !void {
+            var prng = std.rand.DefaultPrng.init(thread_idx);
+            var rnd = prng.random();
+
+            while (true) {
+                self.rwl.lock();
+                defer self.rwl.unlock();
+
+                if (self.writes >= num_writes)
+                    break;
+
+                try self.check();
+
+                const term1 = rnd.int(usize);
+                self.term1 = term1;
+                try std.Thread.yield();
+
+                const term2 = rnd.int(usize);
+                self.term2 = term2;
+                try std.Thread.yield();
+
+                self.term_sum = term1 +% term2;
+                self.writes += 1;
+            }
+        }
+
+        fn check(self: *const Self) !void {
+            const term_sum = self.term_sum;
+            try std.Thread.yield();
+
+            const term2 = self.term2;
+            try std.Thread.yield();
+
+            const term1 = self.term1;
+            try testing.expectEqual(term_sum, term1 +% term2);
+        }
+    };
+
+    var runner = Runner{};
+    var threads: [num_writers + num_readers]std.Thread = undefined;
+
+    for (threads[0..num_writers]) |*t, i| t.* = try std.Thread.spawn(.{}, Runner.writer, .{ &runner, i });
+    for (threads[num_writers..]) |*t| t.* = try std.Thread.spawn(.{}, Runner.reader, .{&runner});
+
+    for (threads) |t| t.join();
+
+    try testing.expectEqual(num_writes, runner.writes);
+
+    //std.debug.print("reads={}\n", .{ runner.reads.load(.Unordered)});
+}