Commit d00faa2407

Igor Anić <igor.anic@gmail.com>
2024-02-22 15:18:03
use BufferedTee in Fetch/git.zig
1 parent eb67fab
Changed files (1)
src
Package
Fetch
src/Package/Fetch/git.zig
@@ -1091,87 +1091,7 @@ pub fn indexPack(allocator: Allocator, pack: std.fs.File, index_writer: anytype)
     try index_writer.writeAll(&index_checksum);
 }
 
-/// A reader that stores read data in a growable internal buffer. The read
-/// position can be rewound to allow previously read data to be read again.
-fn AccumulatingReader(comptime ReaderType: type) type {
-    return struct {
-        child_reader: ReaderType,
-        buffer: std.ArrayListUnmanaged(u8) = .{},
-        /// The position in `buffer` from which reads should start, returning
-        /// buffered data. If this is `buffer.items.len`, data will be read from
-        /// `child_reader` instead.
-        read_start: usize = 0,
-        allocator: Allocator,
-
-        const Self = @This();
-
-        fn deinit(self: *Self) void {
-            self.buffer.deinit(self.allocator);
-            self.* = undefined;
-        }
-
-        const ReadError = ReaderType.Error || Allocator.Error;
-        const Reader = std.io.Reader(*Self, ReadError, read);
-
-        fn read(self: *Self, buf: []u8) ReadError!usize {
-            if (self.read_start < self.buffer.items.len) {
-                // Previously buffered data is available and should be used
-                // before reading more data from the underlying reader.
-                const available = self.buffer.items.len - self.read_start;
-                const count = @min(available, buf.len);
-                @memcpy(buf[0..count], self.buffer.items[self.read_start..][0..count]);
-                self.read_start += count;
-                return count;
-            }
-
-            try self.buffer.ensureUnusedCapacity(self.allocator, buf.len);
-            const read_buffer = self.buffer.unusedCapacitySlice();
-            const count = try self.child_reader.read(read_buffer[0..buf.len]);
-            @memcpy(buf[0..count], read_buffer[0..count]);
-            self.buffer.items.len += count;
-            self.read_start += count;
-            return count;
-        }
-
-        fn reader(self: *Self) Reader {
-            return .{ .context = self };
-        }
-
-        /// Returns a slice of the buffered data that has already been read,
-        /// except the last `count_before_end` bytes.
-        fn readDataExcept(self: Self, count_before_end: usize) []const u8 {
-            assert(count_before_end <= self.read_start);
-            return self.buffer.items[0 .. self.read_start - count_before_end];
-        }
-
-        /// Discards the first `count` bytes of buffered data.
-        fn discard(self: *Self, count: usize) void {
-            assert(count <= self.buffer.items.len);
-            const retain = self.buffer.items.len - count;
-            mem.copyForwards(
-                u8,
-                self.buffer.items[0..retain],
-                self.buffer.items[count..][0..retain],
-            );
-            self.buffer.items.len = retain;
-            self.read_start -= @min(self.read_start, count);
-        }
-
-        /// Rewinds the read position to the beginning of buffered data.
-        fn rewind(self: *Self) void {
-            self.read_start = 0;
-        }
-    };
-}
-
-fn accumulatingReader(
-    allocator: Allocator,
-    reader: anytype,
-) AccumulatingReader(@TypeOf(reader)) {
-    return .{ .child_reader = reader, .allocator = allocator };
-}
-
-/// Performs the first pass over the packfile data for index construction.
+// Performs the first pass over the packfile data for index construction.
 /// This will index all non-delta objects, queue delta objects for further
 /// processing, and return the pack checksum (which is part of the index
 /// format).
