Commit 50d70d5f49

Andrew Kelley <superjoe30@gmail.com>
2018-07-08 08:43:30
tests passing with kqueue on macos
1 parent c15a6fa
Changed files (4)
std/c/darwin.zig
@@ -6,6 +6,13 @@ pub extern "c" fn __getdirentries64(fd: c_int, buf_ptr: [*]u8, buf_len: usize, b
 pub extern "c" fn mach_absolute_time() u64;
 pub extern "c" fn mach_timebase_info(tinfo: ?*mach_timebase_info_data) void;
 
+pub extern "c" fn kqueue() c_int;
+pub extern "c" fn kevent(kq: c_int, changelist: [*]const Kevent, nchanges: c_int,
+    eventlist: [*]Kevent, nevents: c_int, timeout: ?*const timespec) c_int;
+
+pub extern "c" fn kevent64(kq: c_int, changelist: [*]const kevent64_s, nchanges: c_int,
+    eventlist: [*]kevent64_s, nevents: c_int, flags: c_uint, timeout: ?*const timespec) c_int;
+
 pub use @import("../os/darwin_errno.zig");
 
 pub const _errno = __error;
@@ -86,3 +93,52 @@ pub const pthread_attr_t = extern struct {
     __sig: c_long,
     __opaque: [56]u8,
 };
+
+/// Renamed from `kevent` to `Kevent` to avoid conflict with function name.
+pub const Kevent = extern struct {
+    ident: usize,
+    filter: i16,
+    flags: u16,
+    fflags: u32,
+    data: isize,
+    udata: usize,
+};
+
+// sys/types.h on macos uses #pragma pack(4) so these checks are
+// to make sure the struct is laid out the same. These values were
+// produced from C code using the offsetof macro.
+const std = @import("../index.zig");
+const assert = std.debug.assert;
+
+comptime {
+    assert(@offsetOf(Kevent, "ident") == 0);
+    assert(@offsetOf(Kevent, "filter") == 8);
+    assert(@offsetOf(Kevent, "flags") == 10);
+    assert(@offsetOf(Kevent, "fflags") == 12);
+    assert(@offsetOf(Kevent, "data") == 16);
+    assert(@offsetOf(Kevent, "udata") == 24);
+}
+
+pub const kevent64_s = extern struct {
+    ident: u64,
+    filter: i16,
+    flags: u16,
+    fflags: u32,
+    data: i64,
+    udata: u64,
+    ext: [2]u64,
+};
+
+// sys/types.h on macos uses #pragma pack() so these checks are
+// to make sure the struct is laid out the same. These values were
+// produced from C code using the offsetof macro.
+comptime {
+    assert(@offsetOf(kevent64_s, "ident") == 0);
+    assert(@offsetOf(kevent64_s, "filter") == 8);
+    assert(@offsetOf(kevent64_s, "flags") == 10);
+    assert(@offsetOf(kevent64_s, "fflags") == 12);
+    assert(@offsetOf(kevent64_s, "data") == 16);
+    assert(@offsetOf(kevent64_s, "udata") == 24);
+    assert(@offsetOf(kevent64_s, "ext") == 32);
+}
+
std/os/darwin.zig
@@ -264,6 +264,119 @@ pub const SIGUSR1 = 30;
 /// user defined signal 2
 pub const SIGUSR2 = 31;
 
+pub const KEVENT_FLAG_NONE = 0x000; /// no flag value
+pub const KEVENT_FLAG_IMMEDIATE = 0x001; /// immediate timeout
+pub const KEVENT_FLAG_ERROR_EVENTS = 0x002; /// output events only include change
+
+pub const EV_ADD = 0x0001; /// add event to kq (implies enable)
+pub const EV_DELETE = 0x0002; /// delete event from kq
+pub const EV_ENABLE = 0x0004; /// enable event
+pub const EV_DISABLE = 0x0008; /// disable event (not reported)
+
+pub const EV_ONESHOT = 0x0010; /// only report one occurrence
+pub const EV_CLEAR = 0x0020; /// clear event state after reporting
+
+/// force immediate event output
+/// ... with or without EV_ERROR
+/// ... use KEVENT_FLAG_ERROR_EVENTS
+///     on syscalls supporting flags
+pub const EV_RECEIPT = 0x0040;
+
+pub const EV_DISPATCH = 0x0080; /// disable event after reporting
+pub const EV_UDATA_SPECIFIC = 0x0100; /// unique kevent per udata value
+
+/// ... in combination with EV_DELETE
+/// will defer delete until udata-specific
+/// event enabled. EINPROGRESS will be
+/// returned to indicate the deferral
+pub const EV_DISPATCH2 = EV_DISPATCH | EV_UDATA_SPECIFIC;
+
+/// report that source has vanished 
+/// ... only valid with EV_DISPATCH2
+pub const EV_VANISHED = 0x0200;
+
+pub const EV_SYSFLAGS = 0xF000; /// reserved by system
+pub const EV_FLAG0 = 0x1000; /// filter-specific flag
+pub const EV_FLAG1 = 0x2000; /// filter-specific flag
+pub const EV_EOF = 0x8000; /// EOF detected
+pub const EV_ERROR = 0x4000; /// error, data contains errno
+
+pub const EV_POLL = EV_FLAG0;
+pub const EV_OOBAND = EV_FLAG1;
+
+pub const EVFILT_READ = -1;
+pub const EVFILT_WRITE = -2;
+pub const EVFILT_AIO = -3; /// attached to aio requests
+pub const EVFILT_VNODE = -4; /// attached to vnodes
+pub const EVFILT_PROC = -5; /// attached to struct proc
+pub const EVFILT_SIGNAL = -6; /// attached to struct proc
+pub const EVFILT_TIMER = -7; /// timers
+pub const EVFILT_MACHPORT = -8; /// Mach portsets
+pub const EVFILT_FS = -9; /// Filesystem events
+pub const EVFILT_USER = -10; /// User events
+pub const EVFILT_VM = -12; /// Virtual memory events
+
+pub const EVFILT_EXCEPT = -15; /// Exception events
+
+pub const EVFILT_SYSCOUNT = 17;
+
+/// On input, NOTE_TRIGGER causes the event to be triggered for output.
+pub const NOTE_TRIGGER = 0x01000000;
+
+pub const NOTE_FFNOP      = 0x00000000; /// ignore input fflags
+pub const NOTE_FFAND      = 0x40000000; /// and fflags
+pub const NOTE_FFOR       = 0x80000000; /// or fflags
+pub const NOTE_FFCOPY     = 0xc0000000; /// copy fflags
+pub const NOTE_FFCTRLMASK = 0xc0000000; /// mask for operations
+pub const NOTE_FFLAGSMASK = 0x00ffffff;
+
+pub const NOTE_LOWAT = 0x00000001; /// low water mark
+
+pub const NOTE_OOB = 0x00000002; /// OOB data
+
+pub const NOTE_DELETE = 0x00000001;      /// vnode was removed
+pub const NOTE_WRITE  = 0x00000002;      /// data contents changed
+pub const NOTE_EXTEND = 0x00000004;      /// size increased
+pub const NOTE_ATTRIB = 0x00000008;      /// attributes changed
+pub const NOTE_LINK   = 0x00000010;      /// link count changed
+pub const NOTE_RENAME = 0x00000020;      /// vnode was renamed
+pub const NOTE_REVOKE = 0x00000040;      /// vnode access was revoked
+pub const NOTE_NONE   = 0x00000080;      /// No specific vnode event: to test for EVFILT_READ      activation
+pub const NOTE_FUNLOCK    = 0x00000100;      /// vnode was unlocked by flock(2)
+
+pub const NOTE_EXIT       = 0x80000000; /// process exited
+pub const NOTE_FORK       = 0x40000000; /// process forked
+pub const NOTE_EXEC       = 0x20000000; /// process exec'd
+pub const NOTE_SIGNAL     = 0x08000000; /// shared with EVFILT_SIGNAL
+pub const NOTE_EXITSTATUS     = 0x04000000; /// exit status to be returned, valid for child       process only
+pub const NOTE_EXIT_DETAIL    = 0x02000000; /// provide details on reasons for exit
+
+pub const NOTE_PDATAMASK  = 0x000fffff; /// mask for signal & exit status
+pub const NOTE_PCTRLMASK  = (~NOTE_PDATAMASK);
+
+pub const NOTE_EXIT_DETAIL_MASK       = 0x00070000;
+pub const NOTE_EXIT_DECRYPTFAIL       = 0x00010000;
+pub const NOTE_EXIT_MEMORY        = 0x00020000;
+pub const NOTE_EXIT_CSERROR       = 0x00040000;
+
+
+pub const NOTE_VM_PRESSURE            = 0x80000000;              /// will react on memory          pressure
+pub const NOTE_VM_PRESSURE_TERMINATE      = 0x40000000;             /// will quit on memory       pressure, possibly after cleaning up dirty state
+pub const NOTE_VM_PRESSURE_SUDDEN_TERMINATE   = 0x20000000;     /// will quit immediately on      memory pressure
+pub const NOTE_VM_ERROR               = 0x10000000;              /// there was an error
+
+pub const NOTE_SECONDS    = 0x00000001; /// data is seconds        
+pub const NOTE_USECONDS   = 0x00000002; /// data is microseconds  
+pub const NOTE_NSECONDS   = 0x00000004; /// data is nanoseconds  
+pub const NOTE_ABSOLUTE   = 0x00000008; /// absolute timeout    
+
+pub const NOTE_LEEWAY = 0x00000010; /// ext[1] holds leeway for power aware timers
+pub const NOTE_CRITICAL   = 0x00000020; /// system does minimal timer coalescing
+pub const NOTE_BACKGROUND = 0x00000040; /// system does maximum timer coalescing
+pub const NOTE_MACH_CONTINUOUS_TIME   = 0x00000080;
+pub const NOTE_MACHTIME =   0x00000100;             /// data is mach absolute time units
+
+
 fn wstatus(x: i32) i32 {
     return x & 0o177;
 }
@@ -385,6 +498,20 @@ pub fn getdirentries64(fd: i32, buf_ptr: [*]u8, buf_len: usize, basep: *i64) usi
     return errnoWrap(@bitCast(isize, c.__getdirentries64(fd, buf_ptr, buf_len, basep)));
 }
 
