Commit 3f34f5e433
Changed files (6)
lib
compiler
std
lib/compiler/build_runner.zig
@@ -494,7 +494,7 @@ pub fn main() !void {
.max_rss = max_rss,
.max_rss_is_default = false,
- .max_rss_mutex = .{},
+ .max_rss_mutex = .init,
.skip_oom_steps = skip_oom_steps,
.unit_test_timeout_ns = test_timeout_ns,
@@ -583,7 +583,7 @@ pub fn main() !void {
if (run.web_server) |*ws| {
assert(!watch); // fatal error after CLI parsing
- while (true) switch (ws.wait()) {
+ while (true) switch (try ws.wait()) {
.rebuild => {
for (run.step_stack.keys()) |step| {
step.state = .precheck_done;
@@ -652,7 +652,7 @@ const Run = struct {
gpa: Allocator,
max_rss: u64,
max_rss_is_default: bool,
- max_rss_mutex: std.Thread.Mutex,
+ max_rss_mutex: Io.Mutex,
skip_oom_steps: bool,
unit_test_timeout_ns: ?u64,
watch: bool,
@@ -1305,6 +1305,8 @@ fn workerMakeOneStep(
prog_node: std.Progress.Node,
run: *Run,
) void {
+ const io = b.graph.io;
+
// First, check the conditions for running this step. If they are not met,
// then we return without doing the step, relying on another worker to
// queue this step up again when dependencies are met.
@@ -1326,8 +1328,8 @@ fn workerMakeOneStep(
}
if (s.max_rss != 0) {
- run.max_rss_mutex.lock();
- defer run.max_rss_mutex.unlock();
+ run.max_rss_mutex.lockUncancelable(io);
+ defer run.max_rss_mutex.unlock(io);
// Avoid running steps twice.
if (s.state != .precheck_done) {
@@ -1378,8 +1380,6 @@ fn workerMakeOneStep(
printErrorMessages(run.gpa, s, .{}, bw, ttyconf, run.error_style, run.multiline_errors) catch {};
}
- const io = b.graph.io;
-
handle_result: {
if (make_result) |_| {
@atomicStore(Step.State, &s.state, .success, .seq_cst);
@@ -1406,8 +1406,8 @@ fn workerMakeOneStep(
// If this is a step that claims resources, we must now queue up other
// steps that are waiting for resources.
if (s.max_rss != 0) {
- run.max_rss_mutex.lock();
- defer run.max_rss_mutex.unlock();
+ run.max_rss_mutex.lockUncancelable(io);
+ defer run.max_rss_mutex.unlock(io);
// Give the memory back to the scheduler.
run.claimed_rss -= s.max_rss;
lib/std/Build/Step/Run.zig
@@ -1830,6 +1830,7 @@ fn pollZigTest(
} {
const gpa = run.step.owner.allocator;
const arena = run.step.owner.allocator;
+ const io = run.step.owner.graph.io;
var sub_prog_node: ?std.Progress.Node = null;
defer if (sub_prog_node) |n| n.end();
@@ -2035,8 +2036,8 @@ fn pollZigTest(
{
const fuzz = fuzz_context.?.fuzz;
- fuzz.queue_mutex.lock();
- defer fuzz.queue_mutex.unlock();
+ fuzz.queue_mutex.lockUncancelable(io);
+ defer fuzz.queue_mutex.unlock(io);
try fuzz.msg_queue.append(fuzz.gpa, .{ .coverage = .{
.id = coverage_id.?,
.cumulative = .{
@@ -2046,20 +2047,20 @@ fn pollZigTest(
},
.run = run,
} });
- fuzz.queue_cond.signal();
+ fuzz.queue_cond.signal(io);
}
},
.fuzz_start_addr => {
const fuzz = fuzz_context.?.fuzz;
const addr = body_r.takeInt(u64, .little) catch unreachable;
{
- fuzz.queue_mutex.lock();
- defer fuzz.queue_mutex.unlock();
+ fuzz.queue_mutex.lockUncancelable(io);
+ defer fuzz.queue_mutex.unlock(io);
try fuzz.msg_queue.append(fuzz.gpa, .{ .entry_point = .{
.addr = addr,
.coverage_id = coverage_id.?,
} });
- fuzz.queue_cond.signal();
+ fuzz.queue_cond.signal(io);
}
},
else => {}, // ignore other messages
lib/std/Build/Cache.zig
@@ -22,7 +22,7 @@ manifest_dir: fs.Dir,
hash: HashHelper = .{},
/// This value is accessed from multiple threads, protected by mutex.
recent_problematic_timestamp: Io.Timestamp = .zero,
-mutex: std.Thread.Mutex = .{},
+mutex: Io.Mutex = .init,
/// A set of strings such as the zig library directory or project source root, which
/// are stripped from the file paths before putting into the cache. They
@@ -474,6 +474,7 @@ pub const Manifest = struct {
/// A cache manifest file exists however it could not be parsed.
InvalidFormat,
OutOfMemory,
+ Canceled,
};
/// Check the cache to see if the input exists in it. If it exists, returns `true`.
@@ -559,12 +560,14 @@ pub const Manifest = struct {
self.diagnostic = .{ .manifest_create = error.FileNotFound };
return error.CacheCheckFailed;
},
+ error.Canceled => return error.Canceled,
else => |e| {
self.diagnostic = .{ .manifest_create = e };
return error.CacheCheckFailed;
},
}
},
+ error.Canceled => return error.Canceled,
else => |e| {
self.diagnostic = .{ .manifest_create = e };
return error.CacheCheckFailed;
@@ -762,6 +765,7 @@ pub const Manifest = struct {
// Every digest before this one has been populated successfully.
return .{ .miss = .{ .file_digests_populated = idx } };
},
+ error.Canceled => return error.Canceled,
else => |e| {
self.diagnostic = .{ .file_open = .{
.file_index = idx,
@@ -790,7 +794,7 @@ pub const Manifest = struct {
.inode = actual_stat.inode,
};
- if (self.isProblematicTimestamp(cache_hash_file.stat.mtime)) {
+ if (try self.isProblematicTimestamp(cache_hash_file.stat.mtime)) {
// The actual file has an unreliable timestamp, force it to be hashed
cache_hash_file.stat.mtime = .zero;
cache_hash_file.stat.inode = 0;
@@ -848,7 +852,9 @@ pub const Manifest = struct {
}
}
- fn isProblematicTimestamp(man: *Manifest, timestamp: Io.Timestamp) bool {
+ fn isProblematicTimestamp(man: *Manifest, timestamp: Io.Timestamp) error{Canceled}!bool {
+ const io = man.cache.io;
+
// If the file_time is prior to the most recent problematic timestamp
// then we don't need to access the filesystem.
if (timestamp.nanoseconds < man.recent_problematic_timestamp.nanoseconds)
@@ -856,8 +862,8 @@ pub const Manifest = struct {
// Next we will check the globally shared Cache timestamp, which is accessed
// from multiple threads.
- man.cache.mutex.lock();
- defer man.cache.mutex.unlock();
+ try man.cache.mutex.lock(io);
+ defer man.cache.mutex.unlock(io);
// Save the global one to our local one to avoid locking next time.
man.recent_problematic_timestamp = man.cache.recent_problematic_timestamp;
@@ -871,11 +877,18 @@ pub const Manifest = struct {
var file = man.cache.manifest_dir.createFile("timestamp", .{
.read = true,
.truncate = true,
- }) catch return true;
+ }) catch |err| switch (err) {
+ error.Canceled => return error.Canceled,
+ else => return true,
+ };
defer file.close();
// Save locally and also save globally (we still hold the global lock).
- man.recent_problematic_timestamp = (file.stat() catch return true).mtime;
+ const stat = file.stat() catch |err| switch (err) {
+ error.Canceled => return error.Canceled,
+ else => return true,
+ };
+ man.recent_problematic_timestamp = stat.mtime;
man.cache.recent_problematic_timestamp = man.recent_problematic_timestamp;
}
@@ -902,7 +915,7 @@ pub const Manifest = struct {
.inode = actual_stat.inode,
};
- if (self.isProblematicTimestamp(ch_file.stat.mtime)) {
+ if (try self.isProblematicTimestamp(ch_file.stat.mtime)) {
// The actual file has an unreliable timestamp, force it to be hashed
ch_file.stat.mtime = .zero;
ch_file.stat.inode = 0;
@@ -1038,7 +1051,7 @@ pub const Manifest = struct {
.contents = null,
};
- if (self.isProblematicTimestamp(new_file.stat.mtime)) {
+ if (try self.isProblematicTimestamp(new_file.stat.mtime)) {
// The actual file has an unreliable timestamp, force it to be hashed
new_file.stat.mtime = .zero;
new_file.stat.inode = 0;
lib/std/Build/Fuzz.zig
@@ -27,11 +27,11 @@ root_prog_node: std.Progress.Node,
prog_node: std.Progress.Node,
/// Protects `coverage_files`.
-coverage_mutex: std.Thread.Mutex,
+coverage_mutex: Io.Mutex,
coverage_files: std.AutoArrayHashMapUnmanaged(u64, CoverageMap),
-queue_mutex: std.Thread.Mutex,
-queue_cond: std.Thread.Condition,
+queue_mutex: Io.Mutex,
+queue_cond: Io.Condition,
msg_queue: std.ArrayList(Msg),
pub const Mode = union(enum) {
@@ -122,8 +122,8 @@ pub fn init(
.root_prog_node = root_prog_node,
.prog_node = .none,
.coverage_files = .empty,
- .coverage_mutex = .{},
- .queue_mutex = .{},
+ .coverage_mutex = .init,
+ .queue_mutex = .init,
.queue_cond = .{},
.msg_queue = .empty,
};
@@ -157,9 +157,7 @@ pub fn deinit(fuzz: *Fuzz) void {
fn rebuildTestsWorkerRun(run: *Step.Run, gpa: Allocator, ttyconf: tty.Config, parent_prog_node: std.Progress.Node) void {
rebuildTestsWorkerRunFallible(run, gpa, ttyconf, parent_prog_node) catch |err| {
const compile = run.producer.?;
- log.err("step '{s}': failed to rebuild in fuzz mode: {s}", .{
- compile.step.name, @errorName(err),
- });
+ log.err("step '{s}': failed to rebuild in fuzz mode: {t}", .{ compile.step.name, err });
};
}
@@ -208,9 +206,7 @@ fn fuzzWorkerRun(
return;
},
else => {
- log.err("step '{s}': failed to rerun '{s}' in fuzz mode: {s}", .{
- run.step.name, test_name, @errorName(err),
- });
+ log.err("step '{s}': failed to rerun '{s}' in fuzz mode: {t}", .{ run.step.name, test_name, err });
return;
},
};
@@ -269,8 +265,10 @@ pub fn sendUpdate(
socket: *std.http.Server.WebSocket,
prev: *Previous,
) !void {
- fuzz.coverage_mutex.lock();
- defer fuzz.coverage_mutex.unlock();
+ const io = fuzz.io;
+
+ try fuzz.coverage_mutex.lock(io);
+ defer fuzz.coverage_mutex.unlock(io);
const coverage_maps = fuzz.coverage_files.values();
if (coverage_maps.len == 0) return;
@@ -331,30 +329,41 @@ pub fn sendUpdate(
}
fn coverageRun(fuzz: *Fuzz) void {
- fuzz.queue_mutex.lock();
- defer fuzz.queue_mutex.unlock();
+ coverageRunCancelable(fuzz) catch |err| switch (err) {
+ error.Canceled => return,
+ };
+}
+
+fn coverageRunCancelable(fuzz: *Fuzz) Io.Cancelable!void {
+ const io = fuzz.io;
+
+ try fuzz.queue_mutex.lock(io);
+ defer fuzz.queue_mutex.unlock(io);
while (true) {
- fuzz.queue_cond.wait(&fuzz.queue_mutex);
+ try fuzz.queue_cond.wait(io, &fuzz.queue_mutex);
for (fuzz.msg_queue.items) |msg| switch (msg) {
.coverage => |coverage| prepareTables(fuzz, coverage.run, coverage.id) catch |err| switch (err) {
error.AlreadyReported => continue,
- else => |e| log.err("failed to prepare code coverage tables: {s}", .{@errorName(e)}),
+ error.Canceled => return,
+ else => |e| log.err("failed to prepare code coverage tables: {t}", .{e}),
},
.entry_point => |entry_point| addEntryPoint(fuzz, entry_point.coverage_id, entry_point.addr) catch |err| switch (err) {
error.AlreadyReported => continue,
- else => |e| log.err("failed to prepare code coverage tables: {s}", .{@errorName(e)}),
+ error.Canceled => return,
+ else => |e| log.err("failed to prepare code coverage tables: {t}", .{e}),
},
};
fuzz.msg_queue.clearRetainingCapacity();
}
}
-fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutOfMemory, AlreadyReported }!void {
+fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutOfMemory, AlreadyReported, Canceled }!void {
assert(fuzz.mode == .forever);
const ws = fuzz.mode.forever.ws;
+ const io = fuzz.io;
- fuzz.coverage_mutex.lock();
- defer fuzz.coverage_mutex.unlock();
+ try fuzz.coverage_mutex.lock(io);
+ defer fuzz.coverage_mutex.unlock(io);
const gop = try fuzz.coverage_files.getOrPut(fuzz.gpa, coverage_id);
if (gop.found_existing) {
@@ -385,8 +394,8 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO
target.ofmt,
target.cpu.arch,
) catch |err| {
- log.err("step '{s}': failed to load debug information for '{f}': {s}", .{
- run_step.step.name, rebuilt_exe_path, @errorName(err),
+ log.err("step '{s}': failed to load debug information for '{f}': {t}", .{
+ run_step.step.name, rebuilt_exe_path, err,
});
return error.AlreadyReported;
};
@@ -397,15 +406,15 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO
.sub_path = "v/" ++ std.fmt.hex(coverage_id),
};
var coverage_file = coverage_file_path.root_dir.handle.openFile(coverage_file_path.sub_path, .{}) catch |err| {
- log.err("step '{s}': failed to load coverage file '{f}': {s}", .{
- run_step.step.name, coverage_file_path, @errorName(err),
+ log.err("step '{s}': failed to load coverage file '{f}': {t}", .{
+ run_step.step.name, coverage_file_path, err,
});
return error.AlreadyReported;
};
defer coverage_file.close();
const file_size = coverage_file.getEndPos() catch |err| {
- log.err("unable to check len of coverage file '{f}': {s}", .{ coverage_file_path, @errorName(err) });
+ log.err("unable to check len of coverage file '{f}': {t}", .{ coverage_file_path, err });
return error.AlreadyReported;
};
@@ -417,7 +426,7 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO
coverage_file.handle,
0,
) catch |err| {
- log.err("failed to map coverage file '{f}': {s}", .{ coverage_file_path, @errorName(err) });
+ log.err("failed to map coverage file '{f}': {t}", .{ coverage_file_path, err });
return error.AlreadyReported;
};
gop.value_ptr.mapped_memory = mapped_memory;
@@ -443,7 +452,7 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO
}{ .addrs = sorted_pcs.items(.pc) });
debug_info.resolveAddresses(fuzz.gpa, sorted_pcs.items(.pc), sorted_pcs.items(.sl)) catch |err| {
- log.err("failed to resolve addresses to source locations: {s}", .{@errorName(err)});
+ log.err("failed to resolve addresses to source locations: {t}", .{err});
return error.AlreadyReported;
};
@@ -453,9 +462,11 @@ fn prepareTables(fuzz: *Fuzz, run_step: *Step.Run, coverage_id: u64) error{ OutO
ws.notifyUpdate();
}
-fn addEntryPoint(fuzz: *Fuzz, coverage_id: u64, addr: u64) error{ AlreadyReported, OutOfMemory }!void {
- fuzz.coverage_mutex.lock();
- defer fuzz.coverage_mutex.unlock();
+fn addEntryPoint(fuzz: *Fuzz, coverage_id: u64, addr: u64) error{ AlreadyReported, OutOfMemory, Canceled }!void {
+ const io = fuzz.io;
+
+ try fuzz.coverage_mutex.lock(io);
+ defer fuzz.coverage_mutex.unlock(io);
const coverage_map = fuzz.coverage_files.getPtr(coverage_id).?;
const header: *const abi.SeenPcsHeader = @ptrCast(coverage_map.mapped_memory[0..@sizeOf(abi.SeenPcsHeader)]);
@@ -518,8 +529,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void {
.sub_path = "v/" ++ std.fmt.hex(cov.id),
};
var coverage_file = coverage_file_path.root_dir.handle.openFile(coverage_file_path.sub_path, .{}) catch |err| {
- fatal("step '{s}': failed to load coverage file '{f}': {s}", .{
- cov.run.step.name, coverage_file_path, @errorName(err),
+ fatal("step '{s}': failed to load coverage file '{f}': {t}", .{
+ cov.run.step.name, coverage_file_path, err,
});
};
defer coverage_file.close();
@@ -530,8 +541,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void {
var header: fuzz_abi.SeenPcsHeader = undefined;
r.interface.readSliceAll(std.mem.asBytes(&header)) catch |err| {
- fatal("step '{s}': failed to read from coverage file '{f}': {s}", .{
- cov.run.step.name, coverage_file_path, @errorName(err),
+ fatal("step '{s}': failed to read from coverage file '{f}': {t}", .{
+ cov.run.step.name, coverage_file_path, err,
});
};
@@ -545,8 +556,8 @@ pub fn waitAndPrintReport(fuzz: *Fuzz) void {
const chunk_count = fuzz_abi.SeenPcsHeader.seenElemsLen(header.pcs_len);
for (0..chunk_count) |_| {
const seen = r.interface.takeInt(usize, .little) catch |err| {
- fatal("step '{s}': failed to read from coverage file '{f}': {s}", .{
- cov.run.step.name, coverage_file_path, @errorName(err),
+ fatal("step '{s}': failed to read from coverage file '{f}': {t}", .{
+ cov.run.step.name, coverage_file_path, err,
});
};
seen_count += @popCount(seen);
lib/std/Build/Step.zig
@@ -362,7 +362,7 @@ pub fn captureChildProcess(
.allocator = arena,
.argv = argv,
.progress_node = progress_node,
- }) catch |err| return s.fail("failed to run {s}: {s}", .{ argv[0], @errorName(err) });
+ }) catch |err| return s.fail("failed to run {s}: {t}", .{ argv[0], err });
if (result.stderr.len > 0) {
try s.result_error_msgs.append(arena, result.stderr);
@@ -412,7 +412,7 @@ pub fn evalZigProcess(
error.BrokenPipe => {
// Process restart required.
const term = zp.child.wait() catch |e| {
- return s.fail("unable to wait for {s}: {s}", .{ argv[0], @errorName(e) });
+ return s.fail("unable to wait for {s}: {t}", .{ argv[0], e });
};
_ = term;
s.clearZigProcess(gpa);
@@ -428,7 +428,7 @@ pub fn evalZigProcess(
if (s.result_error_msgs.items.len > 0 and result == null) {
// Crash detected.
const term = zp.child.wait() catch |e| {
- return s.fail("unable to wait for {s}: {s}", .{ argv[0], @errorName(e) });
+ return s.fail("unable to wait for {s}: {t}", .{ argv[0], e });
};
s.result_peak_rss = zp.child.resource_usage_statistics.getMaxRss() orelse 0;
s.clearZigProcess(gpa);
@@ -453,9 +453,7 @@ pub fn evalZigProcess(
child.request_resource_usage_statistics = true;
child.progress_node = prog_node;
- child.spawn() catch |err| return s.fail("failed to spawn zig compiler {s}: {s}", .{
- argv[0], @errorName(err),
- });
+ child.spawn() catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err });
const zp = try gpa.create(ZigProcess);
zp.* = .{
@@ -480,7 +478,7 @@ pub fn evalZigProcess(
zp.child.stdin = null;
const term = zp.child.wait() catch |err| {
- return s.fail("unable to wait for {s}: {s}", .{ argv[0], @errorName(err) });
+ return s.fail("unable to wait for {s}: {t}", .{ argv[0], err });
};
s.result_peak_rss = zp.child.resource_usage_statistics.getMaxRss() orelse 0;
@@ -513,8 +511,8 @@ pub fn installFile(s: *Step, src_lazy_path: Build.LazyPath, dest_path: []const u
const src_path = src_lazy_path.getPath3(b, s);
try handleVerbose(b, null, &.{ "install", "-C", b.fmt("{f}", .{src_path}), dest_path });
return Io.Dir.updateFile(src_path.root_dir.handle.adaptToNewApi(), io, src_path.sub_path, .cwd(), dest_path, .{}) catch |err| {
- return s.fail("unable to update file from '{f}' to '{s}': {s}", .{
- src_path, dest_path, @errorName(err),
+ return s.fail("unable to update file from '{f}' to '{s}': {t}", .{
+ src_path, dest_path, err,
});
};
}
@@ -524,9 +522,7 @@ pub fn installDir(s: *Step, dest_path: []const u8) !std.fs.Dir.MakePathStatus {
const b = s.owner;
try handleVerbose(b, null, &.{ "install", "-d", dest_path });
return std.fs.cwd().makePathStatus(dest_path) catch |err| {
- return s.fail("unable to create dir '{s}': {s}", .{
- dest_path, @errorName(err),
- });
+ return s.fail("unable to create dir '{s}': {t}", .{ dest_path, err });
};
}
@@ -825,22 +821,27 @@ pub fn cacheHitAndWatch(s: *Step, man: *Build.Cache.Manifest) !bool {
return is_hit;
}
-fn failWithCacheError(s: *Step, man: *const Build.Cache.Manifest, err: Build.Cache.Manifest.HitError) error{ OutOfMemory, MakeFailed } {
+fn failWithCacheError(
+ s: *Step,
+ man: *const Build.Cache.Manifest,
+ err: Build.Cache.Manifest.HitError,
+) error{ OutOfMemory, Canceled, MakeFailed } {
switch (err) {
error.CacheCheckFailed => switch (man.diagnostic) {
.none => unreachable,
- .manifest_create, .manifest_read, .manifest_lock => |e| return s.fail("failed to check cache: {s} {s}", .{
- @tagName(man.diagnostic), @errorName(e),
+ .manifest_create, .manifest_read, .manifest_lock => |e| return s.fail("failed to check cache: {t} {t}", .{
+ man.diagnostic, e,
}),
.file_open, .file_stat, .file_read, .file_hash => |op| {
const pp = man.files.keys()[op.file_index].prefixed_path;
const prefix = man.cache.prefixes()[pp.prefix].path orelse "";
- return s.fail("failed to check cache: '{s}{c}{s}' {s} {s}", .{
- prefix, std.fs.path.sep, pp.sub_path, @tagName(man.diagnostic), @errorName(op.err),
+ return s.fail("failed to check cache: '{s}{c}{s}' {t} {t}", .{
+ prefix, std.fs.path.sep, pp.sub_path, man.diagnostic, op.err,
});
},
},
error.OutOfMemory => return error.OutOfMemory,
+ error.Canceled => return error.Canceled,
error.InvalidFormat => return s.fail("failed to check cache: invalid manifest file format", .{}),
}
}
@@ -850,7 +851,7 @@ fn failWithCacheError(s: *Step, man: *const Build.Cache.Manifest, err: Build.Cac
pub fn writeManifest(s: *Step, man: *Build.Cache.Manifest) !void {
if (s.test_results.isSuccess()) {
man.writeManifest() catch |err| {
- try s.addError("unable to write cache manifest: {s}", .{@errorName(err)});
+ try s.addError("unable to write cache manifest: {t}", .{err});
};
}
}
lib/std/Build/WebServer.zig
@@ -19,7 +19,7 @@ step_names_trailing: []u8,
step_status_bits: []u8,
fuzz: ?Fuzz,
-time_report_mutex: std.Thread.Mutex,
+time_report_mutex: Io.Mutex,
time_report_msgs: [][]u8,
time_report_update_times: []i64,
@@ -33,9 +33,9 @@ build_status: std.atomic.Value(abi.BuildStatus),
/// an unreasonable number of packets.
update_id: std.atomic.Value(u32),
-runner_request_mutex: std.Thread.Mutex,
-runner_request_ready_cond: std.Thread.Condition,
-runner_request_empty_cond: std.Thread.Condition,
+runner_request_mutex: Io.Mutex,
+runner_request_ready_cond: Io.Condition,
+runner_request_empty_cond: Io.Condition,
runner_request: ?RunnerRequest,
/// If a client is not explicitly notified of changes with `notifyUpdate`, it will be sent updates
@@ -114,14 +114,14 @@ pub fn init(opts: Options) WebServer {
.step_status_bits = step_status_bits,
.fuzz = null,
- .time_report_mutex = .{},
+ .time_report_mutex = .init,
.time_report_msgs = time_report_msgs,
.time_report_update_times = time_report_update_times,
.build_status = .init(.idle),
.update_id = .init(0),
- .runner_request_mutex = .{},
+ .runner_request_mutex = .init,
.runner_request_ready_cond = .{},
.runner_request_empty_cond = .{},
.runner_request = null,
@@ -296,6 +296,8 @@ fn accept(ws: *WebServer, stream: net.Stream) void {
}
fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn {
+ const io = ws.graph.io;
+
var prev_build_status = ws.build_status.load(.monotonic);
const prev_step_status_bits = try ws.gpa.alloc(u8, ws.step_status_bits.len);
@@ -331,8 +333,8 @@ fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn {
}
{
- ws.time_report_mutex.lock();
- defer ws.time_report_mutex.unlock();
+ try ws.time_report_mutex.lock(io);
+ defer ws.time_report_mutex.unlock(io);
for (ws.time_report_msgs, ws.time_report_update_times) |msg, update_time| {
if (update_time <= prev_time) continue;
// We want to send `msg`, but shouldn't block `ws.time_report_mutex` while we do, so
@@ -340,8 +342,8 @@ fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn {
const owned_msg = try ws.gpa.dupe(u8, msg);
defer ws.gpa.free(owned_msg);
// Temporarily unlock, then re-lock after the message is sent.
- ws.time_report_mutex.unlock();
- defer ws.time_report_mutex.lock();
+ ws.time_report_mutex.unlock(io);
+ defer ws.time_report_mutex.lockUncancelable(io);
try sock.writeMessage(owned_msg, .binary);
}
}
@@ -382,6 +384,8 @@ fn serveWebSocket(ws: *WebServer, sock: *http.Server.WebSocket) !noreturn {
}
}
fn recvWebSocketMessages(ws: *WebServer, sock: *http.Server.WebSocket) void {
+ const io = ws.graph.io;
+
while (true) {
const msg = sock.readSmallMessage() catch return;
if (msg.opcode != .binary) continue;
@@ -390,14 +394,16 @@ fn recvWebSocketMessages(ws: *WebServer, sock: *http.Server.WebSocket) void {
switch (tag) {
_ => continue,
.rebuild => while (true) {
- ws.runner_request_mutex.lock();
- defer ws.runner_request_mutex.unlock();
+ ws.runner_request_mutex.lock(io) catch |err| switch (err) {
+ error.Canceled => return,
+ };
+ defer ws.runner_request_mutex.unlock(io);
if (ws.runner_request == null) {
ws.runner_request = .rebuild;
- ws.runner_request_ready_cond.signal();
+ ws.runner_request_ready_cond.signal(io);
break;
}
- ws.runner_request_empty_cond.wait(&ws.runner_request_mutex);
+ ws.runner_request_empty_cond.wait(io, &ws.runner_request_mutex) catch return;
},
}
}
@@ -691,14 +697,15 @@ pub fn updateTimeReportCompile(ws: *WebServer, opts: struct {
trailing: []const u8,
}) void {
const gpa = ws.gpa;
+ const io = ws.graph.io;
const step_idx: u32 = for (ws.all_steps, 0..) |s, i| {
if (s == &opts.compile.step) break @intCast(i);
} else unreachable;
const old_buf = old: {
- ws.time_report_mutex.lock();
- defer ws.time_report_mutex.unlock();
+ ws.time_report_mutex.lock(io) catch return;
+ defer ws.time_report_mutex.unlock(io);
const old = ws.time_report_msgs[step_idx];
ws.time_report_msgs[step_idx] = &.{};
break :old old;
@@ -720,8 +727,8 @@ pub fn updateTimeReportCompile(ws: *WebServer, opts: struct {
@memcpy(buf[@sizeOf(abi.time_report.CompileResult)..], opts.trailing);
{
- ws.time_report_mutex.lock();
- defer ws.time_report_mutex.unlock();
+ ws.time_report_mutex.lock(io) catch return;
+ defer ws.time_report_mutex.unlock(io);
assert(ws.time_report_msgs[step_idx].len == 0);
ws.time_report_msgs[step_idx] = buf;
ws.time_report_update_times[step_idx] = ws.now();
@@ -731,14 +738,15 @@ pub fn updateTimeReportCompile(ws: *WebServer, opts: struct {
pub fn updateTimeReportGeneric(ws: *WebServer, step: *Build.Step, ns_total: u64) void {
const gpa = ws.gpa;
+ const io = ws.graph.io;
const step_idx: u32 = for (ws.all_steps, 0..) |s, i| {
if (s == step) break @intCast(i);
} else unreachable;
const old_buf = old: {
- ws.time_report_mutex.lock();
- defer ws.time_report_mutex.unlock();
+ ws.time_report_mutex.lock(io) catch return;
+ defer ws.time_report_mutex.unlock(io);
const old = ws.time_report_msgs[step_idx];
ws.time_report_msgs[step_idx] = &.{};
break :old old;
@@ -750,8 +758,8 @@ pub fn updateTimeReportGeneric(ws: *WebServer, step: *Build.Step, ns_total: u64)
.ns_total = ns_total,
};
{
- ws.time_report_mutex.lock();
- defer ws.time_report_mutex.unlock();
+ ws.time_report_mutex.lock(io) catch return;
+ defer ws.time_report_mutex.unlock(io);
assert(ws.time_report_msgs[step_idx].len == 0);
ws.time_report_msgs[step_idx] = buf;
ws.time_report_update_times[step_idx] = ws.now();
@@ -766,6 +774,7 @@ pub fn updateTimeReportRunTest(
ns_per_test: []const u64,
) void {
const gpa = ws.gpa;
+ const io = ws.graph.io;
const step_idx: u32 = for (ws.all_steps, 0..) |s, i| {
if (s == &run.step) break @intCast(i);
@@ -782,8 +791,8 @@ pub fn updateTimeReportRunTest(
break :len @sizeOf(abi.time_report.RunTestResult) + names_len + 8 * tests_len;
};
const old_buf = old: {
- ws.time_report_mutex.lock();
- defer ws.time_report_mutex.unlock();
+ ws.time_report_mutex.lock(io) catch return;
+ defer ws.time_report_mutex.unlock(io);
const old = ws.time_report_msgs[step_idx];
ws.time_report_msgs[step_idx] = &.{};
break :old old;
@@ -808,8 +817,8 @@ pub fn updateTimeReportRunTest(
assert(offset == buf.len);
{
- ws.time_report_mutex.lock();
- defer ws.time_report_mutex.unlock();
+ ws.time_report_mutex.lock(io) catch return;
+ defer ws.time_report_mutex.unlock(io);
assert(ws.time_report_msgs[step_idx].len == 0);
ws.time_report_msgs[step_idx] = buf;
ws.time_report_update_times[step_idx] = ws.now();
@@ -821,8 +830,9 @@ const RunnerRequest = union(enum) {
rebuild,
};
pub fn getRunnerRequest(ws: *WebServer) ?RunnerRequest {
- ws.runner_request_mutex.lock();
- defer ws.runner_request_mutex.unlock();
+ const io = ws.graph.io;
+ ws.runner_request_mutex.lock(io) catch return;
+ defer ws.runner_request_mutex.unlock(io);
if (ws.runner_request) |req| {
ws.runner_request = null;
ws.runner_request_empty_cond.signal();
@@ -830,16 +840,17 @@ pub fn getRunnerRequest(ws: *WebServer) ?RunnerRequest {
}
return null;
}
-pub fn wait(ws: *WebServer) RunnerRequest {
- ws.runner_request_mutex.lock();
- defer ws.runner_request_mutex.unlock();
+pub fn wait(ws: *WebServer) Io.Cancelable!RunnerRequest {
+ const io = ws.graph.io;
+ try ws.runner_request_mutex.lock(io);
+ defer ws.runner_request_mutex.unlock(io);
while (true) {
if (ws.runner_request) |req| {
ws.runner_request = null;
- ws.runner_request_empty_cond.signal();
+ ws.runner_request_empty_cond.signal(io);
return req;
}
- ws.runner_request_ready_cond.wait(&ws.runner_request_mutex);
+ try ws.runner_request_ready_cond.wait(io, &ws.runner_request_mutex);
}
}