-
-
Notifications
You must be signed in to change notification settings - Fork 35.4k
stream: add auto-destroy mode #22795
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
ccc94a9
38077e6
294e10c
a6eb82e
c40028a
7eb4e64
713a808
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1521,6 +1521,8 @@ changes: | |
| [`stream._destroy()`][writable-_destroy] method. | ||
| * `final` {Function} Implementation for the | ||
| [`stream._final()`][stream-_final] method. | ||
| * `autoDestroy` {boolean} Whether this stream should automatically call | ||
| .destroy() on itself after ending. | ||
|
|
||
| ```js | ||
| const { Writable } = require('stream'); | ||
|
|
@@ -1770,6 +1772,8 @@ constructor and implement the `readable._read()` method. | |
| method. | ||
| * `destroy` {Function} Implementation for the | ||
| [`stream._destroy()`][readable-_destroy] method. | ||
| * `autoDestroy` {boolean} Whether this stream should automatically call | ||
| .destroy() on itself after ending. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. |
||
|
|
||
| ```js | ||
| const { Readable } = require('stream'); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -117,6 +117,9 @@ function ReadableState(options, stream, isDuplex) { | |
| // Should close be emitted on destroy. Defaults to true. | ||
| this.emitClose = options.emitClose !== false; | ||
|
|
||
| // Should .destroy() be called after 'end' (and potentially 'finish') | ||
| this.autoDestroy = !!options.autoDestroy; | ||
|
|
||
| // has it been destroyed | ||
| this.destroyed = false; | ||
|
|
||
|
|
@@ -235,7 +238,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { | |
| if (!skipChunkCheck) | ||
| er = chunkInvalid(state, chunk); | ||
| if (er) { | ||
| stream.emit('error', er); | ||
| errorOrDestroy(stream, er); | ||
| } else if (state.objectMode || chunk && chunk.length > 0) { | ||
| if (typeof chunk !== 'string' && | ||
| !state.objectMode && | ||
|
|
@@ -245,11 +248,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { | |
|
|
||
| if (addToFront) { | ||
| if (state.endEmitted) | ||
| stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); | ||
| errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); | ||
| else | ||
| addChunk(stream, state, chunk, true); | ||
| } else if (state.ended) { | ||
| stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF()); | ||
| errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); | ||
| } else if (state.destroyed) { | ||
| return false; | ||
| } else { | ||
|
|
@@ -581,7 +584,7 @@ function maybeReadMore_(stream, state) { | |
| // for virtual (non-string, non-buffer) streams, "length" is somewhat | ||
| // arbitrary, and perhaps not very meaningful. | ||
| Readable.prototype._read = function(n) { | ||
| this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()')); | ||
| errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); | ||
| }; | ||
|
|
||
| Readable.prototype.pipe = function(dest, pipeOpts) { | ||
|
|
@@ -687,7 +690,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { | |
| unpipe(); | ||
| dest.removeListener('error', onerror); | ||
| if (EE.listenerCount(dest, 'error') === 0) | ||
| dest.emit('error', er); | ||
| errorOrDestroy(dest, er); | ||
| } | ||
|
|
||
| // Make sure our error handler is attached before userland ones. | ||
|
|
@@ -1084,6 +1087,28 @@ function endReadable(stream) { | |
| } | ||
| } | ||
|
|
||
| function writableAutoDestroy(wState) { | ||
| // In case of duplex streams we need a way to detect | ||
| // if the writable side is ready for autoDestroy as well | ||
| return !wState || (wState.autoDestroy && wState.finished); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this just be inlined? |
||
| } | ||
|
|
||
| function errorOrDestroy(stream, err) { | ||
| // We have tests that rely on errors being emitted | ||
| // in the same tick, so changing this is semver major. | ||
| // For now when you opt-in to autoDestroy we allow | ||
| // the error to be emitted nextTick. In a future | ||
| // semver major update we should change the default to this. | ||
|
|
||
| const rState = stream._readableState; | ||
| const wState = stream._writableState; | ||
|
|
||
| if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) | ||
| stream.destroy(err); | ||
| else | ||
| stream.emit('error', err); | ||
| } | ||
|
|
||
| function endReadableNT(state, stream) { | ||
| debug('endReadableNT', state.endEmitted, state.length); | ||
|
|
||
|
|
@@ -1092,5 +1117,9 @@ function endReadableNT(state, stream) { | |
| state.endEmitted = true; | ||
| stream.readable = false; | ||
| stream.emit('end'); | ||
|
|
||
| if (state.autoDestroy && writableAutoDestroy(stream._writableState)) { | ||
| stream.destroy(); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| 'use strict'; | ||
| const common = require('../common'); | ||
| const stream = require('stream'); | ||
|
|
||
| { | ||
| const r = new stream.Readable({ | ||
| autoDestroy: true, | ||
| read() { | ||
| this.push('hello'); | ||
| this.push('world'); | ||
| this.push(null); | ||
| }, | ||
| destroy: common.mustCall((err, cb) => cb()) | ||
| }); | ||
|
|
||
| r.resume(); | ||
| r.on('end', common.mustCall()); | ||
| r.on('close', common.mustCall()); | ||
| } | ||
|
|
||
| { | ||
| const w = new stream.Writable({ | ||
| autoDestroy: true, | ||
| write(data, enc, cb) { | ||
| cb(null); | ||
| }, | ||
| destroy: common.mustCall((err, cb) => cb()) | ||
| }); | ||
|
|
||
| w.write('hello'); | ||
| w.write('world'); | ||
| w.end(); | ||
|
|
||
| w.on('finish', common.mustCall()); | ||
| w.on('close', common.mustCall()); | ||
| } | ||
|
|
||
| { | ||
| const t = new stream.Transform({ | ||
| autoDestroy: true, | ||
| transform(data, enc, cb) { | ||
| cb(null, data); | ||
| }, | ||
| destroy: common.mustCall((err, cb) => cb()) | ||
| }); | ||
|
|
||
| t.write('hello'); | ||
| t.write('world'); | ||
| t.end(); | ||
|
|
||
| t.resume(); | ||
| t.on('end', common.mustCall()); | ||
| t.on('finish', common.mustCall()); | ||
| t.on('close', common.mustCall()); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we maybe check the order of the events/destroy callback as well? |
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.destroy()->`.destroy()`?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide the default value explicitly here?