Commit 2e1ab5d3f7
Changed files (2)
lib
std
lib/std/Io/Threaded.zig
@@ -10,6 +10,7 @@ const Allocator = std.mem.Allocator;
const assert = std.debug.assert;
const posix = std.posix;
const Io = std.Io;
+const ResetEvent = std.Thread.ResetEvent;
/// Thread-safe.
allocator: Allocator,
@@ -20,9 +21,9 @@ join_requested: bool = false,
threads: std.ArrayListUnmanaged(std.Thread),
stack_size: usize,
cpu_count: std.Thread.CpuCountError!usize,
-parallel_count: usize,
+concurrent_count: usize,
-threadlocal var current_closure: ?*AsyncClosure = null;
+threadlocal var current_closure: ?*Closure = null;
const max_iovecs_len = 8;
const splat_buffer_size = 64;
@@ -31,12 +32,33 @@ comptime {
assert(max_iovecs_len <= posix.IOV_MAX);
}
-pub const Runnable = struct {
+const Closure = struct {
start: Start,
node: std.SinglyLinkedList.Node = .{},
- is_parallel: bool,
+ cancel_tid: std.Thread.Id,
+ /// Whether this task bumps minimum number of threads in the pool.
+ is_concurrent: bool,
+
+ const Start = *const fn (*Closure) void;
+
+ const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) {
+ .int => |int_info| switch (int_info.signedness) {
+ .signed => -1,
+ .unsigned => std.math.maxInt(std.Thread.Id),
+ },
+ .pointer => @ptrFromInt(std.math.maxInt(usize)),
+ else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
+ };
- pub const Start = *const fn (*Runnable) void;
+ fn requestCancel(closure: *Closure) void {
+ switch (@atomicRmw(std.Thread.Id, &closure.cancel_tid, .Xchg, canceling_tid, .acq_rel)) {
+ 0, canceling_tid => {},
+ else => |tid| switch (builtin.os.tag) {
+ .linux => _ = std.os.linux.tgkill(std.os.linux.getpid(), @bitCast(tid), posix.SIG.IO),
+ else => {},
+ },
+ }
+ }
};
pub const InitError = std.Thread.CpuCountError || Allocator.Error;
@@ -47,7 +69,7 @@ pub fn init(gpa: Allocator) Pool {
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = std.Thread.getCpuCount(),
- .parallel_count = 0,
+ .concurrent_count = 0,
};
if (pool.cpu_count) |n| {
pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
@@ -78,14 +100,15 @@ fn worker(pool: *Pool) void {
defer pool.mutex.unlock();
while (true) {
- while (pool.run_queue.popFirst()) |run_node| {
+ while (pool.run_queue.popFirst()) |closure_node| {
pool.mutex.unlock();
- const runnable: *Runnable = @fieldParentPtr("node", run_node);
- runnable.start(runnable);
+ const closure: *Closure = @fieldParentPtr("node", closure_node);
+ const is_concurrent = closure.is_concurrent;
+ closure.start(closure);
pool.mutex.lock();
- if (runnable.is_parallel) {
+ if (is_concurrent) {
// TODO also pop thread and join sometimes
- pool.parallel_count -= 1;
+ pool.concurrent_count -= 1;
}
}
if (pool.join_requested) break;
@@ -154,97 +177,71 @@ pub fn io(pool: *Pool) Io {
};
}
+/// Trailing data:
+/// 1. context
+/// 2. result
const AsyncClosure = struct {
+ closure: Closure,
func: *const fn (context: *anyopaque, result: *anyopaque) void,
- runnable: Runnable,
- reset_event: std.Thread.ResetEvent,
- select_condition: ?*std.Thread.ResetEvent,
- cancel_tid: std.Thread.Id,
- context_offset: usize,
+ reset_event: ResetEvent,
+ select_condition: ?*ResetEvent,
+ context_alignment: std.mem.Alignment,
result_offset: usize,
+ /// Whether the task has a return type with nonzero bits.
+ has_result: bool,
- const done_reset_event: *std.Thread.ResetEvent = @ptrFromInt(@alignOf(std.Thread.ResetEvent));
+ const done_reset_event: *ResetEvent = @ptrFromInt(@alignOf(ResetEvent));
- const canceling_tid: std.Thread.Id = switch (@typeInfo(std.Thread.Id)) {
- .int => |int_info| switch (int_info.signedness) {
- .signed => -1,
- .unsigned => std.math.maxInt(std.Thread.Id),
- },
- .pointer => @ptrFromInt(std.math.maxInt(usize)),
- else => @compileError("unsupported std.Thread.Id: " ++ @typeName(std.Thread.Id)),
- };
-
- fn start(runnable: *Runnable) void {
- const closure: *AsyncClosure = @alignCast(@fieldParentPtr("runnable", runnable));
+ fn start(closure: *Closure) void {
+ const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure));
const tid = std.Thread.getCurrentId();
- if (@cmpxchgStrong(
- std.Thread.Id,
- &closure.cancel_tid,
- 0,
- tid,
- .acq_rel,
- .acquire,
- )) |cancel_tid| {
- assert(cancel_tid == canceling_tid);
- closure.reset_event.set();
- return;
+ if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
+ assert(cancel_tid == Closure.canceling_tid);
+ // Even though we already know the task is canceled, we must still
+ // run the closure in order to make the return value valid - that
+ // is, unless the result is zero bytes!
+ if (!ac.has_result) {
+ ac.reset_event.set();
+ return;
+ }
}
current_closure = closure;
- closure.func(closure.contextPointer(), closure.resultPointer());
+ ac.func(ac.contextPointer(), ac.resultPointer());
current_closure = null;
- if (@cmpxchgStrong(
- std.Thread.Id,
- &closure.cancel_tid,
- tid,
- 0,
- .acq_rel,
- .acquire,
- )) |cancel_tid| assert(cancel_tid == canceling_tid);
-
- if (@atomicRmw(
- ?*std.Thread.ResetEvent,
- &closure.select_condition,
- .Xchg,
- done_reset_event,
- .release,
- )) |select_reset| {
+
+ // In case a cancel happens after successful task completion, prevents
+ // signal from being delivered to the thread in `requestCancel`.
+ if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
+ assert(cancel_tid == Closure.canceling_tid);
+ }
+
+ if (@atomicRmw(?*ResetEvent, &ac.select_condition, .Xchg, done_reset_event, .release)) |select_reset| {
assert(select_reset != done_reset_event);
select_reset.set();
}
- closure.reset_event.set();
- }
-
- fn contextOffset(context_alignment: std.mem.Alignment) usize {
- return context_alignment.forward(@sizeOf(AsyncClosure));
- }
-
- fn resultOffset(
- context_alignment: std.mem.Alignment,
- context_len: usize,
- result_alignment: std.mem.Alignment,
- ) usize {
- return result_alignment.forward(contextOffset(context_alignment) + context_len);
+ ac.reset_event.set();
}
- fn resultPointer(closure: *AsyncClosure) [*]u8 {
- const base: [*]u8 = @ptrCast(closure);
- return base + closure.result_offset;
+ fn resultPointer(ac: *AsyncClosure) [*]u8 {
+ const base: [*]u8 = @ptrCast(ac);
+ return base + ac.result_offset;
}
- fn contextPointer(closure: *AsyncClosure) [*]u8 {
- const base: [*]u8 = @ptrCast(closure);
- return base + closure.context_offset;
+ fn contextPointer(ac: *AsyncClosure) [*]u8 {
+ const base: [*]u8 = @ptrCast(ac);
+ return base + ac.context_alignment.forward(@sizeOf(AsyncClosure));
}
- fn waitAndFree(closure: *AsyncClosure, gpa: Allocator, result: []u8) void {
- closure.reset_event.wait();
- @memcpy(result, closure.resultPointer()[0..result.len]);
- free(closure, gpa, result.len);
+ fn waitAndFree(ac: *AsyncClosure, gpa: Allocator, result: []u8) void {
+ ac.reset_event.wait();
+ @memcpy(result, ac.resultPointer()[0..result.len]);
+ free(ac, gpa, result.len);
}
- fn free(closure: *AsyncClosure, gpa: Allocator, result_len: usize) void {
- const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(closure);
- gpa.free(base[0 .. closure.result_offset + result_len]);
+ fn free(ac: *AsyncClosure, gpa: Allocator, result_len: usize) void {
+ if (!ac.has_result) assert(result_len == 0);
+ const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac);
+ gpa.free(base[0 .. ac.result_offset + result_len]);
}
};
@@ -271,59 +268,60 @@ fn async(
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len;
- const closure: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
+ const ac: *AsyncClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(AsyncClosure), n) catch {
start(context.ptr, result.ptr);
return null;
}));
- closure.* = .{
+ ac.* = .{
+ .closure = .{
+ .cancel_tid = 0,
+ .start = AsyncClosure.start,
+ .is_concurrent = false,
+ },
.func = start,
- .context_offset = context_offset,
+ .context_alignment = context_alignment,
.result_offset = result_offset,
+ .has_result = result.len != 0,
.reset_event = .unset,
- .cancel_tid = 0,
.select_condition = null,
- .runnable = .{
- .start = AsyncClosure.start,
- .is_parallel = false,
- },
};
- @memcpy(closure.contextPointer()[0..context.len], context);
+ @memcpy(ac.contextPointer()[0..context.len], context);
pool.mutex.lock();
- const thread_capacity = cpu_count - 1 + pool.parallel_count;
+ const thread_capacity = cpu_count - 1 + pool.concurrent_count;
pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
pool.mutex.unlock();
- closure.free(gpa, result.len);
+ ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
};
- pool.run_queue.prepend(&closure.runnable.node);
+ pool.run_queue.prepend(&ac.closure.node);
if (pool.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
if (pool.threads.items.len == 0) {
- assert(pool.run_queue.popFirst() == &closure.runnable.node);
+ assert(pool.run_queue.popFirst() == &ac.closure.node);
pool.mutex.unlock();
- closure.free(gpa, result.len);
+ ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
}
// Rely on other workers to do it.
pool.mutex.unlock();
pool.cond.signal();
- return @ptrCast(closure);
+ return @ptrCast(ac);
};
pool.threads.appendAssumeCapacity(thread);
}
pool.mutex.unlock();
pool.cond.signal();
- return @ptrCast(closure);
+ return @ptrCast(ac);
}
fn concurrent(
@@ -342,40 +340,41 @@ fn concurrent(
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len;
- const closure: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n)));
+ const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), n)));
- closure.* = .{
+ ac.* = .{
+ .closure = .{
+ .cancel_tid = 0,
+ .start = AsyncClosure.start,
+ .is_concurrent = true,
+ },
.func = start,
- .context_offset = context_offset,
+ .context_alignment = context_alignment,
.result_offset = result_offset,
+ .has_result = result_len != 0,
.reset_event = .unset,
- .cancel_tid = 0,
.select_condition = null,
- .runnable = .{
- .start = AsyncClosure.start,
- .is_parallel = true,
- },
};
- @memcpy(closure.contextPointer()[0..context.len], context);
+ @memcpy(ac.contextPointer()[0..context.len], context);
pool.mutex.lock();
- pool.parallel_count += 1;
- const thread_capacity = cpu_count - 1 + pool.parallel_count;
+ pool.concurrent_count += 1;
+ const thread_capacity = cpu_count - 1 + pool.concurrent_count;
pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
pool.mutex.unlock();
- closure.free(gpa, result_len);
+ ac.free(gpa, result_len);
return error.OutOfMemory;
};
- pool.run_queue.prepend(&closure.runnable.node);
+ pool.run_queue.prepend(&ac.closure.node);
if (pool.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
- assert(pool.run_queue.popFirst() == &closure.runnable.node);
+ assert(pool.run_queue.popFirst() == &ac.closure.node);
pool.mutex.unlock();
- closure.free(gpa, result_len);
+ ac.free(gpa, result_len);
return error.OutOfMemory;
};
pool.threads.appendAssumeCapacity(thread);
@@ -383,31 +382,48 @@ fn concurrent(
pool.mutex.unlock();
pool.cond.signal();
- return @ptrCast(closure);
+ return @ptrCast(ac);
}
const GroupClosure = struct {
+ closure: Closure,
pool: *Pool,
group: *Io.Group,
+ /// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
+ node: std.SinglyLinkedList.Node,
func: *const fn (context: *anyopaque) void,
- runnable: Runnable,
context_alignment: std.mem.Alignment,
context_len: usize,
- fn start(runnable: *Runnable) void {
- const closure: *GroupClosure = @alignCast(@fieldParentPtr("runnable", runnable));
- closure.func(closure.contextPointer());
- const group = closure.group;
- const gpa = closure.pool.allocator;
- free(closure, gpa);
+ fn start(closure: *Closure) void {
+ const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure));
+ const tid = std.Thread.getCurrentId();
+ const group = gc.group;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
- const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context);
+ const reset_event: *ResetEvent = @ptrCast(&group.context);
+ if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, 0, tid, .acq_rel, .acquire)) |cancel_tid| {
+ assert(cancel_tid == Closure.canceling_tid);
+ // We already know the task is canceled before running the callback. Since all closures
+ // in a Group have void return type, we can return early.
+ std.Thread.WaitGroup.finishStateless(group_state, reset_event);
+ return;
+ }
+ current_closure = closure;
+ gc.func(gc.contextPointer());
+ current_closure = null;
+
+ // In case a cancel happens after successful task completion, prevents
+ // signal from being delivered to the thread in `requestCancel`.
+ if (@cmpxchgStrong(std.Thread.Id, &closure.cancel_tid, tid, 0, .acq_rel, .acquire)) |cancel_tid| {
+ assert(cancel_tid == Closure.canceling_tid);
+ }
+
std.Thread.WaitGroup.finishStateless(group_state, reset_event);
}
- fn free(closure: *GroupClosure, gpa: Allocator) void {
- const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(closure);
- gpa.free(base[0..contextEnd(closure.context_alignment, closure.context_len)]);
+ fn free(gc: *GroupClosure, gpa: Allocator) void {
+ const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc);
+ gpa.free(base[0..contextEnd(gc.context_alignment, gc.context_len)]);
}
fn contextOffset(context_alignment: std.mem.Alignment) usize {
@@ -418,9 +434,9 @@ const GroupClosure = struct {
return contextOffset(context_alignment) + context_len;
}
- fn contextPointer(closure: *GroupClosure) [*]u8 {
- const base: [*]u8 = @ptrCast(closure);
- return base + contextOffset(closure.context_alignment);
+ fn contextPointer(gc: *GroupClosure) [*]u8 {
+ const base: [*]u8 = @ptrCast(gc);
+ return base + contextOffset(gc.context_alignment);
}
};
@@ -436,39 +452,42 @@ fn groupAsync(
const cpu_count = pool.cpu_count catch 1;
const gpa = pool.allocator;
const n = GroupClosure.contextEnd(context_alignment, context.len);
- const closure: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
+ const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
return start(context.ptr);
}));
- closure.* = .{
+ gc.* = .{
+ .closure = .{
+ .cancel_tid = 0,
+ .start = GroupClosure.start,
+ .is_concurrent = false,
+ },
.pool = pool,
.group = group,
+ .node = .{ .next = @ptrCast(@alignCast(group.token)) },
.func = start,
.context_alignment = context_alignment,
.context_len = context.len,
- .runnable = .{
- .start = GroupClosure.start,
- .is_parallel = false,
- },
};
- @memcpy(closure.contextPointer()[0..context.len], context);
+ group.token = &gc.node;
+ @memcpy(gc.contextPointer()[0..context.len], context);
pool.mutex.lock();
- const thread_capacity = cpu_count - 1 + pool.parallel_count;
+ const thread_capacity = cpu_count - 1 + pool.concurrent_count;
pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
pool.mutex.unlock();
- closure.free(gpa);
+ gc.free(gpa);
return start(context.ptr);
};
- pool.run_queue.prepend(&closure.runnable.node);
+ pool.run_queue.prepend(&gc.closure.node);
if (pool.threads.items.len < thread_capacity) {
const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
- assert(pool.run_queue.popFirst() == &closure.runnable.node);
+ assert(pool.run_queue.popFirst() == &gc.closure.node);
pool.mutex.unlock();
- closure.free(gpa);
+ gc.free(gpa);
return start(context.ptr);
};
pool.threads.appendAssumeCapacity(thread);
@@ -486,7 +505,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group) void {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
- const reset_event: *std.Thread.ResetEvent = @ptrCast(&group.context);
+ const reset_event: *ResetEvent = @ptrCast(&group.context);
std.Thread.WaitGroup.waitStateless(group_state, reset_event);
}
@@ -494,8 +513,14 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group) void {
if (builtin.single_threaded) return;
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
- _ = group;
- @panic("TODO threaded group cancel");
+ const token = group.token.?;
+ group.token = null;
+ var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
+ while (true) {
+ const gc: *GroupClosure = @fieldParentPtr("node", node);
+ gc.closure.requestCancel();
+ node = node.next orelse break;
+ }
}
fn await(
@@ -518,32 +543,16 @@ fn cancel(
) void {
_ = result_alignment;
const pool: *Pool = @ptrCast(@alignCast(userdata));
- const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
- switch (@atomicRmw(
- std.Thread.Id,
- &closure.cancel_tid,
- .Xchg,
- AsyncClosure.canceling_tid,
- .acq_rel,
- )) {
- 0, AsyncClosure.canceling_tid => {},
- else => |cancel_tid| switch (builtin.os.tag) {
- .linux => _ = std.os.linux.tgkill(
- std.os.linux.getpid(),
- @bitCast(cancel_tid),
- posix.SIG.IO,
- ),
- else => {},
- },
- }
- closure.waitAndFree(pool.allocator, result);
+ const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
+ ac.closure.requestCancel();
+ ac.waitAndFree(pool.allocator, result);
}
fn cancelRequested(userdata: ?*anyopaque) bool {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
const closure = current_closure orelse return false;
- return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == AsyncClosure.canceling_tid;
+ return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid;
}
fn checkCancel(pool: *Pool) error{Canceled}!void {
@@ -996,14 +1005,14 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
const pool: *Pool = @ptrCast(@alignCast(userdata));
_ = pool;
- var reset_event: std.Thread.ResetEvent = .unset;
+ var reset_event: ResetEvent = .unset;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
- if (@atomicRmw(?*std.Thread.ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) {
+ if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, &reset_event, .seq_cst) == AsyncClosure.done_reset_event) {
for (futures[0..i]) |cleanup_future| {
const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future));
- if (@atomicRmw(?*std.Thread.ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
+ if (@atomicRmw(?*ResetEvent, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
cleanup_closure.reset_event.wait(); // Ensure no reference to our stack-allocated reset_event.
}
}
@@ -1016,7 +1025,7 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
var result: ?usize = null;
for (futures, 0..) |future, i| {
const closure: *AsyncClosure = @ptrCast(@alignCast(future));
- if (@atomicRmw(?*std.Thread.ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
+ if (@atomicRmw(?*ResetEvent, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_reset_event) {
closure.reset_event.wait(); // Ensure no reference to our stack-allocated reset_event.
if (result == null) result = i; // In case multiple are ready, return first.
}
lib/std/Io.zig
@@ -736,10 +736,9 @@ pub fn Future(Result: type) type {
any_future: ?*AnyFuture,
result: Result,
- /// Equivalent to `await` but sets a flag observable to application
- /// code that cancellation has been requested.
+ /// Equivalent to `await` but places a cancellation request.
///
- /// Idempotent.
+ /// Idempotent. Not threadsafe.
pub fn cancel(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
io.vtable.cancel(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
@@ -747,6 +746,7 @@ pub fn Future(Result: type) type {
return f.result;
}
+ /// Idempotent. Not threadsafe.
pub fn await(f: *@This(), io: Io) Result {
const any_future = f.any_future orelse return f.result;
io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]), .of(Result));
@@ -759,8 +759,9 @@ pub fn Future(Result: type) type {
pub const Group = struct {
state: usize,
context: ?*anyopaque,
+ token: ?*anyopaque,
- pub const init: Group = .{ .state = 0, .context = null };
+ pub const init: Group = .{ .state = 0, .context = null, .token = null };
/// Calls `function` with `args` asynchronously. The resource spawned is
/// owned by the group.
@@ -771,7 +772,7 @@ pub const Group = struct {
/// deinitialized.
///
/// See also:
- /// * `async`
+ /// * `Io.async`
/// * `concurrent`
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
@@ -784,14 +785,21 @@ pub const Group = struct {
io.vtable.groupAsync(io.userdata, g, @ptrCast((&args)[0..1]), .of(Args), TypeErased.start);
}
- /// Idempotent.
+ /// Blocks until all tasks of the group finish.
+ ///
+ /// Idempotent. Not threadsafe.
pub fn wait(g: *Group, io: Io) void {
io.vtable.groupWait(io.userdata, g);
}
- /// Idempotent.
+ /// Equivalent to `wait` but requests cancellation on all tasks owned by
+ /// the group.
+ ///
+ /// Idempotent. Not threadsafe.
pub fn cancel(g: *Group, io: Io) void {
+ if (g.token == null) return;
io.vtable.groupCancel(io.userdata, g);
+ assert(g.token == null);
}
};