Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
stream_base: dispatch reqs in the stream impl
Dispatch requests in the implementation of the stream, not in the code
creating these requests. The requests might be piled up and invoked
internally in the implementation, so it should know better when it is
the time to dispatch them.

In fact, TLS was doing exactly this thing which led us to...

Fix: #1512
  • Loading branch information
indutny committed Apr 29, 2015
commit 599218778dc4ca8581c4b7e7dae7db8ceff7e0f8
2 changes: 2 additions & 0 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
req_wrap->object()
};

req_wrap->Dispatched();
Local<Value> res =
MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv);

Expand All @@ -95,6 +96,7 @@ int JSStream::DoWrite(WriteWrap* w,
bufs_arr
};

w->Dispatched();
Local<Value> res =
MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv);

Expand Down
4 changes: 0 additions & 4 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
AfterShutdown);

int err = DoShutdown(req_wrap);
req_wrap->Dispatched();
if (err)
delete req_wrap;
return err;
Expand Down Expand Up @@ -181,7 +180,6 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (bufs != bufs_)
delete[] bufs;

req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));
req_wrap->object()->Set(env->bytes_string(),
Number::New(env->isolate(), bytes));
Expand Down Expand Up @@ -228,7 +226,6 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);

err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap->Dispatched();
req_wrap_obj->Set(env->async(), True(env->isolate()));

if (err)
Expand Down Expand Up @@ -347,7 +344,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
reinterpret_cast<uv_stream_t*>(send_handle));
}

req_wrap->Dispatched();
req_wrap->object()->Set(env->async(), True(env->isolate()));

if (err)
Expand Down
6 changes: 5 additions & 1 deletion src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,10 @@ void StreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {


int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) {
return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
int err;
err = uv_shutdown(&req_wrap->req_, stream(), AfterShutdown);
req_wrap->Dispatched();
return err;
}


Expand Down Expand Up @@ -353,6 +356,7 @@ int StreamWrap::DoWrite(WriteWrap* w,
}
}

w->Dispatched();
UpdateWriteQueueSize();

return r;
Expand Down
2 changes: 1 addition & 1 deletion src/tls_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,6 @@ void TLSWrap::EncOut() {
for (size_t i = 0; i < count; i++)
buf[i] = uv_buf_init(data[i], size[i]);
int err = stream_->DoWrite(write_req, buf, count, nullptr);
write_req->Dispatched();

// Ignore errors, this should be already handled in js
if (err) {
Expand Down Expand Up @@ -558,6 +557,7 @@ int TLSWrap::DoWrite(WriteWrap* w,

// Queue callback to execute it on next tick
write_item_queue_.PushBack(new WriteItem(w));
w->Dispatched();

// Write queued data
if (empty) {
Expand Down