Commit aa53f6d0b5

kprotty <45520026+kprotty@users.noreply.github.com>
2020-10-11 21:18:19
integrate std.time.sleep with the event loop
1 parent a42c0f8
Changed files (2)
lib
lib/std/event/loop.zig
@@ -35,6 +35,9 @@ pub const Loop = struct {
     /// This is only used by `Loop` for the thread pool and associated resources.
     arena: std.heap.ArenaAllocator,
 
+    /// State which manages frames that are sleeping on timers
+    delay_queue: DelayQueue,
+
     /// Pre-allocated eventfds. All permanently active.
     /// This is how `Loop` sends promises to be resumed on other threads.
     available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
@@ -162,6 +165,7 @@ pub const Loop = struct {
             .fs_queue = std.atomic.Queue(Request).init(),
             .fs_thread = undefined,
             .fs_thread_wakeup = std.ResetEvent.init(),
+            .delay_queue = undefined,
         };
         errdefer self.fs_thread_wakeup.deinit();
         errdefer self.arena.deinit();
@@ -186,6 +190,9 @@ pub const Loop = struct {
             self.posixFsRequest(&self.fs_end_request);
             self.fs_thread.wait();
         };
+
+        if (!std.builtin.single_threaded)
+            try self.delay_queue.init();
     }
 
     pub fn deinit(self: *Loop) void {
@@ -645,6 +652,10 @@ pub const Loop = struct {
         for (self.extra_threads) |extra_thread| {
             extra_thread.wait();
         }
+
+        @atomicStore(bool, &self.delay_queue.is_running, false, .SeqCst);
+        self.delay_queue.event.set();
+        self.delay_queue.thread.wait();
     }
 
     /// Runs the provided function asynchronously. The function's frame is allocated
@@ -748,6 +759,125 @@ pub const Loop = struct {
         }
     }
 
+    pub fn sleep(self: *Loop, nanoseconds: u64) void {
+        if (std.builtin.single_threaded)
+            @compileError("TODO: integrate timers with epoll/kevent/iocp for single-threaded");
+
+        suspend {
+            const now = self.delay_queue.timer.read();
+
+            var entry: DelayQueue.Waiters.Entry = undefined;
+            entry.init(@frame(), now + nanoseconds);
+            self.delay_queue.waiters.insert(&entry);
+
+            // Speculatively wake up the timer thread when we add a new entry.
+            // If the timer thread is sleeping on a longer entry, we need to 
+            // interrupt it so that our entry can be expired in time.
+            self.delay_queue.event.set();
+        }
+    }
+
+    const DelayQueue = struct {
+        timer: std.time.Timer,
+        waiters: Waiters,
+        thread: *std.Thread,
+        event: std.AutoResetEvent,
+        is_running: bool,
+        
+        /// Initialize the delay queue by spawning the timer thread
+        /// and starting any timer resources.
+        fn init(self: *DelayQueue) !void {
+            self.* = DelayQueue{
+                .timer = try std.time.Timer.start(),
+                .waiters = DelayQueue.Waiters{
+                    .entries = std.atomic.Queue(anyframe).init(),
+                },
+                .thread = try std.Thread.spawn(&self.delay_queue, DelayQueue.run),
+                .event = std.AutoResetEvent{},
+                .is_running = true,
+            };
+        }
+
+        /// Entry point for the timer thread 
+        /// which waits for timer entries to expire and reschedules them.
+        fn run(self: *DelayQueue) void {
+            const loop = @fieldParentPtr(Loop, "delay_queue", self);
+
+            while (@atomicLoad(bool, &self.is_running, .SeqCst)) {
+                const now = self.timer.read();
+
+                if (self.waiters.popExpired(now)) |entry| {
+                    loop.onNextTick(&entry.node);
+                    continue;
+                }
+
+                if (self.waiters.nextExpire()) |expires| {
+                    if (now >= expires)
+                        continue;
+                    self.event.timedWait(expires - now) catch {};
+                } else {
+                    self.event.wait();
+                }
+            }
+        }
+
+        // TODO: use a tickless heirarchical timer wheel:
+        // https://github.com/wahern/timeout/
+        const Waiters = struct {
+            entries: std.atomic.Queue(anyframe),
+
+            const Entry = struct {
+                node: NextTickNode,
+                expires: u64,
+
+                fn init(self: *Entry, frame: anyframe, expires: u64) void {
+                    self.node.data = frame;
+                    self.expires = expires;
+                }
+            };
+
+            /// Registers the entry into the queue of waiting frames
+            fn insert(self: *Waiters, entry: *Entry) void {
+                self.entries.put(&entry.node);
+            }
+
+            /// Dequeues one expired event relative to `now`
+            fn popExpired(self: *Waiters, now: u64) ?*Entry {
+                const entry = self.peekExpiringEntry() orelse return null;
+                if (entry.expires > now)
+                    return null;
+                
+                assert(self.entries.remove(&entry.node));
+                return entry;
+            }
+            
+            /// Returns an estimate for the amount of time 
+            /// to wait until the next waiting entry expires.
+            fn nextExpire(self: *Waiters) ?u64 {
+                const entry = self.peekExpiringEntry() orelse return null;
+                return entry.expires;
+            }
+
+            fn peekExpiringEntry() ?*Entry {
+                const held = self.entries.mutex.acquire();
+                defer held.release();
+
+                var head = self.entries.head orelse return null;
+
+                var min = head;
+                while (head.next) |node| {
+                    const minEntry = @fieldParentPtr(Entry, "node", min);
+                    const nodeEntry = @fieldParentPtr(Entry, "node", node);
+                    if (nodeEntry.expires < minEntry.expires)
+                        min = node;
+                    head = node;
+                }
+
+                return @fieldParentPtr(Entry, "node", min);
+            }
+        };
+    };
+
     /// ------- I/0 APIs -------
     pub fn accept(
         self: *Loop,
@@ -1550,3 +1680,27 @@ test "std.event.Loop - runDetached" {
 fn testRunDetached() void {
     testRunDetachedData += 1;
 }
+
+test "std.event.Loop - sleep" {
+    // https://github.com/ziglang/zig/issues/1908
+    if (builtin.single_threaded) return error.SkipZigTest;
+    if (!std.io.is_async) return error.SkipZigTest;
+
+    const frames = try testing.allocator.alloc(@Frame(testSleep), 10);
+    defer testing.allocator.free(frames);
+
+    const wait_time = 100 * std.time.ns_per_ms;
+    var sleep_count: usize = 0;
+
+    for (frames) |*frame|
+        frame.* = async testSleep(wait_time, &sleep_count);
+    for (frames) |*frame|
+        await frame;
+
+    testing.expect(sleep_count == frames.len);
+}
+
+fn testSleep(wait_ns: u64, sleep_count: *usize) void {
+    Loop.instance.?.sleep(wait_ns);
+    _ = @atomicRmw(usize, sleep_count, .Add, 1, .SeqCst);
+}
lib/std/time.zig
@@ -14,8 +14,11 @@ const is_windows = std.Target.current.os.tag == .windows;
 pub const epoch = @import("time/epoch.zig");
 
 /// Spurious wakeups are possible and no precision of timing is guaranteed.
-/// TODO integrate with evented I/O
 pub fn sleep(nanoseconds: u64) void {
+    // TODO: opting out of async sleeping?
+    if (std.io.is_async)
+        return std.event.Loop.instance.?.sleep(nanoseconds);
+
     if (is_windows) {
         const big_ms_from_ns = nanoseconds / ns_per_ms;
         const ms = math.cast(os.windows.DWORD, big_ms_from_ns) catch math.maxInt(os.windows.DWORD);