Commit 752d38612f
Changed files (1)
lib
std
lib/std/Io/Threaded.zig
@@ -265,6 +265,10 @@ const have_flock_open_flags = @hasField(posix.O, "EXLOCK");
const have_networking = builtin.os.tag != .wasi;
const have_flock = @TypeOf(posix.system.flock) != void;
const have_sendmmsg = builtin.os.tag == .linux;
+const have_futex = switch (builtin.cpu.arch) {
+ .wasm32, .wasm64 => builtin.cpu.has(.wasm, .atomics),
+ else => true,
+};
const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
const fstat_sym = if (posix.lfs64_abi) posix.system.fstat64 else posix.system.fstat;
@@ -731,6 +735,7 @@ fn checkCancel(t: *Threaded) error{Canceled}!void {
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
+ if (builtin.single_threaded) unreachable; // Interface should have prevented this.
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (prev_state == .contended) {
try futexWait(t, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
@@ -741,6 +746,7 @@ fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex
}
fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
+ if (builtin.single_threaded) unreachable; // Interface should have prevented this.
_ = userdata;
if (prev_state == .contended) {
futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
@@ -751,6 +757,7 @@ fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mute
}
fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
+ if (builtin.single_threaded) unreachable; // Interface should have prevented this.
_ = userdata;
_ = prev_state;
if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) {
@@ -759,6 +766,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
}
fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
+ if (builtin.single_threaded) unreachable; // Deadlock.
const t: *Threaded = @ptrCast(@alignCast(userdata));
const t_io = t.io();
comptime assert(@TypeOf(cond.state) == u64);
@@ -789,6 +797,7 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex:
}
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
+ if (builtin.single_threaded) unreachable; // Deadlock.
const t: *Threaded = @ptrCast(@alignCast(userdata));
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
@@ -833,6 +842,7 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I
}
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
+ if (builtin.single_threaded) unreachable; // Nothing to wake up.
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
comptime assert(@TypeOf(cond.state) == u64);
@@ -4007,6 +4017,32 @@ fn futexWait(t: *Threaded, ptr: *const std.atomic.Value(u32), expect: u32) Io.Ca
return;
}
+ if (builtin.cpu.arch.isWasm()) {
+ comptime assert(builtin.cpu.has(.wasm, .atomics));
+ try t.checkCancel();
+ const timeout: i64 = -1;
+ const signed_expect: i32 = @bitCast(expect);
+ const result = asm volatile (
+ \\local.get %[ptr]
+ \\local.get %[expected]
+ \\local.get %[timeout]
+ \\memory.atomic.wait32 0
+ \\local.set %[ret]
+ : [ret] "=r" (-> u32),
+ : [ptr] "r" (&ptr.raw),
+ [expected] "r" (signed_expect),
+ [timeout] "r" (timeout),
+ );
+ const is_debug = builtin.mode == .Debug;
+ switch (result) {
+ 0 => {}, // ok
+ 1 => {}, // expected != loaded
+ 2 => assert(!is_debug), // timeout
+ else => assert(!is_debug),
+ }
+ return;
+ }
+
@compileError("TODO");
}
@@ -4054,6 +4090,31 @@ pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) voi
return;
}
+ if (builtin.cpu.arch.isWasm()) {
+ comptime assert(builtin.cpu.has(.wasm, .atomics));
+ const timeout: i64 = -1;
+ const signed_expect: i32 = @bitCast(expect);
+ const result = asm volatile (
+ \\local.get %[ptr]
+ \\local.get %[expected]
+ \\local.get %[timeout]
+ \\memory.atomic.wait32 0
+ \\local.set %[ret]
+ : [ret] "=r" (-> u32),
+ : [ptr] "r" (&ptr.raw),
+ [expected] "r" (signed_expect),
+ [timeout] "r" (timeout),
+ );
+ const is_debug = builtin.mode == .Debug;
+ switch (result) {
+ 0 => {}, // ok
+ 1 => {}, // expected != loaded
+ 2 => assert(!is_debug), // timeout
+ else => assert(!is_debug),
+ }
+ return;
+ }
+
@compileError("TODO");
}
@@ -4119,6 +4180,22 @@ pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void {
}
}
+ if (builtin.cpu.arch.isWasm()) {
+ comptime assert(builtin.cpu.has(.wasm, .atomics));
+ assert(max_waiters != 0);
+ const woken_count = asm volatile (
+ \\local.get %[ptr]
+ \\local.get %[waiters]
+ \\memory.atomic.notify 0
+ \\local.set %[ret]
+ : [ret] "=r" (-> u32),
+ : [ptr] "r" (&ptr.raw),
+ [waiters] "r" (max_waiters),
+ );
+ _ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
+ return;
+ }
+
@compileError("TODO");
}