Commit 20f5f56986
Changed files (1)
lib
std
event
lib/std/event/fs.zig
@@ -720,602 +720,602 @@ fn hashString(s: []const u16) u32 {
return @truncate(u32, std.hash.Wyhash.hash(0, @sliceToBytes(s)));
}
-//pub const WatchEventError = error{
-// UserResourceLimitReached,
-// SystemResources,
-// AccessDenied,
-// Unexpected, // TODO remove this possibility
-//};
-//
-//pub fn Watch(comptime V: type) type {
-// return struct {
-// channel: *event.Channel(Event.Error!Event),
-// os_data: OsData,
-//
-// const OsData = switch (builtin.os) {
-// .macosx, .freebsd, .netbsd, .dragonfly => struct {
-// file_table: FileTable,
-// table_lock: event.Lock,
-//
-// const FileTable = std.StringHashmap(*Put);
-// const Put = struct {
-// putter: anyframe,
-// value_ptr: *V,
-// };
-// },
-//
-// .linux => LinuxOsData,
-// .windows => WindowsOsData,
-//
-// else => @compileError("Unsupported OS"),
-// };
-//
-// const WindowsOsData = struct {
-// table_lock: event.Lock,
-// dir_table: DirTable,
-// all_putters: std.atomic.Queue(anyframe),
-// ref_count: std.atomic.Int(usize),
-//
-// const DirTable = std.StringHashMap(*Dir);
-// const FileTable = std.HashMap([]const u16, V, hashString, eqlString);
-//
-// const Dir = struct {
-// putter: anyframe,
-// file_table: FileTable,
-// table_lock: event.Lock,
-// };
-// };
-//
-// const LinuxOsData = struct {
-// putter: anyframe,
-// inotify_fd: i32,
-// wd_table: WdTable,
-// table_lock: event.Lock,
-//
-// const WdTable = std.AutoHashMap(i32, Dir);
-// const FileTable = std.StringHashMap(V);
-//
-// const Dir = struct {
-// dirname: []const u8,
-// file_table: FileTable,
-// };
-// };
-//
-// const FileToHandle = std.StringHashMap(anyframe);
-//
-// const Self = @This();
-//
-// pub const Event = struct {
-// id: Id,
-// data: V,
-//
-// pub const Id = WatchEventId;
-// pub const Error = WatchEventError;
-// };
-//
-// pub fn create(loop: *Loop, event_buf_count: usize) !*Self {
-// const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count);
-// errdefer channel.destroy();
-//
-// switch (builtin.os) {
-// .linux => {
-// const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC);
-// errdefer os.close(inotify_fd);
-//
-// var result: *Self = undefined;
-// _ = try async<loop.allocator> linuxEventPutter(inotify_fd, channel, &result);
-// return result;
-// },
-//
-// .windows => {
-// const self = try loop.allocator.create(Self);
-// errdefer loop.allocator.destroy(self);
-// self.* = Self{
-// .channel = channel,
-// .os_data = OsData{
-// .table_lock = event.Lock.init(loop),
-// .dir_table = OsData.DirTable.init(loop.allocator),
-// .ref_count = std.atomic.Int(usize).init(1),
-// .all_putters = std.atomic.Queue(anyframe).init(),
-// },
-// };
-// return self;
-// },
-//
-// .macosx, .freebsd, .netbsd, .dragonfly => {
-// const self = try loop.allocator.create(Self);
-// errdefer loop.allocator.destroy(self);
-//
-// self.* = Self{
-// .channel = channel,
-// .os_data = OsData{
-// .table_lock = event.Lock.init(loop),
-// .file_table = OsData.FileTable.init(loop.allocator),
-// },
-// };
-// return self;
-// },
-// else => @compileError("Unsupported OS"),
-// }
-// }
-//
-// /// All addFile calls and removeFile calls must have completed.
-// pub fn destroy(self: *Self) void {
-// switch (builtin.os) {
-// .macosx, .freebsd, .netbsd, .dragonfly => {
-// // TODO we need to cancel the frames before destroying the lock
-// self.os_data.table_lock.deinit();
-// var it = self.os_data.file_table.iterator();
-// while (it.next()) |entry| {
-// cancel entry.value.putter;
-// self.channel.loop.allocator.free(entry.key);
-// }
-// self.channel.destroy();
-// },
-// .linux => cancel self.os_data.putter,
-// .windows => {
-// while (self.os_data.all_putters.get()) |putter_node| {
-// cancel putter_node.data;
-// }
-// self.deref();
-// },
-// else => @compileError("Unsupported OS"),
-// }
-// }
-//
-// fn ref(self: *Self) void {
-// _ = self.os_data.ref_count.incr();
-// }
-//
-// fn deref(self: *Self) void {
-// if (self.os_data.ref_count.decr() == 1) {
-// const allocator = self.channel.loop.allocator;
-// self.os_data.table_lock.deinit();
-// var it = self.os_data.dir_table.iterator();
-// while (it.next()) |entry| {
-// allocator.free(entry.key);
-// allocator.destroy(entry.value);
-// }
-// self.os_data.dir_table.deinit();
-// self.channel.destroy();
-// allocator.destroy(self);
-// }
-// }
-//
-// pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
-// switch (builtin.os) {
-// .macosx, .freebsd, .netbsd, .dragonfly => return await (async addFileKEvent(self, file_path, value) catch unreachable),
-// .linux => return await (async addFileLinux(self, file_path, value) catch unreachable),
-// .windows => return await (async addFileWindows(self, file_path, value) catch unreachable),
-// else => @compileError("Unsupported OS"),
-// }
-// }
-//
-// async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
-// const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path});
-// var resolved_path_consumed = false;
-// defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path);
-//
-// var close_op = try CloseOperation.start(self.channel.loop);
-// var close_op_consumed = false;
-// defer if (!close_op_consumed) close_op.finish();
-//
-// const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0;
-// const mode = 0;
-// const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable);
-// close_op.setHandle(fd);
-//
-// var put_data: *OsData.Put = undefined;
-// const putter = try async self.kqPutEvents(close_op, value, &put_data);
-// close_op_consumed = true;
-// errdefer cancel putter;
-//
-// const result = blk: {
-// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
-// defer held.release();
-//
-// const gop = try self.os_data.file_table.getOrPut(resolved_path);
-// if (gop.found_existing) {
-// const prev_value = gop.kv.value.value_ptr.*;
-// cancel gop.kv.value.putter;
-// gop.kv.value = put_data;
-// break :blk prev_value;
-// } else {
-// resolved_path_consumed = true;
-// gop.kv.value = put_data;
-// break :blk null;
-// }
-// };
-//
-// return result;
-// }
-//
-// async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void {
-// var value_copy = value;
-// var put = OsData.Put{
-// .putter = @frame(),
-// .value_ptr = &value_copy,
-// };
-// out_put.* = &put;
-// self.channel.loop.beginOneEvent();
-//
-// defer {
-// close_op.finish();
-// self.channel.loop.finishOneEvent();
-// }
-//
-// while (true) {
-// if (await (async self.channel.loop.bsdWaitKev(
-// @intCast(usize, close_op.getHandle()),
-// os.EVFILT_VNODE,
-// os.NOTE_WRITE | os.NOTE_DELETE,
-// ) catch unreachable)) |kev| {
-// // TODO handle EV_ERROR
-// if (kev.fflags & os.NOTE_DELETE != 0) {
-// await (async self.channel.put(Self.Event{
-// .id = Event.Id.Delete,
-// .data = value_copy,
-// }) catch unreachable);
-// } else if (kev.fflags & os.NOTE_WRITE != 0) {
-// await (async self.channel.put(Self.Event{
-// .id = Event.Id.CloseWrite,
-// .data = value_copy,
-// }) catch unreachable);
-// }
-// } else |err| switch (err) {
-// error.EventNotFound => unreachable,
-// error.ProcessNotFound => unreachable,
-// error.Overflow => unreachable,
-// error.AccessDenied, error.SystemResources => |casted_err| {
-// await (async self.channel.put(casted_err) catch unreachable);
-// },
-// }
-// }
-// }
-//
-// async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
-// const value_copy = value;
-//
-// const dirname = std.fs.path.dirname(file_path) orelse ".";
-// const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname);
-// var dirname_with_null_consumed = false;
-// defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null);
-//
-// const basename = std.fs.path.basename(file_path);
-// const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename);
-// var basename_with_null_consumed = false;
-// defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null);
-//
-// const wd = try os.inotify_add_watchC(
-// self.os_data.inotify_fd,
-// dirname_with_null.ptr,
-// os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK,
-// );
-// // wd is either a newly created watch or an existing one.
-//
-// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
-// defer held.release();
-//
-// const gop = try self.os_data.wd_table.getOrPut(wd);
-// if (!gop.found_existing) {
-// gop.kv.value = OsData.Dir{
-// .dirname = dirname_with_null,
-// .file_table = OsData.FileTable.init(self.channel.loop.allocator),
-// };
-// dirname_with_null_consumed = true;
-// }
-// const dir = &gop.kv.value;
-//
-// const file_table_gop = try dir.file_table.getOrPut(basename_with_null);
-// if (file_table_gop.found_existing) {
-// const prev_value = file_table_gop.kv.value;
-// file_table_gop.kv.value = value_copy;
-// return prev_value;
-// } else {
-// file_table_gop.kv.value = value_copy;
-// basename_with_null_consumed = true;
-// return null;
-// }
-// }
-//
-// async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
-// const value_copy = value;
-// // TODO we might need to convert dirname and basename to canonical file paths ("short"?)
-//
-// const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse ".");
-// var dirname_consumed = false;
-// defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname);
-//
-// const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname);
-// defer self.channel.loop.allocator.free(dirname_utf16le);
-//
-// // TODO https://github.com/ziglang/zig/issues/265
-// const basename = std.fs.path.basename(file_path);
-// const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename);
-// var basename_utf16le_null_consumed = false;
-// defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null);
-// const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1];
-//
-// const dir_handle = try windows.CreateFileW(
-// dirname_utf16le.ptr,
-// windows.FILE_LIST_DIRECTORY,
-// windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE,
-// null,
-// windows.OPEN_EXISTING,
-// windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED,
-// null,
-// );
-// var dir_handle_consumed = false;
-// defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle);
-//
-// const held = await (async self.os_data.table_lock.acquire() catch unreachable);
-// defer held.release();
-//
-// const gop = try self.os_data.dir_table.getOrPut(dirname);
-// if (gop.found_existing) {
-// const dir = gop.kv.value;
-// const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable);
-// defer held_dir_lock.release();
-//
-// const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null);
-// if (file_gop.found_existing) {
-// const prev_value = file_gop.kv.value;
-// file_gop.kv.value = value_copy;
-// return prev_value;
-// } else {
-// file_gop.kv.value = value_copy;
-// basename_utf16le_null_consumed = true;
-// return null;
-// }
-// } else {
-// errdefer _ = self.os_data.dir_table.remove(dirname);
-// const dir = try self.channel.loop.allocator.create(OsData.Dir);
-// errdefer self.channel.loop.allocator.destroy(dir);
-//
-// dir.* = OsData.Dir{
-// .file_table = OsData.FileTable.init(self.channel.loop.allocator),
-// .table_lock = event.Lock.init(self.channel.loop),
-// .putter = undefined,
-// };
-// gop.kv.value = dir;
-// assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null);
-// basename_utf16le_null_consumed = true;
-//
-// dir.putter = try async self.windowsDirReader(dir_handle, dir);
-// dir_handle_consumed = true;
-//
-// dirname_consumed = true;
-//
-// return null;
-// }
-// }
-//
-// async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
-// self.ref();
-// defer self.deref();
-//
-// defer os.close(dir_handle);
-//
-// var putter_node = std.atomic.Queue(anyframe).Node{
-// .data = @frame(),
-// .prev = null,
-// .next = null,
-// };
-// self.os_data.all_putters.put(&putter_node);
-// defer _ = self.os_data.all_putters.remove(&putter_node);
-//
-// var resume_node = Loop.ResumeNode.Basic{
-// .base = Loop.ResumeNode{
-// .id = Loop.ResumeNode.Id.Basic,
-// .handle = @frame(),
-// .overlapped = windows.OVERLAPPED{
-// .Internal = 0,
-// .InternalHigh = 0,
-// .Offset = 0,
-// .OffsetHigh = 0,
-// .hEvent = null,
-// },
-// },
-// };
-// var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
-//
-// // TODO handle this error not in the channel but in the setup
-// _ = windows.CreateIoCompletionPort(
-// dir_handle,
-// self.channel.loop.os_data.io_port,
-// undefined,
-// undefined,
-// ) catch |err| {
-// await (async self.channel.put(err) catch unreachable);
-// return;
-// };
-//
-// while (true) {
-// {
-// // TODO only 1 beginOneEvent for the whole function
-// self.channel.loop.beginOneEvent();
-// errdefer self.channel.loop.finishOneEvent();
-// errdefer {
-// _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped);
-// }
-// suspend {
-// _ = windows.kernel32.ReadDirectoryChangesW(
-// dir_handle,
-// &event_buf,
-// @intCast(windows.DWORD, event_buf.len),
-// windows.FALSE, // watch subtree
-// windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME |
-// windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE |
-// windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
-// windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
-// null, // number of bytes transferred (unused for async)
-// &resume_node.base.overlapped,
-// null, // completion routine - unused because we use IOCP
-// );
-// }
-// }
-// var bytes_transferred: windows.DWORD = undefined;
-// if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
-// const err = switch (windows.kernel32.GetLastError()) {
-// else => |err| windows.unexpectedError(err),
-// };
-// await (async self.channel.put(err) catch unreachable);
-// } else {
-// // can't use @bytesToSlice because of the special variable length name field
-// var ptr = event_buf[0..].ptr;
-// const end_ptr = ptr + bytes_transferred;
-// var ev: *windows.FILE_NOTIFY_INFORMATION = undefined;
-// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) {
-// ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr);
-// const emit = switch (ev.Action) {
-// windows.FILE_ACTION_REMOVED => WatchEventId.Delete,
-// windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite,
-// else => null,
-// };
-// if (emit) |id| {
-// const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2];
-// const user_value = blk: {
-// const held = await (async dir.table_lock.acquire() catch unreachable);
-// defer held.release();
-//
-// if (dir.file_table.get(basename_utf16le)) |entry| {
-// break :blk entry.value;
-// } else {
-// break :blk null;
-// }
-// };
-// if (user_value) |v| {
-// await (async self.channel.put(Event{
-// .id = id,
-// .data = v,
-// }) catch unreachable);
-// }
-// }
-// if (ev.NextEntryOffset == 0) break;
-// }
-// }
-// }
-// }
-//
-// pub async fn removeFile(self: *Self, file_path: []const u8) ?V {
-// @panic("TODO");
-// }
-//
-// async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void {
-// const loop = channel.loop;
-//
-// var watch = Self{
-// .channel = channel,
-// .os_data = OsData{
-// .putter = @frame(),
-// .inotify_fd = inotify_fd,
-// .wd_table = OsData.WdTable.init(loop.allocator),
-// .table_lock = event.Lock.init(loop),
-// },
-// };
-// out_watch.* = &watch;
-//
-// loop.beginOneEvent();
-//
-// defer {
-// watch.os_data.table_lock.deinit();
-// var wd_it = watch.os_data.wd_table.iterator();
-// while (wd_it.next()) |wd_entry| {
-// var file_it = wd_entry.value.file_table.iterator();
-// while (file_it.next()) |file_entry| {
-// loop.allocator.free(file_entry.key);
-// }
-// loop.allocator.free(wd_entry.value.dirname);
-// }
-// loop.finishOneEvent();
-// os.close(inotify_fd);
-// channel.destroy();
-// }
-//
-// var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
-//
-// while (true) {
-// const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len);
-// const errno = os.linux.getErrno(rc);
-// switch (errno) {
-// 0 => {
-// // can't use @bytesToSlice because of the special variable length name field
-// var ptr = event_buf[0..].ptr;
-// const end_ptr = ptr + event_buf.len;
-// var ev: *os.linux.inotify_event = undefined;
-// while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) {
-// ev = @ptrCast(*os.linux.inotify_event, ptr);
-// if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
-// const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
-// const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1];
-// const user_value = blk: {
-// const held = await (async watch.os_data.table_lock.acquire() catch unreachable);
-// defer held.release();
-//
-// const dir = &watch.os_data.wd_table.get(ev.wd).?.value;
-// if (dir.file_table.get(basename_with_null)) |entry| {
-// break :blk entry.value;
-// } else {
-// break :blk null;
-// }
-// };
-// if (user_value) |v| {
-// await (async channel.put(Event{
-// .id = WatchEventId.CloseWrite,
-// .data = v,
-// }) catch unreachable);
-// }
-// }
-// }
-// },
-// os.linux.EINTR => continue,
-// os.linux.EINVAL => unreachable,
-// os.linux.EFAULT => unreachable,
-// os.linux.EAGAIN => {
-// (await (async loop.linuxWaitFd(
-// inotify_fd,
-// os.linux.EPOLLET | os.linux.EPOLLIN,
-// ) catch unreachable)) catch |err| {
-// const transformed_err = switch (err) {
-// error.FileDescriptorAlreadyPresentInSet => unreachable,
-// error.OperationCausesCircularLoop => unreachable,
-// error.FileDescriptorNotRegistered => unreachable,
-// error.FileDescriptorIncompatibleWithEpoll => unreachable,
-// error.Unexpected => unreachable,
-// else => |e| e,
-// };
-// await (async channel.put(transformed_err) catch unreachable);
-// };
-// },
-// else => unreachable,
-// }
-// }
-// }
-// };
-//}
+pub const WatchEventError = error{
+ UserResourceLimitReached,
+ SystemResources,
+ AccessDenied,
+ Unexpected, // TODO remove this possibility
+};
+
+pub fn Watch(comptime V: type) type {
+ return struct {
+ channel: *event.Channel(Event.Error!Event),
+ os_data: OsData,
+
+ const OsData = switch (builtin.os) {
+ .macosx, .freebsd, .netbsd, .dragonfly => struct {
+ file_table: FileTable,
+ table_lock: event.Lock,
+
+ const FileTable = std.StringHashmap(*Put);
+ const Put = struct {
+ putter: anyframe,
+ value_ptr: *V,
+ };
+ },
+
+ .linux => LinuxOsData,
+ .windows => WindowsOsData,
+
+ else => @compileError("Unsupported OS"),
+ };
+
+ const WindowsOsData = struct {
+ table_lock: event.Lock,
+ dir_table: DirTable,
+ all_putters: std.atomic.Queue(anyframe),
+ ref_count: std.atomic.Int(usize),
+
+ const DirTable = std.StringHashMap(*Dir);
+ const FileTable = std.HashMap([]const u16, V, hashString, eqlString);
+
+ const Dir = struct {
+ putter: anyframe,
+ file_table: FileTable,
+ table_lock: event.Lock,
+ };
+ };
+
+ const LinuxOsData = struct {
+ putter: anyframe,
+ inotify_fd: i32,
+ wd_table: WdTable,
+ table_lock: event.Lock,
+
+ const WdTable = std.AutoHashMap(i32, Dir);
+ const FileTable = std.StringHashMap(V);
+
+ const Dir = struct {
+ dirname: []const u8,
+ file_table: FileTable,
+ };
+ };
+
+ const FileToHandle = std.StringHashMap(anyframe);
+
+ const Self = @This();
+
+ pub const Event = struct {
+ id: Id,
+ data: V,
+
+ pub const Id = WatchEventId;
+ pub const Error = WatchEventError;
+ };
+
+ pub fn create(loop: *Loop, event_buf_count: usize) !*Self {
+ const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count);
+ errdefer channel.destroy();
+
+ switch (builtin.os) {
+ .linux => {
+ const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC);
+ errdefer os.close(inotify_fd);
+
+ var result: *Self = undefined;
+// _ = try async<loop.allocator> linuxEventPutter(inotify_fd, channel, &result);
+ return result;
+ },
+
+ .windows => {
+ const self = try loop.allocator.create(Self);
+ errdefer loop.allocator.destroy(self);
+ self.* = Self{
+ .channel = channel,
+ .os_data = OsData{
+ .table_lock = event.Lock.init(loop),
+ .dir_table = OsData.DirTable.init(loop.allocator),
+ .ref_count = std.atomic.Int(usize).init(1),
+ .all_putters = std.atomic.Queue(anyframe).init(),
+ },
+ };
+ return self;
+ },
+
+ .macosx, .freebsd, .netbsd, .dragonfly => {
+ const self = try loop.allocator.create(Self);
+ errdefer loop.allocator.destroy(self);
+
+ self.* = Self{
+ .channel = channel,
+ .os_data = OsData{
+ .table_lock = event.Lock.init(loop),
+ .file_table = OsData.FileTable.init(loop.allocator),
+ },
+ };
+ return self;
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ /// All addFile calls and removeFile calls must have completed.
+ pub fn destroy(self: *Self) void {
+ switch (builtin.os) {
+ .macosx, .freebsd, .netbsd, .dragonfly => {
+ // TODO we need to cancel the frames before destroying the lock
+ self.os_data.table_lock.deinit();
+ var it = self.os_data.file_table.iterator();
+ while (it.next()) |entry| {
+// cancel entry.value.putter;
+ self.channel.loop.allocator.free(entry.key);
+ }
+ self.channel.destroy();
+ },
+// .linux => cancel self.os_data.putter,
+ .windows => {
+ while (self.os_data.all_putters.get()) |putter_node| {
+// cancel putter_node.data;
+ }
+ self.deref();
+ },
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ fn ref(self: *Self) void {
+ _ = self.os_data.ref_count.incr();
+ }
+
+ fn deref(self: *Self) void {
+ if (self.os_data.ref_count.decr() == 1) {
+ const allocator = self.channel.loop.allocator;
+ self.os_data.table_lock.deinit();
+ var it = self.os_data.dir_table.iterator();
+ while (it.next()) |entry| {
+ allocator.free(entry.key);
+ allocator.destroy(entry.value);
+ }
+ self.os_data.dir_table.deinit();
+ self.channel.destroy();
+ allocator.destroy(self);
+ }
+ }
+
+ pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
+ switch (builtin.os) {
+ .macosx, .freebsd, .netbsd, .dragonfly => return await (async addFileKEvent(self, file_path, value) catch unreachable),
+ .linux => return await (async addFileLinux(self, file_path, value) catch unreachable),
+ .windows => return await (async addFileWindows(self, file_path, value) catch unreachable),
+ else => @compileError("Unsupported OS"),
+ }
+ }
+
+ async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
+ const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path});
+ var resolved_path_consumed = false;
+ defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path);
+
+ var close_op = try CloseOperation.start(self.channel.loop);
+ var close_op_consumed = false;
+ defer if (!close_op_consumed) close_op.finish();
+
+ const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0;
+ const mode = 0;
+ const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable);
+ close_op.setHandle(fd);
+
+ var put_data: *OsData.Put = undefined;
+ const putter = try async self.kqPutEvents(close_op, value, &put_data);
+ close_op_consumed = true;
+// errdefer cancel putter;
+
+ const result = blk: {
+ const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+ defer held.release();
+
+ const gop = try self.os_data.file_table.getOrPut(resolved_path);
+ if (gop.found_existing) {
+ const prev_value = gop.kv.value.value_ptr.*;
+// cancel gop.kv.value.putter;
+ gop.kv.value = put_data;
+ break :blk prev_value;
+ } else {
+ resolved_path_consumed = true;
+ gop.kv.value = put_data;
+ break :blk null;
+ }
+ };
+
+ return result;
+ }
+
+ async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void {
+ var value_copy = value;
+ var put = OsData.Put{
+ .putter = @frame(),
+ .value_ptr = &value_copy,
+ };
+ out_put.* = &put;
+ self.channel.loop.beginOneEvent();
+
+ defer {
+ close_op.finish();
+ self.channel.loop.finishOneEvent();
+ }
+
+ while (true) {
+ if (await (async self.channel.loop.bsdWaitKev(
+ @intCast(usize, close_op.getHandle()),
+ os.EVFILT_VNODE,
+ os.NOTE_WRITE | os.NOTE_DELETE,
+ ) catch unreachable)) |kev| {
+ // TODO handle EV_ERROR
+ if (kev.fflags & os.NOTE_DELETE != 0) {
+ await (async self.channel.put(Self.Event{
+ .id = Event.Id.Delete,
+ .data = value_copy,
+ }) catch unreachable);
+ } else if (kev.fflags & os.NOTE_WRITE != 0) {
+ await (async self.channel.put(Self.Event{
+ .id = Event.Id.CloseWrite,
+ .data = value_copy,
+ }) catch unreachable);
+ }
+ } else |err| switch (err) {
+ error.EventNotFound => unreachable,
+ error.ProcessNotFound => unreachable,
+ error.Overflow => unreachable,
+ error.AccessDenied, error.SystemResources => |casted_err| {
+ await (async self.channel.put(casted_err) catch unreachable);
+ },
+ }
+ }
+ }
+
+ async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
+ const value_copy = value;
+
+ const dirname = std.fs.path.dirname(file_path) orelse ".";
+ const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname);
+ var dirname_with_null_consumed = false;
+ defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null);
+
+ const basename = std.fs.path.basename(file_path);
+ const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename);
+ var basename_with_null_consumed = false;
+ defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null);
+
+ const wd = try os.inotify_add_watchC(
+ self.os_data.inotify_fd,
+ dirname_with_null.ptr,
+ os.linux.IN_CLOSE_WRITE | os.linux.IN_ONLYDIR | os.linux.IN_EXCL_UNLINK,
+ );
+ // wd is either a newly created watch or an existing one.
+
+ const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+ defer held.release();
+
+ const gop = try self.os_data.wd_table.getOrPut(wd);
+ if (!gop.found_existing) {
+ gop.kv.value = OsData.Dir{
+ .dirname = dirname_with_null,
+ .file_table = OsData.FileTable.init(self.channel.loop.allocator),
+ };
+ dirname_with_null_consumed = true;
+ }
+ const dir = &gop.kv.value;
+
+ const file_table_gop = try dir.file_table.getOrPut(basename_with_null);
+ if (file_table_gop.found_existing) {
+ const prev_value = file_table_gop.kv.value;
+ file_table_gop.kv.value = value_copy;
+ return prev_value;
+ } else {
+ file_table_gop.kv.value = value_copy;
+ basename_with_null_consumed = true;
+ return null;
+ }
+ }
+
+ async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
+ const value_copy = value;
+ // TODO we might need to convert dirname and basename to canonical file paths ("short"?)
+
+ const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse ".");
+ var dirname_consumed = false;
+ defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname);
+
+ const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname);
+ defer self.channel.loop.allocator.free(dirname_utf16le);
+
+ // TODO https://github.com/ziglang/zig/issues/265
+ const basename = std.fs.path.basename(file_path);
+ const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename);
+ var basename_utf16le_null_consumed = false;
+ defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null);
+ const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1];
+
+ const dir_handle = try windows.CreateFileW(
+ dirname_utf16le.ptr,
+ windows.FILE_LIST_DIRECTORY,
+ windows.FILE_SHARE_READ | windows.FILE_SHARE_DELETE | windows.FILE_SHARE_WRITE,
+ null,
+ windows.OPEN_EXISTING,
+ windows.FILE_FLAG_BACKUP_SEMANTICS | windows.FILE_FLAG_OVERLAPPED,
+ null,
+ );
+ var dir_handle_consumed = false;
+ defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle);
+
+ const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+ defer held.release();
+
+ const gop = try self.os_data.dir_table.getOrPut(dirname);
+ if (gop.found_existing) {
+ const dir = gop.kv.value;
+ const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable);
+ defer held_dir_lock.release();
+
+ const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null);
+ if (file_gop.found_existing) {
+ const prev_value = file_gop.kv.value;
+ file_gop.kv.value = value_copy;
+ return prev_value;
+ } else {
+ file_gop.kv.value = value_copy;
+ basename_utf16le_null_consumed = true;
+ return null;
+ }
+ } else {
+ errdefer _ = self.os_data.dir_table.remove(dirname);
+ const dir = try self.channel.loop.allocator.create(OsData.Dir);
+ errdefer self.channel.loop.allocator.destroy(dir);
+
+ dir.* = OsData.Dir{
+ .file_table = OsData.FileTable.init(self.channel.loop.allocator),
+ .table_lock = event.Lock.init(self.channel.loop),
+ .putter = undefined,
+ };
+ gop.kv.value = dir;
+ assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null);
+ basename_utf16le_null_consumed = true;
+
+ dir.putter = try async self.windowsDirReader(dir_handle, dir);
+ dir_handle_consumed = true;
+
+ dirname_consumed = true;
+
+ return null;
+ }
+ }
+
+ async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
+ self.ref();
+ defer self.deref();
+
+ defer os.close(dir_handle);
+
+ var putter_node = std.atomic.Queue(anyframe).Node{
+ .data = @frame(),
+ .prev = null,
+ .next = null,
+ };
+ self.os_data.all_putters.put(&putter_node);
+ defer _ = self.os_data.all_putters.remove(&putter_node);
+
+ var resume_node = Loop.ResumeNode.Basic{
+ .base = Loop.ResumeNode{
+ .id = Loop.ResumeNode.Id.Basic,
+ .handle = @frame(),
+ .overlapped = windows.OVERLAPPED{
+ .Internal = 0,
+ .InternalHigh = 0,
+ .Offset = 0,
+ .OffsetHigh = 0,
+ .hEvent = null,
+ },
+ },
+ };
+ var event_buf: [4096]u8 align(@alignOf(windows.FILE_NOTIFY_INFORMATION)) = undefined;
+
+ // TODO handle this error not in the channel but in the setup
+ _ = windows.CreateIoCompletionPort(
+ dir_handle,
+ self.channel.loop.os_data.io_port,
+ undefined,
+ undefined,
+ ) catch |err| {
+ await (async self.channel.put(err) catch unreachable);
+ return;
+ };
+
+ while (true) {
+ {
+ // TODO only 1 beginOneEvent for the whole function
+ self.channel.loop.beginOneEvent();
+ errdefer self.channel.loop.finishOneEvent();
+ errdefer {
+ _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped);
+ }
+ suspend {
+ _ = windows.kernel32.ReadDirectoryChangesW(
+ dir_handle,
+ &event_buf,
+ @intCast(windows.DWORD, event_buf.len),
+ windows.FALSE, // watch subtree
+ windows.FILE_NOTIFY_CHANGE_FILE_NAME | windows.FILE_NOTIFY_CHANGE_DIR_NAME |
+ windows.FILE_NOTIFY_CHANGE_ATTRIBUTES | windows.FILE_NOTIFY_CHANGE_SIZE |
+ windows.FILE_NOTIFY_CHANGE_LAST_WRITE | windows.FILE_NOTIFY_CHANGE_LAST_ACCESS |
+ windows.FILE_NOTIFY_CHANGE_CREATION | windows.FILE_NOTIFY_CHANGE_SECURITY,
+ null, // number of bytes transferred (unused for async)
+ &resume_node.base.overlapped,
+ null, // completion routine - unused because we use IOCP
+ );
+ }
+ }
+ var bytes_transferred: windows.DWORD = undefined;
+ if (windows.kernel32.GetOverlappedResult(dir_handle, &resume_node.base.overlapped, &bytes_transferred, windows.FALSE) == 0) {
+ const err = switch (windows.kernel32.GetLastError()) {
+ else => |err| windows.unexpectedError(err),
+ };
+ await (async self.channel.put(err) catch unreachable);
+ } else {
+ // can't use @bytesToSlice because of the special variable length name field
+ var ptr = event_buf[0..].ptr;
+ const end_ptr = ptr + bytes_transferred;
+ var ev: *windows.FILE_NOTIFY_INFORMATION = undefined;
+ while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += ev.NextEntryOffset) {
+ ev = @ptrCast(*windows.FILE_NOTIFY_INFORMATION, ptr);
+ const emit = switch (ev.Action) {
+ windows.FILE_ACTION_REMOVED => WatchEventId.Delete,
+ windows.FILE_ACTION_MODIFIED => WatchEventId.CloseWrite,
+ else => null,
+ };
+ if (emit) |id| {
+ const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2];
+ const user_value = blk: {
+ const held = await (async dir.table_lock.acquire() catch unreachable);
+ defer held.release();
+
+ if (dir.file_table.get(basename_utf16le)) |entry| {
+ break :blk entry.value;
+ } else {
+ break :blk null;
+ }
+ };
+ if (user_value) |v| {
+ await (async self.channel.put(Event{
+ .id = id,
+ .data = v,
+ }) catch unreachable);
+ }
+ }
+ if (ev.NextEntryOffset == 0) break;
+ }
+ }
+ }
+ }
+
+ pub async fn removeFile(self: *Self, file_path: []const u8) ?V {
+ @panic("TODO");
+ }
+
+ async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void {
+ const loop = channel.loop;
+
+ var watch = Self{
+ .channel = channel,
+ .os_data = OsData{
+ .putter = @frame(),
+ .inotify_fd = inotify_fd,
+ .wd_table = OsData.WdTable.init(loop.allocator),
+ .table_lock = event.Lock.init(loop),
+ },
+ };
+ out_watch.* = &watch;
+
+ loop.beginOneEvent();
+
+ defer {
+ watch.os_data.table_lock.deinit();
+ var wd_it = watch.os_data.wd_table.iterator();
+ while (wd_it.next()) |wd_entry| {
+ var file_it = wd_entry.value.file_table.iterator();
+ while (file_it.next()) |file_entry| {
+ loop.allocator.free(file_entry.key);
+ }
+ loop.allocator.free(wd_entry.value.dirname);
+ }
+ loop.finishOneEvent();
+ os.close(inotify_fd);
+ channel.destroy();
+ }
+
+ var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
+
+ while (true) {
+ const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len);
+ const errno = os.linux.getErrno(rc);
+ switch (errno) {
+ 0 => {
+ // can't use @bytesToSlice because of the special variable length name field
+ var ptr = event_buf[0..].ptr;
+ const end_ptr = ptr + event_buf.len;
+ var ev: *os.linux.inotify_event = undefined;
+ while (@ptrToInt(ptr) < @ptrToInt(end_ptr)) : (ptr += @sizeOf(os.linux.inotify_event) + ev.len) {
+ ev = @ptrCast(*os.linux.inotify_event, ptr);
+ if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
+ const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
+ const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1];
+ const user_value = blk: {
+ const held = await (async watch.os_data.table_lock.acquire() catch unreachable);
+ defer held.release();
+
+ const dir = &watch.os_data.wd_table.get(ev.wd).?.value;
+ if (dir.file_table.get(basename_with_null)) |entry| {
+ break :blk entry.value;
+ } else {
+ break :blk null;
+ }
+ };
+ if (user_value) |v| {
+ await (async channel.put(Event{
+ .id = WatchEventId.CloseWrite,
+ .data = v,
+ }) catch unreachable);
+ }
+ }
+ }
+ },
+ os.linux.EINTR => continue,
+ os.linux.EINVAL => unreachable,
+ os.linux.EFAULT => unreachable,
+ os.linux.EAGAIN => {
+ (await (async loop.linuxWaitFd(
+ inotify_fd,
+ os.linux.EPOLLET | os.linux.EPOLLIN,
+ ) catch unreachable)) catch |err| {
+ const transformed_err = switch (err) {
+ error.FileDescriptorAlreadyPresentInSet => unreachable,
+ error.OperationCausesCircularLoop => unreachable,
+ error.FileDescriptorNotRegistered => unreachable,
+ error.FileDescriptorIncompatibleWithEpoll => unreachable,
+ error.Unexpected => unreachable,
+ else => |e| e,
+ };
+ await (async channel.put(transformed_err) catch unreachable);
+ };
+ },
+ else => unreachable,
+ }
+ }
+ }
+ };
+}
const test_tmp_dir = "std_event_fs_test";
// TODO this test is disabled until the async function rewrite is finished.
-//test "write a file, watch it, write it again" {
-// return error.SkipZigTest;
-// const allocator = std.heap.direct_allocator;
-//
-// // TODO move this into event loop too
-// try os.makePath(allocator, test_tmp_dir);
-// defer os.deleteTree(test_tmp_dir) catch {};
-//
-// var loop: Loop = undefined;
-// try loop.initMultiThreaded(allocator);
-// defer loop.deinit();
-//
-// var result: anyerror!void = error.ResultNeverWritten;
+test "write a file, watch it, write it again" {
+ return error.SkipZigTest;
+ const allocator = std.heap.direct_allocator;
+
+ // TODO move this into event loop too
+ try os.makePath(allocator, test_tmp_dir);
+ defer os.deleteTree(test_tmp_dir) catch {};
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var result: anyerror!void = error.ResultNeverWritten;
// const handle = try async<allocator> testFsWatchCantFail(&loop, &result);
// defer cancel handle;
-//
-// loop.run();
-// return result;
-//}
+
+ loop.run();
+ return result;
+}
fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void {
result.* = testFsWatch(loop);