33
44#include " env.h"
55#include " env-inl.h"
6+ #include " debug_utils.h"
67#include " util.h"
78#include < algorithm>
89
@@ -34,7 +35,127 @@ static void PlatformWorkerThread(void* data) {
3435
3536} // namespace
3637
38+ class WorkerThreadsTaskRunner ::DelayedTaskScheduler {
39+ public:
40+ explicit DelayedTaskScheduler (TaskQueue<Task>* tasks)
41+ : pending_worker_tasks_(tasks) {}
42+
43+ std::unique_ptr<uv_thread_t > Start () {
44+ auto start_thread = [](void * data) {
45+ static_cast <DelayedTaskScheduler*>(data)->Run ();
46+ };
47+ std::unique_ptr<uv_thread_t > t { new uv_thread_t () };
48+ uv_sem_init (&ready_, 0 );
49+ CHECK_EQ (0 , uv_thread_create (t.get (), start_thread, this ));
50+ uv_sem_wait (&ready_);
51+ uv_sem_destroy (&ready_);
52+ return t;
53+ }
54+
55+ void PostDelayedTask (std::unique_ptr<Task> task, double delay_in_seconds) {
56+ tasks_.Push (std::unique_ptr<Task>(new ScheduleTask (this , std::move (task),
57+ delay_in_seconds)));
58+ uv_async_send (&flush_tasks_);
59+ }
60+
61+ void Stop () {
62+ tasks_.Push (std::unique_ptr<Task>(new StopTask (this )));
63+ uv_async_send (&flush_tasks_);
64+ }
65+
66+ private:
67+ void Run () {
68+ TRACE_EVENT_METADATA1 (" __metadata" , " thread_name" , " name" ,
69+ " WorkerThreadsTaskRunner::DelayedTaskScheduler" );
70+ loop_.data = this ;
71+ CHECK_EQ (0 , uv_loop_init (&loop_));
72+ flush_tasks_.data = this ;
73+ CHECK_EQ (0 , uv_async_init (&loop_, &flush_tasks_, FlushTasks));
74+ uv_sem_post (&ready_);
75+
76+ uv_run (&loop_, UV_RUN_DEFAULT);
77+ CheckedUvLoopClose (&loop_);
78+ }
79+
80+ static void FlushTasks (uv_async_t * flush_tasks) {
81+ DelayedTaskScheduler* scheduler =
82+ ContainerOf (&DelayedTaskScheduler::loop_, flush_tasks->loop );
83+ while (std::unique_ptr<Task> task = scheduler->tasks_ .Pop ())
84+ task->Run ();
85+ }
86+
87+ class StopTask : public Task {
88+ public:
89+ explicit StopTask (DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
90+
91+ void Run () override {
92+ std::vector<uv_timer_t *> timers;
93+ for (uv_timer_t * timer : scheduler_->timers_ )
94+ timers.push_back (timer);
95+ for (uv_timer_t * timer : timers)
96+ scheduler_->TakeTimerTask (timer);
97+ uv_close (reinterpret_cast <uv_handle_t *>(&scheduler_->flush_tasks_ ),
98+ [](uv_handle_t * handle) {});
99+ }
100+
101+ private:
102+ DelayedTaskScheduler* scheduler_;
103+ };
104+
105+ class ScheduleTask : public Task {
106+ public:
107+ ScheduleTask (DelayedTaskScheduler* scheduler,
108+ std::unique_ptr<Task> task,
109+ double delay_in_seconds)
110+ : scheduler_(scheduler),
111+ task_ (std::move(task)),
112+ delay_in_seconds_(delay_in_seconds) {}
113+
114+ void Run () override {
115+ uint64_t delay_millis =
116+ static_cast <uint64_t >(delay_in_seconds_ + 0.5 ) * 1000 ;
117+ std::unique_ptr<uv_timer_t > timer (new uv_timer_t ());
118+ CHECK_EQ (0 , uv_timer_init (&scheduler_->loop_ , timer.get ()));
119+ timer->data = task_.release ();
120+ CHECK_EQ (0 , uv_timer_start (timer.get (), RunTask, delay_millis, 0 ));
121+ scheduler_->timers_ .insert (timer.release ());
122+ }
123+
124+ private:
125+ DelayedTaskScheduler* scheduler_;
126+ std::unique_ptr<Task> task_;
127+ double delay_in_seconds_;
128+ };
129+
130+ static void RunTask (uv_timer_t * timer) {
131+ DelayedTaskScheduler* scheduler =
132+ ContainerOf (&DelayedTaskScheduler::loop_, timer->loop );
133+ scheduler->pending_worker_tasks_ ->Push (scheduler->TakeTimerTask (timer));
134+ }
135+
136+ std::unique_ptr<Task> TakeTimerTask (uv_timer_t * timer) {
137+ std::unique_ptr<Task> task (static_cast <Task*>(timer->data ));
138+ uv_timer_stop (timer);
139+ uv_close (reinterpret_cast <uv_handle_t *>(timer), [](uv_handle_t * handle) {
140+ delete reinterpret_cast <uv_timer_t *>(handle);
141+ });
142+ timers_.erase (timer);
143+ return task;
144+ }
145+
146+ uv_sem_t ready_;
147+ TaskQueue<v8::Task>* pending_worker_tasks_;
148+
149+ TaskQueue<v8::Task> tasks_;
150+ uv_loop_t loop_;
151+ uv_async_t flush_tasks_;
152+ std::unordered_set<uv_timer_t *> timers_;
153+ };
154+
37155WorkerThreadsTaskRunner::WorkerThreadsTaskRunner (int thread_pool_size) {
156+ delayed_task_scheduler_.reset (
157+ new DelayedTaskScheduler (&pending_worker_tasks_));
158+ threads_.push_back (delayed_task_scheduler_->Start ());
38159 for (int i = 0 ; i < thread_pool_size; i++) {
39160 std::unique_ptr<uv_thread_t > t { new uv_thread_t () };
40161 if (uv_thread_create (t.get (), PlatformWorkerThread,
@@ -51,7 +172,7 @@ void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
51172
52173void WorkerThreadsTaskRunner::PostDelayedTask (std::unique_ptr<v8::Task> task,
53174 double delay_in_seconds) {
54- UNREACHABLE ( );
175+ delayed_task_scheduler_-> PostDelayedTask ( std::move (task), delay_in_seconds );
55176}
56177
57178void WorkerThreadsTaskRunner::BlockingDrain () {
@@ -60,6 +181,7 @@ void WorkerThreadsTaskRunner::BlockingDrain() {
60181
61182void WorkerThreadsTaskRunner::Shutdown () {
62183 pending_worker_tasks_.Stop ();
184+ delayed_task_scheduler_->Stop ();
63185 for (size_t i = 0 ; i < threads_.size (); i++) {
64186 CHECK_EQ (0 , uv_thread_join (threads_[i].get ()));
65187 }
0 commit comments