Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixup
  • Loading branch information
ronag committed Jun 21, 2020
commit fbe7a31967c54c6180832c9d7d1c97e11c0e6bdd
19 changes: 12 additions & 7 deletions lib/_stream_writable.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ const {
module.exports = Writable;

const Stream = require('stream');
const { WritableBase } = require('internal/streams/base');
const {
WritableBase,
WritableStateBase,
errorOrDestroy
} = require('internal/streams/base');

const {
getHighWaterMark,
Expand All @@ -54,7 +58,7 @@ const {
function nop() {}

const kFlush = Symbol('kFlush');
class WritableState extends WritableBase.WritableState {
class WritableState extends WritableStateBase {
constructor(options, stream, isDuplex) {
super(options);

Expand Down Expand Up @@ -203,11 +207,12 @@ function Writable(options) {

WritableBase.call(this, {
...options,
start: (stream, state) => {
start: function() {
const state = this._writableState;
if (!state.writing) {
clearBuffer(stream, state);
clearBuffer(this, state);
}
finishMaybe(stream, state);
finishMaybe(this, state);
},
write: writeOrBuffer,
flush: function(state, cb) {
Expand Down Expand Up @@ -305,7 +310,7 @@ function onwriteError(stream, state, er, cb) {
// writes.
errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
// This can emit error, but error must always follow cb.
WritableBase.errorOrDestroy(stream, er);
errorOrDestroy(stream, er);
}

function onwrite(stream, er) {
Expand All @@ -314,7 +319,7 @@ function onwrite(stream, er) {
const cb = state.writecb;

if (typeof cb !== 'function') {
WritableBase.errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
return;
}

Expand Down
21 changes: 15 additions & 6 deletions lib/internal/streams/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const {
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_ALREADY_FINISHED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING
Expand Down Expand Up @@ -74,6 +75,10 @@ class WritableStateBase {

// Indicates whether the stream has finished destroying.
this.closed = false;

// True if close has been emitted or would have been emitted
// depending on emitClose.
this.closeEmitted = false;
}
}

Expand All @@ -98,8 +103,6 @@ function WritableBase(options, isDuplex, State) {
destroyImpl.construct(this, options.start);
}
WritableBase.prototype = ObjectCreate(Stream.prototype);
WritableBase.errorOrDestroy = errorOrDestroy;
WritableBase.WritableState = WritableStateBase;
WritableBase.prototype.write = function(chunk, encoding, cb) {
const state = this._writableState;

Expand All @@ -108,6 +111,8 @@ WritableBase.prototype.write = function(chunk, encoding, cb) {
encoding = state.defaultEncoding;
} else if (!encoding) {
encoding = state.defaultEncoding;
} else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) {
throw new ERR_UNKNOWN_ENCODING(encoding);
}

if (chunk === null) {
Expand Down Expand Up @@ -178,8 +183,10 @@ WritableBase.prototype.end = function(chunk, encoding, cb) {
encoding = null;
}

if (chunk !== null && chunk !== undefined)
if (chunk !== null && chunk !== undefined) {
// TODO (ronag): Propagate callback error.
this.write(chunk, encoding);
}

// This is forgiving in terms of unnecessary calls to end() and can hide
// logic errors. However, usually such errors are harmless and causing a
Expand All @@ -191,7 +198,7 @@ WritableBase.prototype.end = function(chunk, encoding, cb) {
let called = false;
this[kFlush](state, (err) => {
if (called) {
// TODO(ronag): ERR_MULTIPLE_CALLBACK?
errorOrDestroy(this, new ERR_MULTIPLE_CALLBACK());
return;
}
called = true;
Expand Down Expand Up @@ -225,7 +232,7 @@ WritableBase.prototype.end = function(chunk, encoding, cb) {
function finish(stream, state) {
// TODO(ronag): state.closed, state.errored, state.destroyed?

if (state.errorEmitted)
if (state.errorEmitted || state.closeEmitted)
return;

// TODO(ronag): This could occur after 'close' is emitted.
Expand Down Expand Up @@ -329,5 +336,7 @@ ObjectDefineProperties(WritableBase.prototype, {
});

module.exports = {
WritableBase
WritableBase,
WritableStateBase,
errorOrDestroy
};
10 changes: 5 additions & 5 deletions test/parallel/test-stream-writable-write-error.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ function test(autoDestroy) {
{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
w.end();
expectError(w, ['asd'], 'ERR_STREAM_WRITE_AFTER_END');
Expand All @@ -40,23 +40,23 @@ function test(autoDestroy) {
{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
w.destroy();
}

{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
expectError(w, [null], 'ERR_STREAM_NULL_VALUES', true);
}

{
const w = new Writable({
autoDestroy,
_write() {}
write() {}
});
expectError(w, [{}], 'ERR_INVALID_ARG_TYPE', true);
}
Expand All @@ -65,7 +65,7 @@ function test(autoDestroy) {
const w = new Writable({
decodeStrings: false,
autoDestroy,
_write() {}
write() {}
});
expectError(w, ['asd', 'noencoding'], 'ERR_UNKNOWN_ENCODING', true);
}
Expand Down