Commit 8fba0a6ae8

Andrew Kelley <superjoe30@gmail.com>
2018-07-10 21:17:01
introduce std.event.Group for making parallel async calls
1 parent 0ce6934
Changed files (4)
src-self-hosted/module.zig
@@ -85,6 +85,17 @@ pub const Module = struct {
 
     exported_symbol_names: event.Locked(Decl.Table),
 
+    /// Before code generation starts, must wait on this group to make sure
+    /// the build is complete.
+    build_group: event.Group(BuildError!void),
+
+    const BuildErrorsList = std.SegmentedList(BuildErrorDesc, 1);
+
+    pub const BuildErrorDesc = struct {
+        code: BuildError,
+        text: []const u8,
+    };
+
     // TODO handle some of these earlier and report them in a way other than error codes
     pub const BuildError = error{
         OutOfMemory,
@@ -237,6 +248,7 @@ pub const Module = struct {
             .emit_file_type = Emit.Binary,
             .link_out_file = null,
             .exported_symbol_names = event.Locked(Decl.Table).init(loop, Decl.Table.init(loop.allocator)),
+            .build_group = event.Group(BuildError!void).init(loop),
         });
     }
 
@@ -310,6 +322,9 @@ pub const Module = struct {
         const decls = try Scope.Decls.create(self.a(), null);
         errdefer decls.destroy();
 
+        var decl_group = event.Group(BuildError!void).init(self.loop);
+        errdefer decl_group.cancelAll();
+
         var it = tree.root_node.decls.iterator(0);
         while (it.next()) |decl_ptr| {
             const decl = decl_ptr.*;
@@ -342,25 +357,30 @@ pub const Module = struct {
                     });
                     errdefer self.a().destroy(fn_decl);
 
-                    // TODO make this parallel
-                    try await try async self.addTopLevelDecl(tree, &fn_decl.base);
+                    try decl_group.call(addTopLevelDecl, self, tree, &fn_decl.base);
                 },
                 ast.Node.Id.TestDecl => @panic("TODO"),
                 else => unreachable,
             }
         }
