Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
src: split out callback queue implementation from Environment
This isn’t conceptually tied to anything Node.js-specific at all.
  • Loading branch information
addaleax committed May 6, 2020
commit a80476179eb0f4c47837ca5630235bf5e995fbfb
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,8 @@
'src/base_object.h',
'src/base_object-inl.h',
'src/base64.h',
'src/callback_queue.h',
'src/callback_queue-inl.h',
'src/connect_wrap.h',
'src/connection_wrap.h',
'src/debug_utils.h',
Expand Down
96 changes: 96 additions & 0 deletions src/callback_queue-inl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#ifndef SRC_CALLBACK_QUEUE_INL_H_
#define SRC_CALLBACK_QUEUE_INL_H_

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "callback_queue.h"

namespace node {

template <typename R, typename... Args>
template <typename Fn>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::CreateCallback(Fn&& fn, bool refed) {
return std::make_unique<CallbackImpl<Fn>>(std::move(fn), refed);
}

template <typename R, typename... Args>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::Shift() {
std::unique_ptr<Callback> ret = std::move(head_);
if (ret) {
head_ = ret->get_next();
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::Push(std::unique_ptr<Callback> cb) {
Callback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(cb));
else
head_ = std::move(cb);
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::ConcatMove(CallbackQueue<R, Args...>&& other) {
size_ += other.size_;
if (tail_ != nullptr)
tail_->set_next(std::move(other.head_));
else
head_ = std::move(other.head_);
tail_ = other.tail_;
other.tail_ = nullptr;
other.size_ = 0;
}

template <typename R, typename... Args>
size_t CallbackQueue<R, Args...>::size() const {
return size_.load();
}

template <typename R, typename... Args>
CallbackQueue<R, Args...>::Callback::Callback(bool refed)
: refed_(refed) {}

template <typename R, typename... Args>
bool CallbackQueue<R, Args...>::Callback::is_refed() const {
return refed_;
}

template <typename R, typename... Args>
std::unique_ptr<typename CallbackQueue<R, Args...>::Callback>
CallbackQueue<R, Args...>::Callback::get_next() {
return std::move(next_);
}

template <typename R, typename... Args>
void CallbackQueue<R, Args...>::Callback::set_next(std::unique_ptr<Callback> next) {
next_ = std::move(next);
}

template <typename R, typename... Args>
template <typename Fn>
CallbackQueue<R, Args...>::CallbackImpl<Fn>::CallbackImpl(
Fn&& callback, bool refed)
: Callback(refed),
callback_(std::move(callback)) {}

template <typename R, typename... Args>
template <typename Fn>
R CallbackQueue<R, Args...>::CallbackImpl<Fn>::Call(Args&&... args) {
return callback_(std::forward<Args>(args)...);
}

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#endif // SRC_CALLBACK_QUEUE_INL_H_
64 changes: 64 additions & 0 deletions src/callback_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#ifndef SRC_CALLBACK_QUEUE_H_
#define SRC_CALLBACK_QUEUE_H_

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include <atomic>

namespace node {

template <typename R, typename... Args>
Comment thread
addaleax marked this conversation as resolved.
class CallbackQueue {
public:
class Callback {
public:
explicit inline Callback(bool refed);

virtual ~Callback() = default;
virtual R Call(Args&&... args) = 0;

inline bool is_refed() const;

private:
inline std::unique_ptr<Callback> get_next();
inline void set_next(std::unique_ptr<Callback> next);

bool refed_;
std::unique_ptr<Callback> next_;

friend class CallbackQueue;
};

template <typename Fn>
inline std::unique_ptr<Callback> CreateCallback(Fn&& fn, bool refed);

inline std::unique_ptr<Callback> Shift();
inline void Push(std::unique_ptr<Callback> cb);
// ConcatMove adds elements from 'other' to the end of this list, and clears
// 'other' afterwards.
inline void ConcatMove(CallbackQueue&& other);

// size() is atomic and may be called from any thread.
inline size_t size() const;

private:
template <typename Fn>
class CallbackImpl final : public Callback {
public:
CallbackImpl(Fn&& callback, bool refed);
R Call(Args&&... args) override;

private:
Fn callback_;
};

std::atomic<size_t> size_ {0};
std::unique_ptr<Callback> head_;
Callback* tail_ = nullptr;
};

} // namespace node

#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#endif // SRC_CALLBACK_QUEUE_H_
80 changes: 6 additions & 74 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "aliased_buffer.h"
#include "callback_queue-inl.h"
#include "env.h"
#include "node.h"
#include "util-inl.h"
Expand Down Expand Up @@ -705,50 +706,9 @@ inline void IsolateData::set_options(
options_ = std::move(options);
}

std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateQueue::Shift() {
std::unique_ptr<Environment::NativeImmediateCallback> ret = std::move(head_);
if (ret) {
head_ = ret->get_next();
if (!head_)
tail_ = nullptr; // The queue is now empty.
}
size_--;
return ret;
}

void Environment::NativeImmediateQueue::Push(
std::unique_ptr<Environment::NativeImmediateCallback> cb) {
NativeImmediateCallback* prev_tail = tail_;

size_++;
tail_ = cb.get();
if (prev_tail != nullptr)
prev_tail->set_next(std::move(cb));
else
head_ = std::move(cb);
}

void Environment::NativeImmediateQueue::ConcatMove(
NativeImmediateQueue&& other) {
size_ += other.size_;
if (tail_ != nullptr)
tail_->set_next(std::move(other.head_));
else
head_ = std::move(other.head_);
tail_ = other.tail_;
other.tail_ = nullptr;
other.size_ = 0;
}

size_t Environment::NativeImmediateQueue::size() const {
return size_.load();
}

template <typename Fn>
void Environment::CreateImmediate(Fn&& cb, bool ref) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), ref);
auto callback = native_immediates_.CreateCallback(std::move(cb), ref);
native_immediates_.Push(std::move(callback));
}

