Commit f762597724
Changed files (4)
lib/std/Io/EventLoop.zig
@@ -139,6 +139,8 @@ pub fn io(el: *EventLoop) Io {
.userdata = el,
.vtable = &.{
.async = async,
+ .asyncConcurrent = asyncConcurrent,
+ .asyncParallel = asyncParallel,
.await = await,
.asyncDetached = asyncDetached,
.select = select,
@@ -876,17 +878,28 @@ fn async(
context: []const u8,
context_alignment: Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
+) ?*std.Io.AnyFuture {
+ return asyncConcurrent(userdata, result.len, result_alignment, context, context_alignment, start) orelse {
+ start(context.ptr, result.ptr);
+ return null;
+ };
+}
+
+fn asyncConcurrent(
+ userdata: ?*anyopaque,
+ result_len: usize,
+ result_alignment: Alignment,
+ context: []const u8,
+ context_alignment: Alignment,
+ start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*std.Io.AnyFuture {
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
- assert(result.len <= Fiber.max_result_size); // TODO
+ assert(result_len <= Fiber.max_result_size); // TODO
assert(context.len <= Fiber.max_context_size); // TODO
const event_loop: *EventLoop = @alignCast(@ptrCast(userdata));
- const fiber = Fiber.allocate(event_loop) catch {
- start(context.ptr, result.ptr);
- return null;
- };
+ const fiber = Fiber.allocate(event_loop) catch return null;
std.log.debug("allocated {*}", .{fiber});
const closure: *AsyncClosure = .fromFiber(fiber);
@@ -925,6 +938,23 @@ fn async(
return @ptrCast(fiber);
}
+fn asyncParallel(
+ userdata: ?*anyopaque,
+ result_len: usize,
+ result_alignment: Alignment,
+ context: []const u8,
+ context_alignment: Alignment,
+ start: *const fn (context: *const anyopaque, result: *anyopaque) void,
+) ?*std.Io.AnyFuture {
+ _ = userdata;
+ _ = result_len;
+ _ = result_alignment;
+ _ = context;
+ _ = context_alignment;
+ _ = start;
+ @panic("TODO");
+}
+
const DetachedClosure = struct {
event_loop: *EventLoop,
fiber: *Fiber,
lib/std/Io/ThreadPool.zig
@@ -6,331 +6,88 @@ const WaitGroup = std.Thread.WaitGroup;
const Io = std.Io;
const Pool = @This();
-/// Must be a thread-safe allocator.
-allocator: std.mem.Allocator,
+/// Thread-safe.
+allocator: Allocator,
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{},
-is_running: bool = true,
+join_requested: bool = false,
threads: std.ArrayListUnmanaged(std.Thread),
-ids: if (builtin.single_threaded) struct {
- inline fn deinit(_: @This(), _: std.mem.Allocator) void {}
- fn getIndex(_: @This(), _: std.Thread.Id) usize {
- return 0;
- }
-} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void),
stack_size: usize,
+cpu_count: std.Thread.CpuCountError!usize,
+parallel_count: usize,
threadlocal var current_closure: ?*AsyncClosure = null;
pub const Runnable = struct {
- runFn: RunProto,
+ start: Start,
node: std.SinglyLinkedList.Node = .{},
-};
-
-pub const RunProto = *const fn (*Runnable, id: ?usize) void;
+ is_parallel: bool,
-pub const Options = struct {
- allocator: std.mem.Allocator,
- n_jobs: ?usize = null,
- track_ids: bool = false,
- stack_size: usize = std.Thread.SpawnConfig.default_stack_size,
+ pub const Start = *const fn (*Runnable) void;
};
-pub fn init(pool: *Pool, options: Options) !void {
- const gpa = options.allocator;
- const thread_count = options.n_jobs orelse @max(1, std.Thread.getCpuCount() catch 1);
- const threads = try gpa.alloc(std.Thread, thread_count);
- errdefer gpa.free(threads);
+pub const InitError = std.Thread.CpuCountError || Allocator.Error;
- pool.* = .{
+pub fn init(gpa: Allocator) Pool {
+ var pool: Pool = .{
.allocator = gpa,
- .threads = .initBuffer(threads),
- .ids = .{},
- .stack_size = options.stack_size,
+ .threads = .empty,
+ .stack_size = std.Thread.SpawnConfig.default_stack_size,
+ .cpu_count = std.Thread.getCpuCount(),
+ .parallel_count = 0,
};
-
- if (builtin.single_threaded) return;
-
- if (options.track_ids) {
- try pool.ids.ensureTotalCapacity(gpa, 1 + thread_count);
- pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
- }
+ if (pool.cpu_count) |n| {
+ pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
+ } else |_| {}
+ return pool;
}
pub fn deinit(pool: *Pool) void {
const gpa = pool.allocator;
pool.join();
pool.threads.deinit(gpa);
- pool.ids.deinit(gpa);
pool.* = undefined;
}
fn join(pool: *Pool) void {
if (builtin.single_threaded) return;
-
{
pool.mutex.lock();
defer pool.mutex.unlock();
-
- // ensure future worker threads exit the dequeue loop
- pool.is_running = false;
+ pool.join_requested = true;
}
-
- // wake up any sleeping threads (this can be done outside the mutex)
- // then wait for all the threads we know are spawned to complete.
pool.cond.broadcast();
for (pool.threads.items) |thread| thread.join();
}
-/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and
-/// `WaitGroup.finish` after it returns.
-///
-/// In the case that queuing the function call fails to allocate memory, or the
-/// target is single-threaded, the function is called directly.
-pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void {
- wait_group.start();
-
- if (builtin.single_threaded) {
- @call(.auto, func, args);
- wait_group.finish();
- return;
- }
-
- const Args = @TypeOf(args);
- const Closure = struct {
- arguments: Args,
- pool: *Pool,
- runnable: Runnable = .{ .runFn = runFn },
- wait_group: *WaitGroup,
-
- fn runFn(runnable: *Runnable, _: ?usize) void {
- const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
- @call(.auto, func, closure.arguments);
- closure.wait_group.finish();
- closure.pool.allocator.destroy(closure);
- }
- };
-
- pool.mutex.lock();
-
- const gpa = pool.allocator;
- const closure = gpa.create(Closure) catch {
- pool.mutex.unlock();
- @call(.auto, func, args);
- wait_group.finish();
- return;
- };
- closure.* = .{
- .arguments = args,
- .pool = pool,
- .wait_group = wait_group,
- };
-
- pool.run_queue.prepend(&closure.runnable.node);
-
- if (pool.threads.items.len < pool.threads.capacity) {
- pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
- .stack_size = pool.stack_size,
- .allocator = gpa,
- }, worker, .{pool}) catch t: {
- pool.threads.items.len -= 1;
- break :t undefined;
- };
- }
-
- pool.mutex.unlock();
- pool.cond.signal();
-}
-
-/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and
-/// `WaitGroup.finish` after it returns.
-///
-/// The first argument passed to `func` is a dense `usize` thread id, the rest
-/// of the arguments are passed from `args`. Requires the pool to have been
-/// initialized with `.track_ids = true`.
-///
-/// In the case that queuing the function call fails to allocate memory, or the
-/// target is single-threaded, the function is called directly.
-pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void {
- wait_group.start();
-
- if (builtin.single_threaded) {
- @call(.auto, func, .{0} ++ args);
- wait_group.finish();
- return;
- }
-
- const Args = @TypeOf(args);
- const Closure = struct {
- arguments: Args,
- pool: *Pool,
- runnable: Runnable = .{ .runFn = runFn },
- wait_group: *WaitGroup,
-
- fn runFn(runnable: *Runnable, id: ?usize) void {
- const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
- @call(.auto, func, .{id.?} ++ closure.arguments);
- closure.wait_group.finish();
- closure.pool.allocator.destroy(closure);
- }
- };
-
- pool.mutex.lock();
-
- const gpa = pool.allocator;
- const closure = gpa.create(Closure) catch {
- const id: ?usize = pool.ids.getIndex(std.Thread.getCurrentId());
- pool.mutex.unlock();
- @call(.auto, func, .{id.?} ++ args);
- wait_group.finish();
- return;
- };
- closure.* = .{
- .arguments = args,
- .pool = pool,
- .wait_group = wait_group,
- };
-
- pool.run_queue.prepend(&closure.runnable.node);
-
- if (pool.threads.items.len < pool.threads.capacity) {
- pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
- .stack_size = pool.stack_size,
- .allocator = gpa,
- }, worker, .{pool}) catch t: {
- pool.threads.items.len -= 1;
- break :t undefined;
- };
- }
-
- pool.mutex.unlock();
- pool.cond.signal();
-}
-
-pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) void {
- if (builtin.single_threaded) {
- @call(.auto, func, args);
- return;
- }
-
- const Args = @TypeOf(args);
- const Closure = struct {
- arguments: Args,
- pool: *Pool,
- runnable: Runnable = .{ .runFn = runFn },
-
- fn runFn(runnable: *Runnable, _: ?usize) void {
- const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
- @call(.auto, func, closure.arguments);
- closure.pool.allocator.destroy(closure);
- }
- };
-
- pool.mutex.lock();
-
- const gpa = pool.allocator;
- const closure = gpa.create(Closure) catch {
- pool.mutex.unlock();
- @call(.auto, func, args);
- return;
- };
- closure.* = .{
- .arguments = args,
- .pool = pool,
- };
-
- pool.run_queue.prepend(&closure.runnable.node);
-
- if (pool.threads.items.len < pool.threads.capacity) {
- pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
- .stack_size = pool.stack_size,
- .allocator = gpa,
- }, worker, .{pool}) catch t: {
- pool.threads.items.len -= 1;
- break :t undefined;
- };
- }
-
- pool.mutex.unlock();
- pool.cond.signal();
-}
-
-test spawn {
- const TestFn = struct {
- fn checkRun(completed: *bool) void {
- completed.* = true;
- }
- };
-
- var completed: bool = false;
-
- {
- var pool: Pool = undefined;
- try pool.init(.{
- .allocator = std.testing.allocator,
- });
- defer pool.deinit();
- pool.spawn(TestFn.checkRun, .{&completed});
- }
-
- try std.testing.expectEqual(true, completed);
-}
-
fn worker(pool: *Pool) void {
pool.mutex.lock();
defer pool.mutex.unlock();
- const id: ?usize = if (pool.ids.count() > 0) @intCast(pool.ids.count()) else null;
- if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
-
while (true) {
while (pool.run_queue.popFirst()) |run_node| {
- // Temporarily unlock the mutex in order to execute the run_node
- pool.mutex.unlock();
- defer pool.mutex.lock();
-
- const runnable: *Runnable = @fieldParentPtr("node", run_node);
- runnable.runFn(runnable, id);
- }
-
- // Stop executing instead of waiting if the thread pool is no longer running.
- if (pool.is_running) {
- pool.cond.wait(&pool.mutex);
- } else {
- break;
- }
- }
-}
-
-pub fn waitAndWork(pool: *Pool, wait_group: *WaitGroup) void {
- var id: ?usize = null;
-
- while (!wait_group.isDone()) {
- pool.mutex.lock();
- if (pool.run_queue.popFirst()) |run_node| {
- id = id orelse pool.ids.getIndex(std.Thread.getCurrentId());
pool.mutex.unlock();
const runnable: *Runnable = @fieldParentPtr("node", run_node);
- runnable.runFn(runnable, id);
- continue;
+ runnable.start(runnable);
+ pool.mutex.lock();
+ if (runnable.is_parallel) {
+ // TODO also pop thread and join sometimes
+ pool.parallel_count -= 1;
+ }
}
-
- pool.mutex.unlock();
- wait_group.wait();
- return;
+ if (pool.join_requested) break;
+ pool.cond.wait(&pool.mutex);
}
}
-pub fn getIdCount(pool: *Pool) usize {
- return @intCast(1 + pool.threads.items.len);
-}
-
pub fn io(pool: *Pool) Io {
return .{
.userdata = pool,
.vtable = &.{
.async = async,
+ .asyncConcurrent = asyncParallel,
+ .asyncParallel = asyncParallel,
.await = await,
.asyncDetached = asyncDetached,
.cancel = cancel,
@@ -357,7 +114,7 @@ pub fn io(pool: *Pool) Io {
const AsyncClosure = struct {
func: *const fn (context: *anyopaque, result: *anyopaque) void,
- runnable: Runnable = .{ .runFn = runFn },
+ runnable: Runnable,
reset_event: std.Thread.ResetEvent,
select_condition: ?*std.Thread.ResetEvent,
cancel_tid: std.Thread.Id,
@@ -375,7 +132,7 @@ const AsyncClosure = struct {
else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
};
- fn runFn(runnable: *Pool.Runnable, _: ?usize) void {
+ fn start(runnable: *Runnable) void {
const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable));
const tid = std.Thread.getCurrentId();
if (@cmpxchgStrong(
@@ -387,6 +144,7 @@ const AsyncClosure = struct {
.acquire,
)) |cancel_tid| {
assert(cancel_tid == canceling_tid);
+ closure.reset_event.set();
return;
}
current_closure = closure;
@@ -438,9 +196,13 @@ const AsyncClosure = struct {
fn waitAndFree(closure: *AsyncClosure, gpa: Allocator, result: []u8) void {
closure.reset_event.wait();
- const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure);
@memcpy(result, closure.resultPointer()[0..result.len]);
- gpa.free(base[0 .. closure.result_offset + result.len]);
+ free(closure, gpa, result.len);
+ }
+
+ fn free(closure: *AsyncClosure, gpa: Allocator, result_len: usize) void {
+ const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure);
+ gpa.free(base[0 .. closure.result_offset + result_len]);
}
};
@@ -452,18 +214,26 @@ fn async(
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*Io.AnyFuture {
+ if (builtin.single_threaded) {
+ start(context.ptr, result.ptr);
+ return null;
+ }
const pool: *Pool = @alignCast(@ptrCast(userdata));
- pool.mutex.lock();
-
+ const cpu_count = pool.cpu_count catch {
+ return asyncParallel(userdata, result.len, result_alignment, context, context_alignment, start) orelse {
+ start(context.ptr, result.ptr);
+ return null;
+ };
+ };
const gpa = pool.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len;
const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
- pool.mutex.unlock();
start(context.ptr, result.ptr);
return null;
}));
+
closure.* = .{
.func = start,
.context_offset = context_offset,
@@ -471,37 +241,124 @@ fn async(
.reset_event = .{},
.cancel_tid = 0,
.select_condition = null,
+ .runnable = .{
+ .start = AsyncClosure.start,
+ .is_parallel = false,
+ },
};
+
@memcpy(closure.contextPointer()[0..context.len], context);
+
+ pool.mutex.lock();
+
+ const thread_capacity = cpu_count - 1 + pool.parallel_count;
+
+ pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
+ pool.mutex.unlock();
+ closure.free(gpa, result.len);
+ start(context.ptr, result.ptr);
+ return null;
+ };
+
pool.run_queue.prepend(&closure.runnable.node);
- if (pool.threads.items.len < pool.threads.capacity) {
- pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
- .stack_size = pool.stack_size,
- .allocator = gpa,
- }, worker, .{pool}) catch t: {
- pool.threads.items.len -= 1;
- break :t undefined;
+ if (pool.threads.items.len < thread_capacity) {
+ const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
+ if (pool.threads.items.len == 0) {
+ assert(pool.run_queue.popFirst() == &closure.runnable.node);
+ pool.mutex.unlock();
+ closure.free(gpa, result.len);
+ start(context.ptr, result.ptr);
+ return null;
+ }
+ // Rely on other workers to do it.
+ pool.mutex.unlock();
+ pool.cond.signal();
+ return @ptrCast(closure);
};
+ pool.threads.appendAssumeCapacity(thread);
}
pool.mutex.unlock();
pool.cond.signal();
+ return @ptrCast(closure);
+}
+
+fn asyncParallel(
+ userdata: ?*anyopaque,
+ result_len: usize,
+ result_alignment: std.mem.Alignment,
+ context: []const u8,
+ context_alignment: std.mem.Alignment,
+ start: *const fn (context: *const anyopaque, result: *anyopaque) void,
+) ?*Io.AnyFuture {
+ if (builtin.single_threaded) return null;
+
+ const pool: *Pool = @alignCast(@ptrCast(userdata));
+ const cpu_count = pool.cpu_count catch 1;
+ const gpa = pool.allocator;
+ const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
+ const result_offset = result_alignment.forward(context_offset + context.len);
+ const n = result_offset + result_len;
+ const closure: *AsyncClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch return null));
+
+ closure.* = .{
+ .func = start,
+ .context_offset = context_offset,
+ .result_offset = result_offset,
+ .reset_event = .{},
+ .cancel_tid = 0,
+ .select_condition = null,
+ .runnable = .{
+ .start = AsyncClosure.start,
+ .is_parallel = true,
+ },
+ };
+ @memcpy(closure.contextPointer()[0..context.len], context);
+
+ pool.mutex.lock();
+
+ pool.parallel_count += 1;
+ const thread_capacity = cpu_count - 1 + pool.parallel_count;
+
+ pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
+ pool.mutex.unlock();
+ closure.free(gpa, result_len);
+ return null;
+ };
+
+ pool.run_queue.prepend(&closure.runnable.node);
+
+ if (pool.threads.items.len < thread_capacity) {
+ const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
+ assert(pool.run_queue.popFirst() == &closure.runnable.node);
+ pool.mutex.unlock();
+ closure.free(gpa, result_len);
+ return null;
+ };
+ pool.threads.appendAssumeCapacity(thread);
+ }
+ pool.mutex.unlock();
+ pool.cond.signal();
return @ptrCast(closure);
}
const DetachedClosure = struct {
pool: *Pool,
func: *const fn (context: *anyopaque) void,
- runnable: Runnable = .{ .runFn = runFn },
+ runnable: Runnable,
context_alignment: std.mem.Alignment,
context_len: usize,
- fn runFn(runnable: *Pool.Runnable, _: ?usize) void {
+ fn start(runnable: *Runnable) void {
const closure: *DetachedClosure = @alignCast(@fieldParentPtr("runnable", runnable));
closure.func(closure.contextPointer());
const gpa = closure.pool.allocator;
+ free(closure, gpa);
+ }
+
+ fn free(closure: *DetachedClosure, gpa: Allocator) void {
const base: [*]align(@alignOf(DetachedClosure)) u8 = @ptrCast(closure);
gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]);
}
@@ -526,33 +383,46 @@ fn asyncDetached(
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque) void,
) void {
+ if (builtin.single_threaded) return start(context.ptr);
const pool: *Pool = @alignCast(@ptrCast(userdata));
- pool.mutex.lock();
-
+ const cpu_count = pool.cpu_count catch 1;
const gpa = pool.allocator;
const n = DetachedClosure.contextEnd(context_alignment, context.len);
const closure: *DetachedClosure = @alignCast(@ptrCast(gpa.alignedAlloc(u8, .of(DetachedClosure), n) catch {
- pool.mutex.unlock();
- start(context.ptr);
- return;
+ return start(context.ptr);
}));
closure.* = .{
.pool = pool,
.func = start,
.context_alignment = context_alignment,
.context_len = context.len,
+ .runnable = .{
+ .start = DetachedClosure.start,
+ .is_parallel = false,
+ },
};
@memcpy(closure.contextPointer()[0..context.len], context);
+
+ pool.mutex.lock();
+
+ const thread_capacity = cpu_count - 1 + pool.parallel_count;
+
+ pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
+ pool.mutex.unlock();
+ closure.free(gpa);
+ return start(context.ptr);
+ };
+
pool.run_queue.prepend(&closure.runnable.node);
- if (pool.threads.items.len < pool.threads.capacity) {
- pool.threads.addOneAssumeCapacity().* = std.Thread.spawn(.{
- .stack_size = pool.stack_size,
- .allocator = gpa,
- }, worker, .{pool}) catch t: {
- pool.threads.items.len -= 1;
- break :t undefined;
+ if (pool.threads.items.len < thread_capacity) {
+ const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
+ assert(pool.run_queue.popFirst() == &closure.runnable.node);
+ pool.mutex.unlock();
+ closure.free(gpa);
+ return start(context.ptr);
};
+ pool.threads.appendAssumeCapacity(thread);
}
pool.mutex.unlock();
lib/std/Io.zig
@@ -581,6 +581,32 @@ pub const VTable = struct {
context_alignment: std.mem.Alignment,
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
) ?*AnyFuture,
+ /// Returning `null` indicates resource allocation failed.
+ ///
+ /// Thread-safe.
+ asyncConcurrent: *const fn (
+ /// Corresponds to `Io.userdata`.
+ userdata: ?*anyopaque,
+ result_len: usize,
+ result_alignment: std.mem.Alignment,
+ /// Copied and then passed to `start`.
+ context: []const u8,
+ context_alignment: std.mem.Alignment,
+ start: *const fn (context: *const anyopaque, result: *anyopaque) void,
+ ) ?*AnyFuture,
+ /// Returning `null` indicates resource allocation failed.
+ ///
+ /// Thread-safe.
+ asyncParallel: *const fn (
+ /// Corresponds to `Io.userdata`.
+ userdata: ?*anyopaque,
+ result_len: usize,
+ result_alignment: std.mem.Alignment,
+ /// Copied and then passed to `start`.
+ context: []const u8,
+ context_alignment: std.mem.Alignment,
+ start: *const fn (context: *const anyopaque, result: *anyopaque) void,
+ ) ?*AnyFuture,
/// Executes `start` asynchronously in a manner such that it cleans itself
/// up. This mode does not support results, await, or cancel.
///
@@ -1138,7 +1164,18 @@ pub fn Queue(Elem: type) type {
/// Calls `function` with `args`, such that the return value of the function is
/// not guaranteed to be available until `await` is called.
-pub fn async(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
+///
+/// `function` *may* be called immediately, before `async` returns. This has
+/// weaker guarantees than `asyncConcurrent` and `asyncParallel`, making it the
+/// most portable and reusable among the async family functions.
+///
+/// See also:
+/// * `asyncDetached`
+pub fn async(
+ io: Io,
+ function: anytype,
+ args: std.meta.ArgsTuple(@TypeOf(function)),
+) Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
const Args = @TypeOf(args);
const TypeErased = struct {
@@ -1160,8 +1197,86 @@ pub fn async(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(functio
return future;
}
+/// Calls `function` with `args`, such that the return value of the function is
+/// not guaranteed to be available until `await` is called, passing control
+/// flow back to the caller while waiting for any `Io` operations.
+///
+/// This has a weaker guarantee than `asyncParallel`, making it more portable
+/// and reusable, however it has stronger guarantee than `async`, placing
+/// restrictions on what kind of `Io` implementations are supported. By calling
+/// `async` instead, one allows, for example, stackful single-threaded blocking I/O.
+pub fn asyncConcurrent(
+ io: Io,
+ function: anytype,
+ args: std.meta.ArgsTuple(@TypeOf(function)),
+) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
+ const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
+ const Args = @TypeOf(args);
+ const TypeErased = struct {
+ fn start(context: *const anyopaque, result: *anyopaque) void {
+ const args_casted: *const Args = @alignCast(@ptrCast(context));
+ const result_casted: *Result = @ptrCast(@alignCast(result));
+ result_casted.* = @call(.auto, function, args_casted.*);
+ }
+ };
+ var future: Future(Result) = undefined;
+ future.any_future = io.vtable.asyncConcurrent(
+ io.userdata,
+ @sizeOf(Result),
+ .of(Result),
+ @ptrCast((&args)[0..1]),
+ .of(Args),
+ TypeErased.start,
+ );
+ return future;
+}
+
+/// Calls `function` with `args`, such that the return value of the function is
+/// not guaranteed to be available until `await` is called, while simultaneously
+/// passing control flow back to the caller.
+///
+/// This has the strongest guarantees of all async family functions, placing
+/// the most restrictions on what kind of `Io` implementations are supported.
+/// By calling `asyncConcurrent` instead, one allows, for example,
+/// stackful single-threaded non-blocking I/O.
+///
+/// See also:
+/// * `asyncConcurrent`
+/// * `async`
+pub fn asyncParallel(
+ io: Io,
+ function: anytype,
+ args: std.meta.ArgsTuple(@TypeOf(function)),
+) error{OutOfMemory}!Future(@typeInfo(@TypeOf(function)).@"fn".return_type.?) {
+ const Result = @typeInfo(@TypeOf(function)).@"fn".return_type.?;
+ const Args = @TypeOf(args);
+ const TypeErased = struct {
+ fn start(context: *const anyopaque, result: *anyopaque) void {
+ const args_casted: *const Args = @alignCast(@ptrCast(context));
+ const result_casted: *Result = @ptrCast(@alignCast(result));
+ result_casted.* = @call(.auto, function, args_casted.*);
+ }
+ };
+ var future: Future(Result) = undefined;
+ future.any_future = io.vtable.asyncConcurrent(
+ io.userdata,
+ @ptrCast((&future.result)[0..1]),
+ .of(Result),
+ @ptrCast((&args)[0..1]),
+ .of(Args),
+ TypeErased.start,
+ );
+ return future;
+}
+
/// Calls `function` with `args` asynchronously. The resource cleans itself up
/// when the function returns. Does not support await, cancel, or a return value.
+///
+/// `function` *may* be called immediately, before `async` returns.
+///
+/// See also:
+/// * `async`
+/// * `asyncConcurrent`
pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
@@ -1173,6 +1288,10 @@ pub fn asyncDetached(io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf
io.vtable.asyncDetached(io.userdata, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
}
+pub fn cancelRequested(io: Io) bool {
+ return io.vtable.cancelRequested(io.userdata);
+}
+
pub fn now(io: Io, clockid: std.posix.clockid_t) ClockGetTimeError!Timestamp {
return io.vtable.now(io.userdata, clockid);
}
lib/std/Thread.zig
@@ -385,6 +385,8 @@ pub const CpuCountError = error{
};
/// Returns the platforms view on the number of logical CPU cores available.
+///
+/// Returned value guaranteed to be >= 1.
pub fn getCpuCount() CpuCountError!usize {
return try Impl.getCpuCount();
}