Commit 9462852433

Andrew Kelley <superjoe30@gmail.com>
2018-07-09 22:49:46
std.event.Loop multithreading for windows using IOCP
1 parent caa0085
Changed files (5)
std/os/windows/index.zig
@@ -59,6 +59,9 @@ pub extern "kernel32" stdcallcc fn CreateSymbolicLinkA(
     dwFlags: DWORD,
 ) BOOLEAN;
 
+
+pub extern "kernel32" stdcallcc fn CreateIoCompletionPort(FileHandle: HANDLE, ExistingCompletionPort: ?HANDLE, CompletionKey: ULONG_PTR, NumberOfConcurrentThreads: DWORD) ?HANDLE;
+
 pub extern "kernel32" stdcallcc fn CreateThread(lpThreadAttributes: ?LPSECURITY_ATTRIBUTES, dwStackSize: SIZE_T, lpStartAddress: LPTHREAD_START_ROUTINE, lpParameter: ?LPVOID, dwCreationFlags: DWORD, lpThreadId: ?LPDWORD) ?HANDLE;
 
 pub extern "kernel32" stdcallcc fn DeleteFileA(lpFileName: LPCSTR) BOOL;
@@ -106,6 +109,7 @@ pub extern "kernel32" stdcallcc fn GetFinalPathNameByHandleA(
 ) DWORD;
 
 pub extern "kernel32" stdcallcc fn GetProcessHeap() ?HANDLE;
+pub extern "kernel32" stdcallcc fn GetQueuedCompletionStatus(CompletionPort: HANDLE, lpNumberOfBytesTransferred: LPDWORD, lpCompletionKey: *ULONG_PTR, lpOverlapped: *?*OVERLAPPED, dwMilliseconds: DWORD) BOOL;
 
 pub extern "kernel32" stdcallcc fn GetSystemInfo(lpSystemInfo: *SYSTEM_INFO) void;
 pub extern "kernel32" stdcallcc fn GetSystemTimeAsFileTime(*FILETIME) void;
@@ -130,6 +134,9 @@ pub extern "kernel32" stdcallcc fn MoveFileExA(
     dwFlags: DWORD,
 ) BOOL;
 
+
+pub extern "kernel32" stdcallcc fn PostQueuedCompletionStatus(CompletionPort: HANDLE, dwNumberOfBytesTransferred: DWORD, dwCompletionKey: ULONG_PTR, lpOverlapped: ?*OVERLAPPED) BOOL;
+
 pub extern "kernel32" stdcallcc fn QueryPerformanceCounter(lpPerformanceCount: *LARGE_INTEGER) BOOL;
 
 pub extern "kernel32" stdcallcc fn QueryPerformanceFrequency(lpFrequency: *LARGE_INTEGER) BOOL;
std/os/windows/util.zig
@@ -214,3 +214,50 @@ pub fn windowsFindNextFile(handle: windows.HANDLE, find_file_data: *windows.WIN3
     }
     return true;
 }
+
+
+pub const WindowsCreateIoCompletionPortError = error {
+    Unexpected,
+};
+
+pub fn windowsCreateIoCompletionPort(file_handle: windows.HANDLE, existing_completion_port: ?windows.HANDLE, completion_key: usize, concurrent_thread_count: windows.DWORD) !windows.HANDLE {
+    const handle = windows.CreateIoCompletionPort(file_handle, existing_completion_port, completion_key, concurrent_thread_count) orelse {
+        const err = windows.GetLastError();
+        switch (err) {
+            else => return os.unexpectedErrorWindows(err),
+        }
+    };
+    return handle;
+}
+
+pub const WindowsPostQueuedCompletionStatusError = error {
+    Unexpected,
+};
+
+pub fn windowsPostQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: windows.DWORD, completion_key: usize, lpOverlapped: ?*windows.OVERLAPPED) WindowsPostQueuedCompletionStatusError!void {
+    if (windows.PostQueuedCompletionStatus(completion_port, bytes_transferred_count, completion_key, lpOverlapped) == 0) {
+        const err = windows.GetLastError();
+        switch (err) {
+            else => return os.unexpectedErrorWindows(err),
+        }
+    }
+}
+
+pub const WindowsWaitResult = error {
+    Normal,
+    Aborted,
+};
+
+pub fn windowsGetQueuedCompletionStatus(completion_port: windows.HANDLE, bytes_transferred_count: *windows.DWORD, lpCompletionKey: *usize, lpOverlapped: *?*windows.OVERLAPPED, dwMilliseconds: windows.DWORD) WindowsWaitResult {
+    if (windows.GetQueuedCompletionStatus(completion_port, bytes_transferred_count, lpCompletionKey, lpOverlapped, dwMilliseconds) == windows.FALSE) {
+        if (std.debug.runtime_safety) {
+            const err = windows.GetLastError();
+            if (err != windows.ERROR.ABANDONED_WAIT_0) {
+                std.debug.warn("err: {}\n", err);
+            }
+            assert(err == windows.ERROR.ABANDONED_WAIT_0);
+        }
+        return WindowsWaitResult.Aborted;
+    }
+    return WindowsWaitResult.Normal;
+}
std/os/index.zig
@@ -61,6 +61,15 @@ pub const windowsLoadDll = windows_util.windowsLoadDll;
 pub const windowsUnloadDll = windows_util.windowsUnloadDll;
 pub const createWindowsEnvBlock = windows_util.createWindowsEnvBlock;
 
