Commit 9e5048c3a5

Andrew Kelley <andrew@ziglang.org>
2025-08-05 04:31:03
fetch: update for new http API
it's not quite finished because I need to make it not copy the Resource
1 parent fef41c6
Changed files (3)
lib
std
src
Package
lib/std/http/Client.zig
@@ -682,7 +682,7 @@ pub const Response = struct {
     ///
     /// See also:
     /// * `readerDecompressing`
-    pub fn reader(response: *Response, buffer: []u8) *Reader {
+    pub fn reader(response: *const Response, buffer: []u8) *Reader {
         const req = response.request;
         if (!req.method.responseHasBody()) return .ending;
         const head = &response.head;
@@ -805,6 +805,11 @@ pub const Request = struct {
         unhandled = std.math.maxInt(u16),
         _,
 
+        pub fn init(n: u16) RedirectBehavior {
+            assert(n != std.math.maxInt(u16));
+            return @enumFromInt(n);
+        }
+
         pub fn subtractOne(rb: *RedirectBehavior) void {
             switch (rb.*) {
                 .not_allowed => unreachable,
@@ -855,6 +860,14 @@ pub const Request = struct {
         return result;
     }
 
+    /// Transfers the HTTP head and body over the connection and flushes.
+    pub fn sendBodyComplete(r: *Request, body: []u8) Writer.Error!void {
+        r.transfer_encoding = .{ .content_length = body.len };
+        var bw = try sendBodyUnflushed(r, body);
+        bw.writer.end = body.len;
+        try bw.end();
+    }
+
     /// Transfers the HTTP head over the connection, which is not flushed until
     /// `BodyWriter.flush` or `BodyWriter.end` is called.
     ///
@@ -1296,7 +1309,7 @@ pub const basic_authorization = struct {
     pub fn value(uri: Uri, out: []u8) []u8 {
         var bw: Writer = .fixed(out);
         write(uri, &bw) catch unreachable;
-        return bw.getWritten();
+        return bw.buffered();
     }
 
     pub fn write(uri: Uri, out: *Writer) Writer.Error!void {
src/Package/Fetch/git.zig
@@ -585,17 +585,17 @@ const ObjectCache = struct {
 /// [protocol-common](https://git-scm.com/docs/protocol-common). The special
 /// meanings of the delimiter and response-end packets are documented in
 /// [protocol-v2](https://git-scm.com/docs/protocol-v2).
-const Packet = union(enum) {
+pub const Packet = union(enum) {
     flush,
     delimiter,
     response_end,
     data: []const u8,
 
-    const max_data_length = 65516;
+    pub const max_data_length = 65516;
 
     /// Reads a packet in pkt-line format.
-    fn read(reader: anytype, buf: *[max_data_length]u8) !Packet {
-        const length = std.fmt.parseUnsigned(u16, &try reader.readBytesNoEof(4), 16) catch return error.InvalidPacket;
+    fn read(reader: *std.Io.Reader) !Packet {
+        const length = std.fmt.parseUnsigned(u16, try reader.take(4), 16) catch return error.InvalidPacket;
         switch (length) {
             0 => return .flush,
             1 => return .delimiter,
@@ -603,13 +603,11 @@ const Packet = union(enum) {
             3 => return error.InvalidPacket,
             else => if (length - 4 > max_data_length) return error.InvalidPacket,
         }
-        const data = buf[0 .. length - 4];
-        try reader.readNoEof(data);
-        return .{ .data = data };
+        return .{ .data = try reader.take(length - 4) };
     }
 
     /// Writes a packet in pkt-line format.
-    fn write(packet: Packet, writer: anytype) !void {
+    fn write(packet: Packet, writer: *std.Io.Writer) !void {
         switch (packet) {
             .flush => try writer.writeAll("0000"),
             .delimiter => try writer.writeAll("0001"),
@@ -657,8 +655,10 @@ pub const Session = struct {
         allocator: Allocator,
         transport: *std.http.Client,
         uri: std.Uri,
-        http_headers_buffer: []u8,
+        /// Asserted to be at least `Packet.max_data_length`
+        response_buffer: []u8,
     ) !Session {
+        assert(response_buffer.len >= Packet.max_data_length);
         var session: Session = .{
             .transport = transport,
             .location = try .init(allocator, uri),
@@ -668,7 +668,8 @@ pub const Session = struct {
             .allocator = allocator,
         };
         errdefer session.deinit();
-        var capability_iterator = try session.getCapabilities(http_headers_buffer);
+        var capability_iterator: CapabilityIterator = undefined;
+        try session.getCapabilities(&capability_iterator, response_buffer);
         defer capability_iterator.deinit();
         while (try capability_iterator.next()) |capability| {
             if (mem.eql(u8, capability.key, "agent")) {
@@ -743,7 +744,8 @@ pub const Session = struct {
     ///
     /// The `session.location` is updated if the server returns a redirect, so
     /// that subsequent session functions do not need to handle redirects.
-    fn getCapabilities(session: *Session, http_headers_buffer: []u8) !CapabilityIterator {
+    fn getCapabilities(session: *Session, it: *CapabilityIterator, response_buffer: []u8) !void {
+        assert(response_buffer.len >= Packet.max_data_length);
         var info_refs_uri = session.location.uri;
         {
             const session_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{
@@ -757,19 +759,22 @@ pub const Session = struct {
         info_refs_uri.fragment = null;
 
         const max_redirects = 3;
-        var request = try session.transport.open(.GET, info_refs_uri, .{
-            .redirect_behavior = @enumFromInt(max_redirects),
-            .server_header_buffer = http_headers_buffer,
-            .extra_headers = &.{
-                .{ .name = "Git-Protocol", .value = "version=2" },
-            },
-        });
-        errdefer request.deinit();
-        try request.send();
-        try request.finish();
+        it.* = .{
+            .request = try session.transport.request(.GET, info_refs_uri, .{
+                .redirect_behavior = .init(max_redirects),
+                .extra_headers = &.{
+                    .{ .name = "Git-Protocol", .value = "version=2" },
+                },
+            }),
+            .reader = undefined,
+        };
+        errdefer it.deinit();
+        const request = &it.request;
+        try request.sendBodiless();
 
-        try request.wait();
-        if (request.response.status != .ok) return error.ProtocolError;
+        var redirect_buffer: [1024]u8 = undefined;
+        const response = try request.receiveHead(&redirect_buffer);
+        if (response.head.status != .ok) return error.ProtocolError;
         const any_redirects_occurred = request.redirect_behavior.remaining() < max_redirects;
         if (any_redirects_occurred) {
             const request_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{
@@ -784,8 +789,7 @@ pub const Session = struct {
             session.location = new_location;
         }
 
-        const reader = request.reader();
-        var buf: [Packet.max_data_length]u8 = undefined;
+        it.reader = response.reader(response_buffer);
         var state: enum { response_start, response_content } = .response_start;
         while (true) {
             // Some Git servers (at least GitHub) include an additional
@@ -795,15 +799,15 @@ pub const Session = struct {
             // Thus, we need to skip any such useless additional responses
             // before we get the one we're actually looking for. The responses
             // will be delimited by flush packets.
-            const packet = Packet.read(reader, &buf) catch |e| switch (e) {
+            const packet = Packet.read(it.reader) catch |err| switch (err) {
                 error.EndOfStream => return error.UnsupportedProtocol, // 'version 2' packet not found
-                else => |other| return other,
+                else => |e| return e,
             };
             switch (packet) {
                 .flush => state = .response_start,
                 .data => |data| switch (state) {
                     .response_start => if (mem.eql(u8, Packet.normalizeText(data), "version 2")) {
-                        return .{ .request = request };
+                        return;
                     } else {
                         state = .response_content;
                     },
@@ -816,7 +820,7 @@ pub const Session = struct {
 
     const CapabilityIterator = struct {
         request: std.http.Client.Request,
-        buf: [Packet.max_data_length]u8 = undefined,
+        reader: *std.Io.Reader,
 
         const Capability = struct {
             key: []const u8,
@@ -830,13 +834,13 @@ pub const Session = struct {
             }
         };
 
-        fn deinit(iterator: *CapabilityIterator) void {
-            iterator.request.deinit();
-            iterator.* = undefined;
+        fn deinit(it: *CapabilityIterator) void {
+            it.request.deinit();
+            it.* = undefined;
         }
 
-        fn next(iterator: *CapabilityIterator) !?Capability {
-            switch (try Packet.read(iterator.request.reader(), &iterator.buf)) {
+        fn next(it: *CapabilityIterator) !?Capability {
+            switch (try Packet.read(it.reader)) {
                 .flush => return null,
                 .data => |data| return Capability.parse(Packet.normalizeText(data)),
                 else => return error.UnexpectedPacket,
@@ -854,11 +858,13 @@ pub const Session = struct {
         include_symrefs: bool = false,
         /// Whether to include the peeled object ID for returned tag refs.
         include_peeled: bool = false,
-        server_header_buffer: []u8,
+        /// Asserted to be at least `Packet.max_data_length`.
+        buffer: []u8,
     };
 
     /// Returns an iterator over refs known to the server.
-    pub fn listRefs(session: Session, options: ListRefsOptions) !RefIterator {
+    pub fn listRefs(session: Session, it: *RefIterator, options: ListRefsOptions) !void {
+        assert(options.buffer.len >= Packet.max_data_length);
         var upload_pack_uri = session.location.uri;
         {
             const session_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{
@@ -871,59 +877,56 @@ pub const Session = struct {
         upload_pack_uri.query = null;
         upload_pack_uri.fragment = null;
 
-        var body: std.ArrayListUnmanaged(u8) = .empty;
-        defer body.deinit(session.allocator);
-        const body_writer = body.writer(session.allocator);
-        try Packet.write(.{ .data = "command=ls-refs\n" }, body_writer);
+        var body: std.Io.Writer = .fixed(options.buffer);
+        try Packet.write(.{ .data = "command=ls-refs\n" }, &body);
         if (session.supports_agent) {
-            try Packet.write(.{ .data = agent_capability }, body_writer);
+            try Packet.write(.{ .data = agent_capability }, &body);
         }
         {
-            const object_format_packet = try std.fmt.allocPrint(session.allocator, "object-format={s}\n", .{@tagName(session.object_format)});
+            const object_format_packet = try std.fmt.allocPrint(session.allocator, "object-format={t}\n", .{
+                session.object_format,
+            });
             defer session.allocator.free(object_format_packet);
-            try Packet.write(.{ .data = object_format_packet }, body_writer);
+            try Packet.write(.{ .data = object_format_packet }, &body);
         }
-        try Packet.write(.delimiter, body_writer);
+        try Packet.write(.delimiter, &body);
         for (options.ref_prefixes) |ref_prefix| {
             const ref_prefix_packet = try std.fmt.allocPrint(session.allocator, "ref-prefix {s}\n", .{ref_prefix});
             defer session.allocator.free(ref_prefix_packet);
-            try Packet.write(.{ .data = ref_prefix_packet }, body_writer);
+            try Packet.write(.{ .data = ref_prefix_packet }, &body);
         }
         if (options.include_symrefs) {
-            try Packet.write(.{ .data = "symrefs\n" }, body_writer);
+            try Packet.write(.{ .data = "symrefs\n" }, &body);
         }
         if (options.include_peeled) {
-            try Packet.write(.{ .data = "peel\n" }, body_writer);
+            try Packet.write(.{ .data = "peel\n" }, &body);
         }
-        try Packet.write(.flush, body_writer);
-
-        var request = try session.transport.open(.POST, upload_pack_uri, .{
-            .redirect_behavior = .unhandled,
-            .server_header_buffer = options.server_header_buffer,
-            .extra_headers = &.{
-                .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" },
-                .{ .name = "Git-Protocol", .value = "version=2" },
-            },
-        });
-        errdefer request.deinit();
-        request.transfer_encoding = .{ .content_length = body.items.len };
-        try request.send();
-        try request.writeAll(body.items);
-        try request.finish();
-
-        try request.wait();
-        if (request.response.status != .ok) return error.ProtocolError;
-
-        return .{
+        try Packet.write(.flush, &body);
+
+        it.* = .{
+            .request = try session.transport.request(.POST, upload_pack_uri, .{
+                .redirect_behavior = .unhandled,
+                .extra_headers = &.{
+                    .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" },
+                    .{ .name = "Git-Protocol", .value = "version=2" },
+                },
+            }),
+            .reader = undefined,
             .format = session.object_format,
-            .request = request,
         };
+        const request = &it.request;
+        errdefer request.deinit();
+        try request.sendBodyComplete(body.buffered());
+
+        const response = try request.receiveHead(options.buffer);
+        if (response.head.status != .ok) return error.ProtocolError;
+        it.reader = response.reader(options.buffer);
     }
 
     pub const RefIterator = struct {
         format: Oid.Format,
         request: std.http.Client.Request,
-        buf: [Packet.max_data_length]u8 = undefined,
+        reader: *std.Io.Reader,
 
         pub const Ref = struct {
             oid: Oid,
@@ -937,13 +940,13 @@ pub const Session = struct {
             iterator.* = undefined;
         }
 
-        pub fn next(iterator: *RefIterator) !?Ref {
-            switch (try Packet.read(iterator.request.reader(), &iterator.buf)) {
+        pub fn next(it: *RefIterator) !?Ref {
+            switch (try Packet.read(it.reader)) {
                 .flush => return null,
                 .data => |data| {
                     const ref_data = Packet.normalizeText(data);
                     const oid_sep_pos = mem.indexOfScalar(u8, ref_data, ' ') orelse return error.InvalidRefPacket;
-                    const oid = Oid.parse(iterator.format, data[0..oid_sep_pos]) catch return error.InvalidRefPacket;
+                    const oid = Oid.parse(it.format, data[0..oid_sep_pos]) catch return error.InvalidRefPacket;
 
                     const name_sep_pos = mem.indexOfScalarPos(u8, ref_data, oid_sep_pos + 1, ' ') orelse ref_data.len;
                     const name = ref_data[oid_sep_pos + 1 .. name_sep_pos];
@@ -957,7 +960,7 @@ pub const Session = struct {
                         if (mem.startsWith(u8, attribute, "symref-target:")) {
                             symref_target = attribute["symref-target:".len..];
                         } else if (mem.startsWith(u8, attribute, "peeled:")) {
-                            peeled = Oid.parse(iterator.format, attribute["peeled:".len..]) catch return error.InvalidRefPacket;
+                            peeled = Oid.parse(it.format, attribute["peeled:".len..]) catch return error.InvalidRefPacket;
                         }
                         last_sep_pos = next_sep_pos;
                     }
@@ -973,9 +976,12 @@ pub const Session = struct {
     /// performed if the server supports it.
     pub fn fetch(
         session: Session,
+        fs: *FetchStream,
         wants: []const []const u8,
-        http_headers_buffer: []u8,
-    ) !FetchStream {
+        /// Asserted to be at least `Packet.max_data_length`.
+        response_buffer: []u8,
+    ) !void {
+        assert(response_buffer.len >= Packet.max_data_length);
         var upload_pack_uri = session.location.uri;
         {
             const session_uri_path = try std.fmt.allocPrint(session.allocator, "{f}", .{
@@ -988,63 +994,71 @@ pub const Session = struct {
         upload_pack_uri.query = null;
         upload_pack_uri.fragment = null;
 
-        var body: std.ArrayListUnmanaged(u8) = .empty;
-        defer body.deinit(session.allocator);
-        const body_writer = body.writer(session.allocator);
-        try Packet.write(.{ .data = "command=fetch\n" }, body_writer);
+        var body: std.Io.Writer = .fixed(response_buffer);
+        try Packet.write(.{ .data = "command=fetch\n" }, &body);
         if (session.supports_agent) {
-            try Packet.write(.{ .data = agent_capability }, body_writer);
+            try Packet.write(.{ .data = agent_capability }, &body);
         }
         {
             const object_format_packet = try std.fmt.allocPrint(session.allocator, "object-format={s}\n", .{@tagName(session.object_format)});
             defer session.allocator.free(object_format_packet);
-            try Packet.write(.{ .data = object_format_packet }, body_writer);
+            try Packet.write(.{ .data = object_format_packet }, &body);
         }
-        try Packet.write(.delimiter, body_writer);
+        try Packet.write(.delimiter, &body);
         // Our packfile parser supports the OFS_DELTA object type
-        try Packet.write(.{ .data = "ofs-delta\n" }, body_writer);
+        try Packet.write(.{ .data = "ofs-delta\n" }, &body);
         // We do not currently convey server progress information to the user
-        try Packet.write(.{ .data = "no-progress\n" }, body_writer);
+        try Packet.write(.{ .data = "no-progress\n" }, &body);
         if (session.supports_shallow) {
-            try Packet.write(.{ .data = "deepen 1\n" }, body_writer);
+            try Packet.write(.{ .data = "deepen 1\n" }, &body);
         }
         for (wants) |want| {
             var buf: [Packet.max_data_length]u8 = undefined;
             const arg = std.fmt.bufPrint(&buf, "want {s}\n", .{want}) catch unreachable;
-            try Packet.write(.{ .data = arg }, body_writer);
+            try Packet.write(.{ .data = arg }, &body);
         }
-        try Packet.write(.{ .data = "done\n" }, body_writer);
-        try Packet.write(.flush, body_writer);
-
-        var request = try session.transport.open(.POST, upload_pack_uri, .{
-            .redirect_behavior = .not_allowed,
-            .server_header_buffer = http_headers_buffer,
-            .extra_headers = &.{
-                .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" },
-                .{ .name = "Git-Protocol", .value = "version=2" },
-            },
-        });
+        try Packet.write(.{ .data = "done\n" }, &body);
+        try Packet.write(.flush, &body);
+
+        fs.* = .{
+            .request = try session.transport.request(.POST, upload_pack_uri, .{
+                .redirect_behavior = .not_allowed,
+                .extra_headers = &.{
+                    .{ .name = "Content-Type", .value = "application/x-git-upload-pack-request" },
+                    .{ .name = "Git-Protocol", .value = "version=2" },
+                },
+            }),
+            .input = undefined,
+            .reader = undefined,
+            .remaining_len = undefined,
+        };
+        const request = &fs.request;
         errdefer request.deinit();
-        request.transfer_encoding = .{ .content_length = body.items.len };
-        try request.send();
-        try request.writeAll(body.items);
-        try request.finish();
 
-        try request.wait();
-        if (request.response.status != .ok) return error.ProtocolError;
+        try request.sendBodyComplete(body.buffered());
+
+        const response = try request.receiveHead(&.{});
+        if (response.head.status != .ok) return error.ProtocolError;
 
-        const reader = request.reader();
+        const reader = response.reader(response_buffer);
         // We are not interested in any of the sections of the returned fetch
         // data other than the packfile section, since we aren't doing anything
         // complex like ref negotiation (this is a fresh clone).
         var state: enum { section_start, section_content } = .section_start;
         while (true) {
-            var buf: [Packet.max_data_length]u8 = undefined;
-            const packet = try Packet.read(reader, &buf);
+            const packet = try Packet.read(reader);
             switch (state) {
                 .section_start => switch (packet) {
                     .data => |data| if (mem.eql(u8, Packet.normalizeText(data), "packfile")) {
-                        return .{ .request = request };
+                        fs.input = reader;
+                        fs.reader = .{
+                            .buffer = &.{},
+                            .vtable = &.{ .stream = FetchStream.stream },
+                            .seek = 0,
+                            .end = 0,
+                        };
+                        fs.remaining_len = 0;
+                        return;
                     } else {
                         state = .section_content;
                     },
@@ -1061,20 +1075,23 @@ pub const Session = struct {
 
     pub const FetchStream = struct {
         request: std.http.Client.Request,
-        buf: [Packet.max_data_length]u8 = undefined,
-        pos: usize = 0,
-        len: usize = 0,
+        input: *std.Io.Reader,
+        reader: std.Io.Reader,
+        err: ?Error = null,
+        remaining_len: usize,
 
-        pub fn deinit(stream: *FetchStream) void {
-            stream.request.deinit();
+        pub fn deinit(fs: *FetchStream) void {
+            fs.request.deinit();
         }
 
-        pub const ReadError = std.http.Client.Request.ReadError || error{
+        pub const Error = error{
             InvalidPacket,
             ProtocolError,
             UnexpectedPacket,
+            WriteFailed,
+            ReadFailed,
+            EndOfStream,
         };
-        pub const Reader = std.io.GenericReader(*FetchStream, ReadError, read);
 
         const StreamCode = enum(u8) {
             pack_data = 1,
@@ -1083,33 +1100,41 @@ pub const Session = struct {
             _,
         };
 
-        pub fn reader(stream: *FetchStream) Reader {
-            return .{ .context = stream };
-        }
-
-        pub fn read(stream: *FetchStream, buf: []u8) !usize {
-            if (stream.pos == stream.len) {
+        pub fn stream(r: *std.Io.Reader, w: *std.Io.Writer, limit: std.Io.Limit) std.Io.Reader.StreamError!usize {
+            const fs: *FetchStream = @alignCast(@fieldParentPtr("reader", r));
+            const input = fs.input;
+            if (fs.remaining_len == 0) {
                 while (true) {
-                    switch (try Packet.read(stream.request.reader(), &stream.buf)) {
-                        .flush => return 0,
+                    switch (Packet.read(input) catch |err| {
+                        fs.err = err;
+                        return error.ReadFailed;
+                    }) {
+                        .flush => return error.EndOfStream,
                         .data => |data| if (data.len > 1) switch (@as(StreamCode, @enumFromInt(data[0]))) {
                             .pack_data => {
-                                stream.pos = 1;
-                                stream.len = data.len;
+                                input.toss(1);
+                                fs.remaining_len = data.len;
                                 break;
                             },
-                            .fatal_error => return error.ProtocolError,
+                            .fatal_error => {
+                                fs.err = error.ProtocolError;
+                                return error.ReadFailed;
+                            },
                             else => {},
                         },
-                        else => return error.UnexpectedPacket,
+                        else => {
+                            fs.err = error.UnexpectedPacket;
+                            return error.ReadFailed;
+                        },
                     }
                 }
             }
-
-            const size = @min(buf.len, stream.len - stream.pos);
-            @memcpy(buf[0..size], stream.buf[stream.pos .. stream.pos + size]);
-            stream.pos += size;
-            return size;
+            const buf = limit.slice(try w.writableSliceGreedy(1));
+            const n = @min(buf.len, fs.remaining_len);
+            @memcpy(buf[0..n], input.buffered()[0..n]);
+            input.toss(n);
+            fs.remaining_len -= n;
+            return n;
         }
     };
 };
src/Package/Fetch.zig
@@ -385,20 +385,21 @@ pub fn run(f: *Fetch) RunError!void {
                 var resource: Resource = .{ .dir = dir };
                 return f.runResource(path_or_url, &resource, null);
             } else |dir_err| {
+                var server_header_buffer: [init_resource_buffer_size]u8 = undefined;
+
                 const file_err = if (dir_err == error.NotDir) e: {
                     if (fs.cwd().openFile(path_or_url, .{})) |file| {
-                        var resource: Resource = .{ .file = file };
+                        var resource: Resource = .{ .file = file.reader(&server_header_buffer) };
                         return f.runResource(path_or_url, &resource, null);
                     } else |err| break :e err;
                 } else dir_err;
 
                 const uri = std.Uri.parse(path_or_url) catch |uri_err| {
                     return f.fail(0, try eb.printString(
-                        "'{s}' could not be recognized as a file path ({s}) or an URL ({s})",
-                        .{ path_or_url, @errorName(file_err), @errorName(uri_err) },
+                        "'{s}' could not be recognized as a file path ({t}) or an URL ({t})",
+                        .{ path_or_url, file_err, uri_err },
                     ));
                 };
-                var server_header_buffer: [header_buffer_size]u8 = undefined;
                 var resource = try f.initResource(uri, &server_header_buffer);
                 return f.runResource(try uri.path.toRawMaybeAlloc(arena), &resource, null);
             }
@@ -464,8 +465,8 @@ pub fn run(f: *Fetch) RunError!void {
         f.location_tok,
         try eb.printString("invalid URI: {s}", .{@errorName(err)}),
     );
-    var server_header_buffer: [header_buffer_size]u8 = undefined;
-    var resource = try f.initResource(uri, &server_header_buffer);
+    var buffer: [init_resource_buffer_size]u8 = undefined;
+    var resource = try f.initResource(uri, &buffer);
     return f.runResource(try uri.path.toRawMaybeAlloc(arena), &resource, remote.hash);
 }
 
@@ -866,8 +867,8 @@ fn fail(f: *Fetch, msg_tok: std.zig.Ast.TokenIndex, msg_str: u32) RunError {
 }
 
 const Resource = union(enum) {
-    file: fs.File,
-    http_request: std.http.Client.Request,
+    file: fs.File.Reader,
+    http_request: HttpRequest,
     git: Git,
     dir: fs.Dir,
 
@@ -877,10 +878,16 @@ const Resource = union(enum) {
         want_oid: git.Oid,
     };
 
+    const HttpRequest = struct {
+        request: std.http.Client.Request,
+        head: std.http.Client.Response.Head,
+        buffer: []u8,
+    };
+
     fn deinit(resource: *Resource) void {
         switch (resource.*) {
-            .file => |*file| file.close(),
-            .http_request => |*req| req.deinit(),
+            .file => |*file_reader| file_reader.file.close(),
+            .http_request => |*http_request| http_request.request.deinit(),
             .git => |*git_resource| {
                 git_resource.fetch_stream.deinit();
                 git_resource.session.deinit();
@@ -890,21 +897,19 @@ const Resource = union(enum) {
         resource.* = undefined;
     }
 
-    fn reader(resource: *Resource) std.io.AnyReader {
-        return .{
-            .context = resource,
-            .readFn = read,
-        };
-    }
-
-    fn read(context: *const anyopaque, buffer: []u8) anyerror!usize {
-        const resource: *Resource = @ptrCast(@alignCast(@constCast(context)));
-        switch (resource.*) {
-            .file => |*f| return f.read(buffer),
-            .http_request => |*r| return r.read(buffer),
-            .git => |*g| return g.fetch_stream.read(buffer),
+    fn reader(resource: *Resource) *std.Io.Reader {
+        return switch (resource.*) {
+            .file => |*file_reader| return &file_reader.interface,
+            .http_request => |*http_request| {
+                const response: std.http.Client.Response = .{
+                    .request = &http_request.request,
+                    .head = http_request.head,
+                };
+                return response.reader(http_request.buffer);
+            },
+            .git => |*g| return &g.fetch_stream.reader,
             .dir => unreachable,
-        }
+        };
     }
 };
 
@@ -967,20 +972,21 @@ const FileType = enum {
     }
 };
 
-const header_buffer_size = 16 * 1024;
+const init_resource_buffer_size = git.Packet.max_data_length;
 
-fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Resource {
+fn initResource(f: *Fetch, uri: std.Uri, reader_buffer: []u8) RunError!Resource {
     const gpa = f.arena.child_allocator;
     const arena = f.arena.allocator();
     const eb = &f.error_bundle;
 
     if (ascii.eqlIgnoreCase(uri.scheme, "file")) {
         const path = try uri.path.toRawMaybeAlloc(arena);
-        return .{ .file = f.parent_package_root.openFile(path, .{}) catch |err| {
-            return f.fail(f.location_tok, try eb.printString("unable to open '{f}{s}': {s}", .{
-                f.parent_package_root, path, @errorName(err),
+        const file = f.parent_package_root.openFile(path, .{}) catch |err| {
+            return f.fail(f.location_tok, try eb.printString("unable to open '{f}{s}': {t}", .{
+                f.parent_package_root, path, err,
             }));
-        } };
+        };
+        return .{ .file = file.reader(reader_buffer) };
     }
 
     const http_client = f.job_queue.http_client;
@@ -988,37 +994,27 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re
     if (ascii.eqlIgnoreCase(uri.scheme, "http") or
         ascii.eqlIgnoreCase(uri.scheme, "https"))
     {
-        var req = http_client.open(.GET, uri, .{
-            .server_header_buffer = server_header_buffer,
-        }) catch |err| {
-            return f.fail(f.location_tok, try eb.printString(
-                "unable to connect to server: {s}",
-                .{@errorName(err)},
-            ));
-        };
-        errdefer req.deinit(); // releases more than memory
+        var request = http_client.request(.GET, uri, .{}) catch |err|
+            return f.fail(f.location_tok, try eb.printString("unable to connect to server: {t}", .{err}));
+        defer request.deinit();
 
-        req.send() catch |err| {
-            return f.fail(f.location_tok, try eb.printString(
-                "HTTP request failed: {s}",
-                .{@errorName(err)},
-            ));
-        };
-        req.wait() catch |err| {
-            return f.fail(f.location_tok, try eb.printString(
-                "invalid HTTP response: {s}",
-                .{@errorName(err)},
-            ));
-        };
+        request.sendBodiless() catch |err|
+            return f.fail(f.location_tok, try eb.printString("HTTP request failed: {t}", .{err}));
 
-        if (req.response.status != .ok) {
-            return f.fail(f.location_tok, try eb.printString(
-                "bad HTTP response code: '{d} {s}'",
-                .{ @intFromEnum(req.response.status), req.response.status.phrase() orelse "" },
-            ));
-        }
+        var redirect_buffer: [1024]u8 = undefined;
+        const response = request.receiveHead(&redirect_buffer) catch |err|
+            return f.fail(f.location_tok, try eb.printString("invalid HTTP response: {t}", .{err}));
+
+        if (response.head.status != .ok) return f.fail(f.location_tok, try eb.printString(
+            "bad HTTP response code: '{d} {s}'",
+            .{ response.head.status, response.head.status.phrase() orelse "" },
+        ));
 
-        return .{ .http_request = req };
+        return .{ .http_request = .{
+            .request = request,
+            .head = response.head,
+            .buffer = reader_buffer,
+        } };
     }
 
     if (ascii.eqlIgnoreCase(uri.scheme, "git+http") or
@@ -1026,7 +1022,7 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re
     {
         var transport_uri = uri;
         transport_uri.scheme = uri.scheme["git+".len..];
-        var session = git.Session.init(gpa, http_client, transport_uri, server_header_buffer) catch |err| {
+        var session = git.Session.init(gpa, http_client, transport_uri, reader_buffer) catch |err| {
             return f.fail(f.location_tok, try eb.printString(
                 "unable to discover remote git server capabilities: {s}",
                 .{@errorName(err)},
@@ -1042,16 +1038,12 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re
             const want_ref_head = try std.fmt.allocPrint(arena, "refs/heads/{s}", .{want_ref});
             const want_ref_tag = try std.fmt.allocPrint(arena, "refs/tags/{s}", .{want_ref});
 
-            var ref_iterator = session.listRefs(.{
+            var ref_iterator: git.Session.RefIterator = undefined;
+            session.listRefs(&ref_iterator, .{
                 .ref_prefixes = &.{ want_ref, want_ref_head, want_ref_tag },
                 .include_peeled = true,
-                .server_header_buffer = server_header_buffer,
-            }) catch |err| {
-                return f.fail(f.location_tok, try eb.printString(
-                    "unable to list refs: {s}",
-                    .{@errorName(err)},
-                ));
-            };
+                .buffer = reader_buffer,
+            }) catch |err| return f.fail(f.location_tok, try eb.printString("unable to list refs: {t}", .{err}));
             defer ref_iterator.deinit();
             while (ref_iterator.next() catch |err| {
                 return f.fail(f.location_tok, try eb.printString(
@@ -1089,14 +1081,14 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re
 
         var want_oid_buf: [git.Oid.max_formatted_length]u8 = undefined;
         _ = std.fmt.bufPrint(&want_oid_buf, "{f}", .{want_oid}) catch unreachable;
-        var fetch_stream = session.fetch(&.{&want_oid_buf}, server_header_buffer) catch |err| {
-            return f.fail(f.location_tok, try eb.printString(
-                "unable to create fetch stream: {s}",
-                .{@errorName(err)},
-            ));
+        var fetch_stream: git.Session.FetchStream = undefined;
+        session.fetch(&fetch_stream, &.{&want_oid_buf}, reader_buffer) catch |err| {
+            return f.fail(f.location_tok, try eb.printString("unable to create fetch stream: {t}", .{err}));
         };
         errdefer fetch_stream.deinit();
 
+        if (true) @panic("TODO this moves fetch_stream, invalidating its reader");
+
         return .{ .git = .{
             .session = session,
             .fetch_stream = fetch_stream,
@@ -1104,10 +1096,7 @@ fn initResource(f: *Fetch, uri: std.Uri, server_header_buffer: []u8) RunError!Re
         } };
     }
 
-    return f.fail(f.location_tok, try eb.printString(
-        "unsupported URL scheme: {s}",
-        .{uri.scheme},
-    ));
+    return f.fail(f.location_tok, try eb.printString("unsupported URL scheme: {s}", .{uri.scheme}));
 }
 
 fn unpackResource(
@@ -1121,9 +1110,11 @@ fn unpackResource(
         .file => FileType.fromPath(uri_path) orelse
             return f.fail(f.location_tok, try eb.printString("unknown file type: '{s}'", .{uri_path})),
 
-        .http_request => |req| ft: {
+        .http_request => |*http_request| ft: {
+            const head = &http_request.head;
+
             // Content-Type takes first precedence.
-            const content_type = req.response.content_type orelse
+            const content_type = head.content_type orelse
                 return f.fail(f.location_tok, try eb.addString("missing 'Content-Type' header"));
 
             // Extract the MIME type, ignoring charset and boundary directives
@@ -1165,7 +1156,7 @@ fn unpackResource(
             }
 
             // Next, the filename from 'content-disposition: attachment' takes precedence.
-            if (req.response.content_disposition) |cd_header| {
+            if (head.content_disposition) |cd_header| {
                 break :ft FileType.fromContentDisposition(cd_header) orelse {
                     return f.fail(f.location_tok, try eb.printString(
                         "unsupported Content-Disposition header value: '{s}' for Content-Type=application/octet-stream",
@@ -1176,10 +1167,7 @@ fn unpackResource(
 
             // Finally, the path from the URI is used.
             break :ft FileType.fromPath(uri_path) orelse {
-                return f.fail(f.location_tok, try eb.printString(
-                    "unknown file type: '{s}'",
-                    .{uri_path},
-                ));
+                return f.fail(f.location_tok, try eb.printString("unknown file type: '{s}'", .{uri_path}));
             };
         },
 
@@ -1187,10 +1175,9 @@ fn unpackResource(
 
         .dir => |dir| {
             f.recursiveDirectoryCopy(dir, tmp_directory.handle) catch |err| {
-                return f.fail(f.location_tok, try eb.printString(
-                    "unable to copy directory '{s}': {s}",
-                    .{ uri_path, @errorName(err) },
-                ));
+                return f.fail(f.location_tok, try eb.printString("unable to copy directory '{s}': {t}", .{
+                    uri_path, err,
+                }));
             };
             return .{};
         },
@@ -1198,15 +1185,11 @@ fn unpackResource(
 
     switch (file_type) {
         .tar => {
-            var adapter_buffer: [1024]u8 = undefined;
-            var adapter = resource.reader().adaptToNewApi(&adapter_buffer);
-            return unpackTarball(f, tmp_directory.handle, &adapter.new_interface);
+            return unpackTarball(f, tmp_directory.handle, resource.reader());
         },
         .@"tar.gz" => {
-            var adapter_buffer: [std.crypto.tls.max_ciphertext_record_len]u8 = undefined;
-            var adapter = resource.reader().adaptToNewApi(&adapter_buffer);
             var flate_buffer: [std.compress.flate.max_window_len]u8 = undefined;
-            var decompress: std.compress.flate.Decompress = .init(&adapter.new_interface, .gzip, &flate_buffer);
+            var decompress: std.compress.flate.Decompress = .init(resource.reader(), .gzip, &flate_buffer);
             return try unpackTarball(f, tmp_directory.handle, &decompress.reader);
         },
         .@"tar.xz" => {
@@ -1227,9 +1210,7 @@ fn unpackResource(
         .@"tar.zst" => {
             const window_size = std.compress.zstd.default_window_len;
             const window_buffer = try f.arena.allocator().create([window_size]u8);
-            var adapter_buffer: [std.crypto.tls.max_ciphertext_record_len]u8 = undefined;
-            var adapter = resource.reader().adaptToNewApi(&adapter_buffer);
-            var decompress: std.compress.zstd.Decompress = .init(&adapter.new_interface, window_buffer, .{
+            var decompress: std.compress.zstd.Decompress = .init(resource.reader(), window_buffer, .{
                 .verify_checksum = false,
             });
             return try unpackTarball(f, tmp_directory.handle, &decompress.reader);
@@ -1237,12 +1218,15 @@ fn unpackResource(
         .git_pack => return unpackGitPack(f, tmp_directory.handle, &resource.git) catch |err| switch (err) {
             error.FetchFailed => return error.FetchFailed,
             error.OutOfMemory => return error.OutOfMemory,
-            else => |e| return f.fail(f.location_tok, try eb.printString(
-                "unable to unpack git files: {s}",
-                .{@errorName(e)},
+            else => |e| return f.fail(f.location_tok, try eb.printString("unable to unpack git files: {t}", .{e})),
+        },
+        .zip => return unzip(f, tmp_directory.handle, resource.reader()) catch |err| switch (err) {
+            error.ReadFailed => return f.fail(f.location_tok, try eb.printString(
+                "failed reading resource: {t}",
+                .{err},
             )),
+            else => |e| return e,
         },
-        .zip => return try unzip(f, tmp_directory.handle, resource.reader()),
     }
 }
 
@@ -1277,99 +1261,69 @@ fn unpackTarball(f: *Fetch, out_dir: fs.Dir, reader: *std.Io.Reader) RunError!Un
     return res;
 }
 
-fn unzip(f: *Fetch, out_dir: fs.Dir, reader: anytype) RunError!UnpackResult {
+fn unzip(f: *Fetch, out_dir: fs.Dir, reader: *std.Io.Reader) error{ ReadFailed, OutOfMemory, FetchFailed }!UnpackResult {
     // We write the entire contents to a file first because zip files
     // must be processed back to front and they could be too large to
     // load into memory.
 
     const cache_root = f.job_queue.global_cache;
-
-    // TODO: the downside of this solution is if we get a failure/crash/oom/power out
-    //       during this process, we leave behind a zip file that would be
-    //       difficult to know if/when it can be cleaned up.
-    //       Might be worth it to use a mechanism that enables other processes
-    //       to see if the owning process of a file is still alive (on linux this
-    //       can be done with file locks).
-    //       Coupled with this mechansism, we could also use slots (i.e. zig-cache/tmp/0,
-    //       zig-cache/tmp/1, etc) which would mean that subsequent runs would
-    //       automatically clean up old dead files.
-    //       This could all be done with a simple TmpFile abstraction.
     const prefix = "tmp/";
     const suffix = ".zip";
-
-    const random_bytes_count = 20;
-    const random_path_len = comptime std.fs.base64_encoder.calcSize(random_bytes_count);
-    var zip_path: [prefix.len + random_path_len + suffix.len]u8 = undefined;
-    @memcpy(zip_path[0..prefix.len], prefix);
-    @memcpy(zip_path[prefix.len + random_path_len ..], suffix);
-    {
-        var random_bytes: [random_bytes_count]u8 = undefined;
-        std.crypto.random.bytes(&random_bytes);
-        _ = std.fs.base64_encoder.encode(
-            zip_path[prefix.len..][0..random_path_len],
-            &random_bytes,
-        );
-    }
-
-    defer cache_root.handle.deleteFile(&zip_path) catch {};
-
     const eb = &f.error_bundle;
-
-    {
-        var zip_file = cache_root.handle.createFile(
-            &zip_path,
-            .{},
-        ) catch |err| return f.fail(f.location_tok, try eb.printString(
-            "failed to create tmp zip file: {s}",
-            .{@errorName(err)},
-        ));
-        defer zip_file.close();
-        var buf: [4096]u8 = undefined;
-        while (true) {
-            const len = reader.readAll(&buf) catch |err| return f.fail(f.location_tok, try eb.printString(
-                "read zip stream failed: {s}",
-                .{@errorName(err)},
-            ));
-            if (len == 0) break;
-            zip_file.deprecatedWriter().writeAll(buf[0..len]) catch |err| return f.fail(f.location_tok, try eb.printString(
-                "write temporary zip file failed: {s}",
-                .{@errorName(err)},
-            ));
-        }
-    }
+    const random_len = @sizeOf(u64) * 2;
+
+    var zip_path: [prefix.len + random_len + suffix.len]u8 = undefined;
+    zip_path[0..prefix.len].* = prefix.*;
+    zip_path[prefix.len + random_len ..].* = suffix.*;
+
+    var zip_file = while (true) {
+        const random_integer = std.crypto.random.int(u64);
+        zip_path[prefix.len..][0..random_len].* = std.fmt.hex(random_integer);
+
+        break cache_root.handle.createFile(&zip_path, .{
+            .exclusive = true,
+            .read = true,
+        }) catch |err| switch (err) {
+            error.PathAlreadyExists => continue,
+            else => |e| return f.fail(
+                f.location_tok,
+                try eb.printString("failed to create temporary zip file: {t}", .{e}),
+            ),
+        };
+    };
+    defer zip_file.close();
+    var zip_file_buffer: [4096]u8 = undefined;
+    var zip_file_reader = b: {
+        var zip_file_writer = zip_file.writer(&zip_file_buffer);
+
+        _ = reader.streamRemaining(&zip_file_writer.interface) catch |err| switch (err) {
+            error.ReadFailed => return error.ReadFailed,
+            error.WriteFailed => return f.fail(
+                f.location_tok,
+                try eb.printString("failed writing temporary zip file: {t}", .{err}),
+            ),
+        };
+        zip_file_writer.interface.flush() catch |err| return f.fail(
+            f.location_tok,
+            try eb.printString("failed writing temporary zip file: {t}", .{err}),
+        );
+        break :b zip_file_writer.moveToReader();
+    };
 
     var diagnostics: std.zip.Diagnostics = .{ .allocator = f.arena.allocator() };
     // no need to deinit since we are using an arena allocator
 
-    {
-        var zip_file = cache_root.handle.openFile(
-            &zip_path,
-            .{},
-        ) catch |err| return f.fail(f.location_tok, try eb.printString(
-            "failed to open temporary zip file: {s}",
-            .{@errorName(err)},
-        ));
-        defer zip_file.close();
-
-        var zip_file_buffer: [1024]u8 = undefined;
-        var zip_file_reader = zip_file.reader(&zip_file_buffer);
-
-        std.zip.extract(out_dir, &zip_file_reader, .{
-            .allow_backslashes = true,
-            .diagnostics = &diagnostics,
-        }) catch |err| return f.fail(f.location_tok, try eb.printString(
-            "zip extract failed: {s}",
-            .{@errorName(err)},
-        ));
-    }
+    zip_file_reader.seekTo(0) catch |err|
+        return f.fail(f.location_tok, try eb.printString("failed to seek temporary zip file: {t}", .{err}));
+    std.zip.extract(out_dir, &zip_file_reader, .{
+        .allow_backslashes = true,
+        .diagnostics = &diagnostics,
+    }) catch |err| return f.fail(f.location_tok, try eb.printString("zip extract failed: {t}", .{err}));
 
-    cache_root.handle.deleteFile(&zip_path) catch |err| return f.fail(f.location_tok, try eb.printString(
-        "delete temporary zip failed: {s}",
-        .{@errorName(err)},
-    ));
+    cache_root.handle.deleteFile(&zip_path) catch |err|
+        return f.fail(f.location_tok, try eb.printString("delete temporary zip failed: {t}", .{err}));
 
-    const res: UnpackResult = .{ .root_dir = diagnostics.root_dir };
-    return res;
+    return .{ .root_dir = diagnostics.root_dir };
 }
 
 fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!UnpackResult {
@@ -1387,10 +1341,13 @@ fn unpackGitPack(f: *Fetch, out_dir: fs.Dir, resource: *Resource.Git) anyerror!U
         var pack_file = try pack_dir.createFile("pkg.pack", .{ .read = true });
         defer pack_file.close();
         var pack_file_buffer: [4096]u8 = undefined;
-        var fifo = std.fifo.LinearFifo(u8, .{ .Slice = {} }).init(&pack_file_buffer);
-        try fifo.pump(resource.fetch_stream.reader(), pack_file.deprecatedWriter());
-
-        var pack_file_reader = pack_file.reader(&pack_file_buffer);
+        var pack_file_reader = b: {
+            var pack_file_writer = pack_file.writer(&pack_file_buffer);
+            const fetch_reader = &resource.fetch_stream.reader;
+            _ = try fetch_reader.streamRemaining(&pack_file_writer.interface);
+            try pack_file_writer.interface.flush();
+            break :b pack_file_writer.moveToReader();
+        };
 
         var index_file = try pack_dir.createFile("pkg.idx", .{ .read = true });
         defer index_file.close();