Commit 21b7316772
lib/std/Thread/Pool.zig
@@ -7,6 +7,7 @@ mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
run_queue: std.SinglyLinkedList = .{},
is_running: bool = true,
+/// Must be a thread-safe allocator.
allocator: std.mem.Allocator,
threads: if (builtin.single_threaded) [0]std.Thread else []std.Thread,
ids: if (builtin.single_threaded) struct {
@@ -16,12 +17,12 @@ ids: if (builtin.single_threaded) struct {
}
} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void),
-const Runnable = struct {
+pub const Runnable = struct {
runFn: RunProto,
node: std.SinglyLinkedList.Node = .{},
};
-const RunProto = *const fn (*Runnable, id: ?usize) void;
+pub const RunProto = *const fn (*Runnable, id: ?usize) void;
pub const Options = struct {
allocator: std.mem.Allocator,
@@ -117,12 +118,6 @@ pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
closure.wait_group.finish();
-
- // The thread pool's allocator is protected by the mutex.
- const mutex = &closure.pool.mutex;
- mutex.lock();
- defer mutex.unlock();
-
closure.pool.allocator.destroy(closure);
}
};
@@ -179,12 +174,6 @@ pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, ar
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, .{id.?} ++ closure.arguments);
closure.wait_group.finish();
-
- // The thread pool's allocator is protected by the mutex.
- const mutex = &closure.pool.mutex;
- mutex.lock();
- defer mutex.unlock();
-
closure.pool.allocator.destroy(closure);
}
};
@@ -228,12 +217,6 @@ pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) !void {
fn runFn(runnable: *Runnable, _: ?usize) void {
const closure: *@This() = @alignCast(@fieldParentPtr("runnable", runnable));
@call(.auto, func, closure.arguments);
-
- // The thread pool's allocator is protected by the mutex.
- const mutex = &closure.pool.mutex;
- mutex.lock();
- defer mutex.unlock();
-
closure.pool.allocator.destroy(closure);
}
};
lib/std/Io.zig
@@ -553,3 +553,72 @@ test {
_ = tty;
_ = @import("Io/test.zig");
}
+
+const Io = @This();
+
+userdata: ?*anyopaque,
+vtable: *const VTable,
+
+pub const VTable = struct {
+ /// If it returns `null` it means `result` has been already populated and
+ /// `await` will be a no-op.
+ async: *const fn (
+ /// Corresponds to `Io.userdata`.
+ userdata: ?*anyopaque,
+ /// The pointer of this slice is an "eager" result value.
+ /// The length is the size in bytes of the result type.
+ eager_result: []u8,
+ /// Passed to `start`.
+ context: ?*anyopaque,
+ start: *const fn (context: ?*anyopaque, result: *anyopaque) void,
+ ) ?*AnyFuture,
+
+ /// This function is only called when `async` returns a non-null value.
+ await: *const fn (
+ /// Corresponds to `Io.userdata`.
+ userdata: ?*anyopaque,
+ /// The same value that was returned from `async`.
+ any_future: *AnyFuture,
+ /// Points to a buffer where the result is written.
+ /// The length is equal to size in bytes of result type.
+ result: []u8,
+ ) void,
+};
+
+pub const AnyFuture = opaque {};
+
+pub fn Future(Result: type) type {
+ return struct {
+ any_future: ?*AnyFuture,
+ result: Result,
+
+ pub fn await(f: *@This(), io: Io) Result {
+ const any_future = f.any_future orelse return f.result;
+ io.vtable.await(io.userdata, any_future, @ptrCast((&f.result)[0..1]));
+ f.any_future = null;
+ return f.result;
+ }
+ };
+}
+
+/// `s` is a struct instance that contains a function like this:
+/// ```
+/// struct {
+/// pub fn start(s: S) Result { ... }
+/// }
+/// ```
+/// where `Result` is any type.
+pub fn async(io: Io, s: anytype) Future(@typeInfo(@TypeOf(@TypeOf(s).start)).@"fn".return_type.?) {
+ const S = @TypeOf(s);
+ const Result = @typeInfo(@TypeOf(S.start)).@"fn".return_type.?;
+ const TypeErased = struct {
+ fn start(context: ?*anyopaque, result: *anyopaque) void {
+ const context_casted: *const S = @alignCast(@ptrCast(context));
+ const result_casted: *Result = @ptrCast(@alignCast(result));
+ result_casted.* = S.start(context_casted.*);
+ }
+ };
+ var future: Future(Result) = undefined;
+ future.any_future = io.vtable.async(io.userdata, @ptrCast((&future.result)[0..1]), @constCast(&s), TypeErased.start);
+ return future;
+}