+pub const WindowsCreateIoCompletionPortError = windows_util.WindowsCreateIoCompletionPortError;
+pub const windowsCreateIoCompletionPort = windows_util.windowsCreateIoCompletionPort;
+
+pub const WindowsPostQueuedCompletionStatusError = windows_util.WindowsPostQueuedCompletionStatusError;
+pub const windowsPostQueuedCompletionStatus = windows_util.windowsPostQueuedCompletionStatus;
+
+pub const WindowsWaitResult = windows_util.WindowsWaitResult;
+pub const windowsGetQueuedCompletionStatus = windows_util.windowsGetQueuedCompletionStatus;
+
 pub const WindowsWaitError = windows_util.WaitError;
 pub const WindowsOpenError = windows_util.OpenError;
 pub const WindowsWriteError = windows_util.WriteError;
@@ -2592,11 +2601,17 @@ pub fn spawnThread(context: var, comptime startFn: var) SpawnThreadError!*Thread
                 thread: Thread,
                 inner: Context,
             };
-            extern fn threadMain(arg: windows.LPVOID) windows.DWORD {
-                if (@sizeOf(Context) == 0) {
-                    return startFn({});
-                } else {
-                    return startFn(@ptrCast(*Context, @alignCast(@alignOf(Context), arg)).*);
+            extern fn threadMain(raw_arg: windows.LPVOID) windows.DWORD {
+                const arg = if (@sizeOf(Context) == 0) {} else @ptrCast(*Context, @alignCast(@alignOf(Context), raw_arg)).*;
+                switch (@typeId(@typeOf(startFn).ReturnType)) {
+                    builtin.TypeId.Int => {
+                        return startFn(arg);
+                    },
+                    builtin.TypeId.Void => {
+                        startFn(arg);
+                        return 0;
+                    },
+                    else => @compileError("expected return type of startFn to be 'u8', 'noreturn', 'void', or '!void'"),
                 }
             }
         };
std/event.zig
@@ -4,6 +4,7 @@ const assert = std.debug.assert;
 const event = this;
 const mem = std.mem;
 const posix = std.os.posix;
+const windows = std.os.windows;
 const AtomicRmwOp = builtin.AtomicRmwOp;
 const AtomicOrder = builtin.AtomicOrder;
 
@@ -113,10 +114,10 @@ pub const Loop = struct {
     allocator: *mem.Allocator,
     next_tick_queue: std.atomic.QueueMpsc(promise),
     os_data: OsData,
+    final_resume_node: ResumeNode,
     dispatch_lock: u8, // TODO make this a bool
     pending_event_count: usize,
     extra_threads: []*std.os.Thread,
-    final_resume_node: ResumeNode,
 
     // pre-allocated eventfds. all permanently active.
     // this is how we send promises to be resumed on other threads.
@@ -144,6 +145,7 @@ pub const Loop = struct {
             },
             builtin.Os.windows => struct {
                 base: ResumeNode,
+                completion_key: usize,
             },
             else => @compileError("unsupported OS"),
         };
@@ -181,12 +183,12 @@ pub const Loop = struct {
             .next_tick_queue = std.atomic.QueueMpsc(promise).init(),
             .dispatch_lock = 1, // start locked so threads go directly into epoll wait
             .extra_threads = undefined,
+            .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
+            .eventfd_resume_nodes = undefined,
             .final_resume_node = ResumeNode{
                 .id = ResumeNode.Id.Stop,
                 .handle = undefined,
             },
-            .available_eventfd_resume_nodes = std.atomic.Stack(ResumeNode.EventFd).init(),
-            .eventfd_resume_nodes = undefined,
         };
         const extra_thread_count = thread_count - 1;
         self.eventfd_resume_nodes = try self.allocator.alloc(
@@ -209,7 +211,8 @@ pub const Loop = struct {
     }
 
     const InitOsDataError = std.os.LinuxEpollCreateError || mem.Allocator.Error || std.os.LinuxEventFdError ||
-        std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError;
+        std.os.SpawnThreadError || std.os.LinuxEpollCtlError || std.os.BsdKEventError ||
+        std.os.WindowsCreateIoCompletionPortError;
 
     const wakeup_bytes = []u8{0x1} ** 8;
 
@@ -335,6 +338,51 @@ pub const Loop = struct {
                     self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
                 }
             },
+            builtin.Os.windows => {
+                self.os_data.extra_thread_count = extra_thread_count;
+
+                self.os_data.io_port = try std.os.windowsCreateIoCompletionPort(
+                    windows.INVALID_HANDLE_VALUE,
+                    null,
+                    undefined,
+                    undefined,
+                );
+                errdefer std.os.close(self.os_data.io_port);
+
+                for (self.eventfd_resume_nodes) |*eventfd_node, i| {
+                    eventfd_node.* = std.atomic.Stack(ResumeNode.EventFd).Node{
+                        .data = ResumeNode.EventFd{
+                            .base = ResumeNode{
+                                .id = ResumeNode.Id.EventFd,
+                                .handle = undefined,
+                            },
+                            // this one is for sending events
+                            .completion_key = @ptrToInt(&eventfd_node.data.base),
+                        },
+                        .next = undefined,
+                    };
+                    self.available_eventfd_resume_nodes.push(eventfd_node);
+                }
+
+                var extra_thread_index: usize = 0;
+                errdefer {
+                    var i: usize = 0;
+                    while (i < extra_thread_index) : (i += 1) {
+                        while (true) {
+                            const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+                            std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+                            break;
+                        }
+                    }
+                    while (extra_thread_index != 0) {
+                        extra_thread_index -= 1;
+                        self.extra_threads[extra_thread_index].wait();
+                    }
+                }
+                while (extra_thread_index < extra_thread_count) : (extra_thread_index += 1) {
+                    self.extra_threads[extra_thread_index] = try std.os.spawnThread(self, workerRun);
+                }
+            },
             else => {},
         }
     }
