Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
src: add interrupts to Environments/Workers
Allow doing what V8’s `v8::Isolate::RequestInterrupt()` does for V8.
This also works when there is no JS code currently executing.
  • Loading branch information
addaleax committed Jan 21, 2020
commit c01410a44bdf6c44acb6d66b7525d723f6714734
12 changes: 12 additions & 0 deletions src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,18 @@ void Environment::SetImmediateThreadsafe(Fn&& cb) {
uv_async_send(&task_queues_async_);
}

template <typename Fn>
void Environment::RequestInterrupt(Fn&& cb) {
auto callback = std::make_unique<NativeImmediateCallbackImpl<Fn>>(
std::move(cb), false);
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
native_immediates_interrupts_.Push(std::move(callback));
}
uv_async_send(&task_queues_async_);
RequestInterruptFromV8();
}

Environment::NativeImmediateCallback::NativeImmediateCallback(bool refed)
: refed_(refed) {}

Expand Down
41 changes: 41 additions & 0 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ Environment::Environment(IsolateData* isolate_data,
}

Environment::~Environment() {
if (interrupt_data_ != nullptr) *interrupt_data_ = nullptr;

isolate()->GetHeapProfiler()->RemoveBuildEmbedderGraphCallback(
BuildEmbedderGraph, this);

Expand Down Expand Up @@ -651,11 +653,29 @@ void Environment::AtExit(void (*cb)(void* arg), void* arg) {
at_exit_functions_.push_front(ExitCallback{cb, arg});
}

void Environment::RunAndClearInterrupts() {
while (native_immediates_interrupts_.size() > 0) {
NativeImmediateQueue queue;
{
Mutex::ScopedLock lock(native_immediates_threadsafe_mutex_);
queue.ConcatMove(std::move(native_immediates_interrupts_));
}
DebugSealHandleScope seal_handle_scope(isolate());

while (std::unique_ptr<NativeImmediateCallback> head = queue.Shift())
head->Call(this);
}
}

void Environment::RunAndClearNativeImmediates(bool only_refed) {
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunAndClearNativeImmediates", this);
size_t ref_count = 0;

// Handle interrupts first. These functions are not allowed to throw
// exceptions, so we do not need to handle that.
RunAndClearInterrupts();

// It is safe to check .size() first, because there is a causal relationship
// between pushes to the threadsafe and this function being called.
// For the common case, it's worth checking the size first before establishing
Expand Down Expand Up @@ -695,6 +715,27 @@ void Environment::RunAndClearNativeImmediates(bool only_refed) {
ToggleImmediateRef(false);
}

void Environment::RequestInterruptFromV8() {
if (interrupt_data_ != nullptr) return; // Already scheduled.

// The Isolate may outlive the Environment, so some logic to handle the
// situation in which the Environment is destroyed before the handler runs
// is required.
interrupt_data_ = new Environment*(this);

isolate()->RequestInterrupt([](Isolate* isolate, void* data) {
std::unique_ptr<Environment*> env_ptr { static_cast<Environment**>(data) };
Environment* env = *env_ptr;
if (env == nullptr) {
// The Environment has already been destroyed. That should be okay; any
// callback added before the Environment shuts down would have been
// handled during cleanup.
return;
}
env->interrupt_data_ = nullptr;
env->RunAndClearInterrupts();
}, interrupt_data_);
}

void Environment::ScheduleTimer(int64_t duration_ms) {
if (started_cleanup_) return;
Expand Down
10 changes: 10 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,12 @@ class Environment : public MemoryRetainer {
template <typename Fn>
// This behaves like SetImmediate() but can be called from any thread.
inline void SetImmediateThreadsafe(Fn&& cb);
// This behaves like V8's Isolate::RequestInterrupt(), but also accounts for
// the event loop (i.e. combines the V8 function with SetImmediate()).
// The passed callback may not throw exceptions.
// This function can be called from any thread.
template <typename Fn>
inline void RequestInterrupt(Fn&& cb);
// This needs to be available for the JS-land setImmediate().
void ToggleImmediateRef(bool ref);

Expand Down Expand Up @@ -1431,8 +1437,12 @@ class Environment : public MemoryRetainer {
NativeImmediateQueue native_immediates_;
Mutex native_immediates_threadsafe_mutex_;
NativeImmediateQueue native_immediates_threadsafe_;
NativeImmediateQueue native_immediates_interrupts_;

void RunAndClearNativeImmediates(bool only_refed = false);
void RunAndClearInterrupts();
Environment** interrupt_data_ = nullptr;
void RequestInterruptFromV8();
static void CheckImmediate(uv_check_t* handle);

// Use an unordered_set, so that we have efficient insertion and removal.
Expand Down
11 changes: 11 additions & 0 deletions src/node_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class Worker : public AsyncWrap {
tracker->TrackField("parent_port", parent_port_);
}

template <typename Fn>
inline bool RequestInterrupt(Fn&& cb);

SET_MEMORY_INFO_NAME(Worker)
SET_SELF_SIZE(Worker)

Expand Down Expand Up @@ -123,6 +126,14 @@ class Worker : public AsyncWrap {
friend class WorkerThreadData;
};

template <typename Fn>
bool Worker::RequestInterrupt(Fn&& cb) {
Mutex::ScopedLock lock(mutex_);
if (env_ == nullptr) return false;
env_->RequestInterrupt(std::move(cb));
return true;
}

} // namespace worker
} // namespace node

Expand Down