Commit ac1b957e79

dweiller <4678790+dweiller@users.noreply.github.com>
2024-02-15 06:19:55
std.compress.zstd: remove allocation from DecompressStream
1 parent 73f6d3a
Changed files (4)
lib
src
Package
lib/std/compress/zstandard/decompress.zig
@@ -409,7 +409,7 @@ pub const FrameContext = struct {
             .hasher_opt = if (should_compute_checksum) std.hash.XxHash64.init(0) else null,
             .window_size = window_size,
             .has_checksum = frame_header.descriptor.content_checksum_flag,
-            .block_size_max = @min(1 << 17, window_size),
+            .block_size_max = @min(types.block_size_max, window_size),
             .content_size = content_size,
         };
     }
lib/std/compress/zstandard/types.zig
@@ -1,3 +1,5 @@
+pub const block_size_max = 1 << 17;
+
 pub const frame = struct {
     pub const Kind = enum { zstandard, skippable };
 
lib/std/compress/zstandard.zig
@@ -1,5 +1,4 @@
 const std = @import("std");
-const Allocator = std.mem.Allocator;
 const RingBuffer = std.RingBuffer;
 
 const types = @import("zstandard/types.zig");
@@ -10,7 +9,9 @@ pub const decompress = @import("zstandard/decompress.zig");
 
 pub const DecompressStreamOptions = struct {
     verify_checksum: bool = true,
-    window_size_max: usize = 1 << 23, // 8MiB default maximum window size
+    window_size_max: usize = default_window_size_max,
+
+    pub const default_window_size_max = 1 << 23; // 8MiB default maximum window size
 };
 
 pub fn DecompressStream(
@@ -20,20 +21,29 @@ pub fn DecompressStream(
     return struct {
         const Self = @This();
 
-        allocator: Allocator,
+        pub const window_size_max = options.window_size_max;
+
+        const table_size_max = types.compressed_block.table_size_max;
+
         source: std.io.CountingReader(ReaderType),
         state: enum { NewFrame, InFrame, LastBlock },
         decode_state: decompress.block.DecodeState,
         frame_context: decompress.FrameContext,
-        buffer: RingBuffer,
-        literal_fse_buffer: []types.compressed_block.Table.Fse,
-        match_fse_buffer: []types.compressed_block.Table.Fse,
-        offset_fse_buffer: []types.compressed_block.Table.Fse,
-        literals_buffer: []u8,
-        sequence_buffer: []u8,
+        buffer: WindowBuffer,
+        literal_fse_buffer: [table_size_max.literal]types.compressed_block.Table.Fse,
+        match_fse_buffer: [table_size_max.match]types.compressed_block.Table.Fse,
+        offset_fse_buffer: [table_size_max.offset]types.compressed_block.Table.Fse,
+        literals_buffer: [types.block_size_max]u8,
+        sequence_buffer: [types.block_size_max]u8,
         checksum: if (options.verify_checksum) ?u32 else void,
         current_frame_decompressed_size: usize,
 
+        const WindowBuffer = struct {
+            data: *[options.window_size_max]u8 = undefined,
+            read_index: usize = 0,
+            write_index: usize = 0,
+        };
+
         pub const Error = ReaderType.Error || error{
             ChecksumFailure,
             DictionaryIdFlagUnsupported,
@@ -44,14 +54,13 @@ pub fn DecompressStream(
 
         pub const Reader = std.io.Reader(*Self, Error, read);
 
-        pub fn init(allocator: Allocator, source: ReaderType) Self {
+        pub fn init(source: ReaderType, window_buffer: *[options.window_size_max]u8) Self {
             return Self{
-                .allocator = allocator,
                 .source = std.io.countingReader(source),
                 .state = .NewFrame,
                 .decode_state = undefined,
                 .frame_context = undefined,
-                .buffer = undefined,
+                .buffer = .{ .data = window_buffer },
                 .literal_fse_buffer = undefined,
                 .match_fse_buffer = undefined,
                 .offset_fse_buffer = undefined,
@@ -76,44 +85,11 @@ pub fn DecompressStream(
                         options.verify_checksum,
                     );
 
-                    const literal_fse_buffer = try self.allocator.alloc(
-                        types.compressed_block.Table.Fse,
-                        types.compressed_block.table_size_max.literal,
-                    );
-                    errdefer self.allocator.free(literal_fse_buffer);
-
-                    const match_fse_buffer = try self.allocator.alloc(
-                        types.compressed_block.Table.Fse,
-                        types.compressed_block.table_size_max.match,
-                    );
-                    errdefer self.allocator.free(match_fse_buffer);
-
-                    const offset_fse_buffer = try self.allocator.alloc(
-                        types.compressed_block.Table.Fse,
-                        types.compressed_block.table_size_max.offset,
-                    );
-                    errdefer self.allocator.free(offset_fse_buffer);
-
                     const decode_state = decompress.block.DecodeState.init(
-                        literal_fse_buffer,
-                        match_fse_buffer,
-                        offset_fse_buffer,
+                        &self.literal_fse_buffer,
+                        &self.match_fse_buffer,
+                        &self.offset_fse_buffer,
                     );
-                    const buffer = try RingBuffer.init(self.allocator, frame_context.window_size);
-
-                    const literals_data = try self.allocator.alloc(u8, frame_context.block_size_max);
-                    errdefer self.allocator.free(literals_data);
-
-                    const sequence_data = try self.allocator.alloc(u8, frame_context.block_size_max);
-                    errdefer self.allocator.free(sequence_data);
-
-                    self.literal_fse_buffer = literal_fse_buffer;
-                    self.match_fse_buffer = match_fse_buffer;
-                    self.offset_fse_buffer = offset_fse_buffer;
-                    self.literals_buffer = literals_data;
-                    self.sequence_buffer = sequence_data;
-
-                    self.buffer = buffer;
 
                     self.decode_state = decode_state;
                     self.frame_context = frame_context;
@@ -126,16 +102,6 @@ pub fn DecompressStream(
             }
         }
 
-        pub fn deinit(self: *Self) void {
-            if (self.state == .NewFrame) return;
-            self.allocator.free(self.decode_state.literal_fse_buffer);
-            self.allocator.free(self.decode_state.match_fse_buffer);
-            self.allocator.free(self.decode_state.offset_fse_buffer);
-            self.allocator.free(self.literals_buffer);
-            self.allocator.free(self.sequence_buffer);
-            self.buffer.deinit(self.allocator);
-        }
-
         pub fn reader(self: *Self) Reader {
             return .{ .context = self };
         }
@@ -153,7 +119,6 @@ pub fn DecompressStream(
                             0
                         else
                             error.MalformedFrame,
-                        error.OutOfMemory => return error.OutOfMemory,
                         else => return error.MalformedFrame,
                     };
                 }
@@ -165,20 +130,30 @@ pub fn DecompressStream(
         fn readInner(self: *Self, buffer: []u8) Error!usize {
             std.debug.assert(self.state != .NewFrame);
 
+            var ring_buffer = RingBuffer{
+                .data = self.buffer.data,
+                .read_index = self.buffer.read_index,
+                .write_index = self.buffer.write_index,
+            };
+            defer {
+                self.buffer.read_index = ring_buffer.read_index;
+                self.buffer.write_index = ring_buffer.write_index;
+            }
+
             const source_reader = self.source.reader();
-            while (self.buffer.isEmpty() and self.state != .LastBlock) {
+            while (ring_buffer.isEmpty() and self.state != .LastBlock) {
                 const header_bytes = source_reader.readBytesNoEof(3) catch
                     return error.MalformedFrame;
                 const block_header = decompress.block.decodeBlockHeader(&header_bytes);
 
                 decompress.block.decodeBlockReader(
-                    &self.buffer,
+                    &ring_buffer,
                     source_reader,
                     block_header,
                     &self.decode_state,
                     self.frame_context.block_size_max,
-                    self.literals_buffer,
-                    self.sequence_buffer,
+                    &self.literals_buffer,
+                    &self.sequence_buffer,
                 ) catch
                     return error.MalformedBlock;
 
@@ -186,12 +161,12 @@ pub fn DecompressStream(
                     if (self.current_frame_decompressed_size > size) return error.MalformedFrame;
                 }
 
-                const size = self.buffer.len();
+                const size = ring_buffer.len();
                 self.current_frame_decompressed_size += size;
 
                 if (self.frame_context.hasher_opt) |*hasher| {
                     if (size > 0) {
-                        const written_slice = self.buffer.sliceLast(size);
+                        const written_slice = ring_buffer.sliceLast(size);
                         hasher.update(written_slice.first);
                         hasher.update(written_slice.second);
                     }
@@ -216,18 +191,12 @@ pub fn DecompressStream(
                 }
             }
 
-            const size = @min(self.buffer.len(), buffer.len);
+            const size = @min(ring_buffer.len(), buffer.len);
             if (size > 0) {
-                self.buffer.readFirstAssumeLength(buffer, size);
+                ring_buffer.readFirstAssumeLength(buffer, size);
             }
-            if (self.state == .LastBlock and self.buffer.len() == 0) {
+            if (self.state == .LastBlock and ring_buffer.len() == 0) {
                 self.state = .NewFrame;
-                self.allocator.free(self.literal_fse_buffer);
-                self.allocator.free(self.match_fse_buffer);
-                self.allocator.free(self.offset_fse_buffer);
-                self.allocator.free(self.literals_buffer);
-                self.allocator.free(self.sequence_buffer);
-                self.buffer.deinit(self.allocator);
             }
             return size;
         }
@@ -235,24 +204,24 @@ pub fn DecompressStream(
 }
 
 pub fn decompressStreamOptions(
-    allocator: Allocator,
     reader: anytype,
     comptime options: DecompressStreamOptions,
+    window_buffer: *[options.window_size_max]u8,
 ) DecompressStream(@TypeOf(reader), options) {
-    return DecompressStream(@TypeOf(reader), options).init(allocator, reader);
+    return DecompressStream(@TypeOf(reader), options).init(reader, window_buffer);
 }
 
 pub fn decompressStream(
-    allocator: Allocator,
     reader: anytype,
+    window_buffer: *[DecompressStreamOptions.default_window_size_max]u8,
 ) DecompressStream(@TypeOf(reader), .{}) {
-    return DecompressStream(@TypeOf(reader), .{}).init(allocator, reader);
+    return DecompressStream(@TypeOf(reader), .{}).init(reader, window_buffer);
 }
 
 fn testDecompress(data: []const u8) ![]u8 {
+    var window_buffer: [DecompressStreamOptions.default_window_size_max]u8 = undefined;
     var in_stream = std.io.fixedBufferStream(data);
-    var zstd_stream = decompressStream(std.testing.allocator, in_stream.reader());
-    defer zstd_stream.deinit();
+    var zstd_stream = decompressStream(in_stream.reader(), &window_buffer);
     const result = zstd_stream.reader().readAllAlloc(std.testing.allocator, std.math.maxInt(usize));
     return result;
 }
@@ -301,9 +270,9 @@ fn expectEqualDecoded(expected: []const u8, input: []const u8) !void {
     }
 
     {
+        var window_buffer: [DecompressStreamOptions.default_window_size_max]u8 = undefined;
         var in_stream = std.io.fixedBufferStream(input);
-        var stream = decompressStream(allocator, in_stream.reader());
-        defer stream.deinit();
+        var stream = decompressStream(in_stream.reader(), &window_buffer);
 
         const result = try stream.reader().readAllAlloc(allocator, std.math.maxInt(usize));
         defer allocator.free(result);
src/Package/Fetch.zig
@@ -1126,11 +1126,14 @@ fn unpackResource(
 // wrapped for generic use in unpackTarballCompressed: see github.com/ziglang/zig/issues/14739
 const ZstdWrapper = struct {
     fn DecompressType(comptime T: type) type {
-        return error{}!std.compress.zstd.DecompressStream(T, .{});
+        return Allocator.Error!std.compress.zstd.DecompressStream(T, .{});
     }
 
     fn decompress(allocator: Allocator, reader: anytype) DecompressType(@TypeOf(reader)) {
-        return std.compress.zstd.decompressStream(allocator, reader);
+        const window_size = std.compress.zstd.DecompressStreamOptions.default_window_size_max;
+        const window_buffer = try allocator.create([window_size]u8);
+        defer allocator.destroy(window_buffer);
+        return std.compress.zstd.decompressStream(reader, window_buffer);
     }
 };
 
@@ -1138,7 +1141,7 @@ fn unpackTarballCompressed(
     f: *Fetch,
     out_dir: fs.Dir,
     resource: *Resource,
-    comptime Compression: type,
+    Compression: anytype,
 ) RunError!void {
     const gpa = f.arena.child_allocator;
     const eb = &f.error_bundle;
@@ -1151,7 +1154,7 @@ fn unpackTarballCompressed(
             .{@errorName(err)},
         ));
     };
-    defer decompress.deinit();
+    defer if (@hasDecl(Compression, "deinit")) decompress.deinit();
 
     return unpackTarball(f, out_dir, decompress.reader());
 }