Commit 7dba5ea9cf

Vexu <15308111+Vexu@users.noreply.github.com>
2019-11-24 13:09:49
update event.fs.watch
1 parent 20f5f56
Changed files (1)
lib
std
event
lib/std/event/fs.zig
@@ -9,6 +9,9 @@ const windows = os.windows;
 const Loop = event.Loop;
 const fd_t = os.fd_t;
 const File = std.fs.File;
+const Allocator = mem.Allocator;
+
+//! TODO mege this with `std.fs`
 
 const global_event_loop = Loop.instance orelse
     @compileError("std.event.fs currently only works with event-based I/O");
@@ -681,7 +684,7 @@ fn writeFileModeThread(allocator: *Allocator, path: []const u8, contents: []cons
 /// is closed.
 /// Caller owns returned memory.
 pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) ![]u8 {
-    var close_op = try CloseOperation.start();
+    var close_op = try CloseOperation.start(allocator);
     defer close_op.finish();
 
     const fd = try openRead(file_path);
@@ -694,7 +697,7 @@ pub fn readFile(allocator: *Allocator, file_path: []const u8, max_size: usize) !
         try list.ensureCapacity(list.len + mem.page_size);
         const buf = list.items[list.len..];
         const buf_array = [_][]u8{buf};
-        const amt = try preadv(fd, buf_array, list.len);
+        const amt = try preadv(allocator, fd, buf_array, list.len);
         list.len += amt;
         if (list.len > max_size) {
             return error.FileTooBig;
@@ -731,16 +734,18 @@ pub fn Watch(comptime V: type) type {
     return struct {
         channel: *event.Channel(Event.Error!Event),
         os_data: OsData,
+        allocator: *Allocator,
 
         const OsData = switch (builtin.os) {
             .macosx, .freebsd, .netbsd, .dragonfly => struct {
                 file_table: FileTable,
                 table_lock: event.Lock,
 
-                const FileTable = std.StringHashmap(*Put);
+                const FileTable = std.StringHashMap(*Put);
                 const Put = struct {
-                    putter: anyframe,
-                    value_ptr: *V,
+                    putter_frame: @Frame(kqPutEvents),
+                    cancelled: bool = false,
+                    value: V,
                 };
             },
 
@@ -753,24 +758,30 @@ pub fn Watch(comptime V: type) type {
         const WindowsOsData = struct {
             table_lock: event.Lock,
             dir_table: DirTable,
-            all_putters: std.atomic.Queue(anyframe),
+            all_putters: std.atomic.Queue(Put),
             ref_count: std.atomic.Int(usize),
 
+            const Put = struct {
+                putter: anyframe,
+                cancelled: bool = false,
+            };
+
             const DirTable = std.StringHashMap(*Dir);
             const FileTable = std.HashMap([]const u16, V, hashString, eqlString);
 
             const Dir = struct {
-                putter: anyframe,
+                putter_frame: @Frame(windowsDirReader),
                 file_table: FileTable,
                 table_lock: event.Lock,
             };
         };
 
         const LinuxOsData = struct {
-            putter: anyframe,
+            putter_frame: @Frame(linuxEventPutter),
             inotify_fd: i32,
             wd_table: WdTable,
             table_lock: event.Lock,
+            cancelled: bool = false,
 
             const WdTable = std.AutoHashMap(i32, Dir);
             const FileTable = std.StringHashMap(V);
@@ -781,8 +792,6 @@ pub fn Watch(comptime V: type) type {
             };
         };
 
-        const FileToHandle = std.StringHashMap(anyframe);
-
         const Self = @This();
 
         pub const Event = struct {
@@ -793,28 +802,44 @@ pub fn Watch(comptime V: type) type {
             pub const Error = WatchEventError;
         };
 
-        pub fn create(loop: *Loop, event_buf_count: usize) !*Self {
-            const channel = try event.Channel(Self.Event.Error!Self.Event).create(loop, event_buf_count);
-            errdefer channel.destroy();
+        pub fn init(allocator: *Allocator, event_buf_count: usize) !*Self {
+            const channel = try allocator.create(event.Channel(Event.Error!Event));
+            errdefer allocator.destroy(channel);
+            var buf = try allocator.alloc(Event.Error!Event, event_buf_count);
+            errdefer allocator.free(buf);
+            channel.init(buf);
+            errdefer channel.deinit();
+
+            const self = try allocator.create(Self);
+            errdefer allocator.destroy(self);
 
             switch (builtin.os) {
                 .linux => {
                     const inotify_fd = try os.inotify_init1(os.linux.IN_NONBLOCK | os.linux.IN_CLOEXEC);
                     errdefer os.close(inotify_fd);
 
-                    var result: *Self = undefined;
-//                     _ = try async<loop.allocator> linuxEventPutter(inotify_fd, channel, &result);
-                    return result;
+                    self.* = Self{
+                        .allocator = allocator,
+                        .channel = channel,
+                        .os_data = OsData{
+                            .putter_frame = undefined,
+                            .inotify_fd = inotify_fd,
+                            .wd_table = OsData.WdTable.init(allocator),
+                            .table_lock = event.Lock.init(),
+                        },
+                    };
+
+                    self.os_data.putter_frame = async self.linuxEventPutter();
+                    return self;
                 },
 
                 .windows => {
-                    const self = try loop.allocator.create(Self);
-                    errdefer loop.allocator.destroy(self);
                     self.* = Self{
+                        .allocator = allocator,
                         .channel = channel,
                         .os_data = OsData{
-                            .table_lock = event.Lock.init(loop),
-                            .dir_table = OsData.DirTable.init(loop.allocator),
+                            .table_lock = event.Lock.init(),
+                            .dir_table = OsData.DirTable.init(allocator),
                             .ref_count = std.atomic.Int(usize).init(1),
                             .all_putters = std.atomic.Queue(anyframe).init(),
                         },
@@ -823,14 +848,12 @@ pub fn Watch(comptime V: type) type {
                 },
 
                 .macosx, .freebsd, .netbsd, .dragonfly => {
-                    const self = try loop.allocator.create(Self);
-                    errdefer loop.allocator.destroy(self);
-
                     self.* = Self{
+                        .allocator = allocator,
                         .channel = channel,
                         .os_data = OsData{
-                            .table_lock = event.Lock.init(loop),
-                            .file_table = OsData.FileTable.init(loop.allocator),
+                            .table_lock = event.Lock.init(),
+                            .file_table = OsData.FileTable.init(allocator),
                         },
                     };
                     return self;
@@ -840,22 +863,31 @@ pub fn Watch(comptime V: type) type {
         }
 
         /// All addFile calls and removeFile calls must have completed.
-        pub fn destroy(self: *Self) void {
+        pub fn deinit(self: *Self) void {
             switch (builtin.os) {
                 .macosx, .freebsd, .netbsd, .dragonfly => {
                     // TODO we need to cancel the frames before destroying the lock
                     self.os_data.table_lock.deinit();
                     var it = self.os_data.file_table.iterator();
                     while (it.next()) |entry| {
-//                         cancel entry.value.putter;
-                        self.channel.loop.allocator.free(entry.key);
+                        entry.cancelled = true;
+                        await entry.value.putter;
+                        self.allocator.free(entry.key);
+                        self.allocator.free(entry.value);
                     }
-                    self.channel.destroy();
+                    self.channel.deinit();
+                    self.allocator.destroy(self.channel.buffer_nodes);
+                    self.allocator.destroy(self);
+                },
+                .linux => {
+                    self.os_data.cancelled = true;
+                    await self.os_data.putter_frame;
+                    self.allocator.destroy(self);
                 },
-//                 .linux => cancel self.os_data.putter,
                 .windows => {
                     while (self.os_data.all_putters.get()) |putter_node| {
-//                         cancel putter_node.data;
+                        putter_node.cancelled = true;
+                        await putter_node.frame;
                     }
                     self.deref();
                 },
@@ -869,60 +901,68 @@ pub fn Watch(comptime V: type) type {
 
         fn deref(self: *Self) void {
             if (self.os_data.ref_count.decr() == 1) {
-                const allocator = self.channel.loop.allocator;
                 self.os_data.table_lock.deinit();
                 var it = self.os_data.dir_table.iterator();
                 while (it.next()) |entry| {
-                    allocator.free(entry.key);
-                    allocator.destroy(entry.value);
+                    self.allocator.free(entry.key);
+                    self.allocator.destroy(entry.value);
                 }
                 self.os_data.dir_table.deinit();
-                self.channel.destroy();
-                allocator.destroy(self);
+                self.channel.deinit();
+                self.allocator.destroy(self.channel.buffer_nodes);
+                self.allocator.destroy(self);
             }
         }
 
-        pub async fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
+        pub fn addFile(self: *Self, file_path: []const u8, value: V) !?V {
             switch (builtin.os) {
-                .macosx, .freebsd, .netbsd, .dragonfly => return await (async addFileKEvent(self, file_path, value) catch unreachable),
-                .linux => return await (async addFileLinux(self, file_path, value) catch unreachable),
-                .windows => return await (async addFileWindows(self, file_path, value) catch unreachable),
+                .macosx, .freebsd, .netbsd, .dragonfly => return addFileKEvent(self, file_path, value),
+                .linux => return addFileLinux(self, file_path, value),
+                .windows => return addFileWindows(self, file_path, value),
                 else => @compileError("Unsupported OS"),
             }
         }
 
-        async fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
-            const resolved_path = try std.fs.path.resolve(self.channel.loop.allocator, [_][]const u8{file_path});
+        fn addFileKEvent(self: *Self, file_path: []const u8, value: V) !?V {
+            const resolved_path = try std.fs.path.resolve(self.allocator, [_][]const u8{file_path});
             var resolved_path_consumed = false;
-            defer if (!resolved_path_consumed) self.channel.loop.allocator.free(resolved_path);
+            defer if (!resolved_path_consumed) self.allocator.free(resolved_path);
 
-            var close_op = try CloseOperation.start(self.channel.loop);
+            var close_op = try CloseOperation.start(self.allocator);
             var close_op_consumed = false;
             defer if (!close_op_consumed) close_op.finish();
 
             const flags = if (comptime std.Target.current.isDarwin()) os.O_SYMLINK | os.O_EVTONLY else 0;
             const mode = 0;
-            const fd = try await (async openPosix(self.channel.loop, resolved_path, flags, mode) catch unreachable);
+            const fd = try openPosix(self.allocator, resolved_path, flags, mode);
             close_op.setHandle(fd);
 
-            var put_data: *OsData.Put = undefined;
-            const putter = try async self.kqPutEvents(close_op, value, &put_data);
+            var put = try self.allocator.create(OsData.Put);
+            errdefer self.allocator.destroy(put);
+            put.* = OsData.Put{
+                .value = value,
+                .putter_frame = undefined,
+            };
+            put.putter_frame = async self.kqPutEvents(close_op, put);
             close_op_consumed = true;
-//             errdefer cancel putter;
+            errdefer {
+                put.cancelled = true;
+                await put.putter_frame;
+            }
 
             const result = blk: {
-                const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+                const held = self.os_data.table_lock.acquire();
                 defer held.release();
 
                 const gop = try self.os_data.file_table.getOrPut(resolved_path);
                 if (gop.found_existing) {
-                    const prev_value = gop.kv.value.value_ptr.*;
-//                     cancel gop.kv.value.putter;
-                    gop.kv.value = put_data;
+                    const prev_value = gop.kv.value.value;
+                    await gop.kv.value.putter_frame;
+                    gop.kv.value = put;
                     break :blk prev_value;
                 } else {
                     resolved_path_consumed = true;
-                    gop.kv.value = put_data;
+                    gop.kv.value = put;
                     break :blk null;
                 }
             };
@@ -930,61 +970,53 @@ pub fn Watch(comptime V: type) type {
             return result;
         }
 
-        async fn kqPutEvents(self: *Self, close_op: *CloseOperation, value: V, out_put: **OsData.Put) void {
-            var value_copy = value;
-            var put = OsData.Put{
-                .putter = @frame(),
-                .value_ptr = &value_copy,
-            };
-            out_put.* = &put;
-            self.channel.loop.beginOneEvent();
+        fn kqPutEvents(self: *Self, close_op: *CloseOperation, put: *OsData.Put) void {
+            global_event_loop.beginOneEvent();
 
             defer {
                 close_op.finish();
-                self.channel.loop.finishOneEvent();
+                global_event_loop.finishOneEvent();
             }
 
-            while (true) {
-                if (await (async self.channel.loop.bsdWaitKev(
+            while (!put.cancelled) {
+                if (global_event_loop.bsdWaitKev(
                     @intCast(usize, close_op.getHandle()),
                     os.EVFILT_VNODE,
                     os.NOTE_WRITE | os.NOTE_DELETE,
-                ) catch unreachable)) |kev| {
+                )) |kev| {
                     // TODO handle EV_ERROR
                     if (kev.fflags & os.NOTE_DELETE != 0) {
-                        await (async self.channel.put(Self.Event{
+                        self.channel.put(Self.Event{
                             .id = Event.Id.Delete,
-                            .data = value_copy,
-                        }) catch unreachable);
+                            .data = put.value,
+                        });
                     } else if (kev.fflags & os.NOTE_WRITE != 0) {
-                        await (async self.channel.put(Self.Event{
+                        self.channel.put(Self.Event{
                             .id = Event.Id.CloseWrite,
-                            .data = value_copy,
-                        }) catch unreachable);
+                            .data = put.value,
+                        });
                     }
                 } else |err| switch (err) {
                     error.EventNotFound => unreachable,
                     error.ProcessNotFound => unreachable,
                     error.Overflow => unreachable,
                     error.AccessDenied, error.SystemResources => |casted_err| {
-                        await (async self.channel.put(casted_err) catch unreachable);
+                        self.channel.put(casted_err);
                     },
                 }
             }
         }
 
-        async fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
-            const value_copy = value;
-
+        fn addFileLinux(self: *Self, file_path: []const u8, value: V) !?V {
             const dirname = std.fs.path.dirname(file_path) orelse ".";
-            const dirname_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, dirname);
+            const dirname_with_null = try std.cstr.addNullByte(self.allocator, dirname);
             var dirname_with_null_consumed = false;
-            defer if (!dirname_with_null_consumed) self.channel.loop.allocator.free(dirname_with_null);
+            defer if (!dirname_with_null_consumed) self.channel.free(dirname_with_null);
 
             const basename = std.fs.path.basename(file_path);
-            const basename_with_null = try std.cstr.addNullByte(self.channel.loop.allocator, basename);
+            const basename_with_null = try std.cstr.addNullByte(self.allocator, basename);
             var basename_with_null_consumed = false;
-            defer if (!basename_with_null_consumed) self.channel.loop.allocator.free(basename_with_null);
+            defer if (!basename_with_null_consumed) self.allocator.free(basename_with_null);
 
             const wd = try os.inotify_add_watchC(
                 self.os_data.inotify_fd,
@@ -993,14 +1025,14 @@ pub fn Watch(comptime V: type) type {
             );
             // wd is either a newly created watch or an existing one.
 
-            const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+            const held = self.os_data.table_lock.acquire();
             defer held.release();
 
             const gop = try self.os_data.wd_table.getOrPut(wd);
             if (!gop.found_existing) {
                 gop.kv.value = OsData.Dir{
                     .dirname = dirname_with_null,
-                    .file_table = OsData.FileTable.init(self.channel.loop.allocator),
+                    .file_table = OsData.FileTable.init(self.allocator),
                 };
                 dirname_with_null_consumed = true;
             }
@@ -1009,31 +1041,29 @@ pub fn Watch(comptime V: type) type {
             const file_table_gop = try dir.file_table.getOrPut(basename_with_null);
             if (file_table_gop.found_existing) {
                 const prev_value = file_table_gop.kv.value;
-                file_table_gop.kv.value = value_copy;
+                file_table_gop.kv.value = value;
                 return prev_value;
             } else {
-                file_table_gop.kv.value = value_copy;
+                file_table_gop.kv.value = value;
                 basename_with_null_consumed = true;
                 return null;
             }
         }
 
-        async fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
-            const value_copy = value;
+        fn addFileWindows(self: *Self, file_path: []const u8, value: V) !?V {
             // TODO we might need to convert dirname and basename to canonical file paths ("short"?)
-
-            const dirname = try std.mem.dupe(self.channel.loop.allocator, u8, std.fs.path.dirname(file_path) orelse ".");
+            const dirname = try std.mem.dupe(self.allocator, u8, std.fs.path.dirname(file_path) orelse ".");
             var dirname_consumed = false;
-            defer if (!dirname_consumed) self.channel.loop.allocator.free(dirname);
+            defer if (!dirname_consumed) self.allocator.free(dirname);
 
-            const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, dirname);
-            defer self.channel.loop.allocator.free(dirname_utf16le);
+            const dirname_utf16le = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, dirname);
+            defer self.allocator.free(dirname_utf16le);
 
             // TODO https://github.com/ziglang/zig/issues/265
             const basename = std.fs.path.basename(file_path);
-            const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.channel.loop.allocator, basename);
+            const basename_utf16le_null = try std.unicode.utf8ToUtf16LeWithNull(self.allocator, basename);
             var basename_utf16le_null_consumed = false;
-            defer if (!basename_utf16le_null_consumed) self.channel.loop.allocator.free(basename_utf16le_null);
+            defer if (!basename_utf16le_null_consumed) self.allocator.free(basename_utf16le_null);
             const basename_utf16le_no_null = basename_utf16le_null[0 .. basename_utf16le_null.len - 1];
 
             const dir_handle = try windows.CreateFileW(
@@ -1048,40 +1078,40 @@ pub fn Watch(comptime V: type) type {
             var dir_handle_consumed = false;
             defer if (!dir_handle_consumed) windows.CloseHandle(dir_handle);
 
-            const held = await (async self.os_data.table_lock.acquire() catch unreachable);
+            const held = self.os_data.table_lock.acquire();
             defer held.release();
 
             const gop = try self.os_data.dir_table.getOrPut(dirname);
             if (gop.found_existing) {
                 const dir = gop.kv.value;
-                const held_dir_lock = await (async dir.table_lock.acquire() catch unreachable);
+                const held_dir_lock = dir.table_lock.acquire();
                 defer held_dir_lock.release();
 
                 const file_gop = try dir.file_table.getOrPut(basename_utf16le_no_null);
                 if (file_gop.found_existing) {
                     const prev_value = file_gop.kv.value;
-                    file_gop.kv.value = value_copy;
+                    file_gop.kv.value = value;
                     return prev_value;
                 } else {
-                    file_gop.kv.value = value_copy;
+                    file_gop.kv.value = value;
                     basename_utf16le_null_consumed = true;
                     return null;
                 }
             } else {
                 errdefer _ = self.os_data.dir_table.remove(dirname);
-                const dir = try self.channel.loop.allocator.create(OsData.Dir);
-                errdefer self.channel.loop.allocator.destroy(dir);
+                const dir = try self.allocator.create(OsData.Dir);
+                errdefer self.allocator.destroy(dir);
 
                 dir.* = OsData.Dir{
-                    .file_table = OsData.FileTable.init(self.channel.loop.allocator),
-                    .table_lock = event.Lock.init(self.channel.loop),
-                    .putter = undefined,
+                    .file_table = OsData.FileTable.init(self.allocator),
+                    .table_lock = event.Lock.init(),
+                    .putter_frame = undefined,
                 };
                 gop.kv.value = dir;
-                assert((try dir.file_table.put(basename_utf16le_no_null, value_copy)) == null);
+                assert((try dir.file_table.put(basename_utf16le_no_null, value)) == null);
                 basename_utf16le_null_consumed = true;
 
-                dir.putter = try async self.windowsDirReader(dir_handle, dir);
+                dir.putter_frame = async self.windowsDirReader(dir_handle, dir);
                 dir_handle_consumed = true;
 
                 dirname_consumed = true;
@@ -1090,14 +1120,14 @@ pub fn Watch(comptime V: type) type {
             }
         }
 
-        async fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
+        fn windowsDirReader(self: *Self, dir_handle: windows.HANDLE, dir: *OsData.Dir) void {
             self.ref();
             defer self.deref();
 
             defer os.close(dir_handle);
 
             var putter_node = std.atomic.Queue(anyframe).Node{
-                .data = @frame(),
+                .data = .{ .putter = @frame() },
                 .prev = null,
                 .next = null,
             };
@@ -1122,19 +1152,19 @@ pub fn Watch(comptime V: type) type {
             // TODO handle this error not in the channel but in the setup
             _ = windows.CreateIoCompletionPort(
                 dir_handle,
-                self.channel.loop.os_data.io_port,
+                global_event_loop.os_data.io_port,
                 undefined,
                 undefined,
             ) catch |err| {
-                await (async self.channel.put(err) catch unreachable);
+                self.channel.put(err);
                 return;
             };
 
-            while (true) {
+            while (!putter_node.data.cancelled) {
                 {
                     // TODO only 1 beginOneEvent for the whole function
-                    self.channel.loop.beginOneEvent();
-                    errdefer self.channel.loop.finishOneEvent();
+                    global_event_loop.beginOneEvent();
+                    errdefer global_event_loop.finishOneEvent();
                     errdefer {
                         _ = windows.kernel32.CancelIoEx(dir_handle, &resume_node.base.overlapped);
                     }
@@ -1159,7 +1189,7 @@ pub fn Watch(comptime V: type) type {
                     const err = switch (windows.kernel32.GetLastError()) {
                         else => |err| windows.unexpectedError(err),
                     };
-                    await (async self.channel.put(err) catch unreachable);
+                    self.channel.put(err);
                 } else {
                     // can't use @bytesToSlice because of the special variable length name field
                     var ptr = event_buf[0..].ptr;
@@ -1175,7 +1205,7 @@ pub fn Watch(comptime V: type) type {
                         if (emit) |id| {
                             const basename_utf16le = ([*]u16)(&ev.FileName)[0 .. ev.FileNameLength / 2];
                             const user_value = blk: {
-                                const held = await (async dir.table_lock.acquire() catch unreachable);
+                                const held = dir.table_lock.acquire();
                                 defer held.release();
 
                                 if (dir.file_table.get(basename_utf16le)) |entry| {
@@ -1185,10 +1215,10 @@ pub fn Watch(comptime V: type) type {
                                 }
                             };
                             if (user_value) |v| {
-                                await (async self.channel.put(Event{
+                                self.channel.put(Event{
                                     .id = id,
                                     .data = v,
-                                }) catch unreachable);
+                                });
                             }
                         }
                         if (ev.NextEntryOffset == 0) break;
@@ -1197,45 +1227,35 @@ pub fn Watch(comptime V: type) type {
             }
         }
 
-        pub async fn removeFile(self: *Self, file_path: []const u8) ?V {
+        pub fn removeFile(self: *Self, file_path: []const u8) ?V {
             @panic("TODO");
         }
 
-        async fn linuxEventPutter(inotify_fd: i32, channel: *event.Channel(Event.Error!Event), out_watch: **Self) void {
-            const loop = channel.loop;
-
-            var watch = Self{
-                .channel = channel,
-                .os_data = OsData{
-                    .putter = @frame(),
-                    .inotify_fd = inotify_fd,
-                    .wd_table = OsData.WdTable.init(loop.allocator),
-                    .table_lock = event.Lock.init(loop),
-                },
-            };
-            out_watch.* = &watch;
-
-            loop.beginOneEvent();
+        fn linuxEventPutter(self: *Self) void {
+            global_event_loop.beginOneEvent();
 
             defer {
-                watch.os_data.table_lock.deinit();
-                var wd_it = watch.os_data.wd_table.iterator();
+                self.os_data.table_lock.deinit();
+                var wd_it = self.os_data.wd_table.iterator();
                 while (wd_it.next()) |wd_entry| {
                     var file_it = wd_entry.value.file_table.iterator();
                     while (file_it.next()) |file_entry| {
-                        loop.allocator.free(file_entry.key);
+                        self.allocator.free(file_entry.key);
                     }
-                    loop.allocator.free(wd_entry.value.dirname);
+                    self.allocator.free(wd_entry.value.dirname);
+                    wd_entry.value.file_table.deinit();
                 }
-                loop.finishOneEvent();
-                os.close(inotify_fd);
-                channel.destroy();
+                self.os_data.wd_table.deinit();
+                global_event_loop.finishOneEvent();
+                os.close(self.os_data.inotify_fd);
+                self.channel.deinit();
+                self.allocator.free(self.channel.buffer_nodes);
             }
 
             var event_buf: [4096]u8 align(@alignOf(os.linux.inotify_event)) = undefined;
 
-            while (true) {
-                const rc = os.linux.read(inotify_fd, &event_buf, event_buf.len);
+            while (!self.os_data.cancelled) {
+                const rc = os.linux.read(self.os_data.inotify_fd, &event_buf, event_buf.len);
                 const errno = os.linux.getErrno(rc);
                 switch (errno) {
                     0 => {
@@ -1247,12 +1267,13 @@ pub fn Watch(comptime V: type) type {
                             ev = @ptrCast(*os.linux.inotify_event, ptr);
                             if (ev.mask & os.linux.IN_CLOSE_WRITE == os.linux.IN_CLOSE_WRITE) {
                                 const basename_ptr = ptr + @sizeOf(os.linux.inotify_event);
-                                const basename_with_null = basename_ptr[0 .. std.mem.len(u8, basename_ptr) + 1];
+                                // `ev.len` counts all bytes in `ev.name` including terminating null byte.
+                                const basename_with_null = basename_ptr[0 .. ev.len];
                                 const user_value = blk: {
-                                    const held = await (async watch.os_data.table_lock.acquire() catch unreachable);
+                                    const held = self.os_data.table_lock.acquire();
                                     defer held.release();
 
-                                    const dir = &watch.os_data.wd_table.get(ev.wd).?.value;
+                                    const dir = &self.os_data.wd_table.get(ev.wd).?.value;
                                     if (dir.file_table.get(basename_with_null)) |entry| {
                                         break :blk entry.value;
                                     } else {
@@ -1260,10 +1281,10 @@ pub fn Watch(comptime V: type) type {
                                     }
                                 };
                                 if (user_value) |v| {
-                                    await (async channel.put(Event{
+                                    self.channel.put(Event{
                                         .id = WatchEventId.CloseWrite,
                                         .data = v,
-                                    }) catch unreachable);
+                                    });
                                 }
                             }
                         }
@@ -1272,20 +1293,7 @@ pub fn Watch(comptime V: type) type {
                     os.linux.EINVAL => unreachable,
                     os.linux.EFAULT => unreachable,
                     os.linux.EAGAIN => {
-                        (await (async loop.linuxWaitFd(
-                            inotify_fd,
-                            os.linux.EPOLLET | os.linux.EPOLLIN,
-                        ) catch unreachable)) catch |err| {
-                            const transformed_err = switch (err) {
-                                error.FileDescriptorAlreadyPresentInSet => unreachable,
-                                error.OperationCausesCircularLoop => unreachable,
-                                error.FileDescriptorNotRegistered => unreachable,
-                                error.FileDescriptorIncompatibleWithEpoll => unreachable,
-                                error.Unexpected => unreachable,
-                                else => |e| e,
-                            };
-                            await (async channel.put(transformed_err) catch unreachable);
-                        };
+                        global_event_loop.linuxWaitFd(self.os_data.inotify_fd, os.linux.EPOLLET | os.linux.EPOLLIN);
                     },
                     else => unreachable,
                 }
@@ -1296,34 +1304,22 @@ pub fn Watch(comptime V: type) type {
 
 const test_tmp_dir = "std_event_fs_test";
 
-// TODO this test is disabled until the async function rewrite is finished.
 test "write a file, watch it, write it again" {
-    return error.SkipZigTest;
+    // TODO provide a way to run tests in evented I/O mode
+    if (!std.io.is_async) return error.SkipZigTest;
+
     const allocator = std.heap.direct_allocator;
 
     // TODO move this into event loop too
     try os.makePath(allocator, test_tmp_dir);
     defer os.deleteTree(test_tmp_dir) catch {};
 
-    var loop: Loop = undefined;
-    try loop.initMultiThreaded(allocator);
-    defer loop.deinit();
-
-    var result: anyerror!void = error.ResultNeverWritten;
-//    const handle = try async<allocator> testFsWatchCantFail(&loop, &result);
-//    defer cancel handle;
-
-    loop.run();
-    return result;
-}
-
-fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void {
-    result.* = testFsWatch(loop);
+    return testFsWatch(&allocator);
 }
 
-fn testFsWatch(loop: *Loop) !void {
-    const file_path = try std.fs.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" });
-    defer loop.allocator.free(file_path);
+fn testFsWatch(allocator: *Allocator) !void {
+    const file_path = try std.fs.path.join(allocator, [_][]const u8{ test_tmp_dir, "file.txt" });
+    defer allocator.free(file_path);
 
     const contents =
         \\line 1
@@ -1332,27 +1328,27 @@ fn testFsWatch(loop: *Loop) !void {
     const line2_offset = 7;
 
     // first just write then read the file
-    try writeFile(loop, file_path, contents);
+    try writeFile(allocator, file_path, contents);
 
-    const read_contents = try readFile(loop, file_path, 1024 * 1024);
+    const read_contents = try readFile(allocator, file_path, 1024 * 1024);
     testing.expectEqualSlices(u8, contents, read_contents);
 
     // now watch the file
-    var watch = try Watch(void).create(loop, 0);
-    defer watch.destroy();
+    var watch = try Watch(void).init(allocator, 0);
+    defer watch.deinit();
 
     testing.expect((try watch.addFile(file_path, {})) == null);
 
-    const ev = async watch.channel.get();
+    const ev = watch.channel.get();
     var ev_consumed = false;
     defer if (!ev_consumed) await ev;
 
     // overwrite line 2
-    const fd = try await openReadWrite(loop, file_path, File.default_mode);
+    const fd = try await openReadWrite(file_path, File.default_mode);
     {
         defer os.close(fd);
 
-        try pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset);
+        try pwritev(allocator, fd, []const []const u8{"lorem ipsum"}, line2_offset);
     }
 
     ev_consumed = true;
@@ -1360,7 +1356,7 @@ fn testFsWatch(loop: *Loop) !void {
         WatchEventId.CloseWrite => {},
         WatchEventId.Delete => @panic("wrong event"),
     }
-    const contents_updated = try readFile(loop, file_path, 1024 * 1024);
+    const contents_updated = try readFile(allocator, file_path, 1024 * 1024);
     testing.expectEqualSlices(u8,
         \\line 1
         \\lorem ipsum