Commit ef6d58ed3b
Changed files (3)
lib
std
lib/std/http/Client.zig
@@ -15,8 +15,6 @@ const proto = @import("protocol.zig");
pub const default_connection_pool_size = 32;
pub const connection_pool_size = std.options.http_connection_pool_size;
-/// Used for tcpConnectToHost and storing HTTP headers when an externally
-/// managed buffer is not provided.
allocator: Allocator,
ca_bundle: std.crypto.Certificate.Bundle = .{},
ca_bundle_mutex: std.Thread.Mutex = .{},
@@ -24,8 +22,10 @@ ca_bundle_mutex: std.Thread.Mutex = .{},
/// it will first rescan the system for root certificates.
next_https_rescan_certs: bool = true,
+/// The pool of connections that can be reused (and currently in use).
connection_pool: ConnectionPool = .{},
+/// The last error that occurred on this client. This is not threadsafe, do not expect it to be completely accurate.
last_error: ?ExtraError = null,
pub const ExtraError = union(enum) {
@@ -68,7 +68,9 @@ pub const ExtraError = union(enum) {
decompress: DecompressError, // error.ReadFailed
};
+/// A set of linked lists of connections that can be reused.
pub const ConnectionPool = struct {
+ /// The criteria for a connection to be considered a match.
pub const Criteria = struct {
host: []const u8,
port: u16,
@@ -92,7 +94,9 @@ pub const ConnectionPool = struct {
pub const Node = Queue.Node;
mutex: std.Thread.Mutex = .{},
+ /// Open connections that are currently in use.
used: Queue = .{},
+ /// Open connections that are not currently in use.
free: Queue = .{},
free_len: usize = 0,
free_size: usize = connection_pool_size,
@@ -189,6 +193,7 @@ pub const ConnectionPool = struct {
}
};
+/// An interface to either a plain or TLS connection.
pub const Connection = struct {
stream: net.Stream,
/// undefined unless protocol is tls.
@@ -261,6 +266,7 @@ pub const Connection = struct {
}
};
+/// A buffered (and peekable) Connection.
pub const BufferedConnection = struct {
pub const buffer_size = 0x2000;
@@ -344,12 +350,14 @@ pub const BufferedConnection = struct {
}
};
+/// The mode of transport for requests.
pub const RequestTransfer = union(enum) {
content_length: u64,
chunked: void,
none: void,
};
+/// The decompressor for response messages.
pub const Compression = union(enum) {
pub const DeflateDecompressor = std.compress.zlib.ZlibStream(Request.TransferReader);
pub const GzipDecompressor = std.compress.gzip.Decompress(Request.TransferReader);
@@ -361,6 +369,7 @@ pub const Compression = union(enum) {
none: void,
};
+/// A HTTP response originating from a server.
pub const Response = struct {
pub const Headers = struct {
status: http.Status,
@@ -501,14 +510,9 @@ pub const Response = struct {
skip: bool = false,
};
-/// A HTTP request.
+/// A HTTP request that has been sent.
///
-/// Order of operations:
-/// - request
-/// - write
-/// - finish
-/// - do
-/// - read
+/// Order of operations: request[ -> write -> finish] -> do -> read
pub const Request = struct {
pub const Headers = struct {
version: http.Version = .@"HTTP/1.1",
@@ -862,6 +866,8 @@ pub const Request = struct {
}
};
+/// Release all associated resources with the client.
+/// TODO: currently leaks all request allocated data
pub fn deinit(client: *Client) void {
client.connection_pool.deinit(client);
@@ -871,6 +877,8 @@ pub fn deinit(client: *Client) void {
pub const ConnectError = Allocator.Error || error{ ConnectionFailed, TlsInitializationFailed };
+/// Connect to `host:port` using the specified protocol. This will reuse a connection if one is already open.
+/// This function is threadsafe.
pub fn connect(client: *Client, host: []const u8, port: u16, protocol: Connection.Protocol) ConnectError!*ConnectionPool.Node {
if (client.connection_pool.findConnection(.{
.host = host,
@@ -955,6 +963,8 @@ pub const protocol_map = std.ComptimeStringMap(Connection.Protocol, .{
.{ "wss", .tls },
});
+/// Form and send a http request to a server.
+/// This function is threadsafe.
pub fn request(client: *Client, uri: Uri, headers: Request.Headers, options: Options) RequestError!Request {
const protocol = protocol_map.get(uri.scheme) orelse return error.UnsupportedUrlScheme;
lib/std/http/protocol.zig
@@ -21,6 +21,7 @@ pub const State = enum {
chunk_data_suffix,
chunk_data_suffix_r,
+ /// Returns true if the parser is in a content state (ie. not waiting for more headers).
pub fn isContent(self: State) bool {
return switch (self) {
.invalid, .start, .seen_n, .seen_r, .seen_rn, .seen_rnr => false,
@@ -31,7 +32,7 @@ pub const State = enum {
pub const HeadersParser = struct {
state: State = .start,
- /// Wether or not `header_bytes` is allocated or was provided as a fixed buffer.
+ /// Whether or not `header_bytes` is allocated or was provided as a fixed buffer.
header_bytes_owned: bool,
/// Either a fixed buffer of len `max_header_bytes` or a dynamic buffer that can grow up to `max_header_bytes`.
/// Pointers into this buffer are not stable until after a message is complete.
@@ -39,10 +40,11 @@ pub const HeadersParser = struct {
/// The maximum allowed size of `header_bytes`.
max_header_bytes: usize,
next_chunk_length: u64 = 0,
- /// Wether this parser is done parsing a complete message.
- /// A message is only done when the entire payload has been read
+ /// Whether this parser is done parsing a complete message.
+ /// A message is only done when the entire payload has been read.
done: bool = false,
+ /// Initializes the parser with a dynamically growing header buffer of up to `max` bytes.
pub fn initDynamic(max: usize) HeadersParser {
return .{
.header_bytes = .{},
@@ -51,6 +53,7 @@ pub const HeadersParser = struct {
};
}
+ /// Initializes the parser with a provided buffer `buf`.
pub fn initStatic(buf: []u8) HeadersParser {
return .{
.header_bytes = .{ .items = buf[0..0], .capacity = buf.len },
@@ -59,7 +62,11 @@ pub const HeadersParser = struct {
};
}
+ /// Completely resets the parser to it's initial state.
+ /// This must be called after a message is complete.
pub fn reset(r: *HeadersParser) void {
+ assert(r.done); // The message must be completely read before reset, otherwise the parser is in an invalid state.
+
r.header_bytes.clearRetainingCapacity();
r.* = .{
@@ -69,13 +76,14 @@ pub const HeadersParser = struct {
};
}
- /// Returns how many bytes are part of HTTP headers. Always less than or
- /// equal to bytes.len. If the amount returned is less than bytes.len, it
- /// means the headers ended and the first byte after the double \r\n\r\n is
- /// located at `bytes[result]`.
+ /// Returns the number of bytes consumed by headers. This is always less than or equal to `bytes.len`.
+ /// You should check `r.state.isContent()` after this to check if the headers are done.
+ ///
+ /// If the amount returned is less than `bytes.len`, you may assume that the parser is in a content state and the
+ /// first byte of content is located at `bytes[result]`.
pub fn findHeadersEnd(r: *HeadersParser, bytes: []const u8) u32 {
- const vector_len = 16;
- const len = @truncate(u32, bytes.len);
+ const vector_len: comptime_int = comptime std.simd.suggestVectorSize(u8) orelse 8;
+ const len = @intCast(u32, bytes.len);
var index: u32 = 0;
while (true) {
@@ -390,8 +398,13 @@ pub const HeadersParser = struct {
}
}
+ /// Returns the number of bytes consumed by the chunk size. This is always less than or equal to `bytes.len`.
+ /// You should check `r.state == .chunk_data` after this to check if the chunk size has been fully parsed.
+ ///
+ /// If the amount returned is less than `bytes.len`, you may assume that the parser is in the `chunk_data` state
+ /// and that the first byte of the chunk is at `bytes[result]`.
pub fn findChunkedLen(r: *HeadersParser, bytes: []const u8) u32 {
- const len = @truncate(u32, bytes.len);
+ const len = @intCast(u32, bytes.len);
for (bytes[0..], 0..) |c, i| {
const index = @intCast(u32, i);
@@ -471,8 +484,10 @@ pub const HeadersParser = struct {
pub const CheckCompleteHeadError = mem.Allocator.Error || error{HttpHeadersExceededSizeLimit};
- /// Pumps `in` bytes into the parser. Returns the number of bytes consumed. This function will return 0 if the parser
- /// is not in a state to parse more headers.
+ /// Pushes `in` into the parser. Returns the number of bytes consumed by the header. Any header bytes are appended
+ /// to the `header_bytes` buffer.
+ ///
+ /// This function only uses `allocator` if `r.header_bytes_owned` is true, and may be undefined otherwise.
pub fn checkCompleteHead(r: *HeadersParser, allocator: std.mem.Allocator, in: []const u8) CheckCompleteHeadError!u32 {
if (r.state.isContent()) return 0;
@@ -493,8 +508,11 @@ pub const HeadersParser = struct {
HttpChunkInvalid,
};
- /// Reads the body of the message into `buffer`. If `skip` is true, the buffer will be unused and the body will be
- /// skipped. Returns the number of bytes placed in the buffer.
+ /// Reads the body of the message into `buffer`. Returns the number of bytes placed in the buffer.
+ ///
+ /// If `skip` is true, the buffer will be unused and the body will be skipped.
+ ///
+ /// See `std.http.Client.BufferedConnection for an example of `bconn`.
pub fn read(r: *HeadersParser, bconn: anytype, buffer: []u8, skip: bool) !usize {
assert(r.state.isContent());
if (r.done) return 0;
lib/std/http/Server.zig
@@ -14,10 +14,7 @@ allocator: Allocator,
socket: net.StreamServer,
-pub const DeflateDecompressor = std.compress.zlib.ZlibStream(Response.TransferReader);
-pub const GzipDecompressor = std.compress.gzip.Decompress(Response.TransferReader);
-pub const ZstdDecompressor = std.compress.zstd.DecompressStream(Response.TransferReader, .{});
-
+/// An interface to either a plain or TLS connection.
pub const Connection = struct {
stream: net.Stream,
protocol: Protocol,
@@ -74,6 +71,7 @@ pub const Connection = struct {
}
};
+/// A buffered (and peekable) Connection.
pub const BufferedConnection = struct {
pub const buffer_size = 0x2000;
@@ -157,6 +155,7 @@ pub const BufferedConnection = struct {
}
};
+/// A HTTP request originating from a client.
pub const Request = struct {
pub const Headers = struct {
method: http.Method,
@@ -290,6 +289,11 @@ pub const Request = struct {
compression: Compression = .none,
};
+/// A HTTP response waiting to be sent.
+///
+/// [/ <----------------------------------- \]
+/// Order of operations: accept -> wait -> do [ -> write -> finish][ -> reset /]
+/// \ -> read /
pub const Response = struct {
pub const Headers = struct {
version: http.Version = .@"HTTP/1.1",
@@ -310,6 +314,7 @@ pub const Response = struct {
headers: Headers = .{},
request: Request,
+ /// Reset this response to its initial state. This must be called before handling a second request on the same connection.
pub fn reset(res: *Response) void {
switch (res.request.compression) {
.none => {},
@@ -336,7 +341,8 @@ pub const Response = struct {
}
}
- pub fn sendResponseHead(res: *Response) !void {
+ /// Send the response headers.
+ pub fn do(res: *Response) !void {
var buffered = std.io.bufferedWriter(res.connection.writer());
const w = buffered.writer();
@@ -402,7 +408,8 @@ pub const Response = struct {
pub const WaitForCompleteHeadError = BufferedConnection.ReadError || proto.HeadersParser.WaitForCompleteHeadError || Request.Headers.ParseError || error{ BadHeader, InvalidCompression, StreamTooLong, InvalidWindowSize } || error{CompressionNotSupported};
- pub fn waitForCompleteHead(res: *Response) !void {
+ /// Wait for the client to send a complete request head.
+ pub fn wait(res: *Response) !void {
while (true) {
try res.connection.fill();
@@ -451,7 +458,7 @@ pub const Response = struct {
}
}
- pub const ReadError = DeflateDecompressor.Error || GzipDecompressor.Error || ZstdDecompressor.Error || WaitForCompleteHeadError;
+ pub const ReadError = Compression.DeflateDecompressor.Error || Compression.GzipDecompressor.Error || Compression.ZstdDecompressor.Error || WaitForCompleteHeadError;
pub const Reader = std.io.Reader(*Response, ReadError, read);
@@ -517,13 +524,19 @@ pub const Response = struct {
}
};
+/// The mode of transport for responses.
pub const RequestTransfer = union(enum) {
content_length: u64,
chunked: void,
none: void,
};
+/// The decompressor for request messages.
pub const Compression = union(enum) {
+ pub const DeflateDecompressor = std.compress.zlib.ZlibStream(Response.TransferReader);
+ pub const GzipDecompressor = std.compress.gzip.Decompress(Response.TransferReader);
+ pub const ZstdDecompressor = std.compress.zstd.DecompressStream(Response.TransferReader, .{});
+
deflate: DeflateDecompressor,
gzip: GzipDecompressor,
zstd: ZstdDecompressor,
@@ -543,6 +556,7 @@ pub fn deinit(server: *Server) void {
pub const ListenError = std.os.SocketError || std.os.BindError || std.os.ListenError || std.os.SetSockOptError || std.os.GetSockNameError;
+/// Start the HTTP server listening on the given address.
pub fn listen(server: *Server, address: net.Address) !void {
try server.socket.listen(address);
}
@@ -562,6 +576,7 @@ pub const HeaderStrategy = union(enum) {
static: []u8,
};
+/// Accept a new connection and allocate a Response for it.
pub fn accept(server: *Server, options: HeaderStrategy) AcceptError!*Response {
const in = try server.socket.accept();