Commit 814de45bd2

Andrew Kelley <andrew@ziglang.org>
2023-02-28 06:06:18
add std.io.poll and implement it for POSIX
I think having inputs is problematic here, it should only be for outputs.
1 parent 5236842
Changed files (1)
lib
std
lib/std/io.zig
@@ -168,6 +168,139 @@ test "null_writer" {
     null_writer.writeAll("yay" ** 10) catch |err| switch (err) {};
 }
 
+pub fn poll(
+    allocator: std.mem.Allocator,
+    comptime StreamEnum: type,
+    files: PollFiles(StreamEnum),
+) Poller(StreamEnum) {
+    const enum_fields = @typeInfo(StreamEnum).Enum.fields;
+    var result: Poller(StreamEnum) = undefined;
+    inline for (0..enum_fields.len) |i| {
+        result.fifos[i] = .{
+            .allocator = allocator,
+            .buf = &.{},
+            .head = 0,
+            .count = 0,
+        };
+        result.poll_fds[i] = .{
+            .fd = @field(files, enum_fields[i].name).file.handle,
+            .events = switch (@field(files, enum_fields[i].name).direction) {
+                .in => os.POLL.IN,
+                .out => os.POLL.OUT,
+            },
+            .revents = undefined,
+        };
+    }
+    return result;
+}
+
+pub fn Poller(comptime StreamEnum: type) type {
+    return struct {
+        const enum_fields = @typeInfo(StreamEnum).Enum.fields;
+        const Fifo = std.fifo.LinearFifo(u8, .Dynamic);
+
+        fifos: [enum_fields.len]Fifo,
+        //directions: [enum_fields.len]PollFile.Direction,
+        //handles: [enum_fields.len]std.fs.File.Handle,
+        poll_fds: [enum_fields.len]std.os.pollfd,
+
+        const Self = @This();
+
+        pub fn poll(self: *Self) !void {
+            if (builtin.os.tag == .windows) {
+                return pollWindows(self);
+            } else {
+                return pollPosix(self);
+            }
+        }
+
+        pub inline fn fifo(self: *Self, comptime which: StreamEnum) *Fifo {
+            return &self.fifos[@enumToInt(which)];
+        }
+
+        pub fn done(self: Self) bool {
+            for (self.poll_fds) |poll_fd| {
+                if (poll_fd.fd != -1) return false;
+            } else return true;
+        }
+
+        fn pollWindows(self: *Self) !void {
+            _ = self;
+            @compileError("TODO");
+        }
+
+        fn pollPosix(self: *Self) !void {
+            // We ask for ensureUnusedCapacity with this much extra space. This
+            // has more of an effect on small reads because once the reads
+            // start to get larger the amount of space an ArrayList will
+            // allocate grows exponentially.
+            const bump_amt = 512;
+
+            const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP;
+
+            const events_len = try os.poll(&self.poll_fds, std.math.maxInt(i32));
+            if (events_len == 0) return;
+
+            inline for (0..enum_fields.len) |i| {
+                // Try reading whatever is available before checking the error
+                // conditions.
+                // It's still possible to read after a POLL.HUP is received,
+                // always check if there's some data waiting to be read first.
+                if (self.poll_fds[i].revents & os.POLL.IN != 0) {
+                    const q = &self.fifos[i];
+                    const buf = try q.writableWithSize(bump_amt);
+                    const amt = try os.read(self.poll_fds[i].fd, buf);
+                    q.update(amt);
+                    std.debug.print("read {d} bytes\n", .{amt});
+                    if (amt == 0) {
+                        // Remove the fd when the EOF condition is met.
+                        self.poll_fds[i].fd = -1;
+                    }
+                } else if (self.poll_fds[i].revents & err_mask != 0) {
+                    // Exclude the fds that signaled an error.
+                    self.poll_fds[i].fd = -1;
+                } else if (self.poll_fds[i].revents & os.POLL.OUT != 0) {
+                    const q = &self.fifos[i];
+                    const amt = try os.write(self.poll_fds[i].fd, q.readableSlice(0));
+                    q.discard(amt);
+                    if (amt == 0) {
+                        self.poll_fds[i].fd = -1;
+                    }
+                }
+            }
+        }
+    };
+}
+
+/// Given an enum, returns a struct with fields of that enum, each field
+/// representing an I/O stream for polling.
+pub fn PollFiles(comptime StreamEnum: type) type {
+    const enum_fields = @typeInfo(StreamEnum).Enum.fields;
+    var struct_fields: [enum_fields.len]std.builtin.Type.StructField = undefined;
+    for (&struct_fields, enum_fields) |*struct_field, enum_field| {
+        struct_field.* = .{
+            .name = enum_field.name,
+            .type = PollFile,
+            .default_value = null,
+            .is_comptime = false,
+            .alignment = @alignOf(PollFile),
+        };
+    }
+    return @Type(.{ .Struct = .{
+        .layout = .Auto,
+        .fields = &struct_fields,
+        .decls = &.{},
+        .is_tuple = false,
+    } });
+}
+
+pub const PollFile = struct {
+    file: File,
+    direction: Direction,
+
+    pub const Direction = enum { in, out };
+};
+
 test {
     _ = @import("io/bit_reader.zig");
     _ = @import("io/bit_writer.zig");