Skip to content

Commit a14e56b

Browse files
alexkozyblattersturm
authored andcommitted
src: implement v8::Platform::CallDelayedOnWorkerThread
This method is crucial for Runtime.evaluate protocol command with timeout flag. At least Chrome DevTools frontend uses this method for every execution in console. PR-URL: nodejs#22383 Fixes: nodejs#22157 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 588f957 commit a14e56b

3 files changed

Lines changed: 148 additions & 1 deletion

File tree

src/node_platform.cc

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include "env.h"
55
#include "env-inl.h"
6+
#include "debug_utils.h"
67
#include "util.h"
78
#include <algorithm>
89

@@ -34,7 +35,127 @@ static void PlatformWorkerThread(void* data) {
3435

3536
} // namespace
3637

38+
class WorkerThreadsTaskRunner::DelayedTaskScheduler {
39+
public:
40+
explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
41+
: pending_worker_tasks_(tasks) {}
42+
43+
std::unique_ptr<uv_thread_t> Start() {
44+
auto start_thread = [](void* data) {
45+
static_cast<DelayedTaskScheduler*>(data)->Run();
46+
};
47+
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
48+
uv_sem_init(&ready_, 0);
49+
CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
50+
uv_sem_wait(&ready_);
51+
uv_sem_destroy(&ready_);
52+
return t;
53+
}
54+
55+
void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
56+
tasks_.Push(std::unique_ptr<Task>(new ScheduleTask(this, std::move(task),
57+
delay_in_seconds)));
58+
uv_async_send(&flush_tasks_);
59+
}
60+
61+
void Stop() {
62+
tasks_.Push(std::unique_ptr<Task>(new StopTask(this)));
63+
uv_async_send(&flush_tasks_);
64+
}
65+
66+
private:
67+
void Run() {
68+
TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
69+
"WorkerThreadsTaskRunner::DelayedTaskScheduler");
70+
loop_.data = this;
71+
CHECK_EQ(0, uv_loop_init(&loop_));
72+
flush_tasks_.data = this;
73+
CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
74+
uv_sem_post(&ready_);
75+
76+
uv_run(&loop_, UV_RUN_DEFAULT);
77+
CheckedUvLoopClose(&loop_);
78+
}
79+
80+
static void FlushTasks(uv_async_t* flush_tasks) {
81+
DelayedTaskScheduler* scheduler =
82+
ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
83+
while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
84+
task->Run();
85+
}
86+
87+
class StopTask : public Task {
88+
public:
89+
explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
90+
91+
void Run() override {
92+
std::vector<uv_timer_t*> timers;
93+
for (uv_timer_t* timer : scheduler_->timers_)
94+
timers.push_back(timer);
95+
for (uv_timer_t* timer : timers)
96+
scheduler_->TakeTimerTask(timer);
97+
uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
98+
[](uv_handle_t* handle) {});
99+
}
100+
101+
private:
102+
DelayedTaskScheduler* scheduler_;
103+
};
104+
105+
class ScheduleTask : public Task {
106+
public:
107+
ScheduleTask(DelayedTaskScheduler* scheduler,
108+
std::unique_ptr<Task> task,
109+
double delay_in_seconds)
110+
: scheduler_(scheduler),
111+
task_(std::move(task)),
112+
delay_in_seconds_(delay_in_seconds) {}
113+
114+
void Run() override {
115+
uint64_t delay_millis =
116+
static_cast<uint64_t>(delay_in_seconds_ + 0.5) * 1000;
117+
std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
118+
CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
119+
timer->data = task_.release();
120+
CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
121+
scheduler_->timers_.insert(timer.release());
122+
}
123+
124+
private:
125+
DelayedTaskScheduler* scheduler_;
126+
std::unique_ptr<Task> task_;
127+
double delay_in_seconds_;
128+
};
129+
130+
static void RunTask(uv_timer_t* timer) {
131+
DelayedTaskScheduler* scheduler =
132+
ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
133+
scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
134+
}
135+
136+
std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
137+
std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
138+
uv_timer_stop(timer);
139+
uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
140+
delete reinterpret_cast<uv_timer_t*>(handle);
141+
});
142+
timers_.erase(timer);
143+
return task;
144+
}
145+
146+
uv_sem_t ready_;
147+
TaskQueue<v8::Task>* pending_worker_tasks_;
148+
149+
TaskQueue<v8::Task> tasks_;
150+
uv_loop_t loop_;
151+
uv_async_t flush_tasks_;
152+
std::unordered_set<uv_timer_t*> timers_;
153+
};
154+
37155
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
156+
delayed_task_scheduler_.reset(
157+
new DelayedTaskScheduler(&pending_worker_tasks_));
158+
threads_.push_back(delayed_task_scheduler_->Start());
38159
for (int i = 0; i < thread_pool_size; i++) {
39160
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
40161
if (uv_thread_create(t.get(), PlatformWorkerThread,
@@ -51,7 +172,7 @@ void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
51172

52173
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
53174
double delay_in_seconds) {
54-
UNREACHABLE();
175+
delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
55176
}
56177

57178
void WorkerThreadsTaskRunner::BlockingDrain() {
@@ -60,6 +181,7 @@ void WorkerThreadsTaskRunner::BlockingDrain() {
60181

61182
void WorkerThreadsTaskRunner::Shutdown() {
62183
pending_worker_tasks_.Stop();
184+
delayed_task_scheduler_->Stop();
63185
for (size_t i = 0; i < threads_.size(); i++) {
64186
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
65187
}

src/node_platform.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,10 @@ class WorkerThreadsTaskRunner {
109109

110110
private:
111111
TaskQueue<v8::Task> pending_worker_tasks_;
112+
113+
class DelayedTaskScheduler;
114+
std::unique_ptr<DelayedTaskScheduler> delayed_task_scheduler_;
115+
112116
std::vector<std::unique_ptr<uv_thread_t>> threads_;
113117
};
114118

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Flags: --expose-internals
2+
'use strict';
3+
4+
const common = require('../common');
5+
common.skipIfInspectorDisabled();
6+
7+
(async function test() {
8+
const { strictEqual } = require('assert');
9+
const { Session } = require('inspector');
10+
const { promisify } = require('util');
11+
12+
const session = new Session();
13+
session.connect();
14+
session.post = promisify(session.post);
15+
const result = await session.post('Runtime.evaluate', {
16+
expression: 'for(;;);',
17+
timeout: 0
18+
}).catch((e) => e);
19+
strictEqual(result.message, 'Execution was terminated');
20+
session.disconnect();
21+
})();

0 commit comments

Comments
 (0)