Expand All @@ -768,8 +728,8 @@ void Environment::SetUnrefImmediate(Fn&& cb) {

template <typename Fn>
void Environment::SetImmediateThreadsafe(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
auto callback =
native_immediates_threadsafe_.CreateCallback(std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_threadsafe_.Push(std::move(callback));
Expand All @@ -780,8 +740,8 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {

template <typename Fn>
void Environment::RequestInterrupt(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
auto callback =
native_immediates_interrupts_.CreateCallback(std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
Expand All @@ -791,34 +751,6 @@ void Environment::RequestInterrupt(Fn&& cb) {
RequestInterruptFromV8();
}

Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}

bool Environment::NativeImmediateCallback::is_refed() const {
return refed_;
}

std::unique_ptr<Environment::NativeImmediateCallback>
Environment::NativeImmediateCallback::get_next() {
return std::move(next_);
}

void Environment::NativeImmediateCallback::set_next(
std::unique_ptr<NativeImmediateCallback> next) {
next_ = std::move(next);
}

template <typename Fn>
Environment::NativeImmediateCallbackImpl<Fn>::NativeImmediateCallbackImpl(
Fn&& callback, bool refed)
: NativeImmediateCallback(refed),
callback_(std::move(callback)) {}

template <typename Fn>
void Environment::NativeImmediateCallbackImpl<Fn>::Call(Environment* env) {
callback_(env);
}

inline bool Environment::can_call_into_js() const {
return can_call_into_js_ && !is_stopping();
}
Expand Down
5 changes: 2 additions & 3 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ void Environment::RunAndClearInterrupts() {
}
DebugSealHandleScope seal_handle_scope(isolate());

while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
while (auto head = queue.Shift())
head->Call(this);
}
}
Expand All @@ -755,8 +755,7 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
auto drain_list = [&]() {
TryCatchScope try_catch(this);
DebugSealHandleScope seal_handle_scope(isolate());
while (std::unique_ptr<NativeImmediateCallback> head =
native_immediates_.Shift()) {
while (auto head = native_immediates_.Shift()) {
if (head->is_refed())
ref_count++;

Expand Down
45 changes: 2 additions & 43 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "inspector_agent.h"
#include "inspector_profiler.h"
#endif
#include "callback_queue.h"
#include "debug_utils.h"
#include "handle_wrap.h"
#include "node.h"
Expand Down Expand Up @@ -1368,49 +1369,7 @@ class Environment : public MemoryRetainer {

std::list<ExitCallback> at_exit_functions_;

class NativeImmediateCallback {
public:
explicit inline NativeImmediateCallback(bool refed);

virtual ~NativeImmediateCallback() = default;
virtual void Call(Environment* env) = 0;

inline bool is_refed() const;
inline std::unique_ptr<NativeImmediateCallback> get_next();
inline void set_next(std::unique_ptr<NativeImmediateCallback> next);

private:
bool refed_;
std::unique_ptr<NativeImmediateCallback> next_;
};

template <typename Fn>
class NativeImmediateCallbackImpl final : public NativeImmediateCallback {
public:
NativeImmediateCallbackImpl(Fn&& callback, bool refed);
void Call(Environment* env) override;

private:
Fn callback_;
};

class NativeImmediateQueue {
public:
inline std::unique_ptr<NativeImmediateCallback> Shift();
inline void Push(std::unique_ptr<NativeImmediateCallback> cb);
// ConcatMove adds elements from 'other' to the end of this list, and clears
// 'other' afterwards.
inline void ConcatMove(NativeImmediateQueue&& other);

// size() is atomic and may be called from any thread.
inline size_t size() const;

private:
std::atomic<size_t> size_ {0};
std::unique_ptr<NativeImmediateCallback> head_;
NativeImmediateCallback* tail_ = nullptr;
};

typedef CallbackQueue<void, Environment*> NativeImmediateQueue;
NativeImmediateQueue native_immediates_;
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;
Expand Down