+pub fn kqueue() usize {
+    return errnoWrap(c.kqueue());
+}
+
+pub fn kevent(kq: i32, changelist: []const Kevent, eventlist: []Kevent, timeout: ?*const timespec) usize {
+    return errnoWrap(c.kevent(kq, changelist.ptr, @intCast(c_int, changelist.len), eventlist.ptr, @intCast(c_int, eventlist.len), timeout,));
+}
+
+pub fn kevent64(kq: i32, changelist: []const kevent64_s, eventlist: []kevent64_s, flags: u32,
+    timeout: ?*const timespec) usize
+{
+    return errnoWrap(c.kevent64(kq, changelist.ptr, changelist.len, eventlist.ptr, eventlist.len, flags, timeout));
+}
+
 pub fn mkdir(path: [*]const u8, mode: u32) usize {
     return errnoWrap(c.mkdir(path, mode));
 }
@@ -474,6 +601,10 @@ pub const dirent = c.dirent;
 pub const sa_family_t = c.sa_family_t;
 pub const sockaddr = c.sockaddr;
 
+/// Renamed from `kevent` to `Kevent` to avoid conflict with the syscall.
+pub const Kevent = c.Kevent;
+pub const kevent64_s = c.kevent64_s;
+
 /// Renamed from `sigaction` to `Sigaction` to avoid conflict with the syscall.
 pub const Sigaction = struct {
     handler: extern fn (i32) void,
std/os/index.zig
@@ -2787,3 +2787,59 @@ pub fn cpuCount(fallback_allocator: *mem.Allocator) CpuCountError!usize {
         }
     }
 }
