Commit fd4a607bb2

kprotty <kbutcher6200@gmail.com>
2021-06-26 16:03:53
std.Thread: fix futex test + thread errors
1 parent 18bcb2e
Changed files (2)
lib
lib/std/Thread/Futex.zig
@@ -391,70 +391,74 @@ test "Futex - wait/wake" {
 }
 
 test "Futex - Signal" {
-    if (!single_threaded) {
+    if (single_threaded) {
         return;
     }
 
-    try (struct {
+    const Paddle = struct {
         value: Atomic(u32) = Atomic(u32).init(0),
+        current: u32 = 0,
 
-        const Self = @This();
-
-        fn send(self: *Self, value: u32) void {
-            self.value.store(value, .Release);
-            Futex.wake(&self.value, 1);
-        }
-
-        fn recv(self: *Self, expected: u32) void {
-            while (true) {
-                const value = self.value.load(.Acquire);
-                if (value == expected) break;
-                Futex.wait(&self.value, value, null) catch unreachable;
-            }
-        }
+        fn run(self: *@This(), hit_to: *@This()) !void {
+            var iterations: usize = 4;
+            while (iterations > 0) : (iterations -= 1) {
 
-        const start_value = 1;
+                var value: u32 = undefined;
+                while (true) {
+                    value = self.value.load(.Acquire);
+                    if (value != self.current) break;
+                    Futex.wait(&self.value, self.current, null) catch unreachable;
+                }
 
-        fn runThread(rx: *Self, tx: *Self) void {
-            var iterations: u32 = start_value;
-            while (iterations < 10) : (iterations += 1) {
-                rx.recv(iterations);
-                tx.send(iterations);
+                try testing.expectEqual(value, self.current + 1);
+                self.current = value;
+                
+                _ = hit_to.value.fetchAdd(1, .Release);
+                Futex.wake(&hit_to.value, 1);
             }
         }
+    };
 
-        fn run() !void {
-            var ping = Self{};
-            var pong = Self{};
+    var ping = Paddle{};
+    var pong = Paddle{};
 
-            const t1 = try std.Thread.spawn(.{}, runThread, .{ &ping, &pong });
-            defer t1.join();
+    const t1 = try std.Thread.spawn(.{}, Paddle.run, .{&ping, &pong});
+    defer t1.join();
 
-            const t2 = try std.Thread.spawn(.{}, runThread, .{ &pong, &ping });
-            defer t2.join();
+    const t2 = try std.Thread.spawn(.{}, Paddle.run, .{&pong, &ping});
+    defer t2.join();
 
-            ping.send(start_value);
-        }
-    }).run();
+    _ = ping.value.fetchAdd(1, .Release);
+    Futex.wake(&ping.value, 1);
 }
 
 test "Futex - Broadcast" {
-    if (!single_threaded) {
+    if (single_threaded) {
         return;
     }
 
-    try (struct {
-        threads: [10]std.Thread = undefined,
+    const Context = struct {
+        threads: [4]std.Thread = undefined,
         broadcast: Atomic(u32) = Atomic(u32).init(0),
         notified: Atomic(usize) = Atomic(usize).init(0),
 
-        const Self = @This();
-
         const BROADCAST_EMPTY = 0;
         const BROADCAST_SENT = 1;
         const BROADCAST_RECEIVED = 2;
 
-        fn runReceiver(self: *Self) void {
+        fn runSender(self: *@This()) !void {
+            self.broadcast.store(BROADCAST_SENT, .Monotonic);
+            Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
+
+            while (true) {
+                const broadcast = self.broadcast.load(.Acquire);
+                if (broadcast == BROADCAST_RECEIVED) break;
+                try testing.expectEqual(broadcast, BROADCAST_SENT);
+                Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
+            }
+        }
+
+        fn runReceiver(self: *@This()) void {
             while (true) {
                 const broadcast = self.broadcast.load(.Acquire);
                 if (broadcast == BROADCAST_SENT) break;
@@ -468,66 +472,55 @@ test "Futex - Broadcast" {
                 Futex.wake(&self.broadcast, 1);
             }
         }
+    };
 
-        fn run() !void {
-            var self = Self{};
-
-            for (self.threads) |*thread|
-                thread.* = try std.Thread.spawn(runReceiver, &self);
-            defer for (self.threads) |thread|
-                thread.join();
+    var ctx = Context{};
+    for (ctx.threads) |*thread|
+        thread.* = try std.Thread.spawn(.{}, Context.runReceiver, .{&ctx});
+    defer for (ctx.threads) |thread|
+        thread.join();
 
-            std.time.sleep(16 * std.time.ns_per_ms);
-            self.broadcast.store(BROADCAST_SENT, .Monotonic);
-            Futex.wake(&self.broadcast, @intCast(u32, self.threads.len));
+    // Try to wait for the threads to start before running runSender().
+    // NOTE: not actually needed for correctness.
+    std.time.sleep(16 * std.time.ns_per_ms); 
+    try ctx.runSender();
 
-            while (true) {
-                const broadcast = self.broadcast.load(.Acquire);
-                if (broadcast == BROADCAST_RECEIVED) break;
-                try testing.expectEqual(broadcast, BROADCAST_SENT);
-                Futex.wait(&self.broadcast, broadcast, null) catch unreachable;
-            }
-
-            const notified = self.notified.load(.Monotonic);
-            try testing.expectEqual(notified, self.threads.len);
-        }
-    }).run();
+    const notified = ctx.notified.load(.Monotonic);
+    try testing.expectEqual(notified, ctx.threads.len);
 }
 
 test "Futex - Chain" {
-    if (!single_threaded) {
+    if (single_threaded) {
         return;
     }
 
-    try (struct {
-        completed: Signal = .{},
-        threads: [10]struct {
-            thread: std.Thread,
-            signal: Signal,
-        } = undefined,
-
-        const Signal = struct {
-            state: Atomic(u32) = Atomic(u32).init(0),
+    const Signal = struct {
+        value: Atomic(u32) = Atomic(u32).init(0),
 
-            fn wait(self: *Signal) void {
-                while (true) {
-                    const value = self.value.load(.Acquire);
-                    if (value == 1) break;
-                    assert(value == 0);
-                    Futex.wait(&self.value, 0, null) catch unreachable;
-                }
+        fn wait(self: *@This()) void {
+            while (true) {
+                const value = self.value.load(.Acquire);
+                if (value == 1) break;
+                assert(value == 0);
+                Futex.wait(&self.value, 0, null) catch unreachable;
             }
+        }
 
-            fn notify(self: *Signal) void {
-                assert(self.value.load(.Unordered) == 0);
-                self.value.store(1, .Release);
-                Futex.wake(&self.value, 1);
-            }
-        };
+        fn notify(self: *@This()) void {
+            assert(self.value.load(.Unordered) == 0);
+            self.value.store(1, .Release);
+            Futex.wake(&self.value, 1);
+        }
+    };
 
-        const Self = @This();
+    const Context = struct {
+        completed: Signal = .{},
+        threads: [4]struct {
+            thread: std.Thread,
+            signal: Signal,
+        } = undefined,
 
-        fn runThread(self: *Self, index: usize) void {
+        fn run(self: *@This(), index: usize) void {
             const this_signal = &self.threads[index].signal;
 
             var next_signal = &self.completed;
@@ -538,21 +531,18 @@ test "Futex - Chain" {
             this_signal.wait();
             next_signal.notify();
         }
+    };
 
-        fn run() !void {
-            var self = Self{};
-
-            for (self.threads) |*entry, index| {
-                entry.signal = .{};
-                entry.thread = try std.Thread.spawn(.{}, runThread, .{&self, index});
-            }
+    var ctx = Context{};
+    for (ctx.threads) |*entry, index| {
+        entry.signal = .{};
+        entry.thread = try std.Thread.spawn(.{}, Context.run, .{&ctx, index});
+    }
 
-            self.threads[0].signal.notify();
-            self.completed.wait();
+    ctx.threads[0].signal.notify();
+    ctx.completed.wait();
 
-            for (self.threads) |entry| {
-                entry.thread.join();
-            }
-        }
-    }).run();
+    for (ctx.threads) |entry| {
+        entry.thread.join();
+    }
 }
lib/std/Thread.zig
@@ -385,6 +385,7 @@ const PosixThreadImpl = struct {
         };
 
         const args_ptr = try allocator.create(Args);
+        args_ptr.* = args;
         errdefer allocator.destroy(args_ptr);
 
         var attr: c.pthread_attr_t = undefined;
@@ -523,6 +524,7 @@ const LinuxThreadImpl = struct {
             error.PermissionDenied => unreachable,
             else => |e| return e,
         };
+        assert(mapped.len >= map_bytes);
         errdefer os.munmap(mapped);
 
         // map everything but the guard page as read/write