Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions lib/internal/streams/iter/broadcast.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class BroadcastImpl {
#options;
#writer = null;
#cachedMinCursor = 0;
#minCursorDirty = false;
#cachedMinCursorConsumers = 0;

constructor(options) {
this.#options = options;
Expand Down Expand Up @@ -150,13 +150,13 @@ class BroadcastImpl {
};

this.#consumers.add(state);
// New consumer starts at buffer start; recalculate min cursor
// since this consumer may now be the slowest.
if (this.#consumers.size === 1) {
this.#cachedMinCursor = state.cursor;
this.#minCursorDirty = false;
this.#cachedMinCursorConsumers = 1;
} else if (state.cursor === this.#cachedMinCursor) {
this.#cachedMinCursorConsumers++;
} else {
this.#minCursorDirty = true;
this.#recomputeMinCursor();
}
const self = this;

Expand All @@ -167,9 +167,9 @@ class BroadcastImpl {
state.detached = true;
state.resolve = null;
state.reject = null;
self.#consumers.delete(state);
self.#minCursorDirty = true;
self.#tryTrimBuffer();
if (self.#deleteConsumer(state)) {
self.#tryTrimBuffer();
}
}

return {
Expand All @@ -186,19 +186,19 @@ class BroadcastImpl {
const bufferIndex = state.cursor - self.#bufferStart;
if (bufferIndex < self.#buffer.length) {
const chunk = self.#buffer.get(bufferIndex);
// If this consumer was at the min cursor, mark dirty
if (state.cursor <= self.#cachedMinCursor) {
self.#minCursorDirty = true;
}
const cursor = state.cursor;
state.cursor++;
self.#tryTrimBuffer();
if (cursor === self.#cachedMinCursor &&
--self.#cachedMinCursorConsumers === 0) {
self.#tryTrimBuffer();
}
return PromiseResolve(
{ __proto__: null, done: false, value: chunk });
}

if (self.#error) {
state.detached = true;
self.#consumers.delete(state);
self.#deleteConsumer(state);
return PromiseReject(self.#error);
}

Expand Down Expand Up @@ -253,6 +253,7 @@ class BroadcastImpl {
consumer.detached = true;
}
this.#consumers.clear();
this.#cachedMinCursorConsumers = 0;
}

[SymbolDispose]() {
Expand All @@ -274,9 +275,11 @@ class BroadcastImpl {
this.#bufferStart++;
for (const consumer of this.#consumers) {
if (consumer.cursor < this.#bufferStart) {
this.#deleteConsumerFromMin(consumer);
consumer.cursor = this.#bufferStart;
}
}
this.#recomputeMinCursor();
break;
case 'drop-newest':
return true;
Expand All @@ -297,7 +300,12 @@ class BroadcastImpl {
const bufferIndex = consumer.cursor - this.#bufferStart;
if (bufferIndex < this.#buffer.length) {
const chunk = this.#buffer.get(bufferIndex);
const cursor = consumer.cursor;
consumer.cursor++;
if (cursor === this.#cachedMinCursor &&
--this.#cachedMinCursorConsumers === 0) {
this.#tryTrimBuffer();
}
consumer.resolve({ __proto__: null, done: false, value: chunk });
} else {
consumer.resolve({ __proto__: null, done: true, value: undefined });
Expand All @@ -323,6 +331,7 @@ class BroadcastImpl {
consumer.detached = true;
}
this.#consumers.clear();
this.#cachedMinCursorConsumers = 0;
}

[kGetDesiredSize]() {
Expand All @@ -343,14 +352,14 @@ class BroadcastImpl {
// Private methods

#recomputeMinCursor() {
const { minCursor } = getMinCursor(
const { minCursor, minCursorConsumers } = getMinCursor(
this.#consumers, this.#bufferStart + this.#buffer.length);
this.#cachedMinCursor = minCursor;
this.#minCursorDirty = false;
this.#cachedMinCursorConsumers = minCursorConsumers;
}

#tryTrimBuffer() {
if (this.#minCursorDirty) {
if (this.#cachedMinCursorConsumers === 0) {
this.#recomputeMinCursor();
}
const trimCount = this.#cachedMinCursor - this.#bufferStart;
Expand All @@ -377,10 +386,12 @@ class BroadcastImpl {
const bufferIndex = consumer.cursor - this.#bufferStart;
if (bufferIndex < this.#buffer.length) {
const chunk = this.#buffer.get(bufferIndex);
if (consumer.cursor <= this.#cachedMinCursor) {
this.#minCursorDirty = true;
}
const cursor = consumer.cursor;
consumer.cursor++;
if (cursor === this.#cachedMinCursor &&
--this.#cachedMinCursorConsumers === 0) {
this.#tryTrimBuffer();
}
const resolve = consumer.resolve;
consumer.resolve = null;
consumer.reject = null;
Expand All @@ -392,6 +403,21 @@ class BroadcastImpl {
}
}
}

#deleteConsumerFromMin(consumer) {
if (consumer.cursor === this.#cachedMinCursor) {
this.#cachedMinCursorConsumers--;
return this.#cachedMinCursorConsumers === 0;
}
return false;
}

#deleteConsumer(consumer) {
if (this.#consumers.delete(consumer)) {
return this.#deleteConsumerFromMin(consumer);
}
return false;
}
}

// =============================================================================
Expand Down
28 changes: 28 additions & 0 deletions test/parallel/test-stream-iter-broadcast-coverage.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,33 @@ async function testRingbufferGrow() {
}
}

// Multiple consumers at the minimum cursor should trim only after the last
// one advances or detaches.
async function testFanOutMinCursorTrimming() {
const { writer, broadcast: bc } = broadcast({ highWaterMark: 4 });
const iter1 = bc.push()[Symbol.asyncIterator]();
const iter2 = bc.push()[Symbol.asyncIterator]();

writer.writeSync(new Uint8Array([1]));
writer.writeSync(new Uint8Array([2]));
assert.strictEqual(bc.bufferSize, 2);

assert.strictEqual((await iter1.next()).done, false);
assert.strictEqual(bc.bufferSize, 2);

assert.strictEqual((await iter2.next()).done, false);
assert.strictEqual(bc.bufferSize, 1);

await iter1.return();
assert.strictEqual(bc.bufferSize, 1);

assert.strictEqual((await iter2.next()).done, false);
assert.strictEqual(bc.bufferSize, 0);

writer.endSync();
assert.strictEqual((await iter2.next()).done, true);
}

// Broadcast drainableProtocol after close returns null
async function testDrainableAfterClose() {
const { drainableProtocol } = require('stream/iter');
Expand All @@ -111,5 +138,6 @@ Promise.all([
testBroadcastFromSyncIterable(),
testBroadcastFromSyncIterableStrings(),
testRingbufferGrow(),
testFanOutMinCursorTrimming(),
testDrainableAfterClose(),
]).then(common.mustCall());
Loading