Commit e831313b10
Changed files (2)
lib
std
lib/std/io/buffered_tee.zig
@@ -1,379 +0,0 @@
-const std = @import("std");
-const io = std.io;
-const assert = std.debug.assert;
-const testing = std.testing;
-
-/// BufferedTee provides reader interface to the consumer. Data read by consumer
-/// is also written to the output. Output is hold lookahead_size bytes behind
-/// consumer. Allowing consumer to put back some bytes to be read again. On flush
-/// all consumed bytes are flushed to the output.
-///
-/// input -> tee -> consumer
-/// |
-/// output
-///
-/// input - underlying unbuffered reader
-/// output - writer, receives data read by consumer
-/// consumer - uses provided reader interface
-///
-/// If lookahead_size is zero output always has same bytes as consumer.
-///
-pub fn BufferedTee(
- comptime buffer_size: usize, // internal buffer size in bytes
- comptime lookahead_size: usize, // lookahead, number of bytes to hold output behind consumer
- comptime InputReaderType: type,
- comptime OutputWriterType: type,
-) type {
- comptime assert(buffer_size > lookahead_size);
-
- return struct {
- input: InputReaderType,
- output: OutputWriterType,
-
- buf: [buffer_size]u8 = undefined, // internal buffer
- tail: usize = 0, // buffer is filled up to this position with bytes from input
- rp: usize = 0, // reader pointer; consumer has read up to this position
- wp: usize = 0, // writer pointer; data is sent to the output up to this position
-
- pub const Error = InputReaderType.Error || OutputWriterType.Error;
- pub const Reader = io.Reader(*Self, Error, read);
-
- const Self = @This();
-
- pub fn read(self: *Self, dest: []u8) Error!usize {
- var dest_index: usize = 0;
-
- while (dest_index < dest.len) {
- const written = @min(dest.len - dest_index, self.tail - self.rp);
- if (written == 0) {
- try self.preserveLookahead();
- // fill upper part of the buf
- const n = try self.input.read(self.buf[self.tail..]);
- if (n == 0) {
- // reading from the unbuffered stream returned nothing
- // so we have nothing left to read.
- return dest_index;
- }
- self.tail += n;
- } else {
- @memcpy(dest[dest_index..][0..written], self.buf[self.rp..][0..written]);
- self.rp += written;
- dest_index += written;
- try self.flush_(lookahead_size);
- }
- }
- return dest.len;
- }
-
- /// Move lookahead_size bytes to the buffer start.
- fn preserveLookahead(self: *Self) !void {
- assert(self.tail == self.rp);
- if (lookahead_size == 0) {
- // Flush is called on each read so wp must follow rp when lookahead_size == 0.
- assert(self.wp == self.rp);
- // Nothing to preserve rewind pointer to the buffer start
- self.rp = 0;
- self.wp = 0;
- self.tail = 0;
- return;
- }
- if (self.tail <= lookahead_size) {
- // There is still palce in the buffer, append to buffer from tail position.
- return;
- }
- try self.flush_(lookahead_size);
- const head = self.tail - lookahead_size;
- // Preserve head..tail at the start of the buffer.
- std.mem.copyForwards(u8, self.buf[0..lookahead_size], self.buf[head..self.tail]);
- self.wp -= head;
- assert(self.wp <= lookahead_size);
- self.rp = lookahead_size;
- self.tail = lookahead_size;
- }
-
- /// Flush to the output all but lookahead size bytes.
- fn flush_(self: *Self, lookahead: usize) !void {
- if (self.rp <= self.wp + lookahead) return;
- const new_wp = self.rp - lookahead;
- try self.output.writeAll(self.buf[self.wp..new_wp]);
- self.wp = new_wp;
- }
-
- /// Flush to the output all consumed bytes.
- pub fn flush(self: *Self) !void {
- try self.flush_(0);
- }
-
- /// Put back some bytes to be consumed again. Usefull when we overshoot
- /// reading and want to return that overshoot bytes. Can return maximum
- /// of lookahead_size number of bytes.
- pub fn putBack(self: *Self, n: usize) void {
- assert(n <= lookahead_size and n <= self.rp);
- self.rp -= n;
- }
-
- pub fn reader(self: *Self) Reader {
- return .{ .context = self };
- }
- };
-}
-
-pub fn bufferedTee(
- comptime buffer_size: usize,
- comptime lookahead_size: usize,
- input: anytype,
- output: anytype,
-) BufferedTee(
- buffer_size,
- lookahead_size,
- @TypeOf(input),
- @TypeOf(output),
-) {
- return .{ .input = input, .output = output };
-}
-
-// Running test from std.io.BufferedReader on BufferedTee
-// It should act as BufferedReader for consumer.
-
-fn BufferedReader(comptime buffer_size: usize, comptime ReaderType: type) type {
- return BufferedTee(buffer_size, 0, ReaderType, @TypeOf(io.null_writer));
-}
-
-fn bufferedReader(reader: anytype) BufferedReader(4096, @TypeOf(reader)) {
- return .{
- .input = reader,
- .output = io.null_writer,
- };
-}
-
-test "OneByte" {
- const OneByteReadReader = struct {
- str: []const u8,
- curr: usize,
-
- const Error = error{NoError};
- const Self = @This();
- const Reader = io.Reader(*Self, Error, read);
-
- fn init(str: []const u8) Self {
- return Self{
- .str = str,
- .curr = 0,
- };
- }
-
- fn read(self: *Self, dest: []u8) Error!usize {
- if (self.str.len <= self.curr or dest.len == 0)
- return 0;
-
- dest[0] = self.str[self.curr];
- self.curr += 1;
- return 1;
- }
-
- fn reader(self: *Self) Reader {
- return .{ .context = self };
- }
- };
-
- const str = "This is a test";
- var one_byte_stream = OneByteReadReader.init(str);
- var buf_reader = bufferedReader(one_byte_stream.reader());
- const stream = buf_reader.reader();
-
- const res = try stream.readAllAlloc(testing.allocator, str.len + 1);
- defer testing.allocator.free(res);
- try testing.expectEqualSlices(u8, str, res);
-}
-
-test "Block" {
- const BlockReader = struct {
- block: []const u8,
- reads_allowed: usize,
- curr_read: usize,
-
- const Error = error{NoError};
- const Self = @This();
- const Reader = io.Reader(*Self, Error, read);
-
- fn init(block: []const u8, reads_allowed: usize) Self {
- return Self{
- .block = block,
- .reads_allowed = reads_allowed,
- .curr_read = 0,
- };
- }
-
- fn read(self: *Self, dest: []u8) Error!usize {
- if (self.curr_read >= self.reads_allowed) return 0;
- @memcpy(dest[0..self.block.len], self.block);
-
- self.curr_read += 1;
- return self.block.len;
- }
-
- fn reader(self: *Self) Reader {
- return .{ .context = self };
- }
- };
-
- const block = "0123";
-
- // len out == block
- {
- var test_buf_reader: BufferedReader(4, BlockReader) = .{
- .input = BlockReader.init(block, 2),
- .output = io.null_writer,
- };
- var out_buf: [4]u8 = undefined;
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, block);
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, block);
- try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
- }
-
- // len out < block
- {
- var test_buf_reader: BufferedReader(4, BlockReader) = .{
- .input = BlockReader.init(block, 2),
- .output = io.null_writer,
- };
- var out_buf: [3]u8 = undefined;
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, "012");
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, "301");
- const n = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, out_buf[0..n], "23");
- try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
- }
-
- // len out > block
- {
- var test_buf_reader: BufferedReader(4, BlockReader) = .{
- .input = BlockReader.init(block, 2),
- .output = io.null_writer,
- };
- var out_buf: [5]u8 = undefined;
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, "01230");
- const n = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, out_buf[0..n], "123");
- try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
- }
-
- // len out == 0
- {
- var test_buf_reader: BufferedReader(4, BlockReader) = .{
- .input = BlockReader.init(block, 2),
- .output = io.null_writer,
- };
- var out_buf: [0]u8 = undefined;
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, "");
- }
-
- // len bufreader buf > block
- {
- var test_buf_reader: BufferedReader(5, BlockReader) = .{
- .input = BlockReader.init(block, 2),
- .output = io.null_writer,
- };
- var out_buf: [4]u8 = undefined;
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, block);
- _ = try test_buf_reader.read(&out_buf);
- try testing.expectEqualSlices(u8, &out_buf, block);
- try testing.expectEqual(try test_buf_reader.read(&out_buf), 0);
- }
-}
-
-test "with zero lookahead" {
- // output has same bytes as consumer
- const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12;
- var in = io.fixedBufferStream(&data);
- var out = std.ArrayList(u8).init(testing.allocator);
- defer out.deinit();
-
- var bt = bufferedTee(8, 0, in.reader(), out.writer());
-
- var buf: [16]u8 = undefined;
- var read_len: usize = 0;
- for (0..buf.len) |i| {
- const n = try bt.read(buf[0..i]);
- try testing.expectEqual(i, n);
- read_len += i;
- try testing.expectEqual(read_len, out.items.len);
- }
-}
-
-test "with lookahead" {
- // output is lookahead bytes behind consumer
- inline for (1..8) |lookahead| {
- const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 12;
- var in = io.fixedBufferStream(&data);
- var out = std.ArrayList(u8).init(testing.allocator);
- defer out.deinit();
-
- var bt = bufferedTee(8, lookahead, in.reader(), out.writer());
- var buf: [16]u8 = undefined;
-
- var read_len: usize = 0;
- for (1..buf.len) |i| {
- const n = try bt.read(buf[0..i]);
- try testing.expectEqual(i, n);
- read_len += i;
- const out_len = if (read_len < lookahead) 0 else read_len - lookahead;
- try testing.expectEqual(out_len, out.items.len);
- }
- try testing.expectEqual(read_len, out.items.len + lookahead);
- try bt.flush();
- try testing.expectEqual(read_len, out.items.len);
- }
-}
-
-test "internal state" {
- const data = [_]u8{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 } ** 2;
- var in = io.fixedBufferStream(&data);
- var out = std.ArrayList(u8).init(testing.allocator);
- defer out.deinit();
-
- var bt = bufferedTee(8, 4, in.reader(), out.writer());
-
- var buf: [16]u8 = undefined;
- var n = try bt.read(buf[0..3]);
- try testing.expectEqual(3, n);
- try testing.expectEqualSlices(u8, data[0..3], buf[0..n]);
- try testing.expectEqual(8, bt.tail);
- try testing.expectEqual(3, bt.rp);
- try testing.expectEqual(0, out.items.len);
-
- n = try bt.read(buf[0..6]);
- try testing.expectEqual(6, n);
- try testing.expectEqualSlices(u8, data[3..9], buf[0..n]);
- try testing.expectEqual(8, bt.tail);
- try testing.expectEqual(5, bt.rp);
- try testing.expectEqualSlices(u8, data[4..12], &bt.buf);
- try testing.expectEqual(5, out.items.len);
-
- n = try bt.read(buf[0..9]);
- try testing.expectEqual(9, n);
- try testing.expectEqualSlices(u8, data[9..18], buf[0..n]);
- try testing.expectEqual(8, bt.tail);
- try testing.expectEqual(6, bt.rp);
- try testing.expectEqualSlices(u8, data[12..20], &bt.buf);
- try testing.expectEqual(14, out.items.len);
-
- try bt.flush();
- try testing.expectEqual(18, out.items.len);
-
- bt.putBack(4);
- n = try bt.read(buf[0..4]);
- try testing.expectEqual(4, n);
- try testing.expectEqualSlices(u8, data[14..18], buf[0..n]);
-
- try testing.expectEqual(18, out.items.len);
- try bt.flush();
- try testing.expectEqual(18, out.items.len);
-}
lib/std/io.zig
@@ -410,9 +410,6 @@ pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAt
pub const StreamSource = @import("io/stream_source.zig").StreamSource;
-pub const BufferedTee = @import("io/buffered_tee.zig").BufferedTee;
-pub const bufferedTee = @import("io/buffered_tee.zig").bufferedTee;
-
pub const tty = @import("io/tty.zig");
/// A Writer that doesn't write to anything.
@@ -709,5 +706,4 @@ test {
_ = @import("io/seekable_stream.zig");
_ = @import("io/stream_source.zig");
_ = @import("io/test.zig");
- _ = @import("io/buffered_tee.zig");
}