Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
quic: use async _construct for QuicStream
  • Loading branch information
jasnell committed Jul 23, 2020
commit c3a33beb2ca6c095254a54ff50079cc72a2ab402
5 changes: 3 additions & 2 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1047,8 +1047,9 @@ added: REPLACEME

Returns a `Promise` that resolves a new `QuicStream`.

The `Promise` will be rejected if the `QuicSession` has been destroyed or is in
the process of a graceful shutdown.
The `Promise` will be rejected if the `QuicSession` has been destroyed, is in
the process of a graceful shutdown, or the `QuicSession` is otherwise blocked
from opening a new stream.

#### `quicsession.ping()`
<!--YAML
Expand Down
117 changes: 83 additions & 34 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const {
QLogStream,
} = require('internal/quic/util');
const assert = require('internal/assert');
const EventEmitter = require('events');
const { EventEmitter, once } = require('events');
const fs = require('fs');
const fsPromisesInternal = require('internal/fs/promises');
const { Duplex } = require('stream');
Expand Down Expand Up @@ -226,6 +226,7 @@ const kMaybeBind = Symbol('kMaybeBind');
const kOnFileOpened = Symbol('kOnFileOpened');
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead');
const kReady = Symbol('kReady');
const kRemoveSession = Symbol('kRemove');
const kRemoveStream = Symbol('kRemoveStream');
const kServerBusy = Symbol('kServerBusy');
Expand Down Expand Up @@ -2167,30 +2168,15 @@ class QuicSession extends EventEmitter {
defaultEncoding,
} = validateQuicStreamOptions(options);

await this[kHandshakeComplete]();