@@ -1181,102 +1101,62 @@ fn indexPackFirstPass(
     index_entries: *std.AutoHashMapUnmanaged(Oid, IndexEntry),
     pending_deltas: *std.ArrayListUnmanaged(IndexEntry),
 ) ![Sha1.digest_length]u8 {
-    var pack_buffered_reader = std.io.bufferedReader(pack.reader());
-    var pack_accumulating_reader = accumulatingReader(allocator, pack_buffered_reader.reader());
-    defer pack_accumulating_reader.deinit();
-    var pack_position: usize = 0;
-    var pack_hash = Sha1.init(.{});
-    const pack_reader = pack_accumulating_reader.reader();
+    var pack_counting_writer = std.io.countingWriter(std.io.null_writer);
+    var pack_hashed_writer = std.compress.hashedWriter(pack_counting_writer.writer(), Sha1.init(.{}));
+    var entry_crc32_writer = std.compress.hashedWriter(pack_hashed_writer.writer(), std.hash.Crc32.init());
+    var pack_buffered_reader = std.io.bufferedTee(4096, 8, pack.reader(), entry_crc32_writer.writer());
+    const pack_reader = pack_buffered_reader.reader();
 
     const pack_header = try PackHeader.read(pack_reader);
-    const pack_header_bytes = pack_accumulating_reader.readDataExcept(0);
-    pack_position += pack_header_bytes.len;
-    pack_hash.update(pack_header_bytes);
-    pack_accumulating_reader.discard(pack_header_bytes.len);
+    try pack_buffered_reader.flush();
 
     var current_entry: u32 = 0;
     while (current_entry < pack_header.total_objects) : (current_entry += 1) {
-        const entry_offset = pack_position;
-        var entry_crc32 = std.hash.Crc32.init();
-
+        const entry_offset = pack_counting_writer.bytes_written;
+        entry_crc32_writer.hasher = std.hash.Crc32.init(); // reset hasher
         const entry_header = try EntryHeader.read(pack_reader);
-        const entry_header_bytes = pack_accumulating_reader.readDataExcept(0);
-        pack_position += entry_header_bytes.len;
-        pack_hash.update(entry_header_bytes);
-        entry_crc32.update(entry_header_bytes);
-        pack_accumulating_reader.discard(entry_header_bytes.len);
 
         switch (entry_header) {
-            .commit, .tree, .blob, .tag => |object| {
+            inline .commit, .tree, .blob, .tag => |object, tag| {
                 var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
-                var entry_data_size: usize = 0;
+                var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
                 var entry_hashed_writer = hashedWriter(std.io.null_writer, Sha1.init(.{}));
                 const entry_writer = entry_hashed_writer.writer();
-
                 // The object header is not included in the pack data but is
                 // part of the object's ID
-                try entry_writer.print("{s} {}\x00", .{ @tagName(entry_header), object.uncompressed_length });
-
-                while (try entry_decompress_stream.next()) |decompressed_data| {
-                    entry_data_size += decompressed_data.len;
-                    try entry_writer.writeAll(decompressed_data);
-
-                    const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
-                    pack_position += compressed_bytes.len;
-                    pack_hash.update(compressed_bytes);
-                    entry_crc32.update(compressed_bytes);
-                    pack_accumulating_reader.discard(compressed_bytes.len);
-                }
-                const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
-                pack_position += footer_bytes.len;
-                pack_hash.update(footer_bytes);
-                entry_crc32.update(footer_bytes);
-                pack_accumulating_reader.discard(footer_bytes.len);
-                pack_accumulating_reader.rewind();
-
-                if (entry_data_size != object.uncompressed_length) {
+                try entry_writer.print("{s} {}\x00", .{ @tagName(tag), object.uncompressed_length });
+                var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
+                try fifo.pump(entry_counting_reader.reader(), entry_writer);
+                if (entry_counting_reader.bytes_read != object.uncompressed_length) {
                     return error.InvalidObject;
                 }
-
                 const oid = entry_hashed_writer.hasher.finalResult();
+                pack_buffered_reader.putBack(entry_decompress_stream.unreadBytes());
+                try pack_buffered_reader.flush();
                 try index_entries.put(allocator, oid, .{
                     .offset = entry_offset,
-                    .crc32 = entry_crc32.final(),
+                    .crc32 = entry_crc32_writer.hasher.final(),
                 });
             },
             inline .ofs_delta, .ref_delta => |delta| {
                 var entry_decompress_stream = std.compress.zlib.decompressor(pack_reader);
-                var entry_data_size: usize = 0;
-
-                while (try entry_decompress_stream.next()) |decompressed_data| {
-                    entry_data_size += decompressed_data.len;
-
-                    const compressed_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
-                    pack_position += compressed_bytes.len;
-                    pack_hash.update(compressed_bytes);
-                    entry_crc32.update(compressed_bytes);
-                    pack_accumulating_reader.discard(compressed_bytes.len);
-                }
-                const footer_bytes = pack_accumulating_reader.readDataExcept(entry_decompress_stream.unreadBytes());
-                pack_position += footer_bytes.len;
-                pack_hash.update(footer_bytes);
-                entry_crc32.update(footer_bytes);
-                pack_accumulating_reader.discard(footer_bytes.len);
-                pack_accumulating_reader.rewind();
-
-                if (entry_data_size != delta.uncompressed_length) {
+                var entry_counting_reader = std.io.countingReader(entry_decompress_stream.reader());
+                var fifo = std.fifo.LinearFifo(u8, .{ .Static = 4096 }).init();
+                try fifo.pump(entry_counting_reader.reader(), std.io.null_writer);
+                if (entry_counting_reader.bytes_read != delta.uncompressed_length) {
                     return error.InvalidObject;
                 }
-
+                pack_buffered_reader.putBack(entry_decompress_stream.unreadBytes());
+                try pack_buffered_reader.flush();
                 try pending_deltas.append(allocator, .{
                     .offset = entry_offset,
-                    .crc32 = entry_crc32.final(),
+                    .crc32 = entry_crc32_writer.hasher.final(),
                 });
             },
         }
     }
 
-    const pack_checksum = pack_hash.finalResult();
+    const pack_checksum = pack_hashed_writer.hasher.finalResult();
     const recorded_checksum = try pack_reader.readBytesNoEof(Sha1.digest_length);
     if (!mem.eql(u8, &pack_checksum, &recorded_checksum)) {
         return error.CorruptedPack;