Commit 3dce41b61a

Andrew Kelley <andrew@ziglang.org>
2019-08-17 03:29:29
improvements to std lib for event-based I/O
1 parent bf7b6fb
Changed files (8)
std/event/fs.zig
@@ -23,6 +23,7 @@ pub const Request = struct {
     };
 
     pub const Msg = union(enum) {
+        WriteV: WriteV,
         PWriteV: PWriteV,
         PReadV: PReadV,
         Open: Open,
@@ -30,6 +31,14 @@ pub const Request = struct {
         WriteFile: WriteFile,
         End, // special - means the fs thread should exit
 
+        pub const WriteV = struct {
+            fd: fd_t,
+            iov: []const os.iovec_const,
+            result: Error!void,
+
+            pub const Error = os.WriteError;
+        };
+
         pub const PWriteV = struct {
             fd: fd_t,
             iov: []const os.iovec_const,
@@ -77,7 +86,7 @@ pub const Request = struct {
 pub const PWriteVError = error{OutOfMemory} || File.WriteError;
 
 /// data - just the inner references - must live until pwritev frame completes.
-pub async fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void {
+pub fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) PWriteVError!void {
     switch (builtin.os) {
         .macosx,
         .linux,
@@ -94,31 +103,31 @@ pub async fn pwritev(loop: *Loop, fd: fd_t, data: []const []const u8, offset: us
                 };
             }
 
-            return await (async pwritevPosix(loop, fd, iovecs, offset) catch unreachable);
+            return pwritevPosix(loop, fd, iovecs, offset);
         },
         .windows => {
             const data_copy = try std.mem.dupe(loop.allocator, []const u8, data);
             defer loop.allocator.free(data_copy);
-            return await (async pwritevWindows(loop, fd, data, offset) catch unreachable);
+            return pwritevWindows(loop, fd, data, offset);
         },
         else => @compileError("Unsupported OS"),
     }
 }
 
 /// data must outlive the returned frame
-pub async fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
+pub fn pwritevWindows(loop: *Loop, fd: fd_t, data: []const []const u8, offset: usize) os.WindowsWriteError!void {
     if (data.len == 0) return;
-    if (data.len == 1) return await (async pwriteWindows(loop, fd, data[0], offset) catch unreachable);
+    if (data.len == 1) return pwriteWindows(loop, fd, data[0], offset);
 
     // TODO do these in parallel
     var off = offset;
     for (data) |buf| {
-        try await (async pwriteWindows(loop, fd, buf, off) catch unreachable);
+        try pwriteWindows(loop, fd, buf, off);
         off += buf.len;
     }
 }
 
-pub async fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void {
+pub fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64) os.WindowsWriteError!void {
     var resume_node = Loop.ResumeNode.Basic{
         .base = Loop.ResumeNode{
             .id = Loop.ResumeNode.Id.Basic,
@@ -158,7 +167,7 @@ pub async fn pwriteWindows(loop: *Loop, fd: fd_t, data: []const u8, offset: u64)
 }
 
 /// iovecs must live until pwritev frame completes.
-pub async fn pwritevPosix(
+pub fn pwritevPosix(
     loop: *Loop,
     fd: fd_t,
     iovecs: []const os.iovec_const,
@@ -195,10 +204,44 @@ pub async fn pwritevPosix(
     return req_node.data.msg.PWriteV.result;
 }
 
+/// iovecs must live until pwritev frame completes.
+pub fn writevPosix(
+    loop: *Loop,
+    fd: fd_t,
+    iovecs: []const os.iovec_const,
+) os.WriteError!void {
+    var req_node = RequestNode{
+        .prev = null,
+        .next = null,
+        .data = Request{
+            .msg = Request.Msg{
+                .WriteV = Request.Msg.WriteV{
+                    .fd = fd,
+                    .iov = iovecs,
+                    .result = undefined,
+                },
+            },
+            .finish = Request.Finish{
+                .TickNode = Loop.NextTickNode{
+                    .prev = null,
+                    .next = null,
+                    .data = @frame(),
+                },
+            },
+        },
+    };
+
+    suspend {
+        loop.posixFsRequest(&req_node);
+    }
+
+    return req_node.data.msg.WriteV.result;
+}
+
 pub const PReadVError = error{OutOfMemory} || File.ReadError;
 
 /// data - just the inner references - must live until preadv frame completes.
-pub async fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize {
+pub fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PReadVError!usize {
     assert(data.len != 0);
     switch (builtin.os) {
         .macosx,
@@ -216,21 +259,21 @@ pub async fn preadv(loop: *Loop, fd: fd_t, data: []const []u8, offset: usize) PR
                 };
             }
 
-            return await (async preadvPosix(loop, fd, iovecs, offset) catch unreachable);
+            return preadvPosix(loop, fd, iovecs, offset);
         },
         .windows => {
             const data_copy = try std.mem.dupe(loop.allocator, []u8, data);
             defer loop.allocator.free(data_copy);
-            return await (async preadvWindows(loop, fd, data_copy, offset) catch unreachable);
+            return preadvWindows(loop, fd, data_copy, offset);
         },
         else => @compileError("Unsupported OS"),
     }
 }
 
 /// data must outlive the returned frame
-pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize {
+pub fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u64) !usize {
     assert(data.len != 0);
-    if (data.len == 1) return await (async preadWindows(loop, fd, data[0], offset) catch unreachable);
+    if (data.len == 1) return preadWindows(loop, fd, data[0], offset);
 
     // TODO do these in parallel?
     var off: usize = 0;
@@ -238,7 +281,7 @@ pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u6
     var inner_off: usize = 0;
     while (true) {
         const v = data[iov_i];
-        const amt_read = try await (async preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off) catch unreachable);
+        const amt_read = try preadWindows(loop, fd, v[inner_off .. v.len - inner_off], offset + off);
         off += amt_read;
         inner_off += amt_read;
         if (inner_off == v.len) {
@@ -252,7 +295,7 @@ pub async fn preadvWindows(loop: *Loop, fd: fd_t, data: []const []u8, offset: u6
     }
 }
 
-pub async fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
+pub fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize {
     var resume_node = Loop.ResumeNode.Basic{
         .base = Loop.ResumeNode{
             .id = Loop.ResumeNode.Id.Basic,
@@ -291,7 +334,7 @@ pub async fn preadWindows(loop: *Loop, fd: fd_t, data: []u8, offset: u64) !usize
 }
 
 /// iovecs must live until preadv frame completes
-pub async fn preadvPosix(
+pub fn preadvPosix(
     loop: *Loop,
     fd: fd_t,
     iovecs: []const os.iovec,
@@ -328,7 +371,7 @@ pub async fn preadvPosix(
     return req_node.data.msg.PReadV.result;
 }
 
-pub async fn openPosix(
+pub fn openPosix(
     loop: *Loop,
     path: []const u8,
     flags: u32,
@@ -367,11 +410,11 @@ pub async fn openPosix(
     return req_node.data.msg.Open.result;
 }
 
-pub async fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t {
+pub fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t {
     switch (builtin.os) {
         .macosx, .linux, .freebsd, .netbsd => {
             const flags = os.O_LARGEFILE | os.O_RDONLY | os.O_CLOEXEC;
-            return await (async openPosix(loop, path, flags, File.default_mode) catch unreachable);
+            return openPosix(loop, path, flags, File.default_mode);
         },
 
         .windows => return windows.CreateFile(
@@ -390,12 +433,12 @@ pub async fn openRead(loop: *Loop, path: []const u8) File.OpenError!fd_t {
 
 /// Creates if does not exist. Truncates the file if it exists.
 /// Uses the default mode.
-pub async fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t {
-    return await (async openWriteMode(loop, path, File.default_mode) catch unreachable);
+pub fn openWrite(loop: *Loop, path: []const u8) File.OpenError!fd_t {
+    return openWriteMode(loop, path, File.default_mode);
 }
 
 /// Creates if does not exist. Truncates the file if it exists.
-pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t {
+pub fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.OpenError!fd_t {
     switch (builtin.os) {
         .macosx,
         .linux,
@@ -403,7 +446,7 @@ pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.
         .netbsd,
         => {
             const flags = os.O_LARGEFILE | os.O_WRONLY | os.O_CREAT | os.O_CLOEXEC | os.O_TRUNC;
-            return await (async openPosix(loop, path, flags, File.default_mode) catch unreachable);
+            return openPosix(loop, path, flags, File.default_mode);
         },
         .windows => return windows.CreateFile(
             path,
@@ -419,7 +462,7 @@ pub async fn openWriteMode(loop: *Loop, path: []const u8, mode: File.Mode) File.
 }
 
 /// Creates if does not exist. Does not truncate.
-pub async fn openReadWrite(
+pub fn openReadWrite(
     loop: *Loop,
     path: []const u8,
     mode: File.Mode,
@@ -427,7 +470,7 @@ pub async fn openReadWrite(
     switch (builtin.os) {
         .macosx, .linux, .freebsd, .netbsd => {
             const flags = os.O_LARGEFILE | os.O_RDWR | os.O_CREAT | os.O_CLOEXEC;
-            return await (async openPosix(loop, path, flags, mode) catch unreachable);
+            return openPosix(loop, path, flags, mode);
         },
 
         .windows => return windows.CreateFile(
@@ -576,24 +619,24 @@ pub const CloseOperation = struct {
 
 /// contents must remain alive until writeFile completes.
 /// TODO make this atomic or provide writeFileAtomic and rename this one to writeFileTruncate
-pub async fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void {
-    return await (async writeFileMode(loop, path, contents, File.default_mode) catch unreachable);
+pub fn writeFile(loop: *Loop, path: []const u8, contents: []const u8) !void {
+    return writeFileMode(loop, path, contents, File.default_mode);
 }
 
 /// contents must remain alive until writeFile completes.
-pub async fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
+pub fn writeFileMode(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
     switch (builtin.os) {
         .linux,
         .macosx,
         .freebsd,
         .netbsd,
-        => return await (async writeFileModeThread(loop, path, contents, mode) catch unreachable),
-        .windows => return await (async writeFileWindows(loop, path, contents) catch unreachable),
+        => return writeFileModeThread(loop, path, contents, mode),
+        .windows => return writeFileWindows(loop, path, contents),
         else => @compileError("Unsupported OS"),
     }
 }
 
-async fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void {
+fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !void {
     const handle = try windows.CreateFile(
         path,
         windows.GENERIC_WRITE,
@@ -605,10 +648,10 @@ async fn writeFileWindows(loop: *Loop, path: []const u8, contents: []const u8) !
     );
     defer os.close(handle);
 
-    try await (async pwriteWindows(loop, handle, contents, 0) catch unreachable);
+    try pwriteWindows(loop, handle, contents, 0);
 }
 
-async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
+fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8, mode: File.Mode) !void {
     const path_with_null = try std.cstr.addNullByte(loop.allocator, path);
     defer loop.allocator.free(path_with_null);
 
@@ -646,11 +689,11 @@ async fn writeFileModeThread(loop: *Loop, path: []const u8, contents: []const u8
 /// The frame resumes when the last data has been confirmed written, but before the file handle
 /// is closed.
 /// Caller owns returned memory.
-pub async fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 {
+pub fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8 {
     var close_op = try CloseOperation.start(loop);
     defer close_op.finish();
 
-    const fd = try await (async openRead(loop, file_path) catch unreachable);
+    const fd = try openRead(loop, file_path);
     close_op.setHandle(fd);
 
     var list = std.ArrayList(u8).init(loop.allocator);
@@ -660,7 +703,7 @@ pub async fn readFile(loop: *Loop, file_path: []const u8, max_size: usize) ![]u8
         try list.ensureCapacity(list.len + mem.page_size);
         const buf = list.items[list.len..];
         const buf_array = [_][]u8{buf};
-        const amt = try await (async preadv(loop, fd, buf_array, list.len) catch unreachable);
+        const amt = try preadv(loop, fd, buf_array, list.len);
         list.len += amt;
         if (list.len > max_size) {
             return error.FileTooBig;
@@ -1273,11 +1316,11 @@ const test_tmp_dir = "std_event_fs_test";
 //    return result;
 //}
 
-async fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void {
-    result.* = await (async testFsWatch(loop) catch unreachable);
+fn testFsWatchCantFail(loop: *Loop, result: *(anyerror!void)) void {
+    result.* = testFsWatch(loop);
 }
 
-async fn testFsWatch(loop: *Loop) !void {
+fn testFsWatch(loop: *Loop) !void {
     const file_path = try std.fs.path.join(loop.allocator, [][]const u8{ test_tmp_dir, "file.txt" });
     defer loop.allocator.free(file_path);
 
@@ -1288,27 +1331,27 @@ async fn testFsWatch(loop: *Loop) !void {
     const line2_offset = 7;
 
     // first just write then read the file
-    try await try async writeFile(loop, file_path, contents);
+    try writeFile(loop, file_path, contents);
 
-    const read_contents = try await try async readFile(loop, file_path, 1024 * 1024);
+    const read_contents = try readFile(loop, file_path, 1024 * 1024);
     testing.expectEqualSlices(u8, contents, read_contents);
 
     // now watch the file
     var watch = try Watch(void).create(loop, 0);
     defer watch.destroy();
 
-    testing.expect((try await try async watch.addFile(file_path, {})) == null);
+    testing.expect((try watch.addFile(file_path, {})) == null);
 
-    const ev = try async watch.channel.get();
+    const ev = async watch.channel.get();
     var ev_consumed = false;
     defer if (!ev_consumed) await ev;
 
     // overwrite line 2
-    const fd = try await try async openReadWrite(loop, file_path, File.default_mode);
+    const fd = try await openReadWrite(loop, file_path, File.default_mode);
     {
         defer os.close(fd);
 
-        try await try async pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset);
+        try pwritev(loop, fd, []const []const u8{"lorem ipsum"}, line2_offset);
     }
 
     ev_consumed = true;
@@ -1316,7 +1359,7 @@ async fn testFsWatch(loop: *Loop) !void {
         WatchEventId.CloseWrite => {},
         WatchEventId.Delete => @panic("wrong event"),
     }
-    const contents_updated = try await try async readFile(loop, file_path, 1024 * 1024);
+    const contents_updated = try readFile(loop, file_path, 1024 * 1024);
     testing.expectEqualSlices(u8,
         \\line 1
         \\lorem ipsum
std/event/loop.zig
@@ -89,12 +89,15 @@ pub const Loop = struct {
     pub const IoMode = enum {
         blocking,
         evented,
+        mixed,
     };
     pub const io_mode: IoMode = if (@hasDecl(root, "io_mode")) root.io_mode else IoMode.blocking;
     var global_instance_state: Loop = undefined;
+    threadlocal var per_thread_instance: ?*Loop = null;
     const default_instance: ?*Loop = switch (io_mode) {
         .blocking => null,
         .evented => &global_instance_state,
+        .mixed => per_thread_instance,
     };
     pub const instance: ?*Loop = if (@hasDecl(root, "event_loop")) root.event_loop else default_instance;
 
@@ -796,6 +799,9 @@ pub const Loop = struct {
             while (self.os_data.fs_queue.get()) |node| {
                 switch (node.data.msg) {
                     .End => return,
+                    .WriteV => |*msg| {
+                        msg.result = os.writev(msg.fd, msg.iov);
+                    },
                     .PWriteV => |*msg| {
                         msg.result = os.pwritev(msg.fd, msg.iov, msg.offset);
                     },
std/fs/file.zig
@@ -302,6 +302,14 @@ pub const File = struct {
         return os.write(self.handle, bytes);
     }
 
+    pub fn writev_iovec(self: File, iovecs: []const os.iovec_const) WriteError!void {
+        if (std.event.Loop.instance) |loop| {
+            return std.event.fs.writevPosix(loop, self.handle, iovecs);
+        } else {
+            return os.writev(self.handle, iovecs);
+        }
+    }
+
     pub fn inStream(file: File) InStream {
         return InStream{
             .file = file,
std/os/bits/linux.zig
@@ -783,6 +783,7 @@ pub const socklen_t = u32;
 pub const sockaddr = extern union {
     in: sockaddr_in,
     in6: sockaddr_in6,
+    un: sockaddr_un,
 };
 
 pub const sockaddr_in = extern struct {
std/c.zig
@@ -64,6 +64,7 @@ pub extern "c" fn fstat(fd: fd_t, buf: *Stat) c_int;
 pub extern "c" fn @"fstat$INODE64"(fd: fd_t, buf: *Stat) c_int;
 pub extern "c" fn lseek(fd: fd_t, offset: isize, whence: c_int) isize;
 pub extern "c" fn open(path: [*]const u8, oflag: c_uint, ...) c_int;
+pub extern "c" fn openat(fd: c_int, path: [*]const u8, oflag: c_uint, ...) c_int;
 pub extern "c" fn raise(sig: c_int) c_int;
 pub extern "c" fn read(fd: fd_t, buf: [*]u8, nbyte: usize) isize;
 pub extern "c" fn pread(fd: fd_t, buf: [*]u8, nbyte: usize, offset: u64) isize;
@@ -112,7 +113,6 @@ pub extern "c" fn accept4(sockfd: fd_t, addr: *sockaddr, addrlen: *socklen_t, fl
 pub extern "c" fn getsockopt(sockfd: fd_t, level: c_int, optname: c_int, optval: *c_void, optlen: *socklen_t) c_int;
 pub extern "c" fn kill(pid: pid_t, sig: c_int) c_int;
 pub extern "c" fn getdirentries(fd: fd_t, buf_ptr: [*]u8, nbytes: usize, basep: *i64) isize;
-pub extern "c" fn openat(fd: c_int, path: [*]const u8, flags: c_int) c_int;
 pub extern "c" fn setgid(ruid: c_uint, euid: c_uint) c_int;
 pub extern "c" fn setuid(uid: c_uint) c_int;
 pub extern "c" fn clock_gettime(clk_id: c_int, tp: *timespec) c_int;
std/fs.zig
@@ -442,6 +442,8 @@ pub fn deleteTree(allocator: *Allocator, full_path: []const u8) DeleteTreeError!
     }
 }
 
+/// TODO: separate this API into the one that opens directory handles to then subsequently open
+/// files, and into the one that reads files from an open directory handle.
 pub const Dir = struct {
     handle: Handle,
     allocator: *Allocator,
@@ -564,6 +566,17 @@ pub const Dir = struct {
         }
     }
 
+    pub fn openRead(self: Dir, file_path: []const u8) os.OpenError!File {
+        const path_c = try os.toPosixPath(file_path);
+        return self.openReadC(&path_c);
+    }
+
+    pub fn openReadC(self: Dir, file_path: [*]const u8) OpenError!File {
+        const flags = os.O_LARGEFILE | os.O_RDONLY;
+        const fd = try os.openatC(self.handle.fd, file_path, flags, 0);
+        return File.openHandle(fd);
+    }
+
     fn nextDarwin(self: *Dir) !?Entry {
         start_over: while (true) {
             if (self.handle.index >= self.handle.end_index) {
std/net.zig
@@ -215,3 +215,33 @@ test "std.net.parseIp6" {
     assert(addr.addr[1] == 0x01);
     assert(addr.addr[2] == 0x00);
 }
+
+pub fn connectUnixSocket(path: []const u8) !std.fs.File {
+    const opt_non_block = if (std.event.Loop.instance != null) os.SOCK_NONBLOCK else 0;
+    const sockfd = try os.socket(
+        os.AF_UNIX,
+        os.SOCK_STREAM | os.SOCK_CLOEXEC | opt_non_block,
+        0,
+    );
+    errdefer os.close(sockfd);
+
+    var sock_addr = os.sockaddr{
+        .un = os.sockaddr_un{
+            .family = os.AF_UNIX,
+            .path = undefined,
+        },
+    };
+
+    if (path.len > @typeOf(sock_addr.un.path).len) return error.NameTooLong;
+    mem.copy(u8, sock_addr.un.path[0..], path);
+    const size = @intCast(u32, @sizeOf(os.sa_family_t) + path.len);
+    if (std.event.Loop.instance) |loop| {
+        try os.connect_async(sockfd, &sock_addr, size);
+        try loop.linuxWaitFd(sockfd, os.EPOLLIN | os.EPOLLOUT | os.EPOLLET);
+        try os.getsockoptError(sockfd);
+    } else {
+        try os.connect(sockfd, &sock_addr, size);
+    }
+
+    return std.fs.File.openHandle(sockfd);
+}
std/os.zig
@@ -440,6 +440,33 @@ pub fn write(fd: fd_t, bytes: []const u8) WriteError!void {
 
 /// Write multiple buffers to a file descriptor. Keeps trying if it gets interrupted.
 /// This function is for blocking file descriptors only. For non-blocking, see
+/// `writevAsync`.
+pub fn writev(fd: fd_t, iov: []const iovec_const) WriteError!void {
+    while (true) {
+        // TODO handle the case when iov_len is too large and get rid of this @intCast
+        const rc = system.writev(fd, iov.ptr, @intCast(u32, iov.len));
+        switch (errno(rc)) {
+            0 => return,
+            EINTR => continue,
+            EINVAL => unreachable,
+            EFAULT => unreachable,
+            EAGAIN => unreachable, // This function is for blocking writes.
+            EBADF => unreachable, // Always a race condition.
+            EDESTADDRREQ => unreachable, // `connect` was never called.
+            EDQUOT => return error.DiskQuota,
+            EFBIG => return error.FileTooBig,
+            EIO => return error.InputOutput,
+            ENOSPC => return error.NoSpaceLeft,
+            EPERM => return error.AccessDenied,
+            EPIPE => return error.BrokenPipe,
+            else => |err| return unexpectedErrno(err),
+        }
+    }
+}
+
+/// Write multiple buffers to a file descriptor, with a position offset.
+/// Keeps trying if it gets interrupted.
+/// This function is for blocking file descriptors only. For non-blocking, see
 /// `pwritevAsync`.
 pub fn pwritev(fd: fd_t, iov: []const iovec_const, offset: u64) WriteError!void {
     if (darwin.is_the_target) {
@@ -524,7 +551,6 @@ pub const OpenError = error{
 };
 
 /// Open and possibly create a file. Keeps trying if it gets interrupted.
-/// `file_path` needs to be copied in memory to add a null terminating byte.
 /// See also `openC`.
 pub fn open(file_path: []const u8, flags: u32, perm: usize) OpenError!fd_t {
     const file_path_c = try toPosixPath(file_path);
@@ -564,6 +590,47 @@ pub fn openC(file_path: [*]const u8, flags: u32, perm: usize) OpenError!fd_t {
     }
 }
 
+/// Open and possibly create a file. Keeps trying if it gets interrupted.
+/// `file_path` is relative to the open directory handle `dir_fd`.
+/// See also `openatC`.
+pub fn openat(dir_fd: fd_t, file_path: []const u8, flags: u32, mode: usize) OpenError!fd_t {
+    const file_path_c = try toPosixPath(file_path);
+    return openatC(dir_fd, &file_path_c, flags, mode);
+}
+
+/// Open and possibly create a file. Keeps trying if it gets interrupted.
+/// `file_path` is relative to the open directory handle `dir_fd`.
+/// See also `openat`.
+pub fn openatC(dir_fd: fd_t, file_path: [*]const u8, flags: u32, mode: usize) OpenError!fd_t {
+    while (true) {
+        const rc = system.openat(dir_fd, file_path, flags, mode);
+        switch (errno(rc)) {
+            0 => return @intCast(fd_t, rc),
+            EINTR => continue,
+
+            EFAULT => unreachable,
+            EINVAL => unreachable,
+            EACCES => return error.AccessDenied,
+            EFBIG => return error.FileTooBig,
+            EOVERFLOW => return error.FileTooBig,
+            EISDIR => return error.IsDir,
+            ELOOP => return error.SymLinkLoop,
+            EMFILE => return error.ProcessFdQuotaExceeded,
+            ENAMETOOLONG => return error.NameTooLong,
+            ENFILE => return error.SystemFdQuotaExceeded,
+            ENODEV => return error.NoDevice,
+            ENOENT => return error.FileNotFound,
+            ENOMEM => return error.SystemResources,
+            ENOSPC => return error.NoSpaceLeft,
+            ENOTDIR => return error.NotDir,
+            EPERM => return error.AccessDenied,
+            EEXIST => return error.PathAlreadyExists,
+            EBUSY => return error.DeviceBusy,
+            else => |err| return unexpectedErrno(err),
+        }
+    }
+}
+
 pub fn dup2(old_fd: fd_t, new_fd: fd_t) !void {
     while (true) {
         switch (errno(system.dup2(old_fd, new_fd))) {
@@ -1655,7 +1722,7 @@ pub const ConnectError = error{
 /// For non-blocking, see `connect_async`.
 pub fn connect(sockfd: i32, sock_addr: *sockaddr, len: socklen_t) ConnectError!void {
     while (true) {
-        switch (errno(system.connect(sockfd, sock_addr, @sizeOf(sockaddr)))) {
+        switch (errno(system.connect(sockfd, sock_addr, len))) {
             0 => return,
             EACCES => return error.PermissionDenied,
             EPERM => return error.PermissionDenied,
@@ -1683,7 +1750,8 @@ pub fn connect(sockfd: i32, sock_addr: *sockaddr, len: socklen_t) ConnectError!v
 /// It expects to receive EINPROGRESS`.
 pub fn connect_async(sockfd: i32, sock_addr: *sockaddr, len: socklen_t) ConnectError!void {
     while (true) {
-        switch (errno(system.connect(sockfd, sock_addr, @sizeOf(sockaddr)))) {
+        switch (errno(system.connect(sockfd, sock_addr, len))) {
+            EINVAL => unreachable,
             EINTR => continue,
             0, EINPROGRESS => return,
             EACCES => return error.PermissionDenied,