+        try await (async decl_group.wait() catch unreachable);
+        try await (async self.build_group.wait() catch unreachable);
     }
 
     async fn addTopLevelDecl(self: *Module, tree: *ast.Tree, decl: *Decl) !void {
         const is_export = decl.isExported(tree);
 
-        {
-            const exported_symbol_names = await try async self.exported_symbol_names.acquire();
-            defer exported_symbol_names.release();
+        if (is_export) {
+            try self.build_group.call(verifyUniqueSymbol, self, decl);
+        }
+    }
 
-            if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| {
-                @panic("TODO report compile error");
-            }
+    async fn verifyUniqueSymbol(self: *Module, decl: *Decl) !void {
+        const exported_symbol_names = await (async self.exported_symbol_names.acquire() catch unreachable);
+        defer exported_symbol_names.release();
+
+        if (try exported_symbol_names.value.put(decl.name, decl)) |other_decl| {
+            @panic("TODO report compile error");
         }
     }
 
std/event/group.zig
@@ -0,0 +1,158 @@
+const std = @import("../index.zig");
+const builtin = @import("builtin");
+const Lock = std.event.Lock;
+const Loop = std.event.Loop;
+const AtomicRmwOp = builtin.AtomicRmwOp;
+const AtomicOrder = builtin.AtomicOrder;
+const assert = std.debug.assert;
+
+/// ReturnType should be `void` or `E!void`
+pub fn Group(comptime ReturnType: type) type {
+    return struct {
+        coro_stack: Stack,
+        alloc_stack: Stack,
+        lock: Lock,
+
+        const Self = this;
+
+        const Error = switch (@typeInfo(ReturnType)) {
+            builtin.TypeId.ErrorUnion => |payload| payload.error_set,
+            else => void,
+        };
+        const Stack = std.atomic.Stack(promise->ReturnType);
+
+        pub fn init(loop: *Loop) Self {
+            return Self{
+                .coro_stack = Stack.init(),
+                .alloc_stack = Stack.init(),
+                .lock = Lock.init(loop),
+            };
+        }
+
+        /// Add a promise to the group. Thread-safe.
+        pub fn add(self: *Self, handle: promise->ReturnType) (error{OutOfMemory}!void) {
+            const node = try self.lock.loop.allocator.create(Stack.Node{
+                .next = undefined,
+                .data = handle,
+            });
+            self.alloc_stack.push(node);
+        }
+
+        /// This is equivalent to an async call, but the async function is added to the group, instead
+        /// of returning a promise. func must be async and have return type void.
+        /// Thread-safe.
+        pub fn call(self: *Self, comptime func: var, args: ...) (error{OutOfMemory}!void) {
+            const S = struct {
+                async fn asyncFunc(node: **Stack.Node, args2: ...) ReturnType {
+                    // TODO this is a hack to make the memory following be inside the coro frame
+                    suspend |p| {
+                        var my_node: Stack.Node = undefined;
+                        node.* = &my_node;
+                        resume p;
+                    }
+
+                    // TODO this allocation elision should be guaranteed because we await it in
+                    // this coro frame
+                    return await (async func(args2) catch unreachable);
+                }
+            };
+            var node: *Stack.Node = undefined;
+            const handle = try async<self.lock.loop.allocator> S.asyncFunc(&node, args);
+            node.* = Stack.Node{
+                .next = undefined,
+                .data = handle,
+            };
+            self.coro_stack.push(node);
+        }
+
+        /// Wait for all the calls and promises of the group to complete.
+        /// Thread-safe.
+        pub async fn wait(self: *Self) ReturnType {
+            // TODO catch unreachable because the allocation can be grouped with
+            // the coro frame allocation
+            const held = await (async self.lock.acquire() catch unreachable);
+            defer held.release();
+
+            while (self.coro_stack.pop()) |node| {
+                if (Error == void) {
+                    await node.data;
+                } else {
+                    (await node.data) catch |err| {
+                        self.cancelAll();
+                        return err;
+                    };
+                }
+            }
+            while (self.alloc_stack.pop()) |node| {
+                const handle = node.data;
+                self.lock.loop.allocator.destroy(node);
+                if (Error == void) {
+                    await handle;
+                } else {
+                    (await handle) catch |err| {
+                        self.cancelAll();
+                        return err;
+                    };
+                }
+            }
+        }
+
+        /// Cancel all the outstanding promises. May only be called if wait was never called.
+        pub fn cancelAll(self: *Self) void {
+            while (self.coro_stack.pop()) |node| {
+                cancel node.data;
+            }
+            while (self.alloc_stack.pop()) |node| {
+                cancel node.data;
+                self.lock.loop.allocator.destroy(node);
+            }
+        }
+    };
+}
+
+test "std.event.Group" {
+    var da = std.heap.DirectAllocator.init();
+    defer da.deinit();
+
+    const allocator = &da.allocator;
+
+    var loop: Loop = undefined;
+    try loop.initMultiThreaded(allocator);
+    defer loop.deinit();
+
+    const handle = try async<allocator> testGroup(&loop);
+    defer cancel handle;
+
+    loop.run();
+}
+
+async fn testGroup(loop: *Loop) void {
+    var count: usize = 0;
+    var group = Group(void).init(loop);
+    group.add(async sleepALittle(&count) catch @panic("memory")) catch @panic("memory");
+    group.call(increaseByTen, &count) catch @panic("memory");
+    await (async group.wait() catch @panic("memory"));
+    assert(count == 11);
+
+    var another = Group(error!void).init(loop);
+    another.add(async somethingElse() catch @panic("memory")) catch @panic("memory");
+    another.call(doSomethingThatFails) catch @panic("memory");
+    std.debug.assertError(await (async another.wait() catch @panic("memory")), error.ItBroke);
+}
+
+async fn sleepALittle(count: *usize) void {
+    std.os.time.sleep(0, 1000000);
+    _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+}
+
+async fn increaseByTen(count: *usize) void {
+    var i: usize = 0;
+    while (i < 10) : (i += 1) {
+        _ = @atomicRmw(usize, count, AtomicRmwOp.Add, 1, AtomicOrder.SeqCst);
+    }
+}
+
+async fn doSomethingThatFails() error!void {}
+async fn somethingElse() error!void {
+    return error.ItBroke;
+}
std/event.zig
@@ -3,6 +3,7 @@ pub const Loop = @import("event/loop.zig").Loop;
 pub const Lock = @import("event/lock.zig").Lock;
 pub const tcp = @import("event/tcp.zig");
 pub const Channel = @import("event/channel.zig").Channel;
+pub const Group = @import("event/group.zig").Group;
 
 test "import event tests" {
     _ = @import("event/locked.zig");
@@ -10,4 +11,5 @@ test "import event tests" {
     _ = @import("event/lock.zig");
     _ = @import("event/tcp.zig");
     _ = @import("event/channel.zig");
+    _ = @import("event/group.zig");
 }
CMakeLists.txt
@@ -459,6 +459,7 @@ set(ZIG_STD_FILES
     "empty.zig"
     "event.zig"
     "event/channel.zig"
+    "event/group.zig"
     "event/lock.zig"
     "event/locked.zig"
     "event/loop.zig"