@@ -349,6 +397,10 @@ pub const Loop = struct {
             },
             builtin.Os.macosx => {
                 self.allocator.free(self.os_data.kevents);
+                std.os.close(self.os_data.kqfd);
+            },
+            builtin.Os.windows => {
+                std.os.close(self.os_data.io_port);
             },
             else => {},
         }
@@ -434,7 +486,7 @@ pub const Loop = struct {
                             builtin.Os.macosx => {
                                 const kevent_array = (*[1]posix.Kevent)(&eventfd_node.kevent);
                                 const eventlist = ([*]posix.Kevent)(undefined)[0..0];
-                                _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch |_| {
+                                _ = std.os.bsdKEvent(self.os_data.kqfd, kevent_array, eventlist, null) catch {
                                     // fine, we didn't need it anyway
                                     _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
                                     self.available_eventfd_resume_nodes.push(resume_stack_node);
@@ -446,7 +498,21 @@ pub const Loop = struct {
                             builtin.Os.linux => {
                                 // the pending count is already accounted for
                                 const epoll_events = posix.EPOLLONESHOT | std.os.linux.EPOLLIN | std.os.linux.EPOLLOUT | std.os.linux.EPOLLET;
-                                self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch |_| {
+                                self.modFd(eventfd_node.eventfd, eventfd_node.epoll_op, epoll_events, &eventfd_node.base) catch {
+                                    // fine, we didn't need it anyway
+                                    _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
+                                    self.available_eventfd_resume_nodes.push(resume_stack_node);
+                                    resume handle;
+                                    _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                                    continue :start_over;
+                                };
+                            },
+                            builtin.Os.windows => {
+                                // this value is never dereferenced but we need it to be non-null so that
+                                // the consumer code can decide whether to read the completion key.
+                                // it has to do this for normal I/O, so we match that behavior here.
+                                const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+                                std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, eventfd_node.completion_key, overlapped) catch {
                                     // fine, we didn't need it anyway
                                     _ = @atomicRmw(u8, &self.dispatch_lock, AtomicRmwOp.Xchg, 0, AtomicOrder.SeqCst);
                                     self.available_eventfd_resume_nodes.push(resume_stack_node);
@@ -482,6 +548,17 @@ pub const Loop = struct {
                             _ = std.os.bsdKEvent(self.os_data.kqfd, final_kevent, eventlist, null) catch unreachable;
                             return;
                         },
+                        builtin.Os.windows => {
+                            var i: usize = 0;
+                            while (i < self.os_data.extra_thread_count) : (i += 1) {
+                                while (true) {
+                                    const overlapped = @intToPtr(?*windows.OVERLAPPED, 0x1);
+                                    std.os.windowsPostQueuedCompletionStatus(self.os_data.io_port, undefined, @ptrToInt(&self.final_resume_node), overlapped) catch continue;
+                                    break;
+                                }
+                            }
+                            return;
+                        },
                         else => @compileError("unsupported OS"),
                     }
                 }
@@ -536,6 +613,35 @@ pub const Loop = struct {
                         }
                     }
                 },
+                builtin.Os.windows => {
+                    var completion_key: usize = undefined;
+                    while (true) {
+                        var nbytes: windows.DWORD = undefined;
+                        var overlapped: ?*windows.OVERLAPPED = undefined;
+                        switch (std.os.windowsGetQueuedCompletionStatus(self.os_data.io_port, &nbytes, &completion_key, 
+                            &overlapped, windows.INFINITE)) {
+                            std.os.WindowsWaitResult.Aborted => return,
+                            std.os.WindowsWaitResult.Normal => {},
+                        }
+                        if (overlapped != null) break;
+                    }
+                    const resume_node = @intToPtr(*ResumeNode, completion_key);
+                    const handle = resume_node.handle;
+                    const resume_node_id = resume_node.id;
+                    switch (resume_node_id) {
+                        ResumeNode.Id.Basic => {},
+                        ResumeNode.Id.Stop => return,
+                        ResumeNode.Id.EventFd => {
+                            const event_fd_node = @fieldParentPtr(ResumeNode.EventFd, "base", resume_node);
+                            const stack_node = @fieldParentPtr(std.atomic.Stack(ResumeNode.EventFd).Node, "data", event_fd_node);
+                            self.available_eventfd_resume_nodes.push(stack_node);
+                        },
+                    }
+                    resume handle;
+                    if (resume_node_id == ResumeNode.Id.EventFd) {
+                        _ = @atomicRmw(usize, &self.pending_event_count, AtomicRmwOp.Sub, 1, AtomicOrder.SeqCst);
+                    }
+                },
                 else => @compileError("unsupported OS"),
             }
         }
@@ -548,6 +654,10 @@ pub const Loop = struct {
             final_eventfd_event: std.os.linux.epoll_event,
         },
         builtin.Os.macosx => MacOsData,
+        builtin.Os.windows => struct {
+            io_port: windows.HANDLE,
+            extra_thread_count: usize,
+        },
         else => struct {},
     };
 
std/heap.zig
@@ -98,7 +98,7 @@ pub const DirectAllocator = struct {
                 const amt = n + alignment + @sizeOf(usize);
                 const optional_heap_handle = @atomicLoad(?HeapHandle, &self.heap_handle, builtin.AtomicOrder.SeqCst);
                 const heap_handle = optional_heap_handle orelse blk: {
-                    const hh = os.windows.HeapCreate(os.windows.HEAP_NO_SERIALIZE, amt, 0) orelse return error.OutOfMemory;
+                    const hh = os.windows.HeapCreate(0, amt, 0) orelse return error.OutOfMemory;
                     const other_hh = @cmpxchgStrong(?HeapHandle, &self.heap_handle, null, hh, builtin.AtomicOrder.SeqCst, builtin.AtomicOrder.SeqCst) orelse break :blk hh;
                     _ = os.windows.HeapDestroy(hh);
                     break :blk other_hh.?; // can't be null because of the cmpxchg