master
1pub fn ParallelHasher(comptime Hasher: type) type {
2 const hash_size = Hasher.digest_length;
3
4 return struct {
5 allocator: Allocator,
6 thread_pool: *ThreadPool,
7
8 pub fn hash(self: Self, file: fs.File, out: [][hash_size]u8, opts: struct {
9 chunk_size: u64 = 0x4000,
10 max_file_size: ?u64 = null,
11 }) !void {
12 const tracy = trace(@src());
13 defer tracy.end();
14
15 var wg: WaitGroup = .{};
16
17 const file_size = blk: {
18 const file_size = opts.max_file_size orelse try file.getEndPos();
19 break :blk std.math.cast(usize, file_size) orelse return error.Overflow;
20 };
21 const chunk_size = std.math.cast(usize, opts.chunk_size) orelse return error.Overflow;
22
23 const buffer = try self.allocator.alloc(u8, chunk_size * out.len);
24 defer self.allocator.free(buffer);
25
26 const results = try self.allocator.alloc(fs.File.PReadError!usize, out.len);
27 defer self.allocator.free(results);
28
29 {
30 wg.reset();
31 defer wg.wait();
32
33 for (out, results, 0..) |*out_buf, *result, i| {
34 const fstart = i * chunk_size;
35 const fsize = if (fstart + chunk_size > file_size)
36 file_size - fstart
37 else
38 chunk_size;
39 self.thread_pool.spawnWg(&wg, worker, .{
40 file,
41 fstart,
42 buffer[fstart..][0..fsize],
43 &(out_buf.*),
44 &(result.*),
45 });
46 }
47 }
48 for (results) |result| _ = try result;
49 }
50
51 fn worker(
52 file: fs.File,
53 fstart: usize,
54 buffer: []u8,
55 out: *[hash_size]u8,
56 err: *fs.File.PReadError!usize,
57 ) void {
58 const tracy = trace(@src());
59 defer tracy.end();
60 err.* = file.preadAll(buffer, fstart);
61 Hasher.hash(buffer, out, .{});
62 }
63
64 const Self = @This();
65 };
66}
67
68const assert = std.debug.assert;
69const fs = std.fs;
70const mem = std.mem;
71const std = @import("std");
72const trace = @import("../../tracy.zig").trace;
73
74const Allocator = mem.Allocator;
75const ThreadPool = std.Thread.Pool;
76const WaitGroup = std.Thread.WaitGroup;