+
+pub const BsdKQueueError = error {
+    /// The per-process limit on the number of open file descriptors has been reached.
+    ProcessFdQuotaExceeded,
+
+    /// The system-wide limit on the total number of open files has been reached.
+    SystemFdQuotaExceeded,
+
+    Unexpected,
+};
+
+pub fn bsdKQueue() BsdKQueueError!i32 {
+    const rc = posix.kqueue();
+    const err = posix.getErrno(rc);
+    switch (err) {
+        0 => return @intCast(i32, rc),
+        posix.EMFILE => return BsdKQueueError.ProcessFdQuotaExceeded,
+        posix.ENFILE => return BsdKQueueError.SystemFdQuotaExceeded,
+        else => return unexpectedErrorPosix(err),
+    }
+}
+
+pub const BsdKEventError = error {
+    /// The process does not have permission to register a filter.
+    AccessDenied,
+
+    /// The event could not be found to be modified or deleted.
+    EventNotFound,
+
+    /// No memory was available to register the event.
+    SystemResources,
+
+    /// The specified process to attach to does not exist.
+    ProcessNotFound,
+};
+
+pub fn bsdKEvent(kq: i32, changelist: []const posix.Kevent, eventlist: []posix.Kevent,
+    timeout: ?*const posix.timespec) BsdKEventError!usize
+{
+    while (true) {
+        const rc = posix.kevent(kq, changelist, eventlist, timeout);
+        const err = posix.getErrno(rc);
+        switch (err) {
+            0 => return rc,
+            posix.EACCES => return BsdKEventError.AccessDenied,
+            posix.EFAULT => unreachable,
+            posix.EBADF => unreachable,
+            posix.EINTR => continue,
+            posix.EINVAL => unreachable,
+            posix.ENOENT => return BsdKEventError.EventNotFound,
+            posix.ENOMEM => return BsdKEventError.SystemResources,
+            posix.ESRCH => return BsdKEventError.ProcessNotFound,
+            else => unreachable,
+        }
+    }
+}
std/event.zig
@@ -118,6 +118,11 @@ pub const Loop = struct {
     extra_threads: []*std.os.Thread,
     final_resume_node: ResumeNode,
 
+    // pre-allocated eventfds. all permanently active.
+    // this is how we send promises to be resumed on other threads.
+    available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
+    eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
+
     pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
 
     pub const ResumeNode = struct {
@@ -130,10 +135,17 @@ pub const Loop = struct {
             EventFd,
         };
 
-        pub const EventFd = struct {
-            base: ResumeNode,
-            epoll_op: u32,
-            eventfd: i32,
+        pub const EventFd = switch (builtin.os) {
+            builtin.Os.macosx => struct {
+                base: ResumeNode,
+                kevent: posix.Kevent,
+            },
+            builtin.Os.linux => struct {
+                base: ResumeNode,
+                epoll_op: u32,
+                eventfd: i32,
+            },
+            else => @compileError("unsupported OS"),
         };
     };
 
@@ -168,36 +180,41 @@ pub const Loop = struct {
                 .id = ResumeNode.Id.Stop,
                 .handle = undefined,
             },
+            .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
+            .eventfd_resume_nodes = undefined,
         };
-        try self.initOsData(thread_count);
+        const extra_thread_count = thread_count - 1;
+        self.eventfd_resume_nodes = try self.allocator.alloc(
+            std.atomic.Stack(ResumeNode.EventFd).Node,
+            extra_thread_count,
+        );
+        errdefer self.allocator.free(self.eventfd_resume_nodes);
+
+        self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
+        errdefer self.allocator.free(self.extra_threads);
+
+        try self.initOsData(extra_thread_count);
         errdefer self.deinitOsData();
     }
 
     /// must call stop before deinit
     pub fn deinit(self: *Loop) void {
         self.deinitOsData();
+        self.allocator.free(self.extra_threads);
     }
 
     const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
