diff --git a/lib/internal/streams/iter/broadcast.js b/lib/internal/streams/iter/broadcast.js index b667857c478885..3592453036d303 100644 --- a/lib/internal/streams/iter/broadcast.js +++ b/lib/internal/streams/iter/broadcast.js @@ -44,6 +44,7 @@ const { } = require('internal/streams/iter/types'); const { + from, isAsyncIterable, isSyncIterable, } = require('internal/streams/iter/from'); @@ -799,7 +800,9 @@ const Broadcast = { return { __proto__: null, writer: { __proto__: null }, broadcast: bc }; } - if (!isAsyncIterable(input) && !isSyncIterable(input)) { + const source = from(input); + + if (!isAsyncIterable(source) && !isSyncIterable(source)) { throw new ERR_INVALID_ARG_TYPE( 'input', ['Broadcastable', 'AsyncIterable', 'Iterable'], input); } @@ -810,8 +813,8 @@ const Broadcast = { const pump = async () => { const w = result.writer; try { - if (isAsyncIterable(input)) { - for await (const chunks of input) { + if (isAsyncIterable(source)) { + for await (const chunks of source) { signal?.throwIfAborted(); if (ArrayIsArray(chunks)) { if (!w.writevSync(chunks)) { @@ -821,8 +824,8 @@ const Broadcast = { await w.write(chunks, signal ? { signal } : undefined); } } - } else if (isSyncIterable(input)) { - for (const chunks of input) { + } else if (isSyncIterable(source)) { + for (const chunks of source) { signal?.throwIfAborted(); if (ArrayIsArray(chunks)) { if (!w.writevSync(chunks)) { diff --git a/test/parallel/test-stream-iter-broadcast-from.js b/test/parallel/test-stream-iter-broadcast-from.js index 0203252f26229a..39d92c2aef49b3 100644 --- a/test/parallel/test-stream-iter-broadcast-from.js +++ b/test/parallel/test-stream-iter-broadcast-from.js @@ -43,6 +43,28 @@ async function testBroadcastFromStringChunks() { assert.strictEqual(data, 'foobar'); } +async function testBroadcastFromStringInput() { + const { broadcast: bc } = Broadcast.from('abc'); + const consumer = bc.push(); + const data = await text(consumer); + assert.strictEqual(data, 'abc'); +} + +async function testBroadcastFromUint8ArrayInput() { + const { broadcast: bc } = Broadcast.from(new Uint8Array([97])); + const consumer = bc.push(); + const data = await text(consumer); + assert.strictEqual(data, 'a'); +} + +async function testBroadcastFromDataViewInput() { + const view = new DataView(new Uint8Array([104, 105]).buffer); + const { broadcast: bc } = Broadcast.from(view); + const consumer = bc.push(); + const data = await text(consumer); + assert.strictEqual(data, 'hi'); +} + async function testBroadcastFromMultipleConsumers() { const source = from('shared-data'); const { broadcast: bc } = Broadcast.from(source); @@ -180,6 +202,9 @@ Promise.all([ testBroadcastFromAsyncIterable(), testBroadcastFromNonArrayChunks(), testBroadcastFromStringChunks(), + testBroadcastFromStringInput(), + testBroadcastFromUint8ArrayInput(), + testBroadcastFromDataViewInput(), testBroadcastFromMultipleConsumers(), testAbortSignal(), testAlreadyAbortedSignal(), diff --git a/test/parallel/test-stream-iter-validation.js b/test/parallel/test-stream-iter-validation.js index 19cde169d6f496..ecefaeb171ec41 100644 --- a/test/parallel/test-stream-iter-validation.js +++ b/test/parallel/test-stream-iter-validation.js @@ -157,9 +157,8 @@ assert.throws(() => broadcast({ backpressure: 'bad' }), { code: 'ERR_INVALID_ARG writer.endSync(); } -// Broadcast.from rejects non-iterable input +// Broadcast.from rejects non-streamable input assert.throws(() => Broadcast.from(42), { code: 'ERR_INVALID_ARG_TYPE' }); -assert.throws(() => Broadcast.from('bad'), { code: 'ERR_INVALID_ARG_TYPE' }); // ============================================================================= // share() / shareSync() validation