Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
net: track bytesWritten in C++ land
Move tracking of `socket.bytesWritten` to C++ land.

This makes it easier to provide this functionality for all
`StreamBase` instances, and in particular should keep working
when they have been 'consumed' in C++ in some way (e.g. for
the network sockets that are underlying to TLS or HTTP2 streams).

Also, this parallels `socket.bytesRead` a lot more now.
  • Loading branch information
addaleax committed Mar 29, 2018
commit 35b7e2f08f565fd67718bacfbccd6f8c208f260a
23 changes: 15 additions & 8 deletions lib/net.js
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ function normalizeArgs(args) {
// called when creating new Socket, or when re-using a closed Socket
function initSocketHandle(self) {
self._undestroy();
self._bytesDispatched = 0;
self._sockname = null;

// Handle creation may be deferred to bind() or connect() time.
Expand All @@ -222,7 +221,8 @@ function initSocketHandle(self) {
}


const BYTES_READ = Symbol('bytesRead');
const kBytesRead = Symbol('kBytesRead');
const kBytesWritten = Symbol('kBytesWritten');


function Socket(options) {
Expand Down Expand Up @@ -316,7 +316,8 @@ function Socket(options) {
this._server = null;

// Used after `.destroy()`
this[BYTES_READ] = 0;
this[kBytesRead] = 0;
this[kBytesWritten] = 0;
}
util.inherits(Socket, stream.Duplex);

Expand Down Expand Up @@ -588,8 +589,9 @@ Socket.prototype._destroy = function(exception, cb) {
if (this !== process.stderr)
debug('close handle');
var isException = exception ? true : false;
// `bytesRead` should be accessible after `.destroy()`
this[BYTES_READ] = this._handle.bytesRead;
// `bytesRead` and `kBytesWritten` should be accessible after `.destroy()`
this[kBytesRead] = this._handle.bytesRead;
this[kBytesWritten] = this._handle.bytesWritten;

this._handle.close(() => {
debug('emit close');
Expand Down Expand Up @@ -689,7 +691,7 @@ function protoGetter(name, callback) {
}

protoGetter('bytesRead', function bytesRead() {
return this._handle ? this._handle.bytesRead : this[BYTES_READ];
return this._handle ? this._handle.bytesRead : this[kBytesRead];
});

protoGetter('remoteAddress', function remoteAddress() {
Expand Down Expand Up @@ -761,8 +763,6 @@ Socket.prototype._writeGeneric = function(writev, data, encoding, cb) {
// Bail out if handle.write* returned an error
if (ret) return ret;

this._bytesDispatched += req.bytes;

if (!req.async) {
cb();
return;
Expand All @@ -782,6 +782,13 @@ Socket.prototype._write = function(data, encoding, cb) {
this._writeGeneric(false, data, encoding, cb);
};


// Legacy alias. Having this is probably being overly cautious, but it doesn't
// really hurt anyone either. This can probably be removed safely if desired.
protoGetter('_bytesDispatched', function _bytesDispatched() {
return this._handle ? this._handle.bytesWritten : this[kBytesWritten];
});

protoGetter('bytesWritten', function bytesWritten() {
var bytes = this._bytesDispatched;
const state = this._writableState;
Expand Down
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ struct PackageConfig {
V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \
V(bytes_written_string, "bytesWritten") \
V(cached_data_string, "cachedData") \
V(cached_data_produced_string, "cachedDataProduced") \
V(cached_data_rejected_string, "cachedDataRejected") \
Expand Down
28 changes: 27 additions & 1 deletion src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ inline StreamWriteResult StreamBase::Write(
v8::Local<v8::Object> req_wrap_obj) {
Environment* env = stream_env();
int err;

for (size_t i = 0; i < count; ++i)
bytes_written_ += bufs[i].len;

if (send_handle == nullptr) {
err = DoTryWrite(&bufs, &count);
if (err != 0 || count == 0) {
Expand Down Expand Up @@ -301,6 +305,12 @@ void StreamBase::AddMethods(Environment* env,
env->as_external(),
signature);

Local<FunctionTemplate> get_bytes_written_templ =
FunctionTemplate::New(env->isolate(),
GetBytesWritten<Base>,
env->as_external(),
signature);

t->PrototypeTemplate()->SetAccessorProperty(env->fd_string(),
get_fd_templ,
Local<FunctionTemplate>(),
Expand All @@ -316,6 +326,11 @@ void StreamBase::AddMethods(Environment* env,
Local<FunctionTemplate>(),
attributes);

t->PrototypeTemplate()->SetAccessorProperty(env->bytes_written_string(),
get_bytes_written_templ,
Local<FunctionTemplate>(),
attributes);

env->SetProtoMethod(t, "readStart", JSMethod<Base, &StreamBase::ReadStartJS>);
env->SetProtoMethod(t, "readStop", JSMethod<Base, &StreamBase::ReadStopJS>);
if ((flags & kFlagNoShutdown) == 0)
Expand Down Expand Up @@ -357,7 +372,6 @@ void StreamBase::GetFD(const FunctionCallbackInfo<Value>& args) {

template <class Base>
void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
// The handle instance hasn't been set. So no bytes could have been read.
Base* handle;
ASSIGN_OR_RETURN_UNWRAP(&handle,
args.This(),
Expand All @@ -368,6 +382,18 @@ void StreamBase::GetBytesRead(const FunctionCallbackInfo<Value>& args) {
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_read_));
}

template <class Base>
void StreamBase::GetBytesWritten(const FunctionCallbackInfo<Value>& args) {
Base* handle;
ASSIGN_OR_RETURN_UNWRAP(&handle,
args.This(),
args.GetReturnValue().Set(0));

StreamBase* wrap = static_cast<StreamBase*>(handle);
// uint64_t -> double. 53bits is enough for all real cases.
args.GetReturnValue().Set(static_cast<double>(wrap->bytes_written_));
}

template <class Base>
void StreamBase::GetExternal(const FunctionCallbackInfo<Value>& args) {
Base* handle;
Expand Down
1 change: 1 addition & 0 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
uv_buf_t* bufs = &buf;
size_t count = 1;
err = DoTryWrite(&bufs, &count);
bytes_written_ += data_size;

// Immediate failure or success
if (err != 0 || count == 0) {
Expand Down
4 changes: 4 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ class StreamResource {

StreamListener* listener_ = nullptr;
uint64_t bytes_read_ = 0;
uint64_t bytes_written_ = 0;

friend class StreamListener;
};
Expand Down Expand Up @@ -324,6 +325,9 @@ class StreamBase : public StreamResource {
template <class Base>
static void GetBytesRead(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base>
static void GetBytesWritten(const v8::FunctionCallbackInfo<v8::Value>& args);

template <class Base,
int (StreamBase::*Method)(
const v8::FunctionCallbackInfo<v8::Value>& args)>
Expand Down