-        std.os.SpawnThreadError || std.os.LinuxEpollCtlError;
+        std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError;
 
     const wakeup_bytes = []u8{0x1} ** 8;
 
-    fn initOsData(self: *Loop, thread_count: usize) InitOsDataError!void {
+    fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
         switch (builtin.os) {
             builtin.Os.linux => {
-                const extra_thread_count = thread_count - 1;
-                self.os_data.available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init();
-                self.os_data.eventfd_resume_nodes = try self.allocator.alloc(
-                    std.atomic.Stack(ResumeNode.EventFd).Node,
-                    extra_thread_count,
-                );
-                errdefer self.allocator.free(self.os_data.eventfd_resume_nodes);
-
                 errdefer {
-                    while (self.os_data.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+                    while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
                 }
-                for (self.os_data.eventfd_resume_nodes) |*eventfd_node| {
+                for (self.eventfd_resume_nodes) |*eventfd_node| {
                     eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
                         .data = ResumeNode.EventFd{
                             .base = ResumeNode{
@@ -209,7 +226,7 @@ pub const Loop = struct {
                         },
                         .next = undefined,
                     };
-                    self.os_data.available_eventfd_resume_nodes.push(eventfd_node);
+                    self.available_eventfd_resume_nodes.push(eventfd_node);
                 }
 
                 self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
@@ -228,15 +245,84 @@ pub const Loop = struct {
                     self.os_data.final_eventfd,
                     &self.os_data.final_eventfd_event,
                 );
-                self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
-                errdefer self.allocator.free(self.extra_threads);
 
                 var extra_thread_index: usize = 0;
                 errdefer {
+                    // writing 8 bytes to an eventfd cannot fail
+                    std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+                    while (extra_thread_index != 0) {
+                        extra_thread_index -= 1;
+                        self.extra_threads[extra_thread_index].wait();
+                    }
+                }
+                while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+                    self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+                }
+            },
+            builtin.Os.macosx => {
+                self.os_data.kqfd = try std.os.bsdKQueue();
+                errdefer std.os.close(self.os_data.kqfd);
+
+                self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count);
+                errdefer self.allocator.free(self.os_data.kevents);
+
+                const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+
+                for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+                    eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+                        .data = ResumeNode.EventFd{
+                            .base = ResumeNode{
+                                .id = ResumeNode.Id.EventFd,
+                                .handle = undefined,
+                            },
+                            // this one is for sending events
+                            .kevent = posix.Kevent {
+                                .ident = i,
+                                .filter = posix.EVFILT_USER,
+                                .flags = posix.EV_CLEAR|posix.EV_ADD|posix.EV_DISABLE,
+                                .fflags = 0,
+                                .data = 0,
+                                .udata = @ptrToInt(&eventfd_node.data.base),
+                            },
+                        },
+                        .next = undefined,
+                    };
+                    self.available_eventfd_resume_nodes.push(eventfd_node);
+                    const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
+                    _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+                    eventfd_node.data.kevent.flags = posix.EV_CLEAR|posix.EV_ENABLE;
+                    eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
+                    // this one is for waiting for events
+                    self.os_data.kevents[i] = posix.Kevent {
+                        .ident = i,
+                        .filter = posix.EVFILT_USER,
+                        .flags = 0,
+                        .fflags = 0,
+                        .data = 0,
+                        .udata = @ptrToInt(&eventfd_node.data.base),
+                    };
+                }
+
+                // Pre-add so that we cannot get error.SystemResources
+                // later when we try to activate it.
+                self.os_data.final_kevent = posix.Kevent{
+                    .ident = extra_thread_count,
+                    .filter = posix.EVFILT_USER,
+                    .flags = posix.EV_ADD | posix.EV_DISABLE,
+                    .fflags = 0,
+                    .data = 0,
+                    .udata = @ptrToInt(&self.final_resume_node),
+                };
+                const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+                _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+                self.os_data.final_kevent.flags = posix.EV_ENABLE;
+                self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
+
+                var extra_thread_index: usize = 0;
+                errdefer {
+                    _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
                     while (extra_thread_index != 0) {
                         extra_thread_index -= 1;
-                        // writing 8 bytes to an eventfd cannot fail
-                        std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
                         self.extra_threads[extra_thread_index].wait();
                     }
                 }
@@ -252,10 +338,12 @@ pub const Loop = struct {
         switch (builtin.os) {
             builtin.Os.linux => {
                 std.os.close(self.os_data.final_eventfd);
-                while (self.os_data.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+                while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
                 std.os.close(self.os_data.epollfd);
-                self.allocator.free(self.os_data.eventfd_resume_nodes);
-                self.allocator.free(self.extra_threads);
+                self.allocator.free(self.eventfd_resume_nodes);
+            },
+            builtin.Os.macosx => {
+                self.allocator.free(self.os_data.kevents);
             },
             else => {},
         }
@@ -332,21 +420,38 @@ pub const Loop = struct {
                         continue :start_over;
                     }
 
-                    // non-last node, stick it in the epoll set so that
+                    // non-last node, stick it in the epoll/kqueue set so that
                     // other threads can get to it
-                    if (self.os_data.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
+                    if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
                         const eventfd_node = &resume_stack_node.data;
                         eventfd_node.base.handle = handle;
-                        // the pending count is already accounted for
-                        const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
-                        self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch |_| {
-                            // fine, we didn't need it anyway
-                            _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-                            self.os_data.available_eventfd_resume_nodes.push(resume_stack_node);
-                            resume handle;
-                            _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-                            continue :start_over;
-                        };
+                        switch (builtin.os) {
+                            builtin.Os.macosx => {
+                                const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
+                                const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+                                _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch |_| {
+                                    // fine, we didn't need it anyway
+                                    _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+                                    self.available_eventfd_resume_nodes.push(resume_stack_node);
+                                    resume handle;
+                                    _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                                    continue :start_over;
+                                };
+                            },
+                            builtin.Os.linux => {
+                                // the pending count is already accounted for
+                                const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
+                                self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch |_| {
+                                    // fine, we didn't need it anyway
+                                    _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+                                    self.available_eventfd_resume_nodes.push(resume_stack_node);
+                                    resume handle;
+                                    _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                                    continue :start_over;
+                                };
+                            },
+                            else => @compileError("unsupported OS"),
+                        }
                     } else {
                         // threads are too busy, can't add another eventfd to wake one up
                         _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
@@ -359,35 +464,74 @@ pub const Loop = struct {
                 const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
                 if (pending_event_count == 0) {
                     // cause all the threads to stop
-                    // writing 8 bytes to an eventfd cannot fail
-                    std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
-                    return;
+                    switch (builtin.os) {
+                        builtin.Os.linux => {
+                            // writing 8 bytes to an eventfd cannot fail
+                            std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+                            return;
+                        },
+                        builtin.Os.macosx => {
+                            const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+                            const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+                            // cannot fail because we already added it and this just enables it
+                            _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
+                            return;
+                        },
+                        else => @compileError("unsupported OS"),
+                    }
                 }
 
                 _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
             }
 
-            // only process 1 event so we don't steal from other threads
-            var events: [1]std.os.linux.epoll_event = undefined;
-            const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
-            for (events[0..count]) |ev| {
-                const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
-                const handle = resume_node.handle;
-                const resume_node_id = resume_node.id;
-                switch (resume_node_id) {
-                    ResumeNode.Id.Basic => {},
-                    ResumeNode.Id.Stop => return,
-                    ResumeNode.Id.EventFd => {
-                        const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
-                        event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
-                        const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
-                        self.os_data.available_eventfd_resume_nodes.push(stack_node);
-                    },
-                }
-                resume handle;
-                if (resume_node_id == ResumeNode.Id.EventFd) {
-                    _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-                }
+            switch (builtin.os) {
+                builtin.Os.linux => {
+                    // only process 1 event so we don't steal from other threads
+                    var events: [1]std.os.linux.epoll_event = undefined;
+                    const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
+                    for (events[0..count]) |ev| {
+                        const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
+                        const handle = resume_node.handle;
+                        const resume_node_id = resume_node.id;
+                        switch (resume_node_id) {
+                            ResumeNode.Id.Basic => {},
+                            ResumeNode.Id.Stop => return,
+                            ResumeNode.Id.EventFd => {
+                                const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+                                event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
+                                const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+                                self.available_eventfd_resume_nodes.push(stack_node);
+                            },
+                        }
+                        resume handle;
+                        if (resume_node_id == ResumeNode.Id.EventFd) {
+                            _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                        }
+                    }
+                },
+                builtin.Os.macosx => {
+                    var eventlist: [1]posix.Kevent = undefined;
+                    const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable;
+                    for (eventlist[0..count]) |ev| {
+                        const resume_node = @intToPtr(*ResumeNode, ev.udata);
+                        const handle = resume_node.handle;
+                        const resume_node_id = resume_node.id;
+                        switch (resume_node_id) {
+                            ResumeNode.Id.Basic => {},
+                            ResumeNode.Id.Stop => return,
+                            ResumeNode.Id.EventFd => {
+                                const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+                                const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+                                self.available_eventfd_resume_nodes.push(stack_node);
+                            },
+                        }
+                        resume handle;
+                        if (resume_node_id == ResumeNode.Id.EventFd) {
+                            _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                        }
+                    }
+                },
+                else => @compileError("unsupported OS"),
             }
         }
     }
@@ -395,12 +539,13 @@ pub const Loop = struct {
     const OsData = switch (builtin.os) {
         builtin.Os.linux => struct {
             epollfd: i32,
-            // pre-allocated eventfds. all permanently active.
-            // this is how we send promises to be resumed on other threads.
-            available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
-            eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
             final_eventfd: i32,
-            final_eventfd_event: posix.epoll_event,
+            final_eventfd_event: std.os.linux.epoll_event,
+        },
+        builtin.Os.macosx => struct {
+            kqfd: i32,
+            final_kevent: posix.Kevent,
+            kevents: []posix.Kevent,
         },
         else => struct {},
     };