-
-
Notifications
You must be signed in to change notification settings - Fork 35.4k
lib: remove queue implementation from JSStreamWrap #17918
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
The streams implementation generally ensures that only one write() call is active at a time. `JSStreamWrap` instances still kept queue of write reqeuests in spite of that; refactor it away. Also, fold `isAlive()` into a constant function on the native side.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,9 @@ const uv = process.binding('uv'); | |
| const debug = util.debuglog('stream_wrap'); | ||
| const errors = require('internal/errors'); | ||
|
|
||
| const kCurrentWriteRequest = Symbol('kCurrentWriteRequest'); | ||
| const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest'); | ||
|
|
||
| /* This class serves as a wrapper for when the C++ side of Node wants access | ||
| * to a standard JS stream. For example, TLS or HTTP do not operate on network | ||
| * resources conceptually, although that is the common case and what we are | ||
|
|
@@ -27,12 +30,15 @@ class JSStreamWrap extends Socket { | |
| debug('close'); | ||
| this.doClose(cb); | ||
| }; | ||
| handle.isAlive = () => this.isAlive(); | ||
| handle.isClosing = () => this.isClosing(); | ||
| handle.onreadstart = () => this.readStart(); | ||
| handle.onreadstop = () => this.readStop(); | ||
| handle.onshutdown = (req) => this.doShutdown(req); | ||
| handle.onwrite = (req, bufs) => this.doWrite(req, bufs); | ||
| // Inside of the following functions, `this` refers to the handle | ||
| // and `this.owner` refers to this JSStreamWrap instance. | ||
| handle.isClosing = function() { return this.owner.isClosing(); }; | ||
| handle.onreadstart = function() { return this.owner.readStart(); }; | ||
| handle.onreadstop = function() { return this.owner.readStop(); }; | ||
| handle.onshutdown = function(req) { return this.owner.doShutdown(req); }; | ||
| handle.onwrite = function(req, bufs) { | ||
| return this.owner.doWrite(req, bufs); | ||
| }; | ||
|
|
||
| stream.pause(); | ||
| stream.on('error', (err) => this.emit('error', err)); | ||
|
|
@@ -60,7 +66,10 @@ class JSStreamWrap extends Socket { | |
|
|
||
| super({ handle, manualStart: true }); | ||
| this.stream = stream; | ||
| this._list = null; | ||
| this[kCurrentWriteRequest] = null; | ||
| this[kCurrentShutdownRequest] = null; | ||
|
|
||
| // Start reading | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Dot at end. Also, somewhat superfluous comment.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don’t think it’s superfluous – I could imagine there’s quite a few who wouldn’t know why you’d ever want to call |
||
| this.read(0); | ||
| } | ||
|
|
||
|
|
@@ -69,10 +78,6 @@ class JSStreamWrap extends Socket { | |
| return JSStreamWrap; | ||
| } | ||
|
|
||
| isAlive() { | ||
| return true; | ||
| } | ||
|
|
||
| isClosing() { | ||
| return !this.readable || !this.writable; | ||
| } | ||
|
|
@@ -88,33 +93,56 @@ class JSStreamWrap extends Socket { | |
| } | ||
|
|
||
| doShutdown(req) { | ||
| assert.strictEqual(this[kCurrentShutdownRequest], null); | ||
| this[kCurrentShutdownRequest] = req; | ||
|
|
||
| // TODO(addaleax): It might be nice if we could get into a state where | ||
| // DoShutdown() is not called on streams while a write is still pending. | ||
| // | ||
| // Currently, the only part of the code base where that happens is the | ||
| // TLS implementation, which calls both DoWrite() and DoShutdown() on the | ||
| // underlying network stream inside of its own DoShutdown() method. | ||
| // Working around that on the native side is not quite trivial (yet?), | ||
| // so for now that is supported here. | ||
|
|
||
| if (this[kCurrentWriteRequest] !== null) | ||
| return this.on('drain', () => this.doShutdown(req)); | ||
| assert.strictEqual(this[kCurrentWriteRequest], null); | ||
|
|
||
| const handle = this._handle; | ||
| const item = this._enqueue('shutdown', req); | ||
|
|
||
| this.stream.end(() => { | ||
| // Ensure that write was dispatched | ||
| setImmediate(() => { | ||
| if (!this._dequeue(item)) | ||
| return; | ||
|
|
||
| handle.finishShutdown(req, 0); | ||
| this.finishShutdown(handle, 0); | ||
| }); | ||
| }); | ||
| return 0; | ||
| } | ||
|
|
||
| // handle === this._handle except when called from doClose(). | ||
| finishShutdown(handle, errCode) { | ||
| // The shutdown request might already have been cancelled. | ||
| if (this[kCurrentShutdownRequest] === null) | ||
| return; | ||
| const req = this[kCurrentShutdownRequest]; | ||
| this[kCurrentShutdownRequest] = null; | ||
| handle.finishShutdown(req, errCode); | ||
| } | ||
|
|
||
| doWrite(req, bufs) { | ||
| const self = this; | ||
| const handle = this._handle; | ||
| assert.strictEqual(this[kCurrentWriteRequest], null); | ||
| assert.strictEqual(this[kCurrentShutdownRequest], null); | ||
| this[kCurrentWriteRequest] = req; | ||
|
|
||
| var pending = bufs.length; | ||
| const handle = this._handle; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unused?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The linter would complain about that. :)
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not seeing it, though. Same for the
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It’s passed to the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, indeed. Unfortunate fold in the diff. |
||
| const self = this; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it still needed ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @billouboq it’s used inside the |
||
|
|
||
| // Queue the request to be able to cancel it | ||
| const item = this._enqueue('write', req); | ||
| let pending = bufs.length; | ||
|
|
||
| this.stream.cork(); | ||
| for (var n = 0; n < bufs.length; n++) | ||
| this.stream.write(bufs[n], done); | ||
| for (var i = 0; i < bufs.length; i++) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you're making stylistic changes anyway:
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not allowed to use
|
||
| this.stream.write(bufs[i], done); | ||
| this.stream.uncork(); | ||
|
|
||
| function done(err) { | ||
|
|
@@ -126,93 +154,42 @@ class JSStreamWrap extends Socket { | |
|
|
||
| let errCode = 0; | ||
| if (err) { | ||
| const code = uv[`UV_${err.code}`]; | ||
| errCode = (err.code && code) ? code : uv.UV_EPIPE; | ||
| errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE; | ||
| } | ||
|
|
||
| // Ensure that write was dispatched | ||
| setImmediate(function() { | ||
| // Do not invoke callback twice | ||
| if (!self._dequeue(item)) | ||
| return; | ||
|
|
||
| handle.finishWrite(req, errCode); | ||
| setImmediate(() => { | ||
| self.finishWrite(handle, errCode); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can avoid this closure completely with |
||
| }); | ||
| } | ||
|
|
||
| return 0; | ||
| } | ||
|
|
||
| _enqueue(type, req) { | ||
| const item = new QueueItem(type, req); | ||
| if (this._list === null) { | ||
| this._list = item; | ||
| return item; | ||
| } | ||
|
|
||
| item.next = this._list.next; | ||
| item.prev = this._list; | ||
| item.next.prev = item; | ||
| item.prev.next = item; | ||
|
|
||
| return item; | ||
| } | ||
|
|
||
| _dequeue(item) { | ||
| assert(item instanceof QueueItem); | ||
|
|
||
| var next = item.next; | ||
| var prev = item.prev; | ||
|
|
||
| if (next === null && prev === null) | ||
| return false; | ||
|
|
||
| item.next = null; | ||
| item.prev = null; | ||
|
|
||
| if (next === item) { | ||
| prev = null; | ||
| next = null; | ||
| } else { | ||
| prev.next = next; | ||
| next.prev = prev; | ||
| } | ||
|
|
||
| if (this._list === item) | ||
| this._list = next; | ||
| // handle === this._handle except when called from doClose(). | ||
| finishWrite(handle, errCode) { | ||
| // The write request might already have been cancelled. | ||
| if (this[kCurrentWriteRequest] === null) | ||
| return; | ||
| const req = this[kCurrentWriteRequest]; | ||
| this[kCurrentWriteRequest] = null; | ||
|
|
||
| return true; | ||
| handle.finishWrite(req, errCode); | ||
| } | ||
|
|
||
| doClose(cb) { | ||
| const handle = this._handle; | ||
|
|
||
| setImmediate(() => { | ||
| while (this._list !== null) { | ||
| const item = this._list; | ||
| const req = item.req; | ||
| this._dequeue(item); | ||
|
|
||
| const errCode = uv.UV_ECANCELED; | ||
| if (item.type === 'write') { | ||
| handle.finishWrite(req, errCode); | ||
| } else if (item.type === 'shutdown') { | ||
| handle.finishShutdown(req, errCode); | ||
| } | ||
| } | ||
|
|
||
| // Should be already set by net.js | ||
| assert.strictEqual(this._handle, null); | ||
|
|
||
| this.finishWrite(handle, uv.UV_ECANCELED); | ||
| this.finishShutdown(handle, uv.UV_ECANCELED); | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you remove this closure as well? |
||
| cb(); | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| function QueueItem(type, req) { | ||
| this.type = type; | ||
| this.req = req; | ||
| this.prev = this; | ||
| this.next = this; | ||
| } | ||
|
|
||
| module.exports = JSStreamWrap; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be slightly better to avoid the closures entirely and just make these top level functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jasnell done!