if (this.destroyed) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is already destroyed`);
}
if (this.closing) {
throw new ERR_INVALID_STATE(
`${this.constructor.name} is closing`);
}

const handle =
halfOpen ?
_openUnidirectionalStream(this[kHandle]) :
_openBidirectionalStream(this[kHandle]);

if (handle === undefined)
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');

return new QuicStream({
const stream = new QuicStream({
highWaterMark,
defaultEncoding,
readable: !halfOpen
}, this, handle);
}, this);

await once(stream, kReady);

return stream;
}

get duration() {
Expand Down Expand Up @@ -2532,6 +2518,7 @@ function streamOnPause() {
this[kHandle].readStop();
}
class QuicStream extends Duplex {
#count = 0;
[kInternalState] = {
closed: false,
closePromise: undefined,
Expand All @@ -2547,6 +2534,7 @@ class QuicStream extends Duplex {
dataRateHistogram: undefined,
dataSizeHistogram: undefined,
dataAckHistogram: undefined,
ready: false,
sharedState: undefined,
stats: undefined,
};
Expand Down Expand Up @@ -2578,7 +2566,45 @@ class QuicStream extends Duplex {
this._readableState.readingMore = true;
this.on('pause', streamOnPause);

this[kSetHandle](handle);
if (handle !== undefined)
this[kSetHandle](handle);
}

async _construct(callback) {
try {
if (this[kInternalState].ready)
return callback();

// Handle is already initialized
const unidirectional = !this.readable;

await this.session[kHandshakeComplete]();

if (this.destroyed) {
throw new ERR_INVALID_STATE('QuicStream was destroyed');
}
if (this.session.destroyed) {
throw new ERR_INVALID_STATE(
`${this.session.constructor.name} was destroyed`);
}
if (this.session.closing) {
throw new ERR_INVALID_STATE(
`${this.session.constructor.name} is closing`);
}

const handle =
unidirectional ?
_openUnidirectionalStream(this.session[kHandle]) :
_openBidirectionalStream(this.session[kHandle]);

if (handle === undefined)
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');

this[kSetHandle](handle);
callback();
} catch (error) {
callback(error);
}
}

// Set handle is called once the QuicSession has been able
Expand All @@ -2589,6 +2615,8 @@ class QuicStream extends Duplex {
// written will be buffered until kSetHandle is called.
[kSetHandle](handle) {
const state = this[kInternalState];
const current = this[kHandle];
this[kHandle] = handle;
if (handle !== undefined) {
handle.onread = onStreamRead;
handle[owner_symbol] = this;
Expand All @@ -2599,11 +2627,13 @@ class QuicStream extends Duplex {
state.dataAckHistogram = new Histogram(handle.ack);
state.sharedState = new QuicStreamSharedState(handle.state);
state.session[kAddStream](state.id, this);
state.ready = true;
this.emit(kReady);
} else {
if (this[kHandle] !== undefined) {
this[kHandle].stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
if (current !== undefined) {
current.stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
process.hrtime.bigint();
state.stats = new BigInt64Array(this[kHandle].stats);
state.stats = new BigInt64Array(current.stats);
}
state.sharedState = undefined;
if (state.dataRateHistogram)
Expand All @@ -2613,7 +2643,6 @@ class QuicStream extends Duplex {
if (state.dataAckHistogram)
state.dataAckHistogram[kDestroyHistogram]();
}
this[kHandle] = handle;
}

[kStreamReset](code) {
Expand Down Expand Up @@ -2643,6 +2672,8 @@ class QuicStream extends Duplex {
this.end();
}

// TODO(@jasnell): Investigate later if a Promise version
// of finished() can work here instead.
return promise;
Comment thread
jasnell marked this conversation as resolved.
Outdated
}

Expand All @@ -2663,6 +2694,7 @@ class QuicStream extends Duplex {
else if (typeof state.closePromiseResolve === 'function')
state.closePromiseResolve();

// TODO(@jasnell): Investigate how we can eliminate the nextTick here
process.nextTick(() => callback(error));
Comment thread
jasnell marked this conversation as resolved.
Outdated
}

Expand Down Expand Up @@ -2754,7 +2786,7 @@ class QuicStream extends Duplex {
}

[kWriteGeneric](writev, data, encoding, cb) {
if (this.destroyed)
if (this.destroyed || this.detached)
return; // TODO(addaleax): Can this happen?

this[kUpdateTimer]();
Expand Down Expand Up @@ -2829,6 +2861,8 @@ class QuicStream extends Duplex {
}

sendFile(path, options = {}) {
if (this.detached)
throw new ERR_INVALID_STATE('Unable to send file');
fs.open(path, 'r', QuicStream[kOnFileOpened].bind(this, options));
}

Expand Down Expand Up @@ -2856,6 +2890,9 @@ class QuicStream extends Duplex {
if (this.destroyed || this[kInternalState].closed)
return;

if (this.detached)
throw new ERR_INVALID_STATE('Unable to send file descriptor');

validateInteger(offset, 'options.offset', /* min */ -1);
validateInteger(length, 'options.length', /* min */ -1);

Expand Down Expand Up @@ -2947,6 +2984,12 @@ class QuicStream extends Duplex {
if (this.destroyed)
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached) {
throw new ERR_INVALID_STATE(
'Push stream could not be opened on this QuicSession. ' +
'Push is either disabled or currently blocked.');
}

const state = this[kInternalState];
const {
highWaterMark = state.highWaterMark,
Expand Down Expand Up @@ -2995,9 +3038,11 @@ class QuicStream extends Duplex {
}

submitInformationalHeaders(headers = {}) {
// TODO(@jasnell): Likely better to throw here instead of return false
if (this.destroyed)
return false;
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached)
throw new ERR_INVALID_STATE('Unable to submit headers');

validateObject(headers, 'headers');

Expand Down Expand Up @@ -3025,9 +3070,11 @@ class QuicStream extends Duplex {
}

submitInitialHeaders(headers = {}, options = {}) {
// TODO(@jasnell): Likely better to throw here instead of return false
if (this.destroyed)
return false;
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached)
throw new ERR_INVALID_STATE('Unable to submit headers');

const { terminal } = { ...options };

Expand Down Expand Up @@ -3062,9 +3109,11 @@ class QuicStream extends Duplex {
}

submitTrailingHeaders(headers = {}) {
// TODO(@jasnell): Likely better to throw here instead of return false
if (this.destroyed)
return false;
throw new ERR_INVALID_STATE('QuicStream is already destroyed');

if (this.detached)
throw new ERR_INVALID_STATE('Unable to submit headers');

validateObject(headers, 'headers');

Expand Down
1 change: 1 addition & 0 deletions src/quic/node_quic_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ int QuicBuffer::DoPull(
size_t len = 0;
size_t numbytes = 0;
int status = bob::Status::STATUS_CONTINUE;

// There's no data to read.
if (!remaining() || head_ == nullptr) {
status = is_ended() ?
Expand Down
2 changes: 1 addition & 1 deletion src/quic/node_quic_session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1998,7 +1998,7 @@ bool QuicSession::ReceiveStreamData(
const uint8_t* data,
size_t datalen,
uint64_t offset) {
auto leave = OnScopeLeave([=]() {
auto leave = OnScopeLeave([&]() {
// Unconditionally extend the flow control window for the entire
// session but not for the individual Stream.
ExtendOffset(datalen);
Expand Down
1 change: 1 addition & 0 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
Local<Object> req_wrap_obj = args[0].As<Object>();

return Shutdown(req_wrap_obj);
}

Expand Down