Skip to content

readable.find()/some()/every() don't destroy the stream on match when the source hasn't ended #64261

Description

@perrin4869

Version

v24.18.0 (reproduces on this version; likely affects any version with the stream iterator helpers, i.e. Readable.find/some/every)

Platform

Linux (reproduces regardless of platform - pure JS logic issue)

Subsystem

No response

What steps will reproduce the bug?

import { PassThrough } from "node:stream";

const pt = new PassThrough();

const matchPromise = pt.find((chunk) => chunk.toString().includes("MATCH"));

pt.write("first line\n");
pt.write("MATCH this one\n");

const result = await matchPromise;
console.log("find() resolved with:", JSON.stringify(result.toString()));
console.log("pt.destroyed immediately after resolution:", pt.destroyed);

await new Promise((r) => setTimeout(r, 100));
console.log("pt.destroyed 100ms later:", pt.destroyed);

How often does it reproduce? Is there a required condition?

100% reproducible. Condition: the source stream must still be open (not .end()-ed) and must not receive another chunk immediately after the matching one. This is the common case for any long-lived/never-ending stream (e.g. tailing a child process's stdout/stderr, or any other indefinite source) - which is exactly the kind of stream .find()/.some()/.every() are attractive for using with (for await-based line/event scanning).

What is the expected behavior? Why is that the expected behavior?

Per the documentation for readable.find(fn[, options]) (identical wording for .some() / .every()):

Once an fn call's awaited return value is truthy, the stream is destroyed and the promise is fulfilled with value for which fn returned a truthy value.

So pt.destroyed should become true once find()'s promise resolves (or shortly after, since .destroy() is not fully synchronous).

What do you see instead?

find() resolved with: "first line\nMATCH this one\n"
pt.destroyed immediately after resolution: false
pt.destroyed 100ms later: false

pt is never destroyed, no matter how long you wait, as long as nothing else writes another chunk to it (in this repro, it's just never destroyed at all, period, since nothing writes again).

Additional information

The root cause, traced in lib/internal/streams/operators.js:

find is a thin wrapper around filter, which is a thin wrapper around map:

async function find(fn, options) {
  for await (const result of filter.call(this, fn, options)) {
    return result;
  }
  return undefined;
}

The return result triggers the implicit .return() call on the filter/map async generator (standard for await...of early-exit semantics) - this correctly unwinds the OUTER generator's own finally block (setting done = true and resolving pending resume/next promises inside map).

However, map's implementation spawns a separate, fire-and-forget pump() async function that independently iterates the original source stream:

async function pump() {
  try {
    for await (let val of stream) {   // <-- this is a DIFFERENT for-await-of loop, over the original source
      if (done) {
        return;
      }
      ...

pump()'s for await (let val of stream) is a different iterator/loop than the one find()'s for await unwinds. Setting done = true from the outer generator's cleanup does not itself unblock this inner loop - it's only checked after stream's iterator yields a new value. If the source never produces another chunk after the matching one (because it's still open and simply idle), pump() stays permanently suspended awaiting the next chunk from stream, and nothing ever calls .destroy() (or .return()) on stream itself.

There is no explicit stream.destroy() call anywhere in map/filter/find's implementation in lib/internal/streams/operators.js - the "stream is destroyed" behavior described in the docs appears to rely entirely on the source stream separately reaching natural end/auto-destroy on its own, which only coincidentally happens to align with "matched" for finite/short-lived sources, not because find/some/every themselves ever destroy it.

(I confirmed this distinction experimentally: a finite Readable.from([...]) source does eventually show destroyed: true after a match, but that's because the array-backed stream naturally runs to completion regardless of the match and gets auto-destroyed via autoDestroy once it truly ends - not because find() destroyed it. A still-open, indefinite source like the PassThrough above never gets destroyed at all.)

Anyone relying on the documented "the stream is destroyed" behavior to avoid manually cleaning up resources after find()/some()/every() on a long-lived stream (e.g. scanning a child process's stdout for a marker line) will leak that stream indefinitely - it stays alive, un-destroyed, holding onto internal buffers/decoders, until whatever holds a reference to it is garbage collected, with no explicit teardown ever occurring.

Metadata

Metadata

Assignees

No one assigned

    Labels

    streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Fields

    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions