Commit c2f585e435

Sébastien Marie <semarie@online.fr>
2021-09-04 09:16:26
openbsd: event loop: use EVFILT_TIMER instead of EVFILT_USER
OpenBSD doesn't implement EVFILT_USER filter for kqueue(2), so we couldn't use that for event loop. instead, use a EVFILT_TIMER filter with EV_ONESHOT (trigger only once) and delay 0sec (which trigger immediatly). it fits the usage of EVFILT_USER which is only used to "wakeup" the kevent(2) call from userland.
1 parent f1126e8
Changed files (1)
lib
std
event
lib/std/event/loop.zig
@@ -266,7 +266,7 @@ pub const Loop = struct {
                     self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
                 }
             },
-            .macos, .freebsd, .netbsd, .dragonfly, .openbsd => {
+            .macos, .freebsd, .netbsd, .dragonfly => {
                 self.os_data.kqfd = try os.kqueue();
                 errdefer os.close(self.os_data.kqfd);
 
@@ -331,6 +331,69 @@ pub const Loop = struct {
                     self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
                 }
             },
+            .openbsd => {
+                self.os_data.kqfd = try os.kqueue();
+                errdefer os.close(self.os_data.kqfd);
+
+                const empty_kevs = &[0]os.Kevent{};
+
+                for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+                    eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+                        .data = ResumeNode.EventFd{
+                            .base = ResumeNode{
+                                .id = ResumeNode.Id.EventFd,
+                                .handle = undefined,
+                                .overlapped = ResumeNode.overlapped_init,
+                            },
+                            // this one is for sending events
+                            .kevent = os.Kevent{
+                                .ident = i,
+                                .filter = os.EVFILT_TIMER,
+                                .flags = os.EV_CLEAR | os.EV_ADD | os.EV_DISABLE | os.EV_ONESHOT,
+                                .fflags = 0,
+                                .data = 0,
+                                .udata = @ptrToInt(&eventfd_node.data.base),
+                            },
+                        },
+                        .next = undefined,
+                    };
+                    self.available_eventfd_resume_nodes.push(eventfd_node);
+                    const kevent_array = @as(*const [1]os.Kevent, &eventfd_node.data.kevent);
+                    _ = try os.kevent(self.os_data.kqfd, kevent_array, empty_kevs, null);
+                    eventfd_node.data.kevent.flags = os.EV_CLEAR | os.EV_ENABLE;
+                }
+
+                // Pre-add so that we cannot get error.SystemResources
+                // later when we try to activate it.
+                self.os_data.final_kevent = os.Kevent{
+                    .ident = extra_thread_count,
+                    .filter = os.EVFILT_TIMER,
+                    .flags = os.EV_ADD | os.EV_ONESHOT | os.EV_DISABLE,
+                    .fflags = 0,
+                    .data = 0,
+                    .udata = @ptrToInt(&self.final_resume_node),
+                };
+                const final_kev_arr = @as(*const [1]os.Kevent, &self.os_data.final_kevent);
+                _ = try os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null);
+                self.os_data.final_kevent.flags = os.EV_ENABLE;
+
+                if (builtin.single_threaded) {
+                    assert(extra_thread_count == 0);
+                    return;
+                }
+
+                var extra_thread_index: usize = 0;
+                errdefer {
+                    _ = os.kevent(self.os_data.kqfd, final_kev_arr, empty_kevs, null) catch unreachable;
+                    while (extra_thread_index != 0) {
+                        extra_thread_index -= 1;
+                        self.extra_threads[extra_thread_index].join();
+                    }
+                }
+                while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+                    self.extra_threads[extra_thread_index] = try Thread.spawn(.{}, workerRun, .{self});
+                }
+            },
             .windows => {
                 self.os_data.io_port = try windows.CreateIoCompletionPort(
                     windows.INVALID_HANDLE_VALUE,