Commit 10bfbd7d60
Changed files (2)
lib
std
lib/std/Io/Threaded.zig
@@ -1,4 +1,4 @@
-const Pool = @This();
+const Threaded = @This();
const builtin = @import("builtin");
const native_os = builtin.os.tag;
@@ -76,18 +76,18 @@ pub fn init(
/// If these functions are avoided, then `Allocator.failing` may be passed
/// here.
gpa: Allocator,
-) Pool {
- var pool: Pool = .{
+) Threaded {
+ var t: Threaded = .{
.allocator = gpa,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
.cpu_count = std.Thread.getCpuCount(),
.concurrent_count = 0,
};
- if (pool.cpu_count) |n| {
- pool.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
+ if (t.cpu_count) |n| {
+ t.threads.ensureTotalCapacityPrecise(gpa, n - 1) catch {};
} else |_| {}
- return pool;
+ return t;
}
/// Statically initialize such that any call to the following functions will
@@ -96,7 +96,7 @@ pub fn init(
/// * `Io.VTable.concurrent`
/// * `Io.VTable.groupAsync`
/// When initialized this way, `deinit` is safe, but unnecessary to call.
-pub const init_single_threaded: Pool = .{
+pub const init_single_threaded: Threaded = .{
.allocator = .failing,
.threads = .empty,
.stack_size = std.Thread.SpawnConfig.default_stack_size,
@@ -104,48 +104,48 @@ pub const init_single_threaded: Pool = .{
.concurrent_count = 0,
};
-pub fn deinit(pool: *Pool) void {
- const gpa = pool.allocator;
- pool.join();
- pool.threads.deinit(gpa);
- pool.* = undefined;
+pub fn deinit(t: *Threaded) void {
+ const gpa = t.allocator;
+ t.join();
+ t.threads.deinit(gpa);
+ t.* = undefined;
}
-fn join(pool: *Pool) void {
+fn join(t: *Threaded) void {
if (builtin.single_threaded) return;
{
- pool.mutex.lock();
- defer pool.mutex.unlock();
- pool.join_requested = true;
+ t.mutex.lock();
+ defer t.mutex.unlock();
+ t.join_requested = true;
}
- pool.cond.broadcast();
- for (pool.threads.items) |thread| thread.join();
+ t.cond.broadcast();
+ for (t.threads.items) |thread| thread.join();
}
-fn worker(pool: *Pool) void {
- pool.mutex.lock();
- defer pool.mutex.unlock();
+fn worker(t: *Threaded) void {
+ t.mutex.lock();
+ defer t.mutex.unlock();
while (true) {
- while (pool.run_queue.popFirst()) |closure_node| {
- pool.mutex.unlock();
+ while (t.run_queue.popFirst()) |closure_node| {
+ t.mutex.unlock();
const closure: *Closure = @fieldParentPtr("node", closure_node);
const is_concurrent = closure.is_concurrent;
closure.start(closure);
- pool.mutex.lock();
+ t.mutex.lock();
if (is_concurrent) {
// TODO also pop thread and join sometimes
- pool.concurrent_count -= 1;
+ t.concurrent_count -= 1;
}
}
- if (pool.join_requested) break;
- pool.cond.wait(&pool.mutex);
+ if (t.join_requested) break;
+ t.cond.wait(&t.mutex);
}
}
-pub fn io(pool: *Pool) Io {
+pub fn io(t: *Threaded) Io {
return .{
- .userdata = pool,
+ .userdata = t,
.vtable = &.{
.async = async,
.concurrent = concurrent,
@@ -324,14 +324,14 @@ fn async(
start(context.ptr, result.ptr);
return null;
}
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const cpu_count = pool.cpu_count catch {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const cpu_count = t.cpu_count catch {
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
start(context.ptr, result.ptr);
return null;
};
};
- const gpa = pool.allocator;
+ const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result.len;
@@ -356,38 +356,38 @@ fn async(
@memcpy(ac.contextPointer()[0..context.len], context);
- pool.mutex.lock();
+ t.mutex.lock();
- const thread_capacity = cpu_count - 1 + pool.concurrent_count;
+ const thread_capacity = cpu_count - 1 + t.concurrent_count;
- pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
- pool.mutex.unlock();
+ t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
+ t.mutex.unlock();
ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
};
- pool.run_queue.prepend(&ac.closure.node);
+ t.run_queue.prepend(&ac.closure.node);
- if (pool.threads.items.len < thread_capacity) {
- const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
- if (pool.threads.items.len == 0) {
- assert(pool.run_queue.popFirst() == &ac.closure.node);
- pool.mutex.unlock();
+ if (t.threads.items.len < thread_capacity) {
+ const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
+ if (t.threads.items.len == 0) {
+ assert(t.run_queue.popFirst() == &ac.closure.node);
+ t.mutex.unlock();
ac.free(gpa, result.len);
start(context.ptr, result.ptr);
return null;
}
// Rely on other workers to do it.
- pool.mutex.unlock();
- pool.cond.signal();
+ t.mutex.unlock();
+ t.cond.signal();
return @ptrCast(ac);
};
- pool.threads.appendAssumeCapacity(thread);
+ t.threads.appendAssumeCapacity(thread);
}
- pool.mutex.unlock();
- pool.cond.signal();
+ t.mutex.unlock();
+ t.cond.signal();
return @ptrCast(ac);
}
@@ -401,9 +401,9 @@ fn concurrent(
) error{OutOfMemory}!*Io.AnyFuture {
if (builtin.single_threaded) unreachable;
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const cpu_count = pool.cpu_count catch 1;
- const gpa = pool.allocator;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const cpu_count = t.cpu_count catch 1;
+ const gpa = t.allocator;
const context_offset = context_alignment.forward(@sizeOf(AsyncClosure));
const result_offset = result_alignment.forward(context_offset + context.len);
const n = result_offset + result_len;
@@ -424,37 +424,37 @@ fn concurrent(
};
@memcpy(ac.contextPointer()[0..context.len], context);
- pool.mutex.lock();
+ t.mutex.lock();
- pool.concurrent_count += 1;
- const thread_capacity = cpu_count - 1 + pool.concurrent_count;
+ t.concurrent_count += 1;
+ const thread_capacity = cpu_count - 1 + t.concurrent_count;
- pool.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
- pool.mutex.unlock();
+ t.threads.ensureTotalCapacity(gpa, thread_capacity) catch {
+ t.mutex.unlock();
ac.free(gpa, result_len);
return error.OutOfMemory;
};
- pool.run_queue.prepend(&ac.closure.node);
+ t.run_queue.prepend(&ac.closure.node);
- if (pool.threads.items.len < thread_capacity) {
- const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
- assert(pool.run_queue.popFirst() == &ac.closure.node);
- pool.mutex.unlock();
+ if (t.threads.items.len < thread_capacity) {
+ const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
+ assert(t.run_queue.popFirst() == &ac.closure.node);
+ t.mutex.unlock();
ac.free(gpa, result_len);
return error.OutOfMemory;
};
- pool.threads.appendAssumeCapacity(thread);
+ t.threads.appendAssumeCapacity(thread);
}
- pool.mutex.unlock();
- pool.cond.signal();
+ t.mutex.unlock();
+ t.cond.signal();
return @ptrCast(ac);
}
const GroupClosure = struct {
closure: Closure,
- pool: *Pool,
+ t: *Threaded,
group: *Io.Group,
/// Points to sibling `GroupClosure`. Used for walking the group to cancel all.
node: std.SinglyLinkedList.Node,
@@ -515,9 +515,9 @@ fn groupAsync(
start: *const fn (*Io.Group, context: *const anyopaque) void,
) void {
if (builtin.single_threaded) return start(context.ptr);
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const cpu_count = pool.cpu_count catch 1;
- const gpa = pool.allocator;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const cpu_count = t.cpu_count catch 1;
+ const gpa = t.allocator;
const n = GroupClosure.contextEnd(context_alignment, context.len);
const gc: *GroupClosure = @ptrCast(@alignCast(gpa.alignedAlloc(u8, .of(GroupClosure), n) catch {
return start(group, context.ptr);
@@ -528,7 +528,7 @@ fn groupAsync(
.start = GroupClosure.start,
.is_concurrent = false,
},
- .pool = pool,
+ .t = t,
.group = group,
.node = undefined,
.func = start,
@@ -537,30 +537,30 @@ fn groupAsync(
};
@memcpy(gc.contextPointer()[0..context.len], context);
- pool.mutex.lock();
+ t.mutex.lock();
// Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;
- const thread_capacity = cpu_count - 1 + pool.concurrent_count;
+ const thread_capacity = cpu_count - 1 + t.concurrent_count;
- pool.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
- pool.mutex.unlock();
+ t.threads.ensureTotalCapacityPrecise(gpa, thread_capacity) catch {
+ t.mutex.unlock();
gc.free(gpa);
return start(group, context.ptr);
};
- pool.run_queue.prepend(&gc.closure.node);
+ t.run_queue.prepend(&gc.closure.node);
- if (pool.threads.items.len < thread_capacity) {
- const thread = std.Thread.spawn(.{ .stack_size = pool.stack_size }, worker, .{pool}) catch {
- assert(pool.run_queue.popFirst() == &gc.closure.node);
- pool.mutex.unlock();
+ if (t.threads.items.len < thread_capacity) {
+ const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
+ assert(t.run_queue.popFirst() == &gc.closure.node);
+ t.mutex.unlock();
gc.free(gpa);
return start(group, context.ptr);
};
- pool.threads.appendAssumeCapacity(thread);
+ t.threads.appendAssumeCapacity(thread);
}
// This needs to be done before unlocking the mutex to avoid a race with
@@ -568,13 +568,13 @@ fn groupAsync(
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
std.Thread.WaitGroup.startStateless(group_state);
- pool.mutex.unlock();
- pool.cond.signal();
+ t.mutex.unlock();
+ t.cond.signal();
}
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const gpa = pool.allocator;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const gpa = t.allocator;
if (builtin.single_threaded) return;
@@ -593,8 +593,8 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
}
fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const gpa = pool.allocator;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const gpa = t.allocator;
if (builtin.single_threaded) return;
@@ -629,9 +629,9 @@ fn await(
result_alignment: std.mem.Alignment,
) void {
_ = result_alignment;
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const closure: *AsyncClosure = @ptrCast(@alignCast(any_future));
- closure.waitAndFree(pool.allocator, result);
+ closure.waitAndFree(t.allocator, result);
}
fn cancel(
@@ -641,31 +641,31 @@ fn cancel(
result_alignment: std.mem.Alignment,
) void {
_ = result_alignment;
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const ac: *AsyncClosure = @ptrCast(@alignCast(any_future));
ac.closure.requestCancel();
- ac.waitAndFree(pool.allocator, result);
+ ac.waitAndFree(t.allocator, result);
}
fn cancelRequested(userdata: ?*anyopaque) bool {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
const closure = current_closure orelse return false;
return @atomicLoad(std.Thread.Id, &closure.cancel_tid, .acquire) == Closure.canceling_tid;
}
-fn checkCancel(pool: *Pool) error{Canceled}!void {
- if (cancelRequested(pool)) return error.Canceled;
+fn checkCancel(t: *Threaded) error{Canceled}!void {
+ if (cancelRequested(t)) return error.Canceled;
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
if (prev_state == .contended) {
- try pool.checkCancel();
+ try t.checkCancel();
futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
- try pool.checkCancel();
+ try t.checkCancel();
futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
@@ -689,8 +689,8 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
}
fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const pool_io = pool.io();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const t_io = t.io();
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
@@ -704,8 +704,8 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex:
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
- mutex.unlock(pool_io);
- defer mutex.lockUncancelable(pool_io);
+ mutex.unlock(t_io);
+ defer mutex.lockUncancelable(t_io);
while (true) {
futexWait(cond_epoch, epoch);
@@ -719,7 +719,7 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex:
}
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
@@ -743,11 +743,11 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I
assert(state & waiter_mask != waiter_mask);
state += one_waiter;
- mutex.unlock(pool.io());
- defer mutex.lockUncancelable(pool.io());
+ mutex.unlock(t.io());
+ defer mutex.lockUncancelable(t.io());
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
futexWait(cond_epoch, epoch);
epoch = cond_epoch.load(.acquire);
@@ -764,8 +764,8 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I
}
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
comptime assert(@TypeOf(cond.state) == u64);
const ints: *[2]std.atomic.Value(u32) = @ptrCast(&cond.state);
const cond_state = &ints[0];
@@ -825,13 +825,13 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.
}
fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode: Io.Dir.Mode) Io.Dir.MakeError!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.mkdirat(dir.handle, sub_path_posix, mode))) {
.SUCCESS => return,
.INTR => continue,
@@ -858,8 +858,8 @@ fn dirMakePosix(userdata: ?*anyopaque, dir: Io.Dir, sub_path: []const u8, mode:
}
fn dirStat(userdata: ?*anyopaque, dir: Io.Dir) Io.Dir.StatError!Io.Dir.Stat {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
_ = dir;
@panic("TODO");
@@ -871,7 +871,7 @@ fn dirStatPathLinux(
sub_path: []const u8,
options: Io.Dir.StatPathOptions,
) Io.Dir.StatPathError!Io.File.Stat {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const linux = std.os.linux;
var path_buffer: [posix.PATH_MAX]u8 = undefined;
@@ -881,7 +881,7 @@ fn dirStatPathLinux(
@as(u32, if (!options.follow_symlinks) linux.AT.SYMLINK_NOFOLLOW else 0);
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var statx = std.mem.zeroes(linux.Statx);
const rc = linux.statx(
dir.handle,
@@ -913,7 +913,7 @@ fn dirStatPathPosix(
sub_path: []const u8,
options: Io.Dir.StatPathOptions,
) Io.Dir.StatPathError!Io.File.Stat {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
@@ -921,7 +921,7 @@ fn dirStatPathPosix(
const flags: u32 = if (!options.follow_symlinks) posix.AT.SYMLINK_NOFOLLOW else 0;
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var stat = std.mem.zeroes(posix.Stat);
switch (posix.errno(fstatat_sym(dir.handle, sub_path_posix, &stat, flags))) {
.SUCCESS => return statFromPosix(stat),
@@ -943,12 +943,12 @@ fn dirStatPathPosix(
}
fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
if (posix.Stat == void) return error.Streaming;
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var stat = std.mem.zeroes(posix.Stat);
switch (posix.errno(fstat_sym(file.handle, &stat))) {
.SUCCESS => return statFromPosix(&stat),
@@ -963,10 +963,10 @@ fn fileStatPosix(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File
}
fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const linux = std.os.linux;
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var statx = std.mem.zeroes(linux.Statx);
const rc = linux.statx(
file.handle,
@@ -993,17 +993,17 @@ fn fileStatLinux(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File
}
fn fileStatWindows(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
_ = file;
@panic("TODO");
}
fn fileStatWasi(userdata: ?*anyopaque, file: Io.File) Io.File.StatError!Io.File.Stat {
if (builtin.link_libc) return fileStatPosix(userdata, file);
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var stat: std.os.wasi.filestat_t = undefined;
switch (std.os.wasi.fd_filestat_get(file.handle, &stat)) {
.SUCCESS => return statFromWasi(&stat),
@@ -1031,7 +1031,7 @@ fn dirCreateFilePosix(
sub_path: []const u8,
flags: Io.File.CreateFlags,
) Io.File.OpenError!Io.File {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
@@ -1062,7 +1062,7 @@ fn dirCreateFilePosix(
};
const fd: posix.fd_t = while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = openat_sym(dir.handle, sub_path_posix, os_flags, flags.mode);
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
@@ -1106,7 +1106,7 @@ fn dirCreateFilePosix(
.exclusive => posix.LOCK.EX | lock_nonblocking,
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.flock(fd, lock_flags))) {
.SUCCESS => break,
.INTR => continue,
@@ -1123,7 +1123,7 @@ fn dirCreateFilePosix(
if (has_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.GETFL, 0))) {
.SUCCESS => break,
.INTR => continue,
@@ -1132,7 +1132,7 @@ fn dirCreateFilePosix(
};
fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.fcntl(fd, posix.F.SETFL, fl_flags))) {
.SUCCESS => break,
.INTR => continue,
@@ -1150,7 +1150,7 @@ fn dirOpenFile(
sub_path: []const u8,
flags: Io.File.OpenFlags,
) Io.File.OpenError!Io.File {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
var path_buffer: [posix.PATH_MAX]u8 = undefined;
const sub_path_posix = try pathToPosix(sub_path, &path_buffer);
@@ -1191,7 +1191,7 @@ fn dirOpenFile(
}
}
const fd: posix.fd_t = while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = openat_sym(dir.handle, sub_path_posix, os_flags, @as(posix.mode_t, 0));
switch (posix.errno(rc)) {
.SUCCESS => break @intCast(rc),
@@ -1235,7 +1235,7 @@ fn dirOpenFile(
.exclusive => posix.LOCK.EX | lock_nonblocking,
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.flock(fd, lock_flags))) {
.SUCCESS => break,
.INTR => continue,
@@ -1252,7 +1252,7 @@ fn dirOpenFile(
if (has_flock_open_flags and flags.lock_nonblocking) {
var fl_flags: usize = while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.GETFL, 0))) {
.SUCCESS => break,
.INTR => continue,
@@ -1261,7 +1261,7 @@ fn dirOpenFile(
};
fl_flags &= ~@as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.fcntl(fd, posix.F.SETFL, fl_flags))) {
.SUCCESS => break,
.INTR => continue,
@@ -1274,13 +1274,13 @@ fn dirOpenFile(
}
fn fileClose(userdata: ?*anyopaque, file: Io.File) void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
posix.close(file.handle);
}
fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File.ReadStreamingError!usize {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const DWORD = windows.DWORD;
@@ -1288,7 +1288,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
var truncate: usize = 0;
var total: usize = 0;
while (index < data.len) {
- try pool.checkCancel();
+ try t.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
@@ -1333,7 +1333,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_read(file.handle, dest.ptr, dest.len, &nread)) {
.SUCCESS => return nread,
@@ -1354,7 +1354,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = posix.system.readv(file.handle, dest.ptr, @intCast(dest.len));
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
@@ -1377,7 +1377,7 @@ fn fileReadStreaming(userdata: ?*anyopaque, file: Io.File, data: [][]u8) Io.File
}
fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset: u64) Io.File.ReadPositionalError!usize {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const DWORD = windows.DWORD;
@@ -1386,7 +1386,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
var truncate: usize = 0;
var total: usize = 0;
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
{
const untruncated = data[index];
data[index] = untruncated[truncate..];
@@ -1454,7 +1454,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var nread: usize = undefined;
switch (std.os.wasi.fd_pread(file.handle, dest.ptr, dest.len, offset, &nread)) {
.SUCCESS => return nread,
@@ -1479,7 +1479,7 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = preadv_sym(file.handle, dest.ptr, @intCast(dest.len), @bitCast(offset));
switch (posix.errno(rc)) {
.SUCCESS => return @bitCast(rc),
@@ -1505,8 +1505,8 @@ fn fileReadPositional(userdata: ?*anyopaque, file: Io.File, data: [][]u8, offset
}
fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekError!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
_ = file;
_ = offset;
@@ -1514,11 +1514,11 @@ fn fileSeekBy(userdata: ?*anyopaque, file: Io.File, offset: i64) Io.File.SeekErr
}
fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekError!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const fd = file.handle;
if (native_os == .linux and !builtin.link_libc and @sizeOf(usize) == 4) while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var result: u64 = undefined;
switch (posix.errno(posix.system.llseek(fd, offset, &result, posix.SEEK.SET))) {
.SUCCESS => return,
@@ -1533,12 +1533,12 @@ fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekErr
};
if (native_os == .windows) {
- try pool.checkCancel();
+ try t.checkCancel();
return windows.SetFilePointerEx_BEGIN(fd, offset);
}
if (native_os == .wasi and !builtin.link_libc) while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var new_offset: std.os.wasi.filesize_t = undefined;
switch (std.os.wasi.fd_seek(fd, @bitCast(offset), .SET, &new_offset)) {
.SUCCESS => return,
@@ -1556,7 +1556,7 @@ fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekErr
if (posix.SEEK == void) return error.Unseekable;
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(lseek_sym(fd, @bitCast(offset), posix.SEEK.SET))) {
.SUCCESS => return,
.INTR => continue,
@@ -1571,8 +1571,8 @@ fn fileSeekTo(userdata: ?*anyopaque, file: Io.File, offset: u64) Io.File.SeekErr
}
fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posix.off_t) Io.File.PWriteError!usize {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
const fs_file: std.fs.File = .{ .handle = file.handle };
return switch (offset) {
-1 => fs_file.write(buffer),
@@ -1581,8 +1581,8 @@ fn pwrite(userdata: ?*anyopaque, file: Io.File, buffer: []const u8, offset: posi
}
fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
const clock_id: posix.clockid_t = clockToPosix(clock);
var tp: posix.timespec = undefined;
switch (posix.errno(posix.system.clock_gettime(clock_id, &tp))) {
@@ -1593,8 +1593,8 @@ fn nowPosix(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp
}
fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
switch (clock) {
.realtime => {
// RtlGetSystemTimePrecise() has a granularity of 100 nanoseconds
@@ -1612,8 +1612,8 @@ fn nowWindows(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestam
}
fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
var ns: std.os.wasi.timestamp_t = undefined;
const err = std.os.wasi.clock_time_get(clockToWasi(clock), 1, &ns);
if (err != .SUCCESS) return error.Unexpected;
@@ -1621,7 +1621,7 @@ fn nowWasi(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
}
fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const clock_id: posix.clockid_t = clockToPosix(switch (timeout) {
.none => .awake,
.duration => |d| d.clock,
@@ -1634,7 +1634,7 @@ fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
};
var timespec: posix.timespec = timestampToPosix(deadline_nanoseconds);
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (std.os.linux.E.init(std.os.linux.clock_nanosleep(clock_id, .{ .ABSTIME = switch (timeout) {
.none, .duration => false,
.deadline => true,
@@ -1648,10 +1648,10 @@ fn sleepLinux(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
}
fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
const ms = ms: {
- const duration_and_clock = (try timeout.toDurationFromNow(pool.io())) orelse
+ const duration_and_clock = (try timeout.toDurationFromNow(t.io())) orelse
break :ms std.math.maxInt(windows.DWORD);
break :ms std.math.lossyCast(windows.DWORD, duration_and_clock.duration.toMilliseconds());
};
@@ -1659,12 +1659,12 @@ fn sleepWindows(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
}
fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
const w = std.os.wasi;
- const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(pool.io())) |d| .{
+ const clock: w.subscription_clock_t = if (try timeout.toDurationFromNow(t.io())) |d| .{
.id = clockToWasi(d.clock),
.timeout = std.math.lossyCast(u64, d.duration.nanoseconds),
.precision = 0,
@@ -1688,19 +1688,19 @@ fn sleepWasi(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
}
fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const sec_type = @typeInfo(posix.timespec).@"struct".fields[0].type;
const nsec_type = @typeInfo(posix.timespec).@"struct".fields[1].type;
var timespec: posix.timespec = t: {
- const d = (try timeout.toDurationFromNow(pool.io())) orelse break :t .{
+ const d = (try timeout.toDurationFromNow(t.io())) orelse break :t .{
.sec = std.math.maxInt(sec_type),
.nsec = std.math.maxInt(nsec_type),
};
break :t timestampToPosix(d.duration.nanoseconds);
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.nanosleep(×pec, ×pec))) {
.INTR => continue,
else => return, // This prong handles success as well as unexpected errors.
@@ -1709,8 +1709,8 @@ fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
}
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) usize {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
var reset_event: ResetEvent = .unset;
@@ -1745,26 +1745,26 @@ fn netListenIpPosix(
address: IpAddress,
options: IpAddress.ListenOptions,
) IpAddress.ListenError!net.Server {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(&address);
- const socket_fd = try openSocketPosix(pool, family, .{
+ const socket_fd = try openSocketPosix(t, family, .{
.mode = options.mode,
.protocol = options.protocol,
});
errdefer posix.close(socket_fd);
if (options.reuse_address) {
- try setSocketOption(pool, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
+ try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEADDR, 1);
if (@hasDecl(posix.SO, "REUSEPORT"))
- try setSocketOption(pool, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1);
+ try setSocketOption(t, socket_fd, posix.SOL.SOCKET, posix.SO.REUSEPORT, 1);
}
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(&address, &storage);
- try posixBind(pool, socket_fd, &storage.any, addr_len);
+ try posixBind(t, socket_fd, &storage.any, addr_len);
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
.SUCCESS => break,
.ADDRINUSE => return error.AddressInUse,
@@ -1773,7 +1773,7 @@ fn netListenIpPosix(
}
}
- try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
+ try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
return .{
.socket = .{
.handle = socket_fd,
@@ -1788,8 +1788,8 @@ fn netListenUnix(
options: net.UnixAddress.ListenOptions,
) net.UnixAddress.ListenError!net.Socket.Handle {
if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const socket_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const socket_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream }) catch |err| switch (err) {
error.ProtocolUnsupportedBySystem => return error.AddressFamilyUnsupported,
error.ProtocolUnsupportedByAddressFamily => return error.AddressFamilyUnsupported,
error.SocketModeUnsupported => return error.AddressFamilyUnsupported,
@@ -1799,10 +1799,10 @@ fn netListenUnix(
var storage: UnixAddress = undefined;
const addr_len = addressUnixToPosix(address, &storage);
- try posixBindUnix(pool, socket_fd, &storage.any, addr_len);
+ try posixBindUnix(t, socket_fd, &storage.any, addr_len);
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.listen(socket_fd, options.kernel_backlog))) {
.SUCCESS => break,
.ADDRINUSE => return error.AddressInUse,
@@ -1814,9 +1814,9 @@ fn netListenUnix(
return socket_fd;
}
-fn posixBindUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
+fn posixBindUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.bind(fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
@@ -1842,9 +1842,9 @@ fn posixBindUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, a
}
}
-fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
+fn posixBind(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
@@ -1861,9 +1861,9 @@ fn posixBind(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr
}
}
-fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
+fn posixConnect(t: *Threaded, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
.SUCCESS => return,
.INTR => continue,
@@ -1890,9 +1890,9 @@ fn posixConnect(pool: *Pool, socket_fd: posix.socket_t, addr: *const posix.socka
}
}
-fn posixConnectUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
+fn posixConnectUnix(t: *Threaded, fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.connect(fd, addr, addr_len))) {
.SUCCESS => return,
.INTR => continue,
@@ -1919,9 +1919,9 @@ fn posixConnectUnix(pool: *Pool, fd: posix.socket_t, addr: *const posix.sockaddr
}
}
-fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
+fn posixGetSockName(t: *Threaded, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
.SUCCESS => break,
.INTR => continue,
@@ -1935,10 +1935,10 @@ fn posixGetSockName(pool: *Pool, socket_fd: posix.fd_t, addr: *posix.sockaddr, a
}
}
-fn setSocketOption(pool: *Pool, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
+fn setSocketOption(t: *Threaded, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
const o: []const u8 = @ptrCast(&option);
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
.SUCCESS => return,
.INTR => continue,
@@ -1957,17 +1957,17 @@ fn netConnectIpPosix(
options: IpAddress.ConnectOptions,
) IpAddress.ConnectError!net.Stream {
if (options.timeout != .none) @panic("TODO");
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
- const socket_fd = try openSocketPosix(pool, family, .{
+ const socket_fd = try openSocketPosix(t, family, .{
.mode = options.mode,
.protocol = options.protocol,
});
errdefer posix.close(socket_fd);
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(address, &storage);
- try posixConnect(pool, socket_fd, &storage.any, addr_len);
- try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
+ try posixConnect(t, socket_fd, &storage.any, addr_len);
+ try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
return .{ .socket = .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
@@ -1979,12 +1979,12 @@ fn netConnectUnix(
address: *const net.UnixAddress,
) net.UnixAddress.ConnectError!net.Socket.Handle {
if (!net.has_unix_sockets) return error.AddressFamilyUnsupported;
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const socket_fd = try openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .stream });
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const socket_fd = try openSocketPosix(t, posix.AF.UNIX, .{ .mode = .stream });
errdefer posix.close(socket_fd);
var storage: UnixAddress = undefined;
const addr_len = addressUnixToPosix(address, &storage);
- try posixConnectUnix(pool, socket_fd, &storage.any, addr_len);
+ try posixConnectUnix(t, socket_fd, &storage.any, addr_len);
return socket_fd;
}
@@ -1993,25 +1993,25 @@ fn netBindIpPosix(
address: *const IpAddress,
options: IpAddress.BindOptions,
) IpAddress.BindError!net.Socket {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const family = posixAddressFamily(address);
- const socket_fd = try openSocketPosix(pool, family, options);
+ const socket_fd = try openSocketPosix(t, family, options);
errdefer posix.close(socket_fd);
var storage: PosixAddress = undefined;
var addr_len = addressToPosix(address, &storage);
- try posixBind(pool, socket_fd, &storage.any, addr_len);
- try posixGetSockName(pool, socket_fd, &storage.any, &addr_len);
+ try posixBind(t, socket_fd, &storage.any, addr_len);
+ try posixGetSockName(t, socket_fd, &storage.any, &addr_len);
return .{
.handle = socket_fd,
.address = addressFromPosix(&storage),
};
}
-fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.BindOptions) !posix.socket_t {
+fn openSocketPosix(t: *Threaded, family: posix.sa_family_t, options: IpAddress.BindOptions) !posix.socket_t {
const mode = posixSocketMode(options.mode);
const protocol = posixProtocol(options.protocol);
const socket_fd = while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const flags: u32 = mode | if (socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
const socket_rc = posix.system.socket(family, flags, protocol);
switch (posix.errno(socket_rc)) {
@@ -2019,7 +2019,7 @@ fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.Bi
const fd: posix.fd_t = @intCast(socket_rc);
errdefer posix.close(fd);
if (socket_flags_unsupported) while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
.SUCCESS => break,
.INTR => continue,
@@ -2044,7 +2044,7 @@ fn openSocketPosix(pool: *Pool, family: posix.sa_family_t, options: IpAddress.Bi
if (options.ip6_only) {
if (posix.IPV6 == void) return error.OptionUnsupported;
- try setSocketOption(pool, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
+ try setSocketOption(t, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
}
return socket_fd;
@@ -2054,11 +2054,11 @@ const socket_flags_unsupported = builtin.os.tag.isDarwin() or native_os == .haik
const have_accept4 = !socket_flags_unsupported;
fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Server.AcceptError!net.Stream {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
var storage: PosixAddress = undefined;
var addr_len: posix.socklen_t = @sizeOf(PosixAddress);
const fd = while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = if (have_accept4)
posix.system.accept4(listen_fd, &storage.any, &addr_len, posix.SOCK.CLOEXEC)
else
@@ -2068,7 +2068,7 @@ fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Serve
const fd: posix.fd_t = @intCast(rc);
errdefer posix.close(fd);
if (!have_accept4) while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
.SUCCESS => break,
.INTR => continue,
@@ -2101,7 +2101,7 @@ fn netAcceptPosix(userdata: ?*anyopaque, listen_fd: net.Socket.Handle) net.Serve
}
fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
var i: usize = 0;
@@ -2116,7 +2116,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.
assert(dest[0].len > 0);
if (native_os == .wasi and !builtin.link_libc) while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
var n: usize = undefined;
switch (std.os.wasi.fd_read(fd, dest.ptr, dest.len, &n)) {
.SUCCESS => return n,
@@ -2137,7 +2137,7 @@ fn netReadPosix(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
switch (posix.errno(rc)) {
.SUCCESS => return @intCast(rc),
@@ -2167,7 +2167,7 @@ fn netSend(
messages: []net.OutgoingMessage,
flags: net.SendFlags,
) struct { ?net.Socket.SendError, usize } {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
const posix_flags: u32 =
@as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
@@ -2180,17 +2180,17 @@ fn netSend(
var i: usize = 0;
while (messages.len - i != 0) {
if (have_sendmmsg) {
- i += netSendMany(pool, handle, messages[i..], posix_flags) catch |err| return .{ err, i };
+ i += netSendMany(t, handle, messages[i..], posix_flags) catch |err| return .{ err, i };
continue;
}
- netSendOne(pool, handle, &messages[i], posix_flags) catch |err| return .{ err, i };
+ netSendOne(t, handle, &messages[i], posix_flags) catch |err| return .{ err, i };
i += 1;
}
return .{ null, i };
}
fn netSendOne(
- pool: *Pool,
+ t: *Threaded,
handle: net.Socket.Handle,
message: *net.OutgoingMessage,
flags: u32,
@@ -2207,7 +2207,7 @@ fn netSendOne(
.flags = 0,
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = posix.system.sendmsg(handle, msg, flags);
if (is_windows) {
if (rc == windows.ws2_32.SOCKET_ERROR) {
@@ -2274,7 +2274,7 @@ fn netSendOne(
}
fn netSendMany(
- pool: *Pool,
+ t: *Threaded,
handle: net.Socket.Handle,
messages: []net.OutgoingMessage,
flags: u32,
@@ -2305,7 +2305,7 @@ fn netSendMany(
}
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
const rc = posix.system.sendmmsg(handle, clamped_msgs.ptr, @intCast(clamped_msgs.len), flags);
switch (posix.errno(rc)) {
.SUCCESS => {
@@ -2348,7 +2348,7 @@ fn netReceive(
flags: net.ReceiveFlags,
timeout: Io.Timeout,
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
// recvmmsg is useless, here's why:
// * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
@@ -2375,10 +2375,10 @@ fn netReceive(
var message_i: usize = 0;
var data_i: usize = 0;
- const deadline = timeout.toDeadline(pool.io()) catch |err| return .{ err, message_i };
+ const deadline = timeout.toDeadline(t.io()) catch |err| return .{ err, message_i };
recv: while (true) {
- pool.checkCancel() catch |err| return .{ err, message_i };
+ t.checkCancel() catch |err| return .{ err, message_i };
if (message_buffer.len - message_i == 0) return .{ null, message_i };
const message = &message_buffer[message_i];
@@ -2416,12 +2416,12 @@ fn netReceive(
continue;
},
.AGAIN => while (true) {
- pool.checkCancel() catch |err| return .{ err, message_i };
+ t.checkCancel() catch |err| return .{ err, message_i };
if (message_i != 0) return .{ null, message_i };
const max_poll_ms = std.math.maxInt(u31);
const timeout_ms: u31 = if (deadline) |d| t: {
- const duration = d.durationFromNow(pool.io()) catch |err| return .{ err, message_i };
+ const duration = d.durationFromNow(t.io()) catch |err| return .{ err, message_i };
if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i };
break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
} else max_poll_ms;
@@ -2473,8 +2473,8 @@ fn netWritePosix(
data: []const []const u8,
splat: usize,
) net.Stream.Writer.Error!usize {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
var iovecs: [max_iovecs_len]posix.iovec_const = undefined;
var msg: posix.msghdr_const = .{
@@ -2527,8 +2527,8 @@ fn addBuf(v: []posix.iovec_const, i: *@FieldType(posix.msghdr_const, "iovlen"),
}
fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- _ = pool;
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ _ = t;
switch (native_os) {
.windows => windows.closesocket(handle) catch recoverableOsBugDetected(),
else => posix.close(handle),
@@ -2539,10 +2539,10 @@ fn netInterfaceNameResolve(
userdata: ?*anyopaque,
name: *const net.Interface.Name,
) net.Interface.Name.ResolveError!net.Interface {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
if (native_os == .linux) {
- const sock_fd = openSocketPosix(pool, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) {
+ const sock_fd = openSocketPosix(t, posix.AF.UNIX, .{ .mode = .dgram }) catch |err| switch (err) {
error.ProcessFdQuotaExceeded => return error.SystemResources,
error.SystemFdQuotaExceeded => return error.SystemResources,
error.AddressFamilyUnsupported => return error.Unexpected,
@@ -2559,7 +2559,7 @@ fn netInterfaceNameResolve(
};
while (true) {
- try pool.checkCancel();
+ try t.checkCancel();
switch (posix.errno(posix.system.ioctl(sock_fd, posix.SIOCGIFINDEX, @intFromPtr(&ifr)))) {
.SUCCESS => return .{ .index = @bitCast(ifr.ifru.ivalue) },
.INTR => continue,
@@ -2576,14 +2576,14 @@ fn netInterfaceNameResolve(
}
if (native_os == .windows) {
- try pool.checkCancel();
+ try t.checkCancel();
const index = std.os.windows.ws2_32.if_nametoindex(&name.bytes);
if (index == 0) return error.InterfaceNotFound;
return .{ .index = index };
}
if (builtin.link_libc) {
- try pool.checkCancel();
+ try t.checkCancel();
const index = std.c.if_nametoindex(&name.bytes);
if (index == 0) return error.InterfaceNotFound;
return .{ .index = @bitCast(index) };
@@ -2593,8 +2593,8 @@ fn netInterfaceNameResolve(
}
fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- try pool.checkCancel();
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ try t.checkCancel();
if (native_os == .linux) {
_ = interface;
@@ -2618,18 +2618,18 @@ fn netLookup(
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) void {
- const pool: *Pool = @ptrCast(@alignCast(userdata));
- const pool_io = pool.io();
- resolved.putOneUncancelable(pool_io, .{ .end = netLookupFallible(pool, host_name, resolved, options) });
+ const t: *Threaded = @ptrCast(@alignCast(userdata));
+ const t_io = t.io();
+ resolved.putOneUncancelable(t_io, .{ .end = netLookupFallible(t, host_name, resolved, options) });
}
fn netLookupFallible(
- pool: *Pool,
+ t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) !void {
- const pool_io = pool.io();
+ const t_io = t.io();
const name = host_name.bytes;
assert(name.len <= HostName.max_len);
@@ -2648,7 +2648,7 @@ fn netLookupFallible(
if (native_os == .linux) {
if (options.family != .ip4) {
if (IpAddress.parseIp6(name, options.port)) |addr| {
- try resolved.putAll(pool_io, &.{
+ try resolved.putAll(t_io, &.{
.{ .address = addr },
.{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
});
@@ -2658,7 +2658,7 @@ fn netLookupFallible(
if (options.family != .ip6) {
if (IpAddress.parseIp4(name, options.port)) |addr| {
- try resolved.putAll(pool_io, &.{
+ try resolved.putAll(t_io, &.{
.{ .address = addr },
.{ .canonical_name = copyCanon(options.canonical_name_buffer, name) },
});
@@ -2666,7 +2666,7 @@ fn netLookupFallible(
} else |_| {}
}
- lookupHosts(pool, host_name, resolved, options) catch |err| switch (err) {
+ lookupHosts(t, host_name, resolved, options) catch |err| switch (err) {
error.UnknownHostName => {},
else => |e| return e,
};
@@ -2697,11 +2697,11 @@ fn netLookupFallible(
canon_name_dest.* = canon_name.*;
results_buffer[results_index] = .{ .canonical_name = .{ .bytes = canon_name_dest } };
results_index += 1;
- try resolved.putAll(pool_io, results_buffer[0..results_index]);
+ try resolved.putAll(t_io, results_buffer[0..results_index]);
return;
}
- return lookupDnsSearch(pool, host_name, resolved, options);
+ return lookupDnsSearch(t, host_name, resolved, options);
}
if (native_os == .openbsd) {
@@ -2953,13 +2953,13 @@ fn pathToPosix(file_path: []const u8, buffer: *[posix.PATH_MAX]u8) Io.Dir.PathNa
}
fn lookupDnsSearch(
- pool: *Pool,
+ t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) HostName.LookupError!void {
- const pool_io = pool.io();
- const rc = HostName.ResolvConf.init(pool_io) catch return error.ResolvConfParseFailed;
+ const t_io = t.io();
+ const rc = HostName.ResolvConf.init(t_io) catch return error.ResolvConfParseFailed;
// Count dots, suppress search when >=ndots or name ends in
// a dot, which is an explicit request for global scope.
@@ -2983,7 +2983,7 @@ fn lookupDnsSearch(
while (it.next()) |token| {
@memcpy(options.canonical_name_buffer[canon_name.len + 1 ..][0..token.len], token);
const lookup_canon_name = options.canonical_name_buffer[0 .. canon_name.len + 1 + token.len];
- if (lookupDns(pool, lookup_canon_name, &rc, resolved, options)) |result| {
+ if (lookupDns(t, lookup_canon_name, &rc, resolved, options)) |result| {
return result;
} else |err| switch (err) {
error.UnknownHostName => continue,
@@ -2992,17 +2992,17 @@ fn lookupDnsSearch(
}
const lookup_canon_name = options.canonical_name_buffer[0..canon_name.len];
- return lookupDns(pool, lookup_canon_name, &rc, resolved, options);
+ return lookupDns(t, lookup_canon_name, &rc, resolved, options);
}
fn lookupDns(
- pool: *Pool,
+ t: *Threaded,
lookup_canon_name: []const u8,
rc: *const HostName.ResolvConf,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) HostName.LookupError!void {
- const pool_io = pool.io();
+ const t_io = t.io();
const family_records: [2]struct { af: IpAddress.Family, rr: u8 } = .{
.{ .af = .ip6, .rr = std.posix.RR.A },
.{ .af = .ip4, .rr = std.posix.RR.AAAA },
@@ -3032,7 +3032,7 @@ fn lookupDns(
var socket = s: {
if (any_ip6) ip6: {
const ip6_addr: IpAddress = .{ .ip6 = .unspecified(0) };
- const socket = ip6_addr.bind(pool_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) {
+ const socket = ip6_addr.bind(t_io, .{ .ip6_only = true, .mode = .dgram }) catch |err| switch (err) {
error.AddressFamilyUnsupported => break :ip6,
else => |e| return e,
};
@@ -3040,10 +3040,10 @@ fn lookupDns(
}
any_ip6 = false;
const ip4_addr: IpAddress = .{ .ip4 = .unspecified(0) };
- const socket = try ip4_addr.bind(pool_io, .{ .mode = .dgram });
+ const socket = try ip4_addr.bind(t_io, .{ .mode = .dgram });
break :s socket;
};
- defer socket.close(pool_io);
+ defer socket.close(t_io);
const mapped_nameservers = if (any_ip6) ip4_mapped[0..rc.nameservers_len] else rc.nameservers();
const queries = queries_buffer[0..nq];
@@ -3054,13 +3054,13 @@ fn lookupDns(
// boot clock is chosen because time the computer is suspended should count
// against time spent waiting for external messages to arrive.
const clock: Io.Clock = .boot;
- var now_ts = try clock.now(pool_io);
+ var now_ts = try clock.now(t_io);
const final_ts = now_ts.addDuration(.fromSeconds(rc.timeout_seconds));
const attempt_duration: Io.Duration = .{
.nanoseconds = std.time.ns_per_s * @as(usize, rc.timeout_seconds) / rc.attempts,
};
- send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(pool_io)) {
+ send: while (now_ts.nanoseconds < final_ts.nanoseconds) : (now_ts = try clock.now(t_io)) {
const max_messages = queries_buffer.len * HostName.ResolvConf.max_nameservers;
{
var message_buffer: [max_messages]Io.net.OutgoingMessage = undefined;
@@ -3076,7 +3076,7 @@ fn lookupDns(
message_i += 1;
}
}
- _ = netSend(pool, socket.handle, message_buffer[0..message_i], .{});
+ _ = netSend(t, socket.handle, message_buffer[0..message_i], .{});
}
const timeout: Io.Timeout = .{ .deadline = .{
@@ -3087,7 +3087,7 @@ fn lookupDns(
while (true) {
var message_buffer: [max_messages]Io.net.IncomingMessage = undefined;
const buf = answer_buffer[answer_buffer_i..];
- const recv_err, const recv_n = socket.receiveManyTimeout(pool_io, &message_buffer, buf, .{}, timeout);
+ const recv_err, const recv_n = socket.receiveManyTimeout(t_io, &message_buffer, buf, .{}, timeout);
for (message_buffer[0..recv_n]) |*received_message| {
const reply = received_message.data;
// Ignore non-identifiable packets.
@@ -3124,7 +3124,7 @@ fn lookupDns(
.data_ptr = query.ptr,
.data_len = query.len,
};
- _ = netSend(pool, socket.handle, (&retry_message)[0..1], .{});
+ _ = netSend(t, socket.handle, (&retry_message)[0..1], .{});
continue;
},
else => continue,
@@ -3155,7 +3155,7 @@ fn lookupDns(
std.posix.RR.A => {
const data = record.packet[record.data_off..][0..record.data_len];
if (data.len != 4) return error.InvalidDnsARecord;
- try resolved.putOne(pool_io, .{ .address = .{ .ip4 = .{
+ try resolved.putOne(t_io, .{ .address = .{ .ip4 = .{
.bytes = data[0..4].*,
.port = options.port,
} } });
@@ -3164,7 +3164,7 @@ fn lookupDns(
std.posix.RR.AAAA => {
const data = record.packet[record.data_off..][0..record.data_len];
if (data.len != 16) return error.InvalidDnsAAAARecord;
- try resolved.putOne(pool_io, .{ .address = .{ .ip6 = .{
+ try resolved.putOne(t_io, .{ .address = .{ .ip6 = .{
.bytes = data[0..16].*,
.port = options.port,
} } });
@@ -3178,18 +3178,18 @@ fn lookupDns(
};
}
- try resolved.putOne(pool_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } });
+ try resolved.putOne(t_io, .{ .canonical_name = canonical_name orelse .{ .bytes = lookup_canon_name } });
if (addresses_len == 0) return error.NameServerFailure;
}
fn lookupHosts(
- pool: *Pool,
+ t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
) !void {
- const pool_io = pool.io();
- const file = Io.File.openAbsolute(pool_io, "/etc/hosts", .{}) catch |err| switch (err) {
+ const t_io = t.io();
+ const file = Io.File.openAbsolute(t_io, "/etc/hosts", .{}) catch |err| switch (err) {
error.FileNotFound,
error.NotDir,
error.AccessDenied,
@@ -3202,11 +3202,11 @@ fn lookupHosts(
return error.DetectingNetworkConfigurationFailed;
},
};
- defer file.close(pool_io);
+ defer file.close(t_io);
var line_buf: [512]u8 = undefined;
- var file_reader = file.reader(pool_io, &line_buf);
- return lookupHostsReader(pool, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) {
+ var file_reader = file.reader(t_io, &line_buf);
+ return lookupHostsReader(t, host_name, resolved, options, &file_reader.interface) catch |err| switch (err) {
error.ReadFailed => switch (file_reader.err.?) {
error.Canceled => |e| return e,
else => {
@@ -3220,13 +3220,13 @@ fn lookupHosts(
}
fn lookupHostsReader(
- pool: *Pool,
+ t: *Threaded,
host_name: HostName,
resolved: *Io.Queue(HostName.LookupResult),
options: HostName.LookupOptions,
reader: *Io.Reader,
) error{ ReadFailed, Canceled, UnknownHostName }!void {
- const pool_io = pool.io();
+ const t_io = t.io();
var addresses_len: usize = 0;
var canonical_name: ?HostName = null;
while (true) {
@@ -3268,19 +3268,19 @@ fn lookupHostsReader(
if (options.family != .ip6) {
if (IpAddress.parseIp4(ip_text, options.port)) |addr| {
- try resolved.putOne(pool_io, .{ .address = addr });
+ try resolved.putOne(t_io, .{ .address = addr });
addresses_len += 1;
} else |_| {}
}
if (options.family != .ip4) {
if (IpAddress.parseIp6(ip_text, options.port)) |addr| {
- try resolved.putOne(pool_io, .{ .address = addr });
+ try resolved.putOne(t_io, .{ .address = addr });
addresses_len += 1;
} else |_| {}
}
}
- if (canonical_name) |canon_name| try resolved.putOne(pool_io, .{ .canonical_name = canon_name });
+ if (canonical_name) |canon_name| try resolved.putOne(t_io, .{ .canonical_name = canon_name });
if (addresses_len == 0) return error.UnknownHostName;
}
BRANCH_TODO
@@ -1,7 +1,7 @@
-* Threaded: rename Pool to Threaded
* Threaded: finish linux impl (all tests passing)
* Threaded: finish macos impl
* Threaded: finish windows impl
+* Threaded: glibc impl of netLookup
* fix Group.wait not handling cancelation (need to move impl of ResetEvent to Threaded)
* implement cancelRequest for non-linux posix