Commit b6eb404831
Changed files (7)
std/event/channel.zig
@@ -0,0 +1,254 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const Loop = std.event.Loop;
+
+/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
+/// when buffer is empty, consumers suspend and are resumed by producers
+/// when buffer is full, producers suspend and are resumed by consumers
+pub fn Channel(comptime T: type) type {
+ return struct {
+ loop: *Loop,
+
+ getters: std.atomic.QueueMpsc(GetNode),
+ putters: std.atomic.QueueMpsc(PutNode),
+ get_count: usize,
+ put_count: usize,
+ dispatch_lock: u8, // TODO make this a bool
+ need_dispatch: u8, // TODO make this a bool
+
+ // simple fixed size ring buffer
+ buffer_nodes: []T,
+ buffer_index: usize,
+ buffer_len: usize,
+
+ const SelfChannel = this;
+ const GetNode = struct {
+ ptr: *T,
+ tick_node: *Loop.NextTickNode,
+ };
+ const PutNode = struct {
+ data: T,
+ tick_node: *Loop.NextTickNode,
+ };
+
+ /// call destroy when done
+ pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
+ const buffer_nodes = try loop.allocator.alloc(T, capacity);
+ errdefer loop.allocator.free(buffer_nodes);
+
+ const self = try loop.allocator.create(SelfChannel{
+ .loop = loop,
+ .buffer_len = 0,
+ .buffer_nodes = buffer_nodes,
+ .buffer_index = 0,
+ .dispatch_lock = 0,
+ .need_dispatch = 0,
+ .getters = std.atomic.QueueMpsc(GetNode).init(),
+ .putters = std.atomic.QueueMpsc(PutNode).init(),
+ .get_count = 0,
+ .put_count = 0,
+ });
+ errdefer loop.allocator.destroy(self);
+
+ return self;
+ }
+
+ /// must be called when all calls to put and get have suspended and no more calls occur
+ pub fn destroy(self: *SelfChannel) void {
+ while (self.getters.get()) |get_node| {
+ cancel get_node.data.tick_node.data;
+ }
+ while (self.putters.get()) |put_node| {
+ cancel put_node.data.tick_node.data;
+ }
+ self.loop.allocator.free(self.buffer_nodes);
+ self.loop.allocator.destroy(self);
+ }
+
+ /// puts a data item in the channel. The promise completes when the value has been added to the
+ /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
+ pub async fn put(self: *SelfChannel, data: T) void {
+ // TODO should be able to group memory allocation failure before first suspend point
+ // so that the async invocation catches it
+ var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
+ _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
+
+ suspend |handle| {
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle,
+ };
+ var queue_node = std.atomic.QueueMpsc(PutNode).Node{
+ .data = PutNode{
+ .tick_node = &my_tick_node,
+ .data = data,
+ },
+ .next = undefined,
+ };
+ self.putters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ self.loop.onNextTick(dispatch_tick_node_ptr);
+ }
+ }
+
+ /// await this function to get an item from the channel. If the buffer is empty, the promise will
+ /// complete when the next item is put in the channel.
+ pub async fn get(self: *SelfChannel) T {
+ // TODO should be able to group memory allocation failure before first suspend point
+ // so that the async invocation catches it
+ var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
+ _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
+
+ // TODO integrate this function with named return values
+ // so we can get rid of this extra result copy
+ var result: T = undefined;
+ suspend |handle| {
+ var my_tick_node = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle,
+ };
+ var queue_node = std.atomic.QueueMpsc(GetNode).Node{
+ .data = GetNode{
+ .ptr = &result,
+ .tick_node = &my_tick_node,
+ },
+ .next = undefined,
+ };
+ self.getters.put(&queue_node);
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ self.loop.onNextTick(dispatch_tick_node_ptr);
+ }
+ return result;
+ }
+
+ async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
+ // resumed by onNextTick
+ suspend |handle| {
+ var tick_node = Loop.NextTickNode{
+ .data = handle,
+ .next = undefined,
+ };
+ tick_node_ptr.* = &tick_node;
+ }
+
+ // set the "need dispatch" flag
+ _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+
+ lock: while (true) {
+ // set the lock flag
+ const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ if (prev_lock != 0) return;
+
+ // clear the need_dispatch flag since we're about to do it
+ _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ while (true) {
+ one_dispatch: {
+ // later we correct these extra subtractions
+ var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+
+ // transfer self.buffer to self.getters
+ while (self.buffer_len != 0) {
+ if (get_count == 0) break :one_dispatch;
+
+ const get_node = &self.getters.get().?.data;
+ get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
+ self.loop.onNextTick(get_node.tick_node);
+ self.buffer_len -= 1;
+
+ get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ // direct transfer self.putters to self.getters
+ while (get_count != 0 and put_count != 0) {
+ const get_node = &self.getters.get().?.data;
+ const put_node = &self.putters.get().?.data;
+
+ get_node.ptr.* = put_node.data;
+ self.loop.onNextTick(get_node.tick_node);
+ self.loop.onNextTick(put_node.tick_node);
+
+ get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ // transfer self.putters to self.buffer
+ while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
+ const put_node = &self.putters.get().?.data;
+
+ self.buffer_nodes[self.buffer_index] = put_node.data;
+ self.loop.onNextTick(put_node.tick_node);
+ self.buffer_index +%= 1;
+ self.buffer_len += 1;
+
+ put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+
+ // undo the extra subtractions
+ _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+
+ // clear need-dispatch flag
+ const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ if (need_dispatch != 0) continue;
+
+ const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ assert(my_lock != 0);
+
+ // we have to check again now that we unlocked
+ if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
+
+ return;
+ }
+ }
+ }
+ };
+}
+
+test "std.event.Channel" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ // TODO make a multi threaded test
+ try loop.initSingleThreaded(allocator);
+ defer loop.deinit();
+
+ const channel = try Channel(i32).create(&loop, 0);
+ defer channel.destroy();
+
+ const handle = try async<allocator> testChannelGetter(&loop, channel);
+ defer cancel handle;
+
+ const putter = try async<allocator> testChannelPutter(channel);
+ defer cancel putter;
+
+ loop.run();
+}
+
+async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
+ errdefer @panic("test failed");
+
+ const value1_promise = try async channel.get();
+ const value1 = await value1_promise;
+ assert(value1 == 1234);
+
+ const value2_promise = try async channel.get();
+ const value2 = await value2_promise;
+ assert(value2 == 4567);
+}
+
+async fn testChannelPutter(channel: *Channel(i32)) void {
+ await (async channel.put(1234) catch @panic("out of memory"));
+ await (async channel.put(4567) catch @panic("out of memory"));
+}
+
std/event/lock.zig
@@ -0,0 +1,204 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const mem = std.mem;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const Loop = std.event.Loop;
+
+/// Thread-safe async/await lock.
+/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+pub const Lock = struct {
+ loop: *Loop,
+ shared_bit: u8, // TODO make this a bool
+ queue: Queue,
+ queue_empty_bit: u8, // TODO make this a bool
+
+ const Queue = std.atomic.QueueMpsc(promise);
+
+ pub const Held = struct {
+ lock: *Lock,
+
+ pub fn release(self: Held) void {
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // We need to release the lock.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ // There might be a queue item. If we know the queue is empty, we can be done,
+ // because the other actor will try to obtain the lock.
+ // But if there's a queue item, we are the actor which must loop and attempt
+ // to grab the lock again.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
+ return;
+ }
+
+ while (true) {
+ const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ if (old_bit != 0) {
+ // We did not obtain the lock. Great, the queue is someone else's problem.
+ return;
+ }
+
+ // Resume the next item from the queue.
+ if (self.lock.queue.get()) |node| {
+ self.lock.loop.onNextTick(node);
+ return;
+ }
+
+ // Release the lock again.
+ _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ // Find out if we can be done.
+ if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
+ return;
+ }
+ }
+ }
+ };
+
+ pub fn init(loop: *Loop) Lock {
+ return Lock{
+ .loop = loop,
+ .shared_bit = 0,
+ .queue = Queue.init(),
+ .queue_empty_bit = 1,
+ };
+ }
+
+ /// Must be called when not locked. Not thread safe.
+ /// All calls to acquire() and release() must complete before calling deinit().
+ pub fn deinit(self: *Lock) void {
+ assert(self.shared_bit == 0);
+ while (self.queue.get()) |node| cancel node.data;
+ }
+
+ pub async fn acquire(self: *Lock) Held {
+ s: suspend |handle| {
+ // TODO explicitly put this memory in the coroutine frame #1194
+ var my_tick_node = Loop.NextTickNode{
+ .data = handle,
+ .next = undefined,
+ };
+
+ self.queue.put(&my_tick_node);
+
+ // At this point, we are in the queue, so we might have already been resumed and this coroutine
+ // frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
+
+ // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
+ // will attempt to grab the lock.
+ _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ while (true) {
+ const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ if (old_bit != 0) {
+ // We did not obtain the lock. Trust that our queue entry will resume us, and allow
+ // suspend to complete.
+ break;
+ }
+ // We got the lock. However we might have already been resumed from the queue.
+ if (self.queue.get()) |node| {
+ // Whether this node is us or someone else, we tail resume it.
+ resume node.data;
+ break;
+ } else {
+ // We already got resumed, and there are none left in the queue, which means that
+ // we aren't even supposed to hold the lock right now.
+ _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
+ _ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+
+ // There might be a queue item. If we know the queue is empty, we can be done,
+ // because the other actor will try to obtain the lock.
+ // But if there's a queue item, we are the actor which must loop and attempt
+ // to grab the lock again.
+ if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
+ break;
+ } else {
+ continue;
+ }
+ }
+ unreachable;
+ }
+ }
+
+ return Held{ .lock = self };
+ }
+};
+
+test "std.event.Lock" {
+ var da = std.heap.DirectAllocator.init();
+ defer da.deinit();
+
+ const allocator = &da.allocator;
+
+ var loop: Loop = undefined;
+ try loop.initMultiThreaded(allocator);
+ defer loop.deinit();
+
+ var lock = Lock.init(&loop);
+ defer lock.deinit();
+
+ const handle = try async<allocator> testLock(&loop, &lock);
+ defer cancel handle;
+ loop.run();
+
+ assert(mem.eql(i32, shared_test_data, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len));
+}
+
+async fn testLock(loop: *Loop, lock: *Lock) void {
+ // TODO explicitly put next tick node memory in the coroutine frame #1194
+ suspend |p| {
+ resume p;
+ }
+ const handle1 = async lockRunner(lock) catch @panic("out of memory");
+ var tick_node1 = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle1,
+ };
+ loop.onNextTick(&tick_node1);
+
+ const handle2 = async lockRunner(lock) catch @panic("out of memory");
+ var tick_node2 = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle2,
+ };
+ loop.onNextTick(&tick_node2);
+
+ const handle3 = async lockRunner(lock) catch @panic("out of memory");
+ var tick_node3 = Loop.NextTickNode{
+ .next = undefined,
+ .data = handle3,
+ };
+ loop.onNextTick(&tick_node3);
+
+ await handle1;
+ await handle2;
+ await handle3;
+}
+
+var shared_test_data = [1]i32{0} ** 10;
+var shared_test_index: usize = 0;
+
+async fn lockRunner(lock: *Lock) void {
+ suspend; // resumed by onNextTick
+
+ var i: usize = 0;
+ while (i < shared_test_data.len) : (i += 1) {
+ const lock_promise = async lock.acquire() catch @panic("out of memory");
+ const handle = await lock_promise;
+ defer handle.release();
+
+ shared_test_index = 0;
+ while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
+ shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
+ }
+ }
+}
std/event/locked.zig
@@ -0,0 +1,42 @@
+const std = @import("../index.zig");
+const Lock = std.event.Lock;
+
+/// Thread-safe async/await lock that protects one piece of data.
+/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
+/// are resumed when the lock is released, in order.
+pub fn Locked(comptime T: type) type {
+ return struct {
+ lock: Lock,
+ private_data: T,
+
+ const Self = this;
+
+ pub const HeldLock = struct {
+ value: *T,
+ held: Lock.Held,
+
+ pub fn release(self: HeldLock) void {
+ self.held.release();
+ }
+ };
+
+ pub fn init(loop: *Loop, data: T) Self {
+ return Self{
+ .lock = Lock.init(loop),
+ .private_data = data,
+ };
+ }
+
+ pub fn deinit(self: *Self) void {
+ self.lock.deinit();
+ }
+
+ pub async fn acquire(self: *Self) HeldLock {
+ return HeldLock{
+ // TODO guaranteed allocation elision
+ .held = await (async self.lock.acquire() catch unreachable),
+ .value = &self.private_data,
+ };
+ }
+ };
+}
std/event/loop.zig
@@ -0,0 +1,577 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const mem = std.mem;
+const posix = std.os.posix;
+const windows = std.os.windows;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+
+pub const Loop = struct {
+ allocator: *mem.Allocator,
+ next_tick_queue: std.atomic.QueueMpsc(promise),
+ os_data: OsData,
+ final_resume_node: ResumeNode,
+ dispatch_lock: u8, // TODO make this a bool
+ pending_event_count: usize,
+ extra_threads: []*std.os.Thread,
+
+ // pre-allocated eventfds. all permanently active.
+ // this is how we send promises to be resumed on other threads.
+ available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
+ eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
+
+ pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
+
+ pub const ResumeNode = struct {
+ id: Id,
+ handle: promise,
+
+ pub const Id = enum {
+ Basic,
+ Stop,
+ EventFd,
+ };
+
+ pub const EventFd = switch (builtin.os) {
+ builtin.Os.macosx => MacOsEventFd,
+ builtin.Os.linux => struct {
+ base: ResumeNode,
+ epoll_op: u32,
+ eventfd: i32,
+ },
+ builtin.Os.windows => struct {
+ base: ResumeNode,
+ completion_key: usize,
+ },
+ else => @compileError("unsupported OS"),
+ };
+
+ const MacOsEventFd = struct {
+ base: ResumeNode,
+ kevent: posix.Kevent,
+ };
+ };
+
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ return self.initInternal(allocator, 1);
+ }
+
+ /// The allocator must be thread-safe because we use it for multiplexing
+ /// coroutines onto kernel threads.
+ /// After initialization, call run().
+ /// TODO copy elision / named return values so that the threads referencing *Loop
+ /// have the correct pointer value.
+ fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
+ const core_count = try std.os.cpuCount(allocator);
+ return self.initInternal(allocator, core_count);
+ }
+
+ /// Thread count is the total thread count. The thread pool size will be
+ /// max(thread_count - 1, 0)
+ fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
+ self.* = Loop{
+ .pending_event_count = 0,
+ .allocator = allocator,
+ .os_data = undefined,
+ .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
+ .dispatch_lock = 1, // start locked so threads go directly into epoll wait
+ .extra_threads = undefined,
+ .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
+ .eventfd_resume_nodes = undefined,
+ .final_resume_node = ResumeNode{
+ .id = ResumeNode.Id.Stop,
+ .handle = undefined,
+ },
+ };
+ const extra_thread_count = thread_count - 1;
+ self.eventfd_resume_nodes = try self.allocator.alloc(
+ std.atomic.Stack(ResumeNode.EventFd).Node,
+ extra_thread_count,
+ );
+ errdefer self.allocator.free(self.eventfd_resume_nodes);
+
+ self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
+ errdefer self.allocator.free(self.extra_threads);
+
+ try self.initOsData(extra_thread_count);
+ errdefer self.deinitOsData();
+ }
+
+ /// must call stop before deinit
+ pub fn deinit(self: *Loop) void {
+ self.deinitOsData();
+ self.allocator.free(self.extra_threads);
+ }
+
+ const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
+ std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError ||
+ std.os.WindowsCreateIoCompletionPortError;
+
+ const wakeup_bytes = []u8{0x1} ** 8;
+
+ fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ errdefer {
+ while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+ }
+ for (self.eventfd_resume_nodes) |*eventfd_node| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
+ .epoll_op = posix.EPOLL_CTL_ADD,
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
+ errdefer std.os.close(self.os_data.epollfd);
+
+ self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
+ errdefer std.os.close(self.os_data.final_eventfd);
+
+ self.os_data.final_eventfd_event = posix.epoll_event{
+ .events = posix.EPOLLIN,
+ .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
+ };
+ try std.os.linuxEpollCtl(
+ self.os_data.epollfd,
+ posix.EPOLL_CTL_ADD,
+ self.os_data.final_eventfd,
+ &self.os_data.final_eventfd_event,
+ );
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ // writing 8 bytes to an eventfd cannot fail
+ std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ builtin.Os.macosx => {
+ self.os_data.kqfd = try std.os.bsdKQueue();
+ errdefer std.os.close(self.os_data.kqfd);
+
+ self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count);
+ errdefer self.allocator.free(self.os_data.kevents);
+
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ // this one is for sending events
+ .kevent = posix.Kevent{
+ .ident = i,
+ .filter = posix.EVFILT_USER,
+ .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ },
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
+ _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+ eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE;
+ eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
+ // this one is for waiting for events
+ self.os_data.kevents[i] = posix.Kevent{
+ .ident = i,
+ .filter = posix.EVFILT_USER,
+ .flags = 0,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&eventfd_node.data.base),
+ };
+ }
+
+ // Pre-add so that we cannot get error.SystemResources
+ // later when we try to activate it.
+ self.os_data.final_kevent = posix.Kevent{
+ .ident = extra_thread_count,
+ .filter = posix.EVFILT_USER,
+ .flags = posix.EV_ADD | posix.EV_DISABLE,
+ .fflags = 0,
+ .data = 0,
+ .udata = @ptrToInt(&self.final_resume_node),
+ };
+ const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+ _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
+ self.os_data.final_kevent.flags = posix.EV_ENABLE;
+ self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ builtin.Os.windows => {
+ self.os_data.extra_thread_count = extra_thread_count;
+
+ self.os_data.io_port = try std.os.windowsCreateIoCompletionPort(
+ windows.INVALID_HANDLE_VALUE,
+ null,
+ undefined,
+ undefined,
+ );
+ errdefer std.os.close(self.os_data.io_port);
+
+ for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+ eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+ .data = ResumeNode.EventFd{
+ .base = ResumeNode{
+ .id = ResumeNode.Id.EventFd,
+ .handle = undefined,
+ },
+ // this one is for sending events
+ .completion_key = @ptrToInt(&eventfd_node.data.base),
+ },
+ .next = undefined,
+ };
+ self.available_eventfd_resume_nodes.push(eventfd_node);
+ }
+
+ var extra_thread_index: usize = 0;
+ errdefer {
+ var i: usize = 0;
+ while (i < extra_thread_index) : (i += 1) {
+ while (true) {
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+ break;
+ }
+ }
+ while (extra_thread_index != 0) {
+ extra_thread_index -= 1;
+ self.extra_threads[extra_thread_index].wait();
+ }
+ }
+ while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+ self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+ }
+ },
+ else => {},
+ }
+ }
+
+ fn deinitOsData(self: *Loop) void {
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ std.os.close(self.os_data.final_eventfd);
+ while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
+ std.os.close(self.os_data.epollfd);
+ self.allocator.free(self.eventfd_resume_nodes);
+ },
+ builtin.Os.macosx => {
+ self.allocator.free(self.os_data.kevents);
+ std.os.close(self.os_data.kqfd);
+ },
+ builtin.Os.windows => {
+ std.os.close(self.os_data.io_port);
+ },
+ else => {},
+ }
+ }
+
+ /// resume_node must live longer than the promise that it holds a reference to.
+ pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ errdefer {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ try self.modFd(
+ fd,
+ posix.EPOLL_CTL_ADD,
+ std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
+ resume_node,
+ );
+ }
+
+ pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void {
+ var ev = std.os.linux.epoll_event{
+ .events = events,
+ .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
+ };
+ try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
+ }
+
+ pub fn removeFd(self: *Loop, fd: i32) void {
+ self.removeFdNoCounter(fd);
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+
+ fn removeFdNoCounter(self: *Loop, fd: i32) void {
+ std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
+ }
+
+ pub async fn waitFd(self: *Loop, fd: i32) !void {
+ defer self.removeFd(fd);
+ suspend |p| {
+ // TODO explicitly put this memory in the coroutine frame #1194
+ var resume_node = ResumeNode{
+ .id = ResumeNode.Id.Basic,
+ .handle = p,
+ };
+ try self.addFd(fd, &resume_node);
+ }
+ }
+
+ /// Bring your own linked list node. This means it can't fail.
+ pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+ self.next_tick_queue.put(node);
+ }
+
+ pub fn run(self: *Loop) void {
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.workerRun();
+ for (self.extra_threads) |extra_thread| {
+ extra_thread.wait();
+ }
+ }
+
+ fn workerRun(self: *Loop) void {
+ start_over: while (true) {
+ if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {
+ while (self.next_tick_queue.get()) |next_tick_node| {
+ const handle = next_tick_node.data;
+ if (self.next_tick_queue.isEmpty()) {
+ // last node, just resume it
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ }
+
+ // non-last node, stick it in the epoll/kqueue set so that
+ // other threads can get to it
+ if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
+ const eventfd_node = &resume_stack_node.data;
+ eventfd_node.base.handle = handle;
+ switch (builtin.os) {
+ builtin.Os.macosx => {
+ const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ builtin.Os.linux => {
+ // the pending count is already accounted for
+ const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
+ self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ builtin.Os.windows => {
+ // this value is never dereferenced but we need it to be non-null so that
+ // the consumer code can decide whether to read the completion key.
+ // it has to do this for normal I/O, so we match that behavior here.
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch {
+ // fine, we didn't need it anyway
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ self.available_eventfd_resume_nodes.push(resume_stack_node);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ };
+ },
+ else => @compileError("unsupported OS"),
+ }
+ } else {
+ // threads are too busy, can't add another eventfd to wake one up
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ resume handle;
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ continue :start_over;
+ }
+ }
+
+ const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
+ if (pending_event_count == 0) {
+ // cause all the threads to stop
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ // writing 8 bytes to an eventfd cannot fail
+ std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
+ return;
+ },
+ builtin.Os.macosx => {
+ const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
+ const eventlist = ([*]posix.Kevent)(undefined)[0..0];
+ // cannot fail because we already added it and this just enables it
+ _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
+ return;
+ },
+ builtin.Os.windows => {
+ var i: usize = 0;
+ while (i < self.os_data.extra_thread_count) : (i += 1) {
+ while (true) {
+ const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+ std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+ break;
+ }
+ }
+ return;
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+
+ _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+ }
+
+ switch (builtin.os) {
+ builtin.Os.linux => {
+ // only process 1 event so we don't steal from other threads
+ var events: [1]std.os.linux.epoll_event = undefined;
+ const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
+ for (events[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+ },
+ builtin.Os.macosx => {
+ var eventlist: [1]posix.Kevent = undefined;
+ const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable;
+ for (eventlist[0..count]) |ev| {
+ const resume_node = @intToPtr(*ResumeNode, ev.udata);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ }
+ },
+ builtin.Os.windows => {
+ var completion_key: usize = undefined;
+ while (true) {
+ var nbytes: windows.DWORD = undefined;
+ var overlapped: ?*windows.OVERLAPPED = undefined;
+ switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
+ std.os.WindowsWaitResult.Aborted => return,
+ std.os.WindowsWaitResult.Normal => {},
+ }
+ if (overlapped != null) break;
+ }
+ const resume_node = @intToPtr(*ResumeNode, completion_key);
+ const handle = resume_node.handle;
+ const resume_node_id = resume_node.id;
+ switch (resume_node_id) {
+ ResumeNode.Id.Basic => {},
+ ResumeNode.Id.Stop => return,
+ ResumeNode.Id.EventFd => {
+ const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+ const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+ self.available_eventfd_resume_nodes.push(stack_node);
+ },
+ }
+ resume handle;
+ if (resume_node_id == ResumeNode.Id.EventFd) {
+ _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+ }
+ },
+ else => @compileError("unsupported OS"),
+ }
+ }
+ }
+
+ const OsData = switch (builtin.os) {
+ builtin.Os.linux => struct {
+ epollfd: i32,
+ final_eventfd: i32,
+ final_eventfd_event: std.os.linux.epoll_event,
+ },
+ builtin.Os.macosx => MacOsData,
+ builtin.Os.windows => struct {
+ io_port: windows.HANDLE,
+ extra_thread_count: usize,
+ },
+ else => struct {},
+ };
+
+ const MacOsData = struct {
+ kqfd: i32,
+ final_kevent: posix.Kevent,
+ kevents: []posix.Kevent,
+ };
+};
+
+test "std.event.Loop - basic" {
+ //var da = std.heap.DirectAllocator.init();
+ //defer da.deinit();
+
+ //const allocator = &da.allocator;
+
+ //var loop: Loop = undefined;
+ //try loop.initMultiThreaded(allocator);
+ //defer loop.deinit();
+
+ //loop.run();
+}
std/event/tcp.zig
@@ -0,0 +1,183 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const assert = std.debug.assert;
+const event = std.event;
+const mem = std.mem;
+const posix = std.os.posix;
+const windows = std.os.windows;
+const Loop = std.event.Loop;
+
+pub const Server = struct {
+ handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void,
+
+ loop: *Loop,
+ sockfd: ?i32,
+ accept_coro: ?promise,
+ listen_address: std.net.Address,
+
+ waiting_for_emfile_node: PromiseNode,
+ listen_resume_node: event.Loop.ResumeNode,
+
+ const PromiseNode = std.LinkedList(promise).Node;
+
+ pub fn init(loop: *Loop) Server {
+ // TODO can't initialize handler coroutine here because we need well defined copy elision
+ return Server{
+ .loop = loop,
+ .sockfd = null,
+ .accept_coro = null,
+ .handleRequestFn = undefined,
+ .waiting_for_emfile_node = undefined,
+ .listen_address = undefined,
+ .listen_resume_node = event.Loop.ResumeNode{
+ .id = event.Loop.ResumeNode.Id.Basic,
+ .handle = undefined,
+ },
+ };
+ }
+
+ pub fn listen(
+ self: *Server,
+ address: *const std.net.Address,
+ handleRequestFn: async<*mem.Allocator> fn (*Server, *const std.net.Address, *const std.os.File) void,
+ ) !void {
+ self.handleRequestFn = handleRequestFn;
+
+ const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
+ errdefer std.os.close(sockfd);
+ self.sockfd = sockfd;
+
+ try std.os.posixBind(sockfd, &address.os_addr);
+ try std.os.posixListen(sockfd, posix.SOMAXCONN);
+ self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd));
+
+ self.accept_coro = try async<self.loop.allocator> Server.handler(self);
+ errdefer cancel self.accept_coro.?;
+
+ self.listen_resume_node.handle = self.accept_coro.?;
+ try self.loop.addFd(sockfd, &self.listen_resume_node);
+ errdefer self.loop.removeFd(sockfd);
+ }
+
+ /// Stop listening
+ pub fn close(self: *Server) void {
+ self.loop.removeFd(self.sockfd.?);
+ std.os.close(self.sockfd.?);
+ }
+
+ pub fn deinit(self: *Server) void {
+ if (self.accept_coro) |accept_coro| cancel accept_coro;
+ if (self.sockfd) |sockfd| std.os.close(sockfd);
+ }
+
+ pub async fn handler(self: *Server) void {
+ while (true) {
+ var accepted_addr: std.net.Address = undefined;
+ if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| {
+ var socket = std.os.File.openHandle(accepted_fd);
+ _ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) {
+ error.OutOfMemory => {
+ socket.close();
+ continue;
+ },
+ };
+ } else |err| switch (err) {
+ error.WouldBlock => {
+ suspend; // we will get resumed by epoll_wait in the event loop
+ continue;
+ },
+ error.ProcessFdQuotaExceeded => {
+ errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node);
+ suspend |p| {
+ self.waiting_for_emfile_node = PromiseNode.init(p);
+ std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node);
+ }
+ continue;
+ },
+ error.ConnectionAborted, error.FileDescriptorClosed => continue,
+
+ error.PageFault => unreachable,
+ error.InvalidSyscall => unreachable,
+ error.FileDescriptorNotASocket => unreachable,
+ error.OperationNotSupported => unreachable,
+
+ error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => {
+ @panic("TODO handle this error");
+ },
+ }
+ }
+ }
+};
+
+pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
+ var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
+
+ const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
+ errdefer std.os.close(sockfd);
+
+ try std.os.posixConnectAsync(sockfd, &address.os_addr);
+ try await try async loop.waitFd(sockfd);
+ try std.os.posixGetSockOptConnectError(sockfd);
+
+ return std.os.File.openHandle(sockfd);
+}
+
+test "listen on a port, send bytes, receive bytes" {
+ if (builtin.os != builtin.Os.linux) {
+ // TODO build abstractions for other operating systems
+ return;
+ }
+ const MyServer = struct {
+ tcp_server: Server,
+
+ const Self = this;
+ async<*mem.Allocator> fn handler(tcp_server: *Server, _addr: *const std.net.Address, _socket: *const std.os.File) void {
+ const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
+ var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
+ defer socket.close();
+ // TODO guarantee elision of this allocation
+ const next_handler = async errorableHandler(self, _addr, socket) catch unreachable;
+ (await next_handler) catch |err| {
+ std.debug.panic("unable to handle connection: {}\n", err);
+ };
+ suspend |p| {
+ cancel p;
+ }
+ }
+ async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: *const std.os.File) !void {
+ const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733
+ var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
+
+ var adapter = std.io.FileOutStream.init(&socket);
+ var stream = &adapter.stream;
+ try stream.print("hello from server\n");
+ }
+ };
+
+ const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
+ const addr = std.net.Address.initIp4(ip4addr, 0);
+
+ var loop: Loop = undefined;
+ try loop.initSingleThreaded(std.debug.global_allocator);
+ var server = MyServer{ .tcp_server = Server.init(&loop) };
+ defer server.tcp_server.deinit();
+ try server.tcp_server.listen(addr, MyServer.handler);
+
+ const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address, &server.tcp_server);
+ defer cancel p;
+ loop.run();
+}
+
+async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *Server) void {
+ errdefer @panic("test failure");
+
+ var socket_file = try await try async connect(loop, address);
+ defer socket_file.close();
+
+ var buf: [512]u8 = undefined;
+ const amt_read = try socket_file.read(buf[0..]);
+ const msg = buf[0..amt_read];
+ assert(mem.eql(u8, msg, "hello from server\n"));
+ server.close();
+}
+
std/event.zig
@@ -1,1223 +1,13 @@
-const std = @import("index.zig");
-const builtin = @import("builtin");
-const assert = std.debug.assert;
-const event = this;
-const mem = std.mem;
-const posix = std.os.posix;
-const windows = std.os.windows;
-const AtomicRmwOp = builtin.AtomicRmwOp;
-const AtomicOrder = builtin.AtomicOrder;
-
-pub const TcpServer = struct {
- handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
-
- loop: *Loop,
- sockfd: ?i32,
- accept_coro: ?promise,
- listen_address: std.net.Address,
-
- waiting_for_emfile_node: PromiseNode,
- listen_resume_node: event.Loop.ResumeNode,
-
- const PromiseNode = std.LinkedList(promise).Node;
-
- pub fn init(loop: *Loop) TcpServer {
- // TODO can't initialize handler coroutine here because we need well defined copy elision
- return TcpServer{
- .loop = loop,
- .sockfd = null,
- .accept_coro = null,
- .handleRequestFn = undefined,
- .waiting_for_emfile_node = undefined,
- .listen_address = undefined,
- .listen_resume_node = event.Loop.ResumeNode{
- .id = event.Loop.ResumeNode.Id.Basic,
- .handle = undefined,
- },
- };
- }
-
- pub fn listen(
- self: *TcpServer,
- address: *const std.net.Address,
- handleRequestFn: async<*mem.Allocator> fn (*TcpServer, *const std.net.Address, *const std.os.File) void,
- ) !void {
- self.handleRequestFn = handleRequestFn;
-
- const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
- errdefer std.os.close(sockfd);
- self.sockfd = sockfd;
-
- try std.os.posixBind(sockfd, &address.os_addr);
- try std.os.posixListen(sockfd, posix.SOMAXCONN);
- self.listen_address = std.net.Address.initPosix(try std.os.posixGetSockName(sockfd));
-
- self.accept_coro = try async<self.loop.allocator> TcpServer.handler(self);
- errdefer cancel self.accept_coro.?;
-
- self.listen_resume_node.handle = self.accept_coro.?;
- try self.loop.addFd(sockfd, &self.listen_resume_node);
- errdefer self.loop.removeFd(sockfd);
- }
-
- /// Stop listening
- pub fn close(self: *TcpServer) void {
- self.loop.removeFd(self.sockfd.?);
- std.os.close(self.sockfd.?);
- }
-
- pub fn deinit(self: *TcpServer) void {
- if (self.accept_coro) |accept_coro| cancel accept_coro;
- if (self.sockfd) |sockfd| std.os.close(sockfd);
- }
-
- pub async fn handler(self: *TcpServer) void {
- while (true) {
- var accepted_addr: std.net.Address = undefined;
- if (std.os.posixAccept(self.sockfd.?, &accepted_addr.os_addr, posix.SOCK_NONBLOCK | posix.SOCK_CLOEXEC)) |accepted_fd| {
- var socket = std.os.File.openHandle(accepted_fd);
- _ = async<self.loop.allocator> self.handleRequestFn(self, accepted_addr, socket) catch |err| switch (err) {
- error.OutOfMemory => {
- socket.close();
- continue;
- },
- };
- } else |err| switch (err) {
- error.WouldBlock => {
- suspend; // we will get resumed by epoll_wait in the event loop
- continue;
- },
- error.ProcessFdQuotaExceeded => {
- errdefer std.os.emfile_promise_queue.remove(&self.waiting_for_emfile_node);
- suspend |p| {
- self.waiting_for_emfile_node = PromiseNode.init(p);
- std.os.emfile_promise_queue.append(&self.waiting_for_emfile_node);
- }
- continue;
- },
- error.ConnectionAborted, error.FileDescriptorClosed => continue,
-
- error.PageFault => unreachable,
- error.InvalidSyscall => unreachable,
- error.FileDescriptorNotASocket => unreachable,
- error.OperationNotSupported => unreachable,
-
- error.SystemFdQuotaExceeded, error.SystemResources, error.ProtocolFailure, error.BlockedByFirewall, error.Unexpected => {
- @panic("TODO handle this error");
- },
- }
- }
- }
-};
-
-pub const Loop = struct {
- allocator: *mem.Allocator,
- next_tick_queue: std.atomic.QueueMpsc(promise),
- os_data: OsData,
- final_resume_node: ResumeNode,
- dispatch_lock: u8, // TODO make this a bool
- pending_event_count: usize,
- extra_threads: []*std.os.Thread,
-
- // pre-allocated eventfds. all permanently active.
- // this is how we send promises to be resumed on other threads.
- available_eventfd_resume_nodes: std.atomic.Stack(ResumeNode.EventFd),
- eventfd_resume_nodes: []std.atomic.Stack(ResumeNode.EventFd).Node,
-
- pub const NextTickNode = std.atomic.QueueMpsc(promise).Node;
-
- pub const ResumeNode = struct {
- id: Id,
- handle: promise,
-
- pub const Id = enum {
- Basic,
- Stop,
- EventFd,
- };
-
- pub const EventFd = switch (builtin.os) {
- builtin.Os.macosx => MacOsEventFd,
- builtin.Os.linux => struct {
- base: ResumeNode,
- epoll_op: u32,
- eventfd: i32,
- },
- builtin.Os.windows => struct {
- base: ResumeNode,
- completion_key: usize,
- },
- else => @compileError("unsupported OS"),
- };
-
- const MacOsEventFd = struct {
- base: ResumeNode,
- kevent: posix.Kevent,
- };
- };
-
- /// After initialization, call run().
- /// TODO copy elision / named return values so that the threads referencing *Loop
- /// have the correct pointer value.
- fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
- return self.initInternal(allocator, 1);
- }
-
- /// The allocator must be thread-safe because we use it for multiplexing
- /// coroutines onto kernel threads.
- /// After initialization, call run().
- /// TODO copy elision / named return values so that the threads referencing *Loop
- /// have the correct pointer value.
- fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
- const core_count = try std.os.cpuCount(allocator);
- return self.initInternal(allocator, core_count);
- }
-
- /// Thread count is the total thread count. The thread pool size will be
- /// max(thread_count - 1, 0)
- fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
- self.* = Loop{
- .pending_event_count = 0,
- .allocator = allocator,
- .os_data = undefined,
- .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
- .dispatch_lock = 1, // start locked so threads go directly into epoll wait
- .extra_threads = undefined,
- .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
- .eventfd_resume_nodes = undefined,
- .final_resume_node = ResumeNode{
- .id = ResumeNode.Id.Stop,
- .handle = undefined,
- },
- };
- const extra_thread_count = thread_count - 1;
- self.eventfd_resume_nodes = try self.allocator.alloc(
- std.atomic.Stack(ResumeNode.EventFd).Node,
- extra_thread_count,
- );
- errdefer self.allocator.free(self.eventfd_resume_nodes);
-
- self.extra_threads = try self.allocator.alloc(*std.os.Thread, extra_thread_count);
- errdefer self.allocator.free(self.extra_threads);
-
- try self.initOsData(extra_thread_count);
- errdefer self.deinitOsData();
- }
-
- /// must call stop before deinit
- pub fn deinit(self: *Loop) void {
- self.deinitOsData();
- self.allocator.free(self.extra_threads);
- }
-
- const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
- std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError ||
- std.os.WindowsCreateIoCompletionPortError;
-
- const wakeup_bytes = []u8{0x1} ** 8;
-
- fn initOsData(self: *Loop, extra_thread_count: usize) InitOsDataError!void {
- switch (builtin.os) {
- builtin.Os.linux => {
- errdefer {
- while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
- }
- for (self.eventfd_resume_nodes) |*eventfd_node| {
- eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
- .data = ResumeNode.EventFd{
- .base = ResumeNode{
- .id = ResumeNode.Id.EventFd,
- .handle = undefined,
- },
- .eventfd = try std.os.linuxEventFd(1, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK),
- .epoll_op = posix.EPOLL_CTL_ADD,
- },
- .next = undefined,
- };
- self.available_eventfd_resume_nodes.push(eventfd_node);
- }
-
- self.os_data.epollfd = try std.os.linuxEpollCreate(posix.EPOLL_CLOEXEC);
- errdefer std.os.close(self.os_data.epollfd);
-
- self.os_data.final_eventfd = try std.os.linuxEventFd(0, posix.EFD_CLOEXEC | posix.EFD_NONBLOCK);
- errdefer std.os.close(self.os_data.final_eventfd);
-
- self.os_data.final_eventfd_event = posix.epoll_event{
- .events = posix.EPOLLIN,
- .data = posix.epoll_data{ .ptr = @ptrToInt(&self.final_resume_node) },
- };
- try std.os.linuxEpollCtl(
- self.os_data.epollfd,
- posix.EPOLL_CTL_ADD,
- self.os_data.final_eventfd,
- &self.os_data.final_eventfd_event,
- );
-
- var extra_thread_index: usize = 0;
- errdefer {
- // writing 8 bytes to an eventfd cannot fail
- std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
- while (extra_thread_index != 0) {
- extra_thread_index -= 1;
- self.extra_threads[extra_thread_index].wait();
- }
- }
- while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
- self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
- }
- },
- builtin.Os.macosx => {
- self.os_data.kqfd = try std.os.bsdKQueue();
- errdefer std.os.close(self.os_data.kqfd);
-
- self.os_data.kevents = try self.allocator.alloc(posix.Kevent, extra_thread_count);
- errdefer self.allocator.free(self.os_data.kevents);
-
- const eventlist = ([*]posix.Kevent)(undefined)[0..0];
-
- for (self.eventfd_resume_nodes) |*eventfd_node, i| {
- eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
- .data = ResumeNode.EventFd{
- .base = ResumeNode{
- .id = ResumeNode.Id.EventFd,
- .handle = undefined,
- },
- // this one is for sending events
- .kevent = posix.Kevent{
- .ident = i,
- .filter = posix.EVFILT_USER,
- .flags = posix.EV_CLEAR | posix.EV_ADD | posix.EV_DISABLE,
- .fflags = 0,
- .data = 0,
- .udata = @ptrToInt(&eventfd_node.data.base),
- },
- },
- .next = undefined,
- };
- self.available_eventfd_resume_nodes.push(eventfd_node);
- const kevent_array = (*[1]posix.Kevent)(&eventfd_node.data.kevent);
- _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
- eventfd_node.data.kevent.flags = posix.EV_CLEAR | posix.EV_ENABLE;
- eventfd_node.data.kevent.fflags = posix.NOTE_TRIGGER;
- // this one is for waiting for events
- self.os_data.kevents[i] = posix.Kevent{
- .ident = i,
- .filter = posix.EVFILT_USER,
- .flags = 0,
- .fflags = 0,
- .data = 0,
- .udata = @ptrToInt(&eventfd_node.data.base),
- };
- }
-
- // Pre-add so that we cannot get error.SystemResources
- // later when we try to activate it.
- self.os_data.final_kevent = posix.Kevent{
- .ident = extra_thread_count,
- .filter = posix.EVFILT_USER,
- .flags = posix.EV_ADD | posix.EV_DISABLE,
- .fflags = 0,
- .data = 0,
- .udata = @ptrToInt(&self.final_resume_node),
- };
- const kevent_array = (*[1]posix.Kevent)(&self.os_data.final_kevent);
- _ = try std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null);
- self.os_data.final_kevent.flags = posix.EV_ENABLE;
- self.os_data.final_kevent.fflags = posix.NOTE_TRIGGER;
-
- var extra_thread_index: usize = 0;
- errdefer {
- _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch unreachable;
- while (extra_thread_index != 0) {
- extra_thread_index -= 1;
- self.extra_threads[extra_thread_index].wait();
- }
- }
- while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
- self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
- }
- },
- builtin.Os.windows => {
- self.os_data.extra_thread_count = extra_thread_count;
-
- self.os_data.io_port = try std.os.windowsCreateIoCompletionPort(
- windows.INVALID_HANDLE_VALUE,
- null,
- undefined,
- undefined,
- );
- errdefer std.os.close(self.os_data.io_port);
-
- for (self.eventfd_resume_nodes) |*eventfd_node, i| {
- eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
- .data = ResumeNode.EventFd{
- .base = ResumeNode{
- .id = ResumeNode.Id.EventFd,
- .handle = undefined,
- },
- // this one is for sending events
- .completion_key = @ptrToInt(&eventfd_node.data.base),
- },
- .next = undefined,
- };
- self.available_eventfd_resume_nodes.push(eventfd_node);
- }
-
- var extra_thread_index: usize = 0;
- errdefer {
- var i: usize = 0;
- while (i < extra_thread_index) : (i += 1) {
- while (true) {
- const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
- std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
- break;
- }
- }
- while (extra_thread_index != 0) {
- extra_thread_index -= 1;
- self.extra_threads[extra_thread_index].wait();
- }
- }
- while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
- self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
- }
- },
- else => {},
- }
- }
-
- fn deinitOsData(self: *Loop) void {
- switch (builtin.os) {
- builtin.Os.linux => {
- std.os.close(self.os_data.final_eventfd);
- while (self.available_eventfd_resume_nodes.pop()) |node| std.os.close(node.data.eventfd);
- std.os.close(self.os_data.epollfd);
- self.allocator.free(self.eventfd_resume_nodes);
- },
- builtin.Os.macosx => {
- self.allocator.free(self.os_data.kevents);
- std.os.close(self.os_data.kqfd);
- },
- builtin.Os.windows => {
- std.os.close(self.os_data.io_port);
- },
- else => {},
- }
- }
-
- /// resume_node must live longer than the promise that it holds a reference to.
- pub fn addFd(self: *Loop, fd: i32, resume_node: *ResumeNode) !void {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
- errdefer {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
- try self.modFd(
- fd,
- posix.EPOLL_CTL_ADD,
- std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET,
- resume_node,
- );
- }
-
- pub fn modFd(self: *Loop, fd: i32, op: u32, events: u32, resume_node: *ResumeNode) !void {
- var ev = std.os.linux.epoll_event{
- .events = events,
- .data = std.os.linux.epoll_data{ .ptr = @ptrToInt(resume_node) },
- };
- try std.os.linuxEpollCtl(self.os_data.epollfd, op, fd, &ev);
- }
-
- pub fn removeFd(self: *Loop, fd: i32) void {
- self.removeFdNoCounter(fd);
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
-
- fn removeFdNoCounter(self: *Loop, fd: i32) void {
- std.os.linuxEpollCtl(self.os_data.epollfd, std.os.linux.EPOLL_CTL_DEL, fd, undefined) catch {};
- }
-
- pub async fn waitFd(self: *Loop, fd: i32) !void {
- defer self.removeFd(fd);
- suspend |p| {
- // TODO explicitly put this memory in the coroutine frame #1194
- var resume_node = ResumeNode{
- .id = ResumeNode.Id.Basic,
- .handle = p,
- };
- try self.addFd(fd, &resume_node);
- }
- }
-
- /// Bring your own linked list node. This means it can't fail.
- pub fn onNextTick(self: *Loop, node: *NextTickNode) void {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
- self.next_tick_queue.put(node);
- }
-
- pub fn run(self: *Loop) void {
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- self.workerRun();
- for (self.extra_threads) |extra_thread| {
- extra_thread.wait();
- }
- }
-
- fn workerRun(self: *Loop) void {
- start_over: while (true) {
- if (@atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst) == 0) {
- while (self.next_tick_queue.get()) |next_tick_node| {
- const handle = next_tick_node.data;
- if (self.next_tick_queue.isEmpty()) {
- // last node, just resume it
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- }
-
- // non-last node, stick it in the epoll/kqueue set so that
- // other threads can get to it
- if (self.available_eventfd_resume_nodes.pop()) |resume_stack_node| {
- const eventfd_node = &resume_stack_node.data;
- eventfd_node.base.handle = handle;
- switch (builtin.os) {
- builtin.Os.macosx => {
- const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
- const eventlist = ([*]posix.Kevent)(undefined)[0..0];
- _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
- // fine, we didn't need it anyway
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- };
- },
- builtin.Os.linux => {
- // the pending count is already accounted for
- const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
- self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch {
- // fine, we didn't need it anyway
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- };
- },
- builtin.Os.windows => {
- // this value is never dereferenced but we need it to be non-null so that
- // the consumer code can decide whether to read the completion key.
- // it has to do this for normal I/O, so we match that behavior here.
- const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
- std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch {
- // fine, we didn't need it anyway
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- self.available_eventfd_resume_nodes.push(resume_stack_node);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- };
- },
- else => @compileError("unsupported OS"),
- }
- } else {
- // threads are too busy, can't add another eventfd to wake one up
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- resume handle;
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- continue :start_over;
- }
- }
-
- const pending_event_count = @atomicLoad(usize, &self.pending_event_count, AtomicOrder.SeqCst);
- if (pending_event_count == 0) {
- // cause all the threads to stop
- switch (builtin.os) {
- builtin.Os.linux => {
- // writing 8 bytes to an eventfd cannot fail
- std.os.posixWrite(self.os_data.final_eventfd, wakeup_bytes) catch unreachable;
- return;
- },
- builtin.Os.macosx => {
- const final_kevent = (*[1]posix.Kevent)(&self.os_data.final_kevent);
- const eventlist = ([*]posix.Kevent)(undefined)[0..0];
- // cannot fail because we already added it and this just enables it
- _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
- return;
- },
- builtin.Os.windows => {
- var i: usize = 0;
- while (i < self.os_data.extra_thread_count) : (i += 1) {
- while (true) {
- const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
- std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
- break;
- }
- }
- return;
- },
- else => @compileError("unsupported OS"),
- }
- }
-
- _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- }
-
- switch (builtin.os) {
- builtin.Os.linux => {
- // only process 1 event so we don't steal from other threads
- var events: [1]std.os.linux.epoll_event = undefined;
- const count = std.os.linuxEpollWait(self.os_data.epollfd, events[0..], -1);
- for (events[0..count]) |ev| {
- const resume_node = @intToPtr(*ResumeNode, ev.data.ptr);
- const handle = resume_node.handle;
- const resume_node_id = resume_node.id;
- switch (resume_node_id) {
- ResumeNode.Id.Basic => {},
- ResumeNode.Id.Stop => return,
- ResumeNode.Id.EventFd => {
- const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
- event_fd_node.epoll_op = posix.EPOLL_CTL_MOD;
- const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
- self.available_eventfd_resume_nodes.push(stack_node);
- },
- }
- resume handle;
- if (resume_node_id == ResumeNode.Id.EventFd) {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
- }
- },
- builtin.Os.macosx => {
- var eventlist: [1]posix.Kevent = undefined;
- const count = std.os.bsdKEvent(self.os_data.kqfd, self.os_data.kevents, eventlist[0..], null) catch unreachable;
- for (eventlist[0..count]) |ev| {
- const resume_node = @intToPtr(*ResumeNode, ev.udata);
- const handle = resume_node.handle;
- const resume_node_id = resume_node.id;
- switch (resume_node_id) {
- ResumeNode.Id.Basic => {},
- ResumeNode.Id.Stop => return,
- ResumeNode.Id.EventFd => {
- const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
- const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
- self.available_eventfd_resume_nodes.push(stack_node);
- },
- }
- resume handle;
- if (resume_node_id == ResumeNode.Id.EventFd) {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
- }
- },
- builtin.Os.windows => {
- var completion_key: usize = undefined;
- while (true) {
- var nbytes: windows.DWORD = undefined;
- var overlapped: ?*windows.OVERLAPPED = undefined;
- switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, &overlapped, windows.INFINITE)) {
- std.os.WindowsWaitResult.Aborted => return,
- std.os.WindowsWaitResult.Normal => {},
- }
- if (overlapped != null) break;
- }
- const resume_node = @intToPtr(*ResumeNode, completion_key);
- const handle = resume_node.handle;
- const resume_node_id = resume_node.id;
- switch (resume_node_id) {
- ResumeNode.Id.Basic => {},
- ResumeNode.Id.Stop => return,
- ResumeNode.Id.EventFd => {
- const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
- const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
- self.available_eventfd_resume_nodes.push(stack_node);
- },
- }
- resume handle;
- if (resume_node_id == ResumeNode.Id.EventFd) {
- _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
- },
- else => @compileError("unsupported OS"),
- }
- }
- }
-
- const OsData = switch (builtin.os) {
- builtin.Os.linux => struct {
- epollfd: i32,
- final_eventfd: i32,
- final_eventfd_event: std.os.linux.epoll_event,
- },
- builtin.Os.macosx => MacOsData,
- builtin.Os.windows => struct {
- io_port: windows.HANDLE,
- extra_thread_count: usize,
- },
- else => struct {},
- };
-
- const MacOsData = struct {
- kqfd: i32,
- final_kevent: posix.Kevent,
- kevents: []posix.Kevent,
- };
-};
-
-/// many producer, many consumer, thread-safe, lock-free, runtime configurable buffer size
-/// when buffer is empty, consumers suspend and are resumed by producers
-/// when buffer is full, producers suspend and are resumed by consumers
-pub fn Channel(comptime T: type) type {
- return struct {
- loop: *Loop,
-
- getters: std.atomic.QueueMpsc(GetNode),
- putters: std.atomic.QueueMpsc(PutNode),
- get_count: usize,
- put_count: usize,
- dispatch_lock: u8, // TODO make this a bool
- need_dispatch: u8, // TODO make this a bool
-
- // simple fixed size ring buffer
- buffer_nodes: []T,
- buffer_index: usize,
- buffer_len: usize,
-
- const SelfChannel = this;
- const GetNode = struct {
- ptr: *T,
- tick_node: *Loop.NextTickNode,
- };
- const PutNode = struct {
- data: T,
- tick_node: *Loop.NextTickNode,
- };
-
- /// call destroy when done
- pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
- const buffer_nodes = try loop.allocator.alloc(T, capacity);
- errdefer loop.allocator.free(buffer_nodes);
-
- const self = try loop.allocator.create(SelfChannel{
- .loop = loop,
- .buffer_len = 0,
- .buffer_nodes = buffer_nodes,
- .buffer_index = 0,
- .dispatch_lock = 0,
- .need_dispatch = 0,
- .getters = std.atomic.QueueMpsc(GetNode).init(),
- .putters = std.atomic.QueueMpsc(PutNode).init(),
- .get_count = 0,
- .put_count = 0,
- });
- errdefer loop.allocator.destroy(self);
-
- return self;
- }
-
- /// must be called when all calls to put and get have suspended and no more calls occur
- pub fn destroy(self: *SelfChannel) void {
- while (self.getters.get()) |get_node| {
- cancel get_node.data.tick_node.data;
- }
- while (self.putters.get()) |put_node| {
- cancel put_node.data.tick_node.data;
- }
- self.loop.allocator.free(self.buffer_nodes);
- self.loop.allocator.destroy(self);
- }
-
- /// puts a data item in the channel. The promise completes when the value has been added to the
- /// buffer, or in the case of a zero size buffer, when the item has been retrieved by a getter.
- pub async fn put(self: *SelfChannel, data: T) void {
- // TODO should be able to group memory allocation failure before first suspend point
- // so that the async invocation catches it
- var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
- _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
-
- suspend |handle| {
- var my_tick_node = Loop.NextTickNode{
- .next = undefined,
- .data = handle,
- };
- var queue_node = std.atomic.QueueMpsc(PutNode).Node{
- .data = PutNode{
- .tick_node = &my_tick_node,
- .data = data,
- },
- .next = undefined,
- };
- self.putters.put(&queue_node);
- _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
-
- self.loop.onNextTick(dispatch_tick_node_ptr);
- }
- }
-
- /// await this function to get an item from the channel. If the buffer is empty, the promise will
- /// complete when the next item is put in the channel.
- pub async fn get(self: *SelfChannel) T {
- // TODO should be able to group memory allocation failure before first suspend point
- // so that the async invocation catches it
- var dispatch_tick_node_ptr: *Loop.NextTickNode = undefined;
- _ = async self.dispatch(&dispatch_tick_node_ptr) catch unreachable;
-
- // TODO integrate this function with named return values
- // so we can get rid of this extra result copy
- var result: T = undefined;
- suspend |handle| {
- var my_tick_node = Loop.NextTickNode{
- .next = undefined,
- .data = handle,
- };
- var queue_node = std.atomic.QueueMpsc(GetNode).Node{
- .data = GetNode{
- .ptr = &result,
- .tick_node = &my_tick_node,
- },
- .next = undefined,
- };
- self.getters.put(&queue_node);
- _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
-
- self.loop.onNextTick(dispatch_tick_node_ptr);
- }
- return result;
- }
-
- async fn dispatch(self: *SelfChannel, tick_node_ptr: **Loop.NextTickNode) void {
- // resumed by onNextTick
- suspend |handle| {
- var tick_node = Loop.NextTickNode{
- .data = handle,
- .next = undefined,
- };
- tick_node_ptr.* = &tick_node;
- }
-
- // set the "need dispatch" flag
- _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
-
- lock: while (true) {
- // set the lock flag
- const prev_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- if (prev_lock != 0) return;
-
- // clear the need_dispatch flag since we're about to do it
- _ = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-
- while (true) {
- one_dispatch: {
- // later we correct these extra subtractions
- var get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- var put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
-
- // transfer self.buffer to self.getters
- while (self.buffer_len != 0) {
- if (get_count == 0) break :one_dispatch;
-
- const get_node = &self.getters.get().?.data;
- get_node.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
- self.loop.onNextTick(get_node.tick_node);
- self.buffer_len -= 1;
-
- get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
-
- // direct transfer self.putters to self.getters
- while (get_count != 0 and put_count != 0) {
- const get_node = &self.getters.get().?.data;
- const put_node = &self.putters.get().?.data;
-
- get_node.ptr.* = put_node.data;
- self.loop.onNextTick(get_node.tick_node);
- self.loop.onNextTick(put_node.tick_node);
-
- get_count = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
-
- // transfer self.putters to self.buffer
- while (self.buffer_len != self.buffer_nodes.len and put_count != 0) {
- const put_node = &self.putters.get().?.data;
-
- self.buffer_nodes[self.buffer_index] = put_node.data;
- self.loop.onNextTick(put_node.tick_node);
- self.buffer_index +%= 1;
- self.buffer_len += 1;
-
- put_count = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
- }
- }
-
- // undo the extra subtractions
- _ = @atomicRmw(usize, &self.get_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
- _ = @atomicRmw(usize, &self.put_count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
-
- // clear need-dispatch flag
- const need_dispatch = @atomicRmw(u8, &self.need_dispatch, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- if (need_dispatch != 0) continue;
-
- const my_lock = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
- assert(my_lock != 0);
-
- // we have to check again now that we unlocked
- if (@atomicLoad(u8, &self.need_dispatch, AtomicOrder.SeqCst) != 0) continue :lock;
-
- return;
- }
- }
- }
- };
-}
-
-pub async fn connect(loop: *Loop, _address: *const std.net.Address) !std.os.File {
- var address = _address.*; // TODO https://github.com/ziglang/zig/issues/733
-
- const sockfd = try std.os.posixSocket(posix.AF_INET, posix.SOCK_STREAM | posix.SOCK_CLOEXEC | posix.SOCK_NONBLOCK, posix.PROTO_tcp);
- errdefer std.os.close(sockfd);
-
- try std.os.posixConnectAsync(sockfd, &address.os_addr);
- try await try async loop.waitFd(sockfd);
- try std.os.posixGetSockOptConnectError(sockfd);
-
- return std.os.File.openHandle(sockfd);
-}
-
-test "listen on a port, send bytes, receive bytes" {
- if (builtin.os != builtin.Os.linux) {
- // TODO build abstractions for other operating systems
- return;
- }
- const MyServer = struct {
- tcp_server: TcpServer,
-
- const Self = this;
- async<*mem.Allocator> fn handler(tcp_server: *TcpServer, _addr: *const std.net.Address, _socket: *const std.os.File) void {
- const self = @fieldParentPtr(Self, "tcp_server", tcp_server);
- var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
- defer socket.close();
- // TODO guarantee elision of this allocation
- const next_handler = async errorableHandler(self, _addr, socket) catch unreachable;
- (await next_handler) catch |err| {
- std.debug.panic("unable to handle connection: {}\n", err);
- };
- suspend |p| {
- cancel p;
- }
- }
- async fn errorableHandler(self: *Self, _addr: *const std.net.Address, _socket: *const std.os.File) !void {
- const addr = _addr.*; // TODO https://github.com/ziglang/zig/issues/733
- var socket = _socket.*; // TODO https://github.com/ziglang/zig/issues/733
-
- var adapter = std.io.FileOutStream.init(&socket);
- var stream = &adapter.stream;
- try stream.print("hello from server\n");
- }
- };
-
- const ip4addr = std.net.parseIp4("127.0.0.1") catch unreachable;
- const addr = std.net.Address.initIp4(ip4addr, 0);
-
- var loop: Loop = undefined;
- try loop.initSingleThreaded(std.debug.global_allocator);
- var server = MyServer{ .tcp_server = TcpServer.init(&loop) };
- defer server.tcp_server.deinit();
- try server.tcp_server.listen(addr, MyServer.handler);
-
- const p = try async<std.debug.global_allocator> doAsyncTest(&loop, server.tcp_server.listen_address, &server.tcp_server);
- defer cancel p;
- loop.run();
-}
-
-async fn doAsyncTest(loop: *Loop, address: *const std.net.Address, server: *TcpServer) void {
- errdefer @panic("test failure");
-
- var socket_file = try await try async event.connect(loop, address);
- defer socket_file.close();
-
- var buf: [512]u8 = undefined;
- const amt_read = try socket_file.read(buf[0..]);
- const msg = buf[0..amt_read];
- assert(mem.eql(u8, msg, "hello from server\n"));
- server.close();
-}
-
-test "std.event.Channel" {
- var da = std.heap.DirectAllocator.init();
- defer da.deinit();
-
- const allocator = &da.allocator;
-
- var loop: Loop = undefined;
- // TODO make a multi threaded test
- try loop.initSingleThreaded(allocator);
- defer loop.deinit();
-
- const channel = try Channel(i32).create(&loop, 0);
- defer channel.destroy();
-
- const handle = try async<allocator> testChannelGetter(&loop, channel);
- defer cancel handle;
-
- const putter = try async<allocator> testChannelPutter(channel);
- defer cancel putter;
-
- loop.run();
-}
-
-async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
- errdefer @panic("test failed");
-
- const value1_promise = try async channel.get();
- const value1 = await value1_promise;
- assert(value1 == 1234);
-
- const value2_promise = try async channel.get();
- const value2 = await value2_promise;
- assert(value2 == 4567);
-}
-
-async fn testChannelPutter(channel: *Channel(i32)) void {
- await (async channel.put(1234) catch @panic("out of memory"));
- await (async channel.put(4567) catch @panic("out of memory"));
-}
-
-/// Thread-safe async/await lock.
-/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
-/// are resumed when the lock is released, in order.
-pub const Lock = struct {
- loop: *Loop,
- shared_bit: u8, // TODO make this a bool
- queue: Queue,
- queue_empty_bit: u8, // TODO make this a bool
-
- const Queue = std.atomic.QueueMpsc(promise);
-
- pub const Held = struct {
- lock: *Lock,
-
- pub fn release(self: Held) void {
- // Resume the next item from the queue.
- if (self.lock.queue.get()) |node| {
- self.lock.loop.onNextTick(node);
- return;
- }
-
- // We need to release the lock.
- _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-
- // There might be a queue item. If we know the queue is empty, we can be done,
- // because the other actor will try to obtain the lock.
- // But if there's a queue item, we are the actor which must loop and attempt
- // to grab the lock again.
- if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
- return;
- }
-
- while (true) {
- const old_bit = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- if (old_bit != 0) {
- // We did not obtain the lock. Great, the queue is someone else's problem.
- return;
- }
-
- // Resume the next item from the queue.
- if (self.lock.queue.get()) |node| {
- self.lock.loop.onNextTick(node);
- return;
- }
-
- // Release the lock again.
- _ = @atomicRmw(u8, &self.lock.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- _ = @atomicRmw(u8, &self.lock.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-
- // Find out if we can be done.
- if (@atomicLoad(u8, &self.lock.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
- return;
- }
- }
- }
- };
-
- pub fn init(loop: *Loop) Lock {
- return Lock{
- .loop = loop,
- .shared_bit = 0,
- .queue = Queue.init(),
- .queue_empty_bit = 1,
- };
- }
-
- /// Must be called when not locked. Not thread safe.
- /// All calls to acquire() and release() must complete before calling deinit().
- pub fn deinit(self: *Lock) void {
- assert(self.shared_bit == 0);
- while (self.queue.get()) |node| cancel node.data;
- }
-
- pub async fn acquire(self: *Lock) Held {
- s: suspend |handle| {
- // TODO explicitly put this memory in the coroutine frame #1194
- var my_tick_node = Loop.NextTickNode{
- .data = handle,
- .next = undefined,
- };
-
- self.queue.put(&my_tick_node);
-
- // At this point, we are in the queue, so we might have already been resumed and this coroutine
- // frame might be destroyed. For the rest of the suspend block we cannot access the coroutine frame.
-
- // We set this bit so that later we can rely on the fact, that if queue_empty_bit is 1, some actor
- // will attempt to grab the lock.
- _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-
- while (true) {
- const old_bit = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- if (old_bit != 0) {
- // We did not obtain the lock. Trust that our queue entry will resume us, and allow
- // suspend to complete.
- break;
- }
- // We got the lock. However we might have already been resumed from the queue.
- if (self.queue.get()) |node| {
- // Whether this node is us or someone else, we tail resume it.
- resume node.data;
- break;
- } else {
- // We already got resumed, and there are none left in the queue, which means that
- // we aren't even supposed to hold the lock right now.
- _ = @atomicRmw(u8, &self.queue_empty_bit, AtomicRmwOp.Xchg, 1, AtomicOrder.SeqCst);
- _ = @atomicRmw(u8, &self.shared_bit, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
-
- // There might be a queue item. If we know the queue is empty, we can be done,
- // because the other actor will try to obtain the lock.
- // But if there's a queue item, we are the actor which must loop and attempt
- // to grab the lock again.
- if (@atomicLoad(u8, &self.queue_empty_bit, AtomicOrder.SeqCst) == 1) {
- break;
- } else {
- continue;
- }
- }
- unreachable;
- }
- }
-
- return Held{ .lock = self };
- }
-};
-
-/// Thread-safe async/await lock that protects one piece of data.
-/// Does not make any syscalls - coroutines which are waiting for the lock are suspended, and
-/// are resumed when the lock is released, in order.
-pub fn Locked(comptime T: type) type {
- return struct {
- lock: Lock,
- private_data: T,
-
- const Self = this;
-
- pub const HeldLock = struct {
- value: *T,
- held: Lock.Held,
-
- pub fn release(self: HeldLock) void {
- self.held.release();
- }
- };
-
- pub fn init(loop: *Loop, data: T) Self {
- return Self{
- .lock = Lock.init(loop),
- .private_data = data,
- };
- }
-
- pub fn deinit(self: *Self) void {
- self.lock.deinit();
- }
-
- pub async fn acquire(self: *Self) HeldLock {
- return HeldLock{
- // TODO guaranteed allocation elision
- .held = await (async self.lock.acquire() catch unreachable),
- .value = &self.private_data,
- };
- }
- };
-}
-
-test "std.event.Lock" {
- var da = std.heap.DirectAllocator.init();
- defer da.deinit();
-
- const allocator = &da.allocator;
-
- var loop: Loop = undefined;
- try loop.initMultiThreaded(allocator);
- defer loop.deinit();
-
- var lock = Lock.init(&loop);
- defer lock.deinit();
-
- const handle = try async<allocator> testLock(&loop, &lock);
- defer cancel handle;
- loop.run();
-
- assert(mem.eql(i32, shared_test_data, [1]i32{3 * @intCast(i32, shared_test_data.len)} ** shared_test_data.len));
-}
-
-async fn testLock(loop: *Loop, lock: *Lock) void {
- // TODO explicitly put next tick node memory in the coroutine frame #1194
- suspend |p| {
- resume p;
- }
- const handle1 = async lockRunner(lock) catch @panic("out of memory");
- var tick_node1 = Loop.NextTickNode{
- .next = undefined,
- .data = handle1,
- };
- loop.onNextTick(&tick_node1);
-
- const handle2 = async lockRunner(lock) catch @panic("out of memory");
- var tick_node2 = Loop.NextTickNode{
- .next = undefined,
- .data = handle2,
- };
- loop.onNextTick(&tick_node2);
-
- const handle3 = async lockRunner(lock) catch @panic("out of memory");
- var tick_node3 = Loop.NextTickNode{
- .next = undefined,
- .data = handle3,
- };
- loop.onNextTick(&tick_node3);
-
- await handle1;
- await handle2;
- await handle3;
-}
-
-var shared_test_data = [1]i32{0} ** 10;
-var shared_test_index: usize = 0;
-
-async fn lockRunner(lock: *Lock) void {
- suspend; // resumed by onNextTick
-
- var i: usize = 0;
- while (i < shared_test_data.len) : (i += 1) {
- const lock_promise = async lock.acquire() catch @panic("out of memory");
- const handle = await lock_promise;
- defer handle.release();
-
- shared_test_index = 0;
- while (shared_test_index < shared_test_data.len) : (shared_test_index += 1) {
- shared_test_data[shared_test_index] = shared_test_data[shared_test_index] + 1;
- }
- }
+pub const Locked = @import("event/locked.zig").Locked;
+pub const Loop = @import("event/loop.zig").Loop;
+pub const Lock = @import("event/lock.zig").Lock;
+pub const tcp = @import("event/tcp.zig");
+pub const Channel = @import("event/channel.zig").Channel;
+
+test "import event tests" {
+ _ = @import("event/locked.zig");
+ _ = @import("event/loop.zig");
+ _ = @import("event/lock.zig");
+ _ = @import("event/tcp.zig");
+ _ = @import("event/channel.zig");
}
CMakeLists.txt
@@ -458,6 +458,11 @@ set(ZIG_STD_FILES
"elf.zig"
"empty.zig"
"event.zig"
+ "event/channel.zig"
+ "event/lock.zig"
+ "event/locked.zig"
+ "event/loop.zig"
+ "event/tcp.zig"
"fmt/errol/enum3.zig"
"fmt/errol/index.zig"
"fmt/errol/lookup.zig"