Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions src/js_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ void JSStream::OnReadImpl(ssize_t nread,
}


void* JSStream::Cast() {
return static_cast<void*>(this);
}


AsyncWrap* JSStream::GetAsyncWrap() {
return static_cast<AsyncWrap*>(this);
}
Expand Down Expand Up @@ -181,7 +176,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo<Value>& args) {
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As<Object>());

wrap->OnAfterWrite(w);
w->Done(0);
}


Expand Down
1 change: 0 additions & 1 deletion src/js_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class JSStream : public AsyncWrap, public StreamBase {

~JSStream();

void* Cast() override;
bool IsAlive() override;
bool IsClosing() override;
int ReadStart() override;
Expand Down
5 changes: 1 addition & 4 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -987,9 +987,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) {

WriteWrap* Http2Session::AllocateSend() {
HandleScope scope(env()->isolate());
auto AfterWrite = [](WriteWrap* req, int status) {
req->Dispose();
};
Local<Object> obj =
env()->write_wrap_constructor_function()
->NewInstance(env()->context()).ToLocalChecked();
Expand All @@ -999,7 +996,7 @@ WriteWrap* Http2Session::AllocateSend() {
session(),
NGHTTP2_SETTINGS_MAX_FRAME_SIZE);
// Max frame size + 9 bytes for the header
return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9);
return WriteWrap::New(env(), obj, stream_, size + 9);
}

void Http2Session::Send(WriteWrap* req, char* buf, size_t length) {
Expand Down
3 changes: 1 addition & 2 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -672,8 +672,7 @@ class Http2Stream : public AsyncWrap,
return false;
}

AsyncWrap* GetAsyncWrap() override { return static_cast<AsyncWrap*>(this); }
void* Cast() override { return reinterpret_cast<void*>(this); }
AsyncWrap* GetAsyncWrap() override { return this; }

int DoWrite(WriteWrap* w, uv_buf_t* bufs, size_t count,
uv_stream_t* send_handle) override;
Expand Down
12 changes: 10 additions & 2 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo<Value>& args) {
}


inline void ShutdownWrap::OnDone(int status) {
stream()->AfterShutdown(this, status);
}


WriteWrap* WriteWrap::New(Environment* env,
Local<Object> obj,
StreamBase* wrap,
DoneCb cb,
size_t extra) {
size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra;
char* storage = new char[storage_size];

return new(storage) WriteWrap(env, obj, wrap, cb, storage_size);
return new(storage) WriteWrap(env, obj, wrap, storage_size);
}


Expand All @@ -171,6 +175,10 @@ size_t WriteWrap::ExtraSize() const {
return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize);
}

inline void WriteWrap::OnDone(int status) {
stream()->AfterWrite(this, status);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
Expand Down
21 changes: 9 additions & 12 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
env->set_init_trigger_async_id(wrap->get_async_id());
ShutdownWrap* req_wrap = new ShutdownWrap(env,
req_wrap_obj,
this,
AfterShutdown);
this);

int err = DoShutdown(req_wrap);
if (err)
Expand All @@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {


void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();

// The wrap and request objects should still be there.
Expand All @@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) {
Local<Object> req_wrap_obj = req_wrap->object();
Local<Value> argv[3] = {
Integer::New(env->isolate(), status),
wrap->GetObject(),
GetObject(),
req_wrap_obj
};

Expand Down Expand Up @@ -158,7 +156,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap();
CHECK_NE(wrap, nullptr);
env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);

offset = 0;
if (!all_buffers) {
Expand Down Expand Up @@ -248,7 +246,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id());
// Allocate, or write rest
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite);
req_wrap = WriteWrap::New(env, req_wrap_obj, this);

err = DoWrite(req_wrap, bufs, count, nullptr);
req_wrap_obj->Set(env->async(), True(env->isolate()));
Expand Down Expand Up @@ -332,7 +330,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
wrap = GetAsyncWrap();
if (wrap != nullptr)
env->set_init_trigger_async_id(wrap->get_async_id());
req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, storage_size);
req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size);

data = req_wrap->Extra();

Expand Down Expand Up @@ -393,7 +391,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {


void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
StreamBase* wrap = req_wrap->wrap();
Environment* env = req_wrap->env();

HandleScope handle_scope(env->isolate());
Expand All @@ -405,19 +402,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) {
// Unref handle property
Local<Object> req_wrap_obj = req_wrap->object();
req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust();
wrap->OnAfterWrite(req_wrap);
OnAfterWrite(req_wrap, status);

Local<Value> argv[] = {
Integer::New(env->isolate(), status),
wrap->GetObject(),
GetObject(),
req_wrap_obj,
Undefined(env->isolate())
};

const char* msg = wrap->Error();
const char* msg = Error();
if (msg != nullptr) {
argv[3] = OneByteString(env->isolate(), msg);
wrap->ClearError();
ClearError();
}

if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust())
Expand Down
70 changes: 27 additions & 43 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,37 @@ namespace node {
// Forward declarations
class StreamBase;

template <class Req>
template<typename Base>
class StreamReq {
public:
typedef void (*DoneCb)(Req* req, int status);

explicit StreamReq(DoneCb cb) : cb_(cb) {
explicit StreamReq(StreamBase* stream) : stream_(stream) {
}

inline void Done(int status, const char* error_str = nullptr) {
Req* req = static_cast<Req*>(this);
Base* req = static_cast<Base*>(this);
Environment* env = req->env();
if (error_str != nullptr) {
req->object()->Set(env->error_string(),
OneByteString(env->isolate(), error_str));
}

cb_(req, status);
req->OnDone(status);
}

inline StreamBase* stream() const { return stream_; }

private:
DoneCb cb_;
StreamBase* const stream_;
};

class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
public StreamReq<ShutdownWrap> {
public:
ShutdownWrap(Environment* env,
v8::Local<v8::Object> req_wrap_obj,
StreamBase* wrap,
DoneCb cb)
StreamBase* stream)
: ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP),
StreamReq<ShutdownWrap>(cb),
wrap_(wrap) {
StreamReq<ShutdownWrap>(stream) {
Wrap(req_wrap_obj, this);
}

Expand All @@ -60,27 +58,22 @@ class ShutdownWrap : public ReqWrap<uv_shutdown_t>,
return ContainerOf(&ShutdownWrap::req_, req);
}

inline StreamBase* wrap() const { return wrap_; }
size_t self_size() const override { return sizeof(*this); }

private:
StreamBase* const wrap_;
inline void OnDone(int status); // Just calls stream()->AfterShutdown()
};

