Commit d000574380
Changed files (1)
lib
std
lib/std/Io.zig
@@ -1272,7 +1272,7 @@ pub const TypeErasedQueue = struct {
if (elements.len == 0) return 0;
try q.mutex.lock(io);
defer q.mutex.unlock(io);
- return putLocked(q, io, elements, min, false);
+ return q.putLocked(io, elements, min, false);
}
/// Same as `put` but cannot be canceled.
@@ -1281,7 +1281,7 @@ pub const TypeErasedQueue = struct {
if (elements.len == 0) return 0;
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
- return putLocked(q, io, elements, min, true) catch |err| switch (err) {
+ return q.putLocked(io, elements, min, true) catch |err| switch (err) {
error.Canceled => unreachable,
};
}
@@ -1291,50 +1291,49 @@ pub const TypeErasedQueue = struct {
// queue is empty do we start populating the buffer.
var remaining = elements;
- while (true) {
- const getter: *Get = @alignCast(@fieldParentPtr("node", q.getters.popFirst() orelse break));
+ while (q.getters.popFirst()) |getter_node| {
+ const getter: *Get = @alignCast(@fieldParentPtr("node", getter_node));
const copy_len = @min(getter.remaining.len, remaining.len);
+ assert(copy_len > 0);
@memcpy(getter.remaining[0..copy_len], remaining[0..copy_len]);
remaining = remaining[copy_len..];
getter.remaining = getter.remaining[copy_len..];
if (getter.remaining.len == 0) {
getter.condition.signal(io);
- continue;
- }
- q.getters.prepend(&getter.node);
+ if (remaining.len > 0) continue;
+ } else q.getters.prepend(getter_node);
assert(remaining.len == 0);
return elements.len;
}
- while (true) {
- {
- const available = q.buffer[q.put_index..];
- const copy_len = @min(available.len, remaining.len);
- @memcpy(available[0..copy_len], remaining[0..copy_len]);
- remaining = remaining[copy_len..];
- q.put_index += copy_len;
- if (remaining.len == 0) return elements.len;
- }
- {
- const available = q.buffer[0..q.get_index];
- const copy_len = @min(available.len, remaining.len);
- @memcpy(available[0..copy_len], remaining[0..copy_len]);
- remaining = remaining[copy_len..];
- q.put_index = copy_len;
- if (remaining.len == 0) return elements.len;
- }
-
- const total_filled = elements.len - remaining.len;
- if (total_filled >= min) return total_filled;
-
- var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} };
- q.putters.append(&pending.node);
- if (uncancelable)
- pending.condition.waitUncancelable(io, &q.mutex)
- else
- try pending.condition.wait(io, &q.mutex);
- remaining = pending.remaining;
+ {
+ const available = q.buffer[q.put_index..];
+ const copy_len = @min(available.len, remaining.len);
+ @memcpy(available[0..copy_len], remaining[0..copy_len]);
+ remaining = remaining[copy_len..];
+ q.put_index += copy_len;
+ if (remaining.len == 0) return elements.len;
+ }
+ {
+ const available = q.buffer[0..q.get_index];
+ const copy_len = @min(available.len, remaining.len);
+ @memcpy(available[0..copy_len], remaining[0..copy_len]);
+ remaining = remaining[copy_len..];
+ q.put_index = copy_len;
+ if (remaining.len == 0) return elements.len;
}
+
+ const total_filled = elements.len - remaining.len;
+ if (total_filled >= min) return total_filled;
+
+ var pending: Put = .{ .remaining = remaining, .condition = .{}, .node = .{} };
+ q.putters.append(&pending.node);
+ defer if (pending.remaining.len > 0) q.putters.remove(&pending.node);
+ while (pending.remaining.len > 0) if (uncancelable)
+ pending.condition.waitUncancelable(io, &q.mutex)
+ else
+ try pending.condition.wait(io, &q.mutex);
+ return elements.len;
}
pub fn get(q: *@This(), io: Io, buffer: []u8, min: usize) Cancelable!usize {
@@ -1342,7 +1341,7 @@ pub const TypeErasedQueue = struct {
if (buffer.len == 0) return 0;
try q.mutex.lock(io);
defer q.mutex.unlock(io);
- return getLocked(q, io, buffer, min, false);
+ return q.getLocked(io, buffer, min, false);
}
pub fn getUncancelable(q: *@This(), io: Io, buffer: []u8, min: usize) usize {
@@ -1350,99 +1349,113 @@ pub const TypeErasedQueue = struct {
if (buffer.len == 0) return 0;
q.mutex.lockUncancelable(io);
defer q.mutex.unlock(io);
- return getLocked(q, io, buffer, min, true) catch |err| switch (err) {
+ return q.getLocked(io, buffer, min, true) catch |err| switch (err) {
error.Canceled => unreachable,
};
}
- pub fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize {
+ fn getLocked(q: *@This(), io: Io, buffer: []u8, min: usize, uncancelable: bool) Cancelable!usize {
// The ring buffer gets first priority, then data should come from any
// queued putters, then finally the ring buffer should be filled with
// data from putters so they can be resumed.
var remaining = buffer;
- while (true) {
- if (q.get_index <= q.put_index) {
- const available = q.buffer[q.get_index..q.put_index];
+ if (q.get_index <= q.put_index) {
+ const available = q.buffer[q.get_index..q.put_index];
+ const copy_len = @min(available.len, remaining.len);
+ @memcpy(remaining[0..copy_len], available[0..copy_len]);
+ q.get_index += copy_len;
+ remaining = remaining[copy_len..];
+ if (remaining.len == 0) {
+ q.fillRingBufferFromPutters(io);
+ return buffer.len;
+ }
+ } else {
+ {
+ const available = q.buffer[q.get_index..];
const copy_len = @min(available.len, remaining.len);
@memcpy(remaining[0..copy_len], available[0..copy_len]);
q.get_index += copy_len;
remaining = remaining[copy_len..];
- if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
- } else {
- {
- const available = q.buffer[q.get_index..];
- const copy_len = @min(available.len, remaining.len);
- @memcpy(remaining[0..copy_len], available[0..copy_len]);
- q.get_index += copy_len;
- remaining = remaining[copy_len..];
- if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
- }
- {
- const available = q.buffer[0..q.put_index];
- const copy_len = @min(available.len, remaining.len);
- @memcpy(remaining[0..copy_len], available[0..copy_len]);
- q.get_index = copy_len;
- remaining = remaining[copy_len..];
- if (remaining.len == 0) return fillRingBufferFromPutters(q, io, buffer.len);
+ if (remaining.len == 0) {
+ q.fillRingBufferFromPutters(io);
+ return buffer.len;
}
}
- // Copy directly from putters into buffer.
- while (remaining.len > 0) {
- const putter: *Put = @alignCast(@fieldParentPtr("node", q.putters.popFirst() orelse break));
- const copy_len = @min(putter.remaining.len, remaining.len);
- @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]);
- putter.remaining = putter.remaining[copy_len..];
+ {
+ const available = q.buffer[0..q.put_index];
+ const copy_len = @min(available.len, remaining.len);
+ @memcpy(remaining[0..copy_len], available[0..copy_len]);
+ q.get_index = copy_len;
remaining = remaining[copy_len..];
- if (putter.remaining.len == 0) {
- putter.condition.signal(io);
- } else {
- assert(remaining.len == 0);
- q.putters.prepend(&putter.node);
- return fillRingBufferFromPutters(q, io, buffer.len);
+ if (remaining.len == 0) {
+ q.fillRingBufferFromPutters(io);
+ return buffer.len;
}
}
- // Both ring buffer and putters queue is empty.
- const total_filled = buffer.len - remaining.len;
- if (total_filled >= min) return total_filled;
-
- var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} };
- q.getters.append(&pending.node);
- if (uncancelable)
- pending.condition.waitUncancelable(io, &q.mutex)
- else
- try pending.condition.wait(io, &q.mutex);
- remaining = pending.remaining;
}
+ // Copy directly from putters into buffer.
+ while (q.putters.popFirst()) |putter_node| {
+ const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
+ const copy_len = @min(putter.remaining.len, remaining.len);
+ assert(copy_len > 0);
+ @memcpy(remaining[0..copy_len], putter.remaining[0..copy_len]);
+ putter.remaining = putter.remaining[copy_len..];
+ remaining = remaining[copy_len..];
+ if (putter.remaining.len == 0) {
+ putter.condition.signal(io);
+ if (remaining.len > 0) continue;
+ } else q.putters.prepend(putter_node);
+ assert(remaining.len == 0);
+ q.fillRingBufferFromPutters(io);
+ return buffer.len;
+ }
+ // Both ring buffer and putters queue is empty.
+ const total_filled = buffer.len - remaining.len;
+ if (total_filled >= min) return total_filled;
+
+ var pending: Get = .{ .remaining = remaining, .condition = .{}, .node = .{} };
+ q.getters.append(&pending.node);
+ defer if (pending.remaining.len > 0) q.getters.remove(&pending.node);
+ while (pending.remaining.len > 0) if (uncancelable)
+ pending.condition.waitUncancelable(io, &q.mutex)
+ else
+ try pending.condition.wait(io, &q.mutex);
+ q.fillRingBufferFromPutters(io);
+ return buffer.len;
}
/// Called when there is nonzero space available in the ring buffer and
/// potentially putters waiting. The mutex is already held and the task is
/// to copy putter data to the ring buffer and signal any putters whose
/// buffers been fully copied.
- fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io, len: usize) usize {
- while (true) {
- const putter: *Put = @alignCast(@fieldParentPtr("node", q.putters.popFirst() orelse return len));
- const available = q.buffer[q.put_index..];
- const copy_len = @min(available.len, putter.remaining.len);
- @memcpy(available[0..copy_len], putter.remaining[0..copy_len]);
- putter.remaining = putter.remaining[copy_len..];
- q.put_index += copy_len;
- if (putter.remaining.len == 0) {
- putter.condition.signal(io);
- continue;
+ fn fillRingBufferFromPutters(q: *TypeErasedQueue, io: Io) void {
+ while (q.putters.popFirst()) |putter_node| {
+ const putter: *Put = @alignCast(@fieldParentPtr("node", putter_node));
+ {
+ const available = q.buffer[q.put_index..];
+ const copy_len = @min(available.len, putter.remaining.len);
+ @memcpy(available[0..copy_len], putter.remaining[0..copy_len]);
+ putter.remaining = putter.remaining[copy_len..];
+ q.put_index += copy_len;
+ if (putter.remaining.len == 0) {
+ putter.condition.signal(io);
+ continue;
+ }
}
- const second_available = q.buffer[0..q.get_index];
- const second_copy_len = @min(second_available.len, putter.remaining.len);
- @memcpy(second_available[0..second_copy_len], putter.remaining[0..second_copy_len]);
- putter.remaining = putter.remaining[copy_len..];
- q.put_index = copy_len;
- if (putter.remaining.len == 0) {
- putter.condition.signal(io);
- continue;
+ {
+ const available = q.buffer[0..q.get_index];
+ const copy_len = @min(available.len, putter.remaining.len);
+ @memcpy(available[0..copy_len], putter.remaining[0..copy_len]);
+ putter.remaining = putter.remaining[copy_len..];
+ q.put_index = copy_len;
+ if (putter.remaining.len == 0) {
+ putter.condition.signal(io);
+ continue;
+ }
}
- q.putters.prepend(&putter.node);
- return len;
+ q.putters.prepend(putter_node);
+ break;
}
}
};