Commit d3d3e4e374

Andrew Kelley <andrew@ziglang.org>
2019-10-31 16:41:39
startup code sets up event loop if I/O mode is declared evented
1 parent 788848e
lib/std/event/channel.zig
@@ -7,12 +7,8 @@ const Loop = std.event.Loop;
 /// Many producer, many consumer, thread-safe, runtime configurable buffer size.
 /// When buffer is empty, consumers suspend and are resumed by producers.
 /// When buffer is full, producers suspend and are resumed by consumers.
-/// TODO now that async function rewrite has landed, this API should be adjusted
-/// to not use the event loop's allocator, and to not require allocation.
 pub fn Channel(comptime T: type) type {
     return struct {
-        loop: *Loop,
-
         getters: std.atomic.Queue(GetNode),
         or_null_queue: std.atomic.Queue(*std.atomic.Queue(GetNode).Node),
         putters: std.atomic.Queue(PutNode),
@@ -50,16 +46,17 @@ pub fn Channel(comptime T: type) type {
             tick_node: *Loop.NextTickNode,
         };
 
-        /// Call `destroy` when done.
-        pub fn create(loop: *Loop, capacity: usize) !*SelfChannel {
-            const buffer_nodes = try loop.allocator.alloc(T, capacity);
-            errdefer loop.allocator.free(buffer_nodes);
+        const global_event_loop = Loop.instance orelse
+            @compileError("std.event.Channel currently only works with event-based I/O");
 
-            const self = try loop.allocator.create(SelfChannel);
+        /// Call `deinit` to free resources when done.
+        /// `buffer` must live until `deinit` is called.
+        /// For a zero length buffer, use `[0]T{}`.
+        /// TODO https://github.com/ziglang/zig/issues/2765
+        pub fn init(self: *SelfChannel, buffer: []T) void {
             self.* = SelfChannel{
-                .loop = loop,
                 .buffer_len = 0,
-                .buffer_nodes = buffer_nodes,
+                .buffer_nodes = buffer,
                 .buffer_index = 0,
                 .dispatch_lock = 0,
                 .need_dispatch = 0,
@@ -69,21 +66,19 @@ pub fn Channel(comptime T: type) type {
                 .get_count = 0,
                 .put_count = 0,
             };
-            errdefer loop.allocator.destroy(self);
-
-            return self;
         }
 
-        /// must be called when all calls to put and get have suspended and no more calls occur
-        pub fn destroy(self: *SelfChannel) void {
+        /// Must be called when all calls to put and get have suspended and no more calls occur.
+        /// This can be omitted if caller can guarantee that the suspended putters and getters
+        /// do not need to be run to completion. Note that this may leave awaiters hanging.
+        pub fn deinit(self: *SelfChannel) void {
             while (self.getters.get()) |get_node| {
                 resume get_node.data.tick_node.data;
             }
             while (self.putters.get()) |put_node| {
                 resume put_node.data.tick_node.data;
             }
-            self.loop.allocator.free(self.buffer_nodes);
-            self.loop.allocator.destroy(self);
+            self.* = undefined;
         }
 
         /// puts a data item in the channel. The function returns when the value has been added to the
@@ -96,17 +91,6 @@ pub fn Channel(comptime T: type) type {
                 .data = data,
             });
 
-            // TODO test canceling a put()
-            errdefer {
-                _ = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
-                const need_dispatch = !self.putters.remove(&queue_node);
-                self.loop.cancelOnNextTick(&my_tick_node);
-                if (need_dispatch) {
-                    // oops we made the put_count incorrect for a period of time. fix by dispatching.
-                    _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
-                    self.dispatch();
-                }
-            }
             suspend {
                 self.putters.put(&queue_node);
                 _ = @atomicRmw(usize, &self.put_count, .Add, 1, .SeqCst);
@@ -128,18 +112,6 @@ pub fn Channel(comptime T: type) type {
                 },
             });
 
-            // TODO test canceling a get()
-            errdefer {
-                _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
-                const need_dispatch = !self.getters.remove(&queue_node);
-                self.loop.cancelOnNextTick(&my_tick_node);
-                if (need_dispatch) {
-                    // oops we made the get_count incorrect for a period of time. fix by dispatching.
-                    _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
-                    self.dispatch();
-                }
-            }
-
             suspend {
                 self.getters.put(&queue_node);
                 _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
@@ -158,11 +130,9 @@ pub fn Channel(comptime T: type) type {
         //    }
         //}
 
-        /// Await this function to get an item from the channel. If the buffer is empty and there are no
-        /// puts waiting, this returns null.
-        /// Await is necessary for locking purposes. The function will be resumed after checking the channel
-        /// for data and will not wait for data to be available.
-        pub async fn getOrNull(self: *SelfChannel) ?T {
+        /// Get an item from the channel. If the buffer is empty and there are no
+        /// puts waiting, this returns `null`.
+        pub fn getOrNull(self: *SelfChannel) ?T {
             // TODO integrate this function with named return values
             // so we can get rid of this extra result copy
             var result: ?T = null;
@@ -179,19 +149,6 @@ pub fn Channel(comptime T: type) type {
             });
             or_null_node.data = &queue_node;
 
-            // TODO test canceling getOrNull
-            errdefer {
-                _ = self.or_null_queue.remove(&or_null_node);
-                _ = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
-                const need_dispatch = !self.getters.remove(&queue_node);
-                self.loop.cancelOnNextTick(&my_tick_node);
-                if (need_dispatch) {
-                    // oops we made the get_count incorrect for a period of time. fix by dispatching.
-                    _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
-                    self.dispatch();
-                }
-            }
-
             suspend {
                 self.getters.put(&queue_node);
                 _ = @atomicRmw(usize, &self.get_count, .Add, 1, .SeqCst);
@@ -234,7 +191,7 @@ pub fn Channel(comptime T: type) type {
                                     info.ptr.* = self.buffer_nodes[self.buffer_index -% self.buffer_len];
                                 },
                             }
-                            self.loop.onNextTick(get_node.tick_node);
+                            global_event_loop.onNextTick(get_node.tick_node);
                             self.buffer_len -= 1;
 
                             get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
@@ -254,8 +211,8 @@ pub fn Channel(comptime T: type) type {
                                     info.ptr.* = put_node.data;
                                 },
                             }
-                            self.loop.onNextTick(get_node.tick_node);
-                            self.loop.onNextTick(put_node.tick_node);
+                            global_event_loop.onNextTick(get_node.tick_node);
+                            global_event_loop.onNextTick(put_node.tick_node);
 
                             get_count = @atomicRmw(usize, &self.get_count, .Sub, 1, .SeqCst);
                             put_count = @atomicRmw(usize, &self.put_count, .Sub, 1, .SeqCst);
@@ -266,7 +223,7 @@ pub fn Channel(comptime T: type) type {
                             const put_node = &self.putters.get().?.data;
 
                             self.buffer_nodes[self.buffer_index] = put_node.data;
-                            self.loop.onNextTick(put_node.tick_node);
+                            global_event_loop.onNextTick(put_node.tick_node);
                             self.buffer_index +%= 1;
                             self.buffer_len += 1;
 
@@ -282,7 +239,7 @@ pub fn Channel(comptime T: type) type {
                     var remove_count: usize = 0;
                     while (self.or_null_queue.get()) |or_null_node| {
                         remove_count += @boolToInt(self.getters.remove(or_null_node.data));
-                        self.loop.onNextTick(or_null_node.data.data.tick_node);
+                        global_event_loop.onNextTick(or_null_node.data.data.tick_node);
                     }
                     if (remove_count != 0) {
                         _ = @atomicRmw(usize, &self.get_count, .Sub, remove_count, .SeqCst);
@@ -315,21 +272,21 @@ test "std.event.Channel" {
     // https://github.com/ziglang/zig/issues/3251
     if (builtin.os == .freebsd) return error.SkipZigTest;
 
-    var loop: Loop = undefined;
-    // TODO make a multi threaded test
-    try loop.initSingleThreaded(std.heap.direct_allocator);
-    defer loop.deinit();
+    // TODO provide a way to run tests in evented I/O mode
+    if (!std.io.is_async) return error.SkipZigTest;
 
-    const channel = try Channel(i32).create(&loop, 0);
-    defer channel.destroy();
+    var channel: Channel(i32) = undefined;
+    channel.init([0]i32{});
+    defer channel.deinit();
 
-    const handle = async testChannelGetter(&loop, channel);
-    const putter = async testChannelPutter(channel);
+    var handle = async testChannelGetter(&channel);
+    var putter = async testChannelPutter(&channel);
 
-    loop.run();
+    await handle;
+    await putter;
 }
 
-async fn testChannelGetter(loop: *Loop, channel: *Channel(i32)) void {
+async fn testChannelGetter(channel: *Channel(i32)) void {
     const value1 = channel.get();
     testing.expect(value1 == 1234);
 
lib/std/event/future.zig
@@ -87,11 +87,11 @@ test "std.event.Future" {
     if (builtin.single_threaded) return error.SkipZigTest;
     // https://github.com/ziglang/zig/issues/3251
     if (builtin.os == .freebsd) return error.SkipZigTest;
-
-    const allocator = std.heap.direct_allocator;
+    // TODO provide a way to run tests in evented I/O mode
+    if (!std.io.is_async) return error.SkipZigTest;
 
     var loop: Loop = undefined;
-    try loop.initMultiThreaded(allocator);
+    try loop.initMultiThreaded();
     defer loop.deinit();
 
     const handle = async testFuture(&loop);
lib/std/event/group.zig
@@ -87,10 +87,11 @@ test "std.event.Group" {
     // https://github.com/ziglang/zig/issues/1908
     if (builtin.single_threaded) return error.SkipZigTest;
 
-    const allocator = std.heap.direct_allocator;
+    // TODO provide a way to run tests in evented I/O mode
+    if (!std.io.is_async) return error.SkipZigTest;
 
     var loop: Loop = undefined;
-    try loop.initMultiThreaded(allocator);
+    try loop.initMultiThreaded();
     defer loop.deinit();
 
     const handle = async testGroup(&loop);
lib/std/event/lock.zig
@@ -9,6 +9,7 @@ const Loop = std.event.Loop;
 /// Functions which are waiting for the lock are suspended, and
 /// are resumed when the lock is released, in order.
 /// Allows only one actor to hold the lock.
+/// TODO: make this API also work in blocking I/O mode.
 pub const Lock = struct {
     loop: *Loop,
     shared_bit: u8, // TODO make this a bool
@@ -125,10 +126,11 @@ test "std.event.Lock" {
     // TODO https://github.com/ziglang/zig/issues/3251
     if (builtin.os == .freebsd) return error.SkipZigTest;
 
-    const allocator = std.heap.direct_allocator;
+    // TODO provide a way to run tests in evented I/O mode
+    if (!std.io.is_async) return error.SkipZigTest;
 
     var loop: Loop = undefined;
-    try loop.initMultiThreaded(allocator);
+    try loop.initMultiThreaded();
     defer loop.deinit();
 
     var lock = Lock.init(&loop);
lib/std/event/loop.zig
@@ -96,11 +96,11 @@ pub const Loop = struct {
     /// TODO copy elision / named return values so that the threads referencing *Loop
     /// have the correct pointer value.
     /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
-    pub fn init(self: *Loop, allocator: *mem.Allocator) !void {
+    pub fn init(self: *Loop) !void {
         if (builtin.single_threaded) {
-            return self.initSingleThreaded(allocator);
+            return self.initSingleThreaded();
         } else {
-            return self.initMultiThreaded(allocator);
+            return self.initMultiThreaded();
         }
     }
 
@@ -108,25 +108,28 @@ pub const Loop = struct {
     /// TODO copy elision / named return values so that the threads referencing *Loop
     /// have the correct pointer value.
     /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
-    pub fn initSingleThreaded(self: *Loop, allocator: *mem.Allocator) !void {
-        return self.initInternal(allocator, 1);
+    pub fn initSingleThreaded(self: *Loop) !void {
+        return self.initThreadPool(1);
     }
 
-    /// The allocator must be thread-safe because we use it for multiplexing
-    /// async functions onto kernel threads.
     /// After initialization, call run().
+    /// This is the same as `initThreadPool` using `Thread.cpuCount` to determine the thread
+    /// pool size.
     /// TODO copy elision / named return values so that the threads referencing *Loop
     /// have the correct pointer value.
     /// https://github.com/ziglang/zig/issues/2761 and https://github.com/ziglang/zig/issues/2765
-    pub fn initMultiThreaded(self: *Loop, allocator: *mem.Allocator) !void {
-        if (builtin.single_threaded) @compileError("initMultiThreaded unavailable when building in single-threaded mode");
+    pub fn initMultiThreaded(self: *Loop) !void {
+        if (builtin.single_threaded)
+            @compileError("initMultiThreaded unavailable when building in single-threaded mode");
         const core_count = try Thread.cpuCount();
-        return self.initInternal(allocator, core_count);
+        return self.initThreadPool(core_count);
     }
 
     /// Thread count is the total thread count. The thread pool size will be
     /// max(thread_count - 1, 0)
-    fn initInternal(self: *Loop, allocator: *mem.Allocator, thread_count: usize) !void {
+    pub fn initThreadPool(self: *Loop, thread_count: usize) !void {
+        // TODO: https://github.com/ziglang/zig/issues/3539
+        const allocator = std.heap.direct_allocator;
         self.* = Loop{
             .pending_event_count = 1,
             .allocator = allocator,
@@ -932,10 +935,8 @@ test "std.event.Loop - basic" {
     // https://github.com/ziglang/zig/issues/1908
     if (builtin.single_threaded) return error.SkipZigTest;
 
-    const allocator = std.heap.direct_allocator;
-
     var loop: Loop = undefined;
-    try loop.initMultiThreaded(allocator);
+    try loop.initMultiThreaded();
     defer loop.deinit();
 
     loop.run();
@@ -945,10 +946,8 @@ test "std.event.Loop - call" {
     // https://github.com/ziglang/zig/issues/1908
     if (builtin.single_threaded) return error.SkipZigTest;
 
-    const allocator = std.heap.direct_allocator;
-
     var loop: Loop = undefined;
-    try loop.initMultiThreaded(allocator);
+    try loop.initMultiThreaded();
     defer loop.deinit();
 
     var did_it = false;
lib/std/event/rwlock.zig
@@ -11,6 +11,7 @@ const Loop = std.event.Loop;
 /// Many readers can hold the lock at the same time; however locking for writing is exclusive.
 /// When a read lock is held, it will not be released until the reader queue is empty.
 /// When a write lock is held, it will not be released until the writer queue is empty.
+/// TODO: make this API also work in blocking I/O mode
 pub const RwLock = struct {
     loop: *Loop,
     shared_state: u8, // TODO make this an enum
@@ -212,10 +213,11 @@ test "std.event.RwLock" {
     // https://github.com/ziglang/zig/issues/1908
     if (builtin.single_threaded) return error.SkipZigTest;
 
-    const allocator = std.heap.direct_allocator;
+    // TODO provide a way to run tests in evented I/O mode
+    if (!std.io.is_async) return error.SkipZigTest;
 
     var loop: Loop = undefined;
-    try loop.initMultiThreaded(allocator);
+    try loop.initMultiThreaded();
     defer loop.deinit();
 
     var lock = RwLock.init(&loop);
lib/std/special/start.zig
@@ -35,7 +35,9 @@ comptime {
 }
 
 extern fn wasm_freestanding_start() void {
-    _ = callMain();
+    // This is marked inline because for some reason LLVM in release mode fails to inline it,
+    // and we want fewer call frames in stack traces.
+    _ = @inlineCall(callMain);
 }
 
 extern fn EfiMain(handle: uefi.Handle, system_table: *uefi.tables.SystemTable) usize {
@@ -63,7 +65,9 @@ extern fn EfiMain(handle: uefi.Handle, system_table: *uefi.tables.SystemTable) u
 
 nakedcc fn _start() noreturn {
     if (builtin.os == builtin.Os.wasi) {
-        std.os.wasi.proc_exit(callMain());
+        // This is marked inline because for some reason LLVM in release mode fails to inline it,
+        // and we want fewer call frames in stack traces.
+        std.os.wasi.proc_exit(@inlineCall(callMain));
     }
 
     switch (builtin.arch) {
@@ -110,7 +114,7 @@ extern fn WinMainCRTStartup() noreturn {
 
     std.debug.maybeEnableSegfaultHandler();
 
-    std.os.windows.kernel32.ExitProcess(callMain());
+    std.os.windows.kernel32.ExitProcess(initEventLoopAndCallMain());
 }
 
 // TODO https://github.com/ziglang/zig/issues/265
@@ -170,7 +174,7 @@ fn callMainWithArgs(argc: usize, argv: [*][*]u8, envp: [][*]u8) u8 {
 
     std.debug.maybeEnableSegfaultHandler();
 
-    return callMain();
+    return initEventLoopAndCallMain();
 }
 
 extern fn main(c_argc: i32, c_argv: [*][*]u8, c_envp: [*]?[*]u8) i32 {
@@ -185,7 +189,32 @@ const bad_main_ret = "expected return type of main to be 'void', '!void', 'noret
 
 // This is marked inline because for some reason LLVM in release mode fails to inline it,
 // and we want fewer call frames in stack traces.
-inline fn callMain() u8 {
+inline fn initEventLoopAndCallMain() u8 {
+    if (std.event.Loop.instance) |loop| {
+        loop.init() catch |err| {
+            std.debug.warn("error: {}\n", @errorName(err));
+            if (@errorReturnTrace()) |trace| {
+                std.debug.dumpStackTrace(trace.*);
+            }
+            return 1;
+        };
+        defer loop.deinit();
+
+        var result: u8 = undefined;
+        var frame: @Frame(callMain) = undefined;
+        _ = @asyncCall(&frame, &result, callMain);
+        loop.run();
+        return result;
+    } else {
+        // This is marked inline because for some reason LLVM in release mode fails to inline it,
+        // and we want fewer call frames in stack traces.
+        return @inlineCall(callMain);
+    }
+}
+
+// This is not marked inline because it is called with @asyncCall when
+// there is an event loop.
+fn callMain() u8 {
     switch (@typeInfo(@typeOf(root.main).ReturnType)) {
         .NoReturn => {
             root.main();