Commit a1c1d06b19
lib/std/Thread/Pool.zig
@@ -332,6 +332,7 @@ pub fn io(pool: *Pool) Io {
.vtable = &.{
.@"async" = @"async",
.@"await" = @"await",
+ .go = go,
.cancel = cancel,
.cancelRequested = cancelRequested,
.mutexLock = mutexLock,
@@ -472,6 +473,75 @@ fn @"async"(
return @ptrCast(closure);
}
+const DetachedClosure = struct {
+ pool: *Pool,
+ func: *const fn (context: *anyopaque) void,
+ run_node: std.Thread.Pool.RunQueue.Node = .{ .data = .{ .runFn = runFn } },
+ context_alignment: std.mem.Alignment,
+ context_len: usize,
+
+ fn runFn(runnable: *std.Thread.Pool.Runnable, _: ?usize) void {
+ const run_node: *std.Thread.Pool.RunQueue.Node = @fieldParentPtr("data", runnable);
+ const closure: *DetachedClosure = @alignCast(@fieldParentPtr("run_node", run_node));
+ closure.func(closure.contextPointer());
+ const gpa = closure.pool.allocator;
+ const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure);
+ gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]);
+ }
+
+ fn contextOffset(context_alignment: std.mem.Alignment) usize {
+ return context_alignment.forward(@sizeOf(DetachedClosure));
+ }
+
+ fn contextEnd(context_alignment: std.mem.Alignment, context_len: usize) usize {
+ return contextOffset(context_alignment) + context_len;
+ }
+
+ fn contextPointer(closure: *DetachedClosure) [*]u8 {
+ const base: [*]u8 = @ptrCast(closure);
+ return base + contextOffset(closure.context_alignment);
+ }
+};
+
+fn go(
+ userdata: ?*anyopaque,
+ context: []const u8,
+ context_alignment: std.mem.Alignment,
+ start: *const fn (context: *const anyopaque) void,
+) void {
+ const pool: *std.Thread.Pool = @alignCast(@ptrCast(userdata));
+ pool.mutex.lock();
+
+ const gpa = pool.allocator;
+ const n = DetachedClosure.contextEnd(context_alignment, context.len);
+ const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, @alignOf(DetachedClosure), n) catch {
+ pool.mutex.unlock();
+ start(context.ptr);
+ return;
+ }));
+ closure.* = .{
+ .pool = pool,
+ .func = start,
+ .context_alignment = context_alignment,
+ .context_len = context.len,
+ };
+ @memcpy(closure.contextPointer()[0..context.len], context);
+ pool.run_queue.prepend(&closure.run_node);
+
+ if (pool.threads.items.len < pool.threads.capacity) {
+ pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
+ .stack_size = pool.stack_size,
+ .allocator = gpa,
+ }, worker, .{pool}) catch t: {
+ pool.threads.items.len -= 1;
+ break :t undefined;
+ };
+ }
+
+ pool.mutex.unlock();
+ pool.cond.signal();
+}
+
fn @"await"(
userdata: ?*anyopaque,
any_future: *std.Io.AnyFuture,
lib/std/Io.zig
@@ -580,6 +580,18 @@ pub const VTable = struct {
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture,
+ /// Executes `start` asynchronously in a manner such that it cleans itself
+ /// up. This mode does not support results, await, or cancel.
+ ///
+ /// Thread-safe.
+ go: *const fn (
+ /// Corresponds to `Io.userdata`.
+ userdata: ?*anyopaque,
+ /// Copied and then passed to `start`.
+ context: []const u8,
+ context_alignment: std.mem.Alignment,
+ start: *const fn (context: *const anyopaque) void,
+ ) void,
/// This function is only called when `async` returns a non-null value.
///
/// Thread-safe.
@@ -593,7 +605,6 @@ pub const VTable = struct {
result: []u8,
result_alignment: std.mem.Alignment,
) void,
-
/// Equivalent to `await` but initiates cancel request.
///
/// This function is only called when `async` returns a non-null value.
@@ -671,14 +682,24 @@ pub fn Future(Result: type) type {
/// Idempotent.
pub fn cancel(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
- io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
+ io.vtable.cancel(
+ io.userdata,
+ any_future,
+ if (@sizeOf(Result) == 0) &.{} else @ptrCast((&f.result)[0..1]), // work around compiler bug
+ .of(Result),
+ );
f.any_future = null;
return f.result;
}
pub fn await(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
- io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
+ io.vtable.await(
+ io.userdata,
+ any_future,
+ if (@sizeOf(Result) == 0) &.{} else @ptrCast((&f.result)[0..1]), // work around compiler bug
+ .of(Result),
+ );
f.any_future = null;
return f.result;
}
@@ -996,7 +1017,7 @@ pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf(
var future: Future(Result) = undefined;
future.any_future = io.vtable.async(
io.userdata,
- @ptrCast((&future.result)[0..1]),
+ if (@sizeOf(Result) == 0) &.{} else @ptrCast((&future.result)[0..1]), // work around compiler bug
.of(Result),
if (@sizeOf(Args) == 0) &.{} else @ptrCast((&args)[0..1]), // work around compiler bug
.of(Args),
@@ -1005,6 +1026,24 @@ pub fn async(io: Io, function: anytype, args: anytype) Future(@typeInfo(@TypeOf(
return future;
}
+/// Calls `function` with `args` asynchronously. The resource cleans itself up
+/// when the function returns. Does not support await, cancel, or a return value.
+pub fn go(io: Io, function: anytype, args: anytype) void {
+ const Args = @TypeOf(args);
+ const TypeErased = struct {
+ fn start(context: *const anyopaque) void {
+ const args_casted: *const Args = @alignCast(@ptrCast(context));
+ @call(.auto, function, args_casted.*);
+ }
+ };
+ io.vtable.go(
+ io.userdata,
+ if (@sizeOf(Args) == 0) &.{} else @ptrCast((&args)[0..1]), // work around compiler bug
+ .of(Args),
+ TypeErased.start,
+ );
+}
+
pub fn openFile(io: Io, dir: fs.Dir, sub_path: []const u8, flags: fs.File.OpenFlags) FileOpenError!fs.File {
return io.vtable.openFile(io.userdata, dir, sub_path, flags);
}