Commit a9ab528e34

Andrew Kelley <superjoe30@gmail.com>
2018-07-17 21:17:06
std.event.Loop.onNextTick dispatches work to waiting threads
1 parent ecf8da0
Changed files (2)
std
atomic
event
std/atomic/queue.zig
@@ -51,6 +51,20 @@ pub fn Queue(comptime T: type) type {
             return head;
         }
 
+        pub fn unget(self: *Self, node: *Node) void {
+            while (@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) != 0) {}
+            defer assert(@atomicRmw(u8, &self.lock, builtin.AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst) == 1);
+
+            const opt_head = self.head;
+            self.head = node;
+            if (opt_head) |head| {
+                head.next = node;
+            } else {
+                assert(self.tail == null);
+                self.tail = node;
+            }
+        }
+
         pub fn isEmpty(self: *Self) bool {
             return @atomicLoad(?*Node, &self.head, builtin.AtomicOrder.SeqCst) != null;
         }
std/event/loop.zig
@@ -12,7 +12,6 @@ pub const Loop = struct {
     next_tick_queue: std.atomic.Queue(promise),
     os_data: OsData,
     final_resume_node: ResumeNode,
-    dispatch_lock: u8, // TODO make this a bool
     pending_event_count: usize,
     extra_threads: []*std.os.Thread,
 
@@ -74,11 +73,10 @@ pub const Loop = struct {
     /// max(thread_count - 1, 0)
     fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
         self.* = Loop{
-            .pending_event_count = 0,
+            .pending_event_count = 1,
             .allocator = allocator,
             .os_data = undefined,
             .next_tick_queue = std.atomic.Queue(promise).init(),
-            .dispatch_lock = 1, // start locked so threads go directly into epoll wait
             .extra_threads = undefined,
             .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
             .eventfd_resume_nodes = undefined,
@@ -306,7 +304,7 @@ pub const Loop = struct {
     pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
         _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
         errdefer {
-            _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+            self.finishOneEvent();
         }
         try self.modFd(
             fd,
@@ -326,7 +324,7 @@ pub const Loop = struct {
 
     pub fn removeFd(self: *Loop, fd: i32) void {
         self.removeFdNoCounter(fd);
-        _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+        self.finishOneEvent();
     }
 
     fn removeFdNoCounter(self: *Loop, fd: i32) void {
@@ -345,14 +343,70 @@ pub const Loop = struct {
         }
     }
 
+    fn dispatch(self: *Loop) void {
+        while (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
+            const next_tick_node = self.next_tick_queue.get() orelse {
+                self.available_eventfd_resume_nodes.push(resume_stack_node);
+                return;
+            };
+            const eventfd_node = &resume_stack_node.data;
+            eventfd_node.base.handle = next_tick_node.data;
+            switch (builtin.os) {
+                builtin.Os.macosx => {
+                    const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
+                    const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+                    _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
+                        self.next_tick_queue.unget(next_tick_node);
+                        self.available_eventfd_resume_nodes.push(resume_stack_node);
+                        return;
+                    };
+                },
+                builtin.Os.linux => {
+                    // the pending count is already accounted for
+                    const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT |
+                        std.os.linux.EPOLLET;
+                    self.modFd(
+                        eventfd_node.eventfd,
+                        eventfd_node.epoll_op,
+                        epoll_events,
+                        &eventfd_node.base,
+                    ) catch {
+                        self.next_tick_queue.unget(next_tick_node);
+                        self.available_eventfd_resume_nodes.push(resume_stack_node);
+                        return;
+                    };
+                },
+                builtin.Os.windows => {
+                    // this value is never dereferenced but we need it to be non-null so that
+                    // the consumer code can decide whether to read the completion key.
+                    // it has to do this for normal I/O, so we match that behavior here.
+                    const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+                    std.os.windowsPostQueuedCompletionStatus(
+                        self.os_data.io_port,
+                        undefined,
+                        eventfd_node.completion_key,
+                        overlapped,
+                    ) catch {
+                        self.next_tick_queue.unget(next_tick_node);
+                        self.available_eventfd_resume_nodes.push(resume_stack_node);
+                        return;
+                    };
+                },
+                else => @compileError("unsupported OS"),
+            }
+        }
+    }
+
     /// Bring your own linked list node. This means it can't fail.
     pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
         _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
         self.next_tick_queue.put(node);
+        self.dispatch();
     }
 
     pub fn run(self: *Loop) void {
-        _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+        self.finishOneEvent(); // the reference we start with
+
         self.workerRun();
         for (self.extra_threads) |extra_thread| {
             extra_thread.wait();
@@ -396,106 +450,45 @@ pub const Loop = struct {
         }
     }
 
-    fn workerRun(self: *Loop) void {
-        start_over: while (true) {
-            if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {
-                while (self.next_tick_queue.get()) |next_tick_node| {
-                    const handle = next_tick_node.data;
-                    if (self.next_tick_queue.isEmpty()) {
-                        // last node, just resume it
-                        _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-                        resume handle;
-                        _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-                        continue :start_over;
-                    }
-
-                    // non-last node, stick it in the epoll/kqueue set so that
-                    // other threads can get to it
-                    if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
-                        const eventfd_node = &resume_stack_node.data;
-                        eventfd_node.base.handle = handle;
-                        switch (builtin.os) {
-                            builtin.Os.macosx => {
-                                const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
-                                const eventlist = ([*]posix.Kevent)(undefined)[0..0];
-                                _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
-                                    // fine, we didn't need it anyway
-                                    _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-                                    self.available_eventfd_resume_nodes.push(resume_stack_node);
-                                    resume handle;
-                                    _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-                                    continue :start_over;
-                                };
-                            },
-                            builtin.Os.linux => {
-                                // the pending count is already accounted for
-                                const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
-                                self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch {
-                                    // fine, we didn't need it anyway
-                                    _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-                                    self.available_eventfd_resume_nodes.push(resume_stack_node);
-                                    resume handle;
-                                    _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-                                    continue :start_over;
-                                };
-                            },
-                            builtin.Os.windows => {
-                                // this value is never dereferenced but we need it to be non-null so that
-                                // the consumer code can decide whether to read the completion key.
-                                // it has to do this for normal I/O, so we match that behavior here.
-                                const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
-                                std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch {
-                                    // fine, we didn't need it anyway
-                                    _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-                                    self.available_eventfd_resume_nodes.push(resume_stack_node);
-                                    resume handle;
-                                    _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-                                    continue :start_over;
-                                };
-                            },
-                            else => @compileError("unsupported OS"),
+    fn finishOneEvent(self: *Loop) void {
+        if (@atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst) == 1) {
+            // cause all the threads to stop
+            switch (builtin.os) {
+                builtin.Os.linux => {
+                    // writing 8 bytes to an eventfd cannot fail
+                    std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+                    return;
+                },
+                builtin.Os.macosx => {
+                    const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+                    const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+                    // cannot fail because we already added it and this just enables it
+                    _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
+                    return;
+                },
+                builtin.Os.windows => {
+                    var i: usize = 0;
+                    while (i < self.os_data.extra_thread_count) : (i += 1) {
+                        while (true) {
+                            const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+                            std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+                            break;
                         }
-                    } else {
-                        // threads are too busy, can't add another eventfd to wake one up
-                        _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-                        resume handle;
-                        _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-                        continue :start_over;
                     }
-                }
-
-                const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
-                if (pending_event_count == 0) {
-                    // cause all the threads to stop
-                    switch (builtin.os) {
-                        builtin.Os.linux => {
-                            // writing 8 bytes to an eventfd cannot fail
-                            std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
-                            return;
-                        },
-                        builtin.Os.macosx => {
-                            const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
-                            const eventlist = ([*]posix.Kevent)(undefined)[0..0];
-                            // cannot fail because we already added it and this just enables it
-                            _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
-                            return;
-                        },
-                        builtin.Os.windows => {
-                            var i: usize = 0;
-                            while (i < self.os_data.extra_thread_count) : (i += 1) {
-                                while (true) {
-                                    const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
-                                    std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
-                                    break;
-                                }
-                            }
-                            return;
-                        },
-                        else => @compileError("unsupported OS"),
-                    }
-                }
+                    return;
+                },
+                else => @compileError("unsupported OS"),
+            }
+        }
+    }
 
-                _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+    fn workerRun(self: *Loop) void {
+        while (true) {
+            while (true) {
+                const next_tick_node = self.next_tick_queue.get() orelse break;
+                self.dispatch();
+                resume next_tick_node.data;
+                self.finishOneEvent();
             }
 
             switch (builtin.os) {
@@ -519,7 +512,7 @@ pub const Loop = struct {
                         }
                         resume handle;
                         if (resume_node_id == ResumeNode.Id.EventFd) {
-                            _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                            self.finishOneEvent();
                         }
                     }
                 },
@@ -541,7 +534,7 @@ pub const Loop = struct {
                         }
                         resume handle;
                         if (resume_node_id == ResumeNode.Id.EventFd) {
-                            _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                            self.finishOneEvent();
                         }
                     }
                 },
@@ -570,7 +563,7 @@ pub const Loop = struct {
                     }
                     resume handle;
                     if (resume_node_id == ResumeNode.Id.EventFd) {
-                        _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                        self.finishOneEvent();
                     }
                 },
                 else => @compileError("unsupported OS"),