class WriteWrap: public ReqWrap<uv_write_t>,
public StreamReq<WriteWrap> {
class WriteWrap : public ReqWrap<uv_write_t>,
public StreamReq<WriteWrap> {
public:
static inline WriteWrap* New(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb,
StreamBase* stream,
size_t extra = 0);
inline void Dispose();
inline char* Extra(size_t offset = 0);
inline size_t ExtraSize() const;

inline StreamBase* wrap() const { return wrap_; }

size_t self_size() const override { return storage_size_; }

static WriteWrap* from_req(uv_write_t* req) {
Expand All @@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap<uv_write_t>,

WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb)
StreamBase* stream)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb),
wrap_(wrap),
StreamReq<WriteWrap>(stream),
storage_size_(0) {
Wrap(obj, this);
}

inline void OnDone(int status); // Just calls stream()->AfterWrite()

protected:
WriteWrap(Environment* env,
v8::Local<v8::Object> obj,
StreamBase* wrap,
DoneCb cb,
StreamBase* stream,
size_t storage_size)
: ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP),
StreamReq<WriteWrap>(cb),
wrap_(wrap),
StreamReq<WriteWrap>(stream),
storage_size_(storage_size) {
Wrap(obj, this);
}
Expand All @@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap<uv_write_t>,
// WriteWrap. Ensure this never happens.
void operator delete(void* ptr) { UNREACHABLE(); }

StreamBase* const wrap_;
const size_t storage_size_;
};

Expand All @@ -151,7 +141,7 @@ class StreamResource {
void* ctx;
};

typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx);
typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx);
typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx);
typedef void (*ReadCb)(ssize_t nread,
const uv_buf_t* buf,
Expand All @@ -176,9 +166,9 @@ class StreamResource {
virtual void ClearError();

// Events
inline void OnAfterWrite(WriteWrap* w) {
inline void OnAfterWrite(WriteWrap* w, int status) {
if (!after_write_cb_.is_empty())
after_write_cb_.fn(w, after_write_cb_.ctx);
after_write_cb_.fn(w, status, after_write_cb_.ctx);
}

inline void OnAlloc(size_t size, uv_buf_t* buf) {
Expand Down Expand Up @@ -208,14 +198,12 @@ class StreamResource {
inline Callback<ReadCb> read_cb() { return read_cb_; }
inline Callback<DestructCb> destruct_cb() { return destruct_cb_; }

private:
protected:
Callback<AfterWriteCb> after_write_cb_;
Callback<AllocCb> alloc_cb_;
Callback<ReadCb> read_cb_;
Callback<DestructCb> destruct_cb_;
uint64_t bytes_read_;

friend class StreamBase;
};

class StreamBase : public StreamResource {
Expand All @@ -231,7 +219,6 @@ class StreamBase : public StreamResource {
v8::Local<v8::FunctionTemplate> target,
int flags = kFlagNone);

virtual void* Cast() = 0;
virtual bool IsAlive() = 0;
virtual bool IsClosing() = 0;
virtual bool IsIPCPipe();
Expand All @@ -250,13 +237,14 @@ class StreamBase : public StreamResource {
consumed_ = false;
}

template <class Outer>
inline Outer* Cast() { return static_cast<Outer*>(Cast()); }

void EmitData(ssize_t nread,
v8::Local<v8::Object> buf,
v8::Local<v8::Object> handle);

// These are called by the respective {Write,Shutdown}Wrap class.
virtual void AfterShutdown(ShutdownWrap* req, int status);
virtual void AfterWrite(WriteWrap* req, int status);

protected:
explicit StreamBase(Environment* env) : env_(env), consumed_(false) {
}
Expand All @@ -267,10 +255,6 @@ class StreamBase : public StreamResource {
virtual AsyncWrap* GetAsyncWrap() = 0;
virtual v8::Local<v8::Object> GetObject();

// Libuv callbacks
static void AfterShutdown(ShutdownWrap* req, int status);
static void AfterWrite(WriteWrap* req, int status);

// JS Methods
int ReadStart(const v8::FunctionCallbackInfo<v8::Value>& args);
int ReadStop(const v8::FunctionCallbackInfo<v8::Value>& args);
Expand Down
Loading