master
1//! An implementation of file-system watching based on the `FSEventStream` API in macOS.
2//! While macOS supports kqueue, it does not allow detecting changes to files without
3//! placing watches on each individual file, meaning FD limits are reached incredibly
4//! quickly. The File System Events API works differently: it implements *recursive*
5//! directory watches, managed by a system service. Rather than being in libc, the API is
6//! exposed by the CoreServices framework. To avoid a compile dependency on the framework
7//! bundle, we dynamically load CoreServices with `std.DynLib`.
8//!
9//! While the logic in this file *is* specialized to `std.Build.Watch`, efforts have been
10//! made to keep that specialization to a minimum. Other use cases could be served with
11//! relatively minimal modifications to the `watch_paths` field and its usages (in
12//! particular the `setPaths` function). We avoid using the global GCD dispatch queue in
13//! favour of creating our own and synchronizing with an explicit semaphore, meaning this
14//! logic is thread-safe and does not affect process-global state.
15//!
16//! In theory, this API is quite good at avoiding filesystem race conditions. In practice,
17//! the logic that would avoid them is currently disabled, because the build system kind
18//! of relies on them at the time of writing to avoid redundant work -- see the comment at
19//! the top of `wait` for details.
20
21const enable_debug_logs = false;
22
23core_services: std.DynLib,
24resolved_symbols: ResolvedSymbols,
25
26paths_arena: std.heap.ArenaAllocator.State,
27/// The roots of the recursive watches. FSEvents has relatively small limits on the number
28/// of watched paths, so this slice must not be too long. The paths themselves are allocated
29/// into `paths_arena`, but this slice is allocated into the GPA.
30watch_roots: [][:0]const u8,
31/// All of the paths being watched. Value is the set of steps which depend on the file/directory.
32/// Keys and values are in `paths_arena`, but this map is allocated into the GPA.
33watch_paths: std.StringArrayHashMapUnmanaged([]const *std.Build.Step),
34
35/// The semaphore we use to block the thread calling `wait` until the callback determines a relevant
36/// event has occurred. This is retained across `wait` calls for simplicity and efficiency.
37waiting_semaphore: dispatch_semaphore_t,
38/// This dispatch queue is created by us and executes serially. It exists exclusively to trigger the
39/// callbacks of the FSEventStream we create. This is not in use outside of `wait`, but is retained
40/// across `wait` calls for simplicity and efficiency.
41dispatch_queue: dispatch_queue_t,
42/// In theory, this field avoids race conditions. In practice, it is essentially unused at the time
43/// of writing. See the comment at the start of `wait` for details.
44since_event: FSEventStreamEventId,
45
46/// All of the symbols we pull from the `dlopen`ed CoreServices framework. If any of these symbols
47/// is not present, `init` will close the framework and return an error.
48const ResolvedSymbols = struct {
49 FSEventStreamCreate: *const fn (
50 allocator: CFAllocatorRef,
51 callback: FSEventStreamCallback,
52 ctx: ?*const FSEventStreamContext,
53 paths_to_watch: CFArrayRef,
54 since_when: FSEventStreamEventId,
55 latency: CFTimeInterval,
56 flags: FSEventStreamCreateFlags,
57 ) callconv(.c) FSEventStreamRef,
58 FSEventStreamSetDispatchQueue: *const fn (stream: FSEventStreamRef, queue: dispatch_queue_t) callconv(.c) void,
59 FSEventStreamStart: *const fn (stream: FSEventStreamRef) callconv(.c) bool,
60 FSEventStreamStop: *const fn (stream: FSEventStreamRef) callconv(.c) void,
61 FSEventStreamInvalidate: *const fn (stream: FSEventStreamRef) callconv(.c) void,
62 FSEventStreamRelease: *const fn (stream: FSEventStreamRef) callconv(.c) void,
63 FSEventStreamGetLatestEventId: *const fn (stream: ConstFSEventStreamRef) callconv(.c) FSEventStreamEventId,
64 FSEventsGetCurrentEventId: *const fn () callconv(.c) FSEventStreamEventId,
65 CFRelease: *const fn (cf: *const anyopaque) callconv(.c) void,
66 CFArrayCreate: *const fn (
67 allocator: CFAllocatorRef,
68 values: [*]const usize,
69 num_values: CFIndex,
70 call_backs: ?*const CFArrayCallBacks,
71 ) callconv(.c) CFArrayRef,
72 CFStringCreateWithCString: *const fn (
73 alloc: CFAllocatorRef,
74 c_str: [*:0]const u8,
75 encoding: CFStringEncoding,
76 ) callconv(.c) CFStringRef,
77 CFAllocatorCreate: *const fn (allocator: CFAllocatorRef, context: *const CFAllocatorContext) callconv(.c) CFAllocatorRef,
78 kCFAllocatorUseContext: *const CFAllocatorRef,
79};
80
81pub fn init() error{ OpenFrameworkFailed, MissingCoreServicesSymbol }!FsEvents {
82 var core_services = std.DynLib.open("/System/Library/Frameworks/CoreServices.framework/CoreServices") catch
83 return error.OpenFrameworkFailed;
84 errdefer core_services.close();
85
86 var resolved_symbols: ResolvedSymbols = undefined;
87 inline for (@typeInfo(ResolvedSymbols).@"struct".fields) |f| {
88 @field(resolved_symbols, f.name) = core_services.lookup(f.type, f.name) orelse return error.MissingCoreServicesSymbol;
89 }
90
91 return .{
92 .core_services = core_services,
93 .resolved_symbols = resolved_symbols,
94 .paths_arena = .{},
95 .watch_roots = &.{},
96 .watch_paths = .empty,
97 .waiting_semaphore = dispatch_semaphore_create(0),
98 .dispatch_queue = dispatch_queue_create("zig-watch", .SERIAL),
99 // Not `.since_now`, because this means we can init `FsEvents` *before* we do work in order
100 // to notice any changes which happened during said work.
101 .since_event = resolved_symbols.FSEventsGetCurrentEventId(),
102 };
103}
104
105pub fn deinit(fse: *FsEvents, gpa: Allocator) void {
106 dispatch_release(fse.waiting_semaphore);
107 dispatch_release(fse.dispatch_queue);
108 fse.core_services.close();
109
110 gpa.free(fse.watch_roots);
111 fse.watch_paths.deinit(gpa);
112 {
113 var paths_arena = fse.paths_arena.promote(gpa);
114 paths_arena.deinit();
115 }
116}
117
118pub fn setPaths(fse: *FsEvents, gpa: Allocator, steps: []const *std.Build.Step) !void {
119 var paths_arena_instance = fse.paths_arena.promote(gpa);
120 defer fse.paths_arena = paths_arena_instance.state;
121 const paths_arena = paths_arena_instance.allocator();
122
123 const cwd_path = try std.process.getCwdAlloc(gpa);
124 defer gpa.free(cwd_path);
125
126 var need_dirs: std.StringArrayHashMapUnmanaged(void) = .empty;
127 defer need_dirs.deinit(gpa);
128
129 fse.watch_paths.clearRetainingCapacity();
130
131 // We take `step` by pointer for a slight memory optimization in a moment.
132 for (steps) |*step| {
133 for (step.*.inputs.table.keys(), step.*.inputs.table.values()) |path, *files| {
134 const resolved_dir = try std.fs.path.resolvePosix(paths_arena, &.{ cwd_path, path.root_dir.path orelse ".", path.sub_path });
135 try need_dirs.put(gpa, resolved_dir, {});
136 for (files.items) |file_name| {
137 const watch_path = if (std.mem.eql(u8, file_name, "."))
138 resolved_dir
139 else
140 try std.fs.path.join(paths_arena, &.{ resolved_dir, file_name });
141 const gop = try fse.watch_paths.getOrPut(gpa, watch_path);
142 if (gop.found_existing) {
143 const old_steps = gop.value_ptr.*;
144 const new_steps = try paths_arena.alloc(*std.Build.Step, old_steps.len + 1);
145 @memcpy(new_steps[0..old_steps.len], old_steps);
146 new_steps[old_steps.len] = step.*;
147 gop.value_ptr.* = new_steps;
148 } else {
149 // This is why we captured `step` by pointer! We can avoid allocating a slice of one
150 // step in the arena in the common case where a file is referenced by only one step.
151 gop.value_ptr.* = step[0..1];
152 }
153 }
154 }
155 }
156
157 {
158 // There's no point looking at directories inside other ones (e.g. "/foo" and "/foo/bar").
159 // To eliminate these, we'll re-add directories in order of path length with a redundancy check.
160 const old_dirs = try gpa.dupe([]const u8, need_dirs.keys());
161 defer gpa.free(old_dirs);
162 std.mem.sort([]const u8, old_dirs, {}, struct {
163 fn lessThan(ctx: void, a: []const u8, b: []const u8) bool {
164 ctx;
165 return std.mem.lessThan(u8, a, b);
166 }
167 }.lessThan);
168 need_dirs.clearRetainingCapacity();
169 for (old_dirs) |dir_path| {
170 var it: std.fs.path.ComponentIterator(.posix, u8) = .init(dir_path);
171 while (it.next()) |component| {
172 if (need_dirs.contains(component.path)) {
173 // this path is '/foo/bar/qux', but '/foo' or '/foo/bar' was already added
174 break;
175 }
176 } else {
177 need_dirs.putAssumeCapacityNoClobber(dir_path, {});
178 }
179 }
180 }
181
182 // `need_dirs` is now a set of directories to watch with no redundancy. In practice, this is very
183 // likely to have reduced it to a quite small set (e.g. it'll typically coalesce a full `src/`
184 // directory into one entry). However, the FSEventStream API has a fairly low undocumented limit
185 // on total watches (supposedly 4096), so we should handle the case where we exceed it. To be
186 // safe, because this API can be a little unpredictable, we'll cap ourselves a little *below*
187 // that known limit.
188 if (need_dirs.count() > 2048) {
189 // Fallback: watch the whole filesystem. This is excessive, but... it *works* :P
190 if (enable_debug_logs) watch_log.debug("too many dirs; recursively watching root", .{});
191 fse.watch_roots = try gpa.realloc(fse.watch_roots, 1);
192 fse.watch_roots[0] = "/";
193 } else {
194 fse.watch_roots = try gpa.realloc(fse.watch_roots, need_dirs.count());
195 for (fse.watch_roots, need_dirs.keys()) |*out, in| {
196 out.* = try paths_arena.dupeZ(u8, in);
197 }
198 }
199 if (enable_debug_logs) {
200 watch_log.debug("watching {d} paths using {d} recursive watches:", .{ fse.watch_paths.count(), fse.watch_roots.len });
201 for (fse.watch_roots) |dir_path| {
202 watch_log.debug("- '{s}'", .{dir_path});
203 }
204 }
205}
206
207pub fn wait(fse: *FsEvents, gpa: Allocator, timeout_ns: ?u64) error{ OutOfMemory, StartFailed }!std.Build.Watch.WaitResult {
208 if (fse.watch_roots.len == 0) @panic("nothing to watch");
209
210 const rs = fse.resolved_symbols;
211
212 // At the time of writing, using `since_event` in the obvious way causes redundant rebuilds
213 // to occur, because one step modifies a file which is an input to another step. The solution
214 // to this problem will probably be either:
215 //
216 // a) Don't include the output of one step as a watch input of another; only mark external
217 // files as watch inputs. Or...
218 //
219 // b) Note the current event ID when a step begins, and disregard events preceding that ID
220 // when considering whether to dirty that step in `eventCallback`.
221 //
222 // For now, to avoid the redundant rebuilds, we bypass this `since_event` mechanism. This does
223 // introduce race conditions, but the other `std.Build.Watch` implementations suffer from those
224 // too at the time of writing, so this is kind of expected.
225 fse.since_event = .since_now;
226
227 const cf_allocator = rs.CFAllocatorCreate(rs.kCFAllocatorUseContext.*, &.{
228 .version = 0,
229 .info = @constCast(&gpa),
230 .retain = null,
231 .release = null,
232 .copy_description = null,
233 .allocate = &cf_alloc_callbacks.allocate,
234 .reallocate = &cf_alloc_callbacks.reallocate,
235 .deallocate = &cf_alloc_callbacks.deallocate,
236 .preferred_size = null,
237 }) orelse return error.OutOfMemory;
238 defer rs.CFRelease(cf_allocator);
239
240 const cf_paths = try gpa.alloc(?CFStringRef, fse.watch_roots.len);
241 @memset(cf_paths, null);
242 defer {
243 for (cf_paths) |o| if (o) |p| rs.CFRelease(p);
244 gpa.free(cf_paths);
245 }
246 for (fse.watch_roots, cf_paths) |raw_path, *cf_path| {
247 cf_path.* = rs.CFStringCreateWithCString(cf_allocator, raw_path, .utf8);
248 }
249 const cf_paths_array = rs.CFArrayCreate(cf_allocator, @ptrCast(cf_paths), @intCast(cf_paths.len), null);
250 defer rs.CFRelease(cf_paths_array);
251
252 const callback_ctx: EventCallbackCtx = .{
253 .fse = fse,
254 .gpa = gpa,
255 };
256 const event_stream = rs.FSEventStreamCreate(
257 null,
258 &eventCallback,
259 &.{
260 .version = 0,
261 .info = @constCast(&callback_ctx),
262 .retain = null,
263 .release = null,
264 .copy_description = null,
265 },
266 cf_paths_array,
267 fse.since_event,
268 0.05, // 0.05s latency; higher values increase efficiency by coalescing more events
269 .{ .watch_root = true, .file_events = true },
270 );
271 defer rs.FSEventStreamRelease(event_stream);
272 rs.FSEventStreamSetDispatchQueue(event_stream, fse.dispatch_queue);
273 defer rs.FSEventStreamInvalidate(event_stream);
274 if (!rs.FSEventStreamStart(event_stream)) return error.StartFailed;
275 defer rs.FSEventStreamStop(event_stream);
276 const result = dispatch_semaphore_wait(fse.waiting_semaphore, timeout: {
277 const ns = timeout_ns orelse break :timeout .forever;
278 break :timeout dispatch_time(.now, @intCast(ns));
279 });
280 return switch (result) {
281 0 => .dirty,
282 else => .timeout,
283 };
284}
285
286const cf_alloc_callbacks = struct {
287 const log = std.log.scoped(.cf_alloc);
288 fn allocate(size: CFIndex, hint: CFOptionFlags, info: ?*const anyopaque) callconv(.c) ?*const anyopaque {
289 if (enable_debug_logs) log.debug("allocate {d}", .{size});
290 _ = hint;
291 const gpa: *const Allocator = @ptrCast(@alignCast(info));
292 const mem = gpa.alignedAlloc(u8, .of(usize), @intCast(size + @sizeOf(usize))) catch return null;
293 const metadata: *usize = @ptrCast(mem);
294 metadata.* = @intCast(size);
295 return mem[@sizeOf(usize)..].ptr;
296 }
297 fn reallocate(ptr: ?*anyopaque, new_size: CFIndex, hint: CFOptionFlags, info: ?*const anyopaque) callconv(.c) ?*const anyopaque {
298 if (enable_debug_logs) log.debug("reallocate @{*} {d}", .{ ptr, new_size });
299 _ = hint;
300 if (ptr == null or new_size == 0) return null; // not a bug: documentation explicitly states that realloc on NULL should return NULL
301 const gpa: *const Allocator = @ptrCast(@alignCast(info));
302 const old_base: [*]align(@alignOf(usize)) u8 = @alignCast(@as([*]u8, @ptrCast(ptr)) - @sizeOf(usize));
303 const old_size = @as(*const usize, @ptrCast(old_base)).*;
304 const old_mem = old_base[0 .. old_size + @sizeOf(usize)];
305 const new_mem = gpa.realloc(old_mem, @intCast(new_size + @sizeOf(usize))) catch return null;
306 const metadata: *usize = @ptrCast(new_mem);
307 metadata.* = @intCast(new_size);
308 return new_mem[@sizeOf(usize)..].ptr;
309 }
310 fn deallocate(ptr: *anyopaque, info: ?*const anyopaque) callconv(.c) void {
311 if (enable_debug_logs) log.debug("deallocate @{*}", .{ptr});
312 const gpa: *const Allocator = @ptrCast(@alignCast(info));
313 const old_base: [*]align(@alignOf(usize)) u8 = @alignCast(@as([*]u8, @ptrCast(ptr)) - @sizeOf(usize));
314 const old_size = @as(*const usize, @ptrCast(old_base)).*;
315 const old_mem = old_base[0 .. old_size + @sizeOf(usize)];
316 gpa.free(old_mem);
317 }
318};
319
320const EventCallbackCtx = struct {
321 fse: *FsEvents,
322 gpa: Allocator,
323};
324
325fn eventCallback(
326 stream: ConstFSEventStreamRef,
327 client_callback_info: ?*anyopaque,
328 num_events: usize,
329 events_paths_ptr: *anyopaque,
330 events_flags_ptr: [*]const FSEventStreamEventFlags,
331 events_ids_ptr: [*]const FSEventStreamEventId,
332) callconv(.c) void {
333 const ctx: *const EventCallbackCtx = @ptrCast(@alignCast(client_callback_info));
334 const fse = ctx.fse;
335 const gpa = ctx.gpa;
336 const rs = fse.resolved_symbols;
337 const events_paths_ptr_casted: [*]const [*:0]const u8 = @ptrCast(@alignCast(events_paths_ptr));
338 const events_paths = events_paths_ptr_casted[0..num_events];
339 const events_ids = events_ids_ptr[0..num_events];
340 const events_flags = events_flags_ptr[0..num_events];
341 var any_dirty = false;
342 for (events_paths, events_ids, events_flags) |event_path_nts, event_id, event_flags| {
343 _ = event_id;
344 if (event_flags.history_done) continue; // sentinel
345 const event_path = std.mem.span(event_path_nts);
346 switch (event_flags.must_scan_sub_dirs) {
347 false => {
348 if (fse.watch_paths.get(event_path)) |steps| {
349 assert(steps.len > 0);
350 for (steps) |s| dirtyStep(s, gpa, &any_dirty);
351 }
352 if (std.fs.path.dirname(event_path)) |event_dirname| {
353 // Modifying '/foo/bar' triggers the watch on '/foo'.
354 if (fse.watch_paths.get(event_dirname)) |steps| {
355 assert(steps.len > 0);
356 for (steps) |s| dirtyStep(s, gpa, &any_dirty);
357 }
358 }
359 },
360 true => {
361 // This is unlikely, but can occasionally happen when bottlenecked: events have been
362 // coalesced into one. We want to see if any of these events are actually relevant
363 // to us. The only way we can reasonably do that in this rare edge case is iterate
364 // the watch paths and see if any is under this directory. That's acceptable because
365 // we would otherwise kick off a rebuild which would be clearing those paths anyway.
366 const changed_path = std.fs.path.dirname(event_path) orelse event_path;
367 for (fse.watch_paths.keys(), fse.watch_paths.values()) |watching_path, steps| {
368 if (dirStartsWith(watching_path, changed_path)) {
369 for (steps) |s| dirtyStep(s, gpa, &any_dirty);
370 }
371 }
372 },
373 }
374 }
375 if (any_dirty) {
376 fse.since_event = rs.FSEventStreamGetLatestEventId(stream);
377 _ = dispatch_semaphore_signal(fse.waiting_semaphore);
378 }
379}
380fn dirtyStep(s: *std.Build.Step, gpa: Allocator, any_dirty: *bool) void {
381 if (s.state == .precheck_done) return;
382 s.recursiveReset(gpa);
383 any_dirty.* = true;
384}
385fn dirStartsWith(path: []const u8, prefix: []const u8) bool {
386 if (std.mem.eql(u8, path, prefix)) return true;
387 if (!std.mem.startsWith(u8, path, prefix)) return false;
388 if (path[prefix.len] != '/') return false; // `path` is `/foo/barx`, `prefix` is `/foo/bar`
389 return true; // `path` is `/foo/bar/...`, `prefix` is `/foo/bar`
390}
391
392const dispatch_time_t = enum(u64) {
393 now = 0,
394 forever = std.math.maxInt(u64),
395 _,
396};
397extern fn dispatch_time(base: dispatch_time_t, delta_ns: i64) dispatch_time_t;
398
399const dispatch_semaphore_t = *opaque {};
400extern fn dispatch_semaphore_create(value: isize) dispatch_semaphore_t;
401extern fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) isize;
402extern fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) isize;
403
404const dispatch_queue_t = *opaque {};
405const dispatch_queue_attr_t = ?*opaque {
406 const SERIAL: dispatch_queue_attr_t = null;
407};
408extern fn dispatch_queue_create(label: [*:0]const u8, attr: dispatch_queue_attr_t) dispatch_queue_t;
409extern fn dispatch_release(object: *anyopaque) void;
410
411const CFAllocatorRef = ?*const opaque {};
412const CFArrayRef = *const opaque {};
413const CFStringRef = *const opaque {};
414const CFTimeInterval = f64;
415const CFIndex = i32;
416const CFOptionFlags = enum(u32) { _ };
417const CFAllocatorRetainCallBack = *const fn (info: ?*const anyopaque) callconv(.c) *const anyopaque;
418const CFAllocatorReleaseCallBack = *const fn (info: ?*const anyopaque) callconv(.c) void;
419const CFAllocatorCopyDescriptionCallBack = *const fn (info: ?*const anyopaque) callconv(.c) CFStringRef;
420const CFAllocatorAllocateCallBack = *const fn (alloc_size: CFIndex, hint: CFOptionFlags, info: ?*const anyopaque) callconv(.c) ?*const anyopaque;
421const CFAllocatorReallocateCallBack = *const fn (ptr: ?*anyopaque, new_size: CFIndex, hint: CFOptionFlags, info: ?*const anyopaque) callconv(.c) ?*const anyopaque;
422const CFAllocatorDeallocateCallBack = *const fn (ptr: *anyopaque, info: ?*const anyopaque) callconv(.c) void;
423const CFAllocatorPreferredSizeCallBack = *const fn (size: CFIndex, hint: CFOptionFlags, info: ?*const anyopaque) callconv(.c) CFIndex;
424const CFAllocatorContext = extern struct {
425 version: CFIndex,
426 info: ?*anyopaque,
427 retain: ?CFAllocatorRetainCallBack,
428 release: ?CFAllocatorReleaseCallBack,
429 copy_description: ?CFAllocatorCopyDescriptionCallBack,
430 allocate: CFAllocatorAllocateCallBack,
431 reallocate: ?CFAllocatorReallocateCallBack,
432 deallocate: ?CFAllocatorDeallocateCallBack,
433 preferred_size: ?CFAllocatorPreferredSizeCallBack,
434};
435const CFArrayCallBacks = opaque {};
436const CFStringEncoding = enum(u32) {
437 invalid_id = std.math.maxInt(u32),
438 mac_roman = 0,
439 windows_latin_1 = 0x500,
440 iso_latin_1 = 0x201,
441 next_step_latin = 0xB01,
442 ascii = 0x600,
443 unicode = 0x100,
444 utf8 = 0x8000100,
445 non_lossy_ascii = 0xBFF,
446};
447
448const FSEventStreamRef = *opaque {};
449const ConstFSEventStreamRef = *const @typeInfo(FSEventStreamRef).pointer.child;
450const FSEventStreamCallback = *const fn (
451 stream: ConstFSEventStreamRef,
452 client_callback_info: ?*anyopaque,
453 num_events: usize,
454 event_paths: *anyopaque,
455 event_flags: [*]const FSEventStreamEventFlags,
456 event_ids: [*]const FSEventStreamEventId,
457) callconv(.c) void;
458const FSEventStreamContext = extern struct {
459 version: CFIndex,
460 info: ?*anyopaque,
461 retain: ?CFAllocatorRetainCallBack,
462 release: ?CFAllocatorReleaseCallBack,
463 copy_description: ?CFAllocatorCopyDescriptionCallBack,
464};
465const FSEventStreamEventId = enum(u64) {
466 since_now = std.math.maxInt(u64),
467 _,
468};
469const FSEventStreamCreateFlags = packed struct(u32) {
470 use_cf_types: bool = false,
471 no_defer: bool = false,
472 watch_root: bool = false,
473 ignore_self: bool = false,
474 file_events: bool = false,
475 _: u27 = 0,
476};
477const FSEventStreamEventFlags = packed struct(u32) {
478 must_scan_sub_dirs: bool,
479 user_dropped: bool,
480 kernel_dropped: bool,
481 event_ids_wrapped: bool,
482 history_done: bool,
483 root_changed: bool,
484 mount: bool,
485 unmount: bool,
486 _: u24 = 0,
487};
488
489const std = @import("std");
490const assert = std.debug.assert;
491const Allocator = std.mem.Allocator;
492const watch_log = std.log.scoped(.watch);
493const FsEvents = @This();