Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
src: use modern v8::Platform worker threads APIs
Precursor to removing deprecated APIs on the v8 side @
https://chromium-review.googlesource.com/c/v8/v8/+/1045310
  • Loading branch information
Gabriel Charette authored and targos committed Jul 25, 2018
commit befa6b912d81097008b663bca355a6bf818b33fd
2 changes: 1 addition & 1 deletion src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ static struct {
}

void DrainVMTasks(Isolate* isolate) {
platform_->DrainBackgroundTasks(isolate);
platform_->DrainTasks(isolate);
}

void CancelVMTasks(Isolate* isolate) {
Expand Down
2 changes: 1 addition & 1 deletion src/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class MultiIsolatePlatform : public v8::Platform {
// posted during flushing of the queue are postponed until the next
// flushing.
virtual bool FlushForegroundTasks(v8::Isolate* isolate) = 0;
virtual void DrainBackgroundTasks(v8::Isolate* isolate) = 0;
virtual void DrainTasks(v8::Isolate* isolate) = 0;
virtual void CancelPendingDelayedTasks(v8::Isolate* isolate) = 0;

// These will be called by the `IsolateData` creation/destruction functions.
Expand Down
76 changes: 38 additions & 38 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,52 @@ using v8::Platform;
using v8::Task;
using v8::TracingController;

static void BackgroundRunner(void* data) {
namespace {

static void WorkerThreadMain(void* data) {
TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
"BackgroundTaskRunner");
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to change this? @nodejs/trace-events

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense to change this to match the concept name actually used by V8, that is, 'WorkerThread'. However, things going to get really confusing since we also have worker_threads from #20876.

One of these concepts needs to be renamed for sanity's sake. /cc @addaleax

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One option would be to use something like 'PlatformWorkerThread' here.

TaskQueue<Task> *background_tasks = static_cast<TaskQueue<Task> *>(data);
while (std::unique_ptr<Task> task = background_tasks->BlockingPop()) {
TaskQueue<Task>* pending_worker_tasks = static_cast<TaskQueue<Task>*>(data);
while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
task->Run();
background_tasks->NotifyOfCompletion();
pending_worker_tasks->NotifyOfCompletion();
}
}

BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) {
} // namespace

WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
for (int i = 0; i < thread_pool_size; i++) {
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0)
if (uv_thread_create(t.get(), WorkerThreadMain,
&pending_worker_tasks_) != 0) {
break;
}
threads_.push_back(std::move(t));
}
}

void BackgroundTaskRunner::PostTask(std::unique_ptr<Task> task) {
background_tasks_.Push(std::move(task));
}

void BackgroundTaskRunner::PostIdleTask(std::unique_ptr<v8::IdleTask> task) {
UNREACHABLE();
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
pending_worker_tasks_.Push(std::move(task));
}

void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
UNREACHABLE();
}

void BackgroundTaskRunner::BlockingDrain() {
background_tasks_.BlockingDrain();
void WorkerThreadsTaskRunner::BlockingDrain() {
pending_worker_tasks_.BlockingDrain();
}

void BackgroundTaskRunner::Shutdown() {
background_tasks_.Stop();
void WorkerThreadsTaskRunner::Shutdown() {
pending_worker_tasks_.Stop();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
}
}

size_t BackgroundTaskRunner::NumberOfAvailableBackgroundThreads() const {
int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const {
return threads_.size();
}

Expand Down Expand Up @@ -131,8 +133,8 @@ NodePlatform::NodePlatform(int thread_pool_size,
TracingController* controller = new TracingController();
tracing_controller_.reset(controller);
}
background_task_runner_ =
std::make_shared<BackgroundTaskRunner>(thread_pool_size);
worker_thread_task_runner_ =
std::make_shared<WorkerThreadsTaskRunner>(thread_pool_size);
}

void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) {
Expand Down Expand Up @@ -160,16 +162,16 @@ void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) {
}

void NodePlatform::Shutdown() {
background_task_runner_->Shutdown();
worker_thread_task_runner_->Shutdown();

{
Mutex::ScopedLock lock(per_isolate_mutex_);
per_isolate_.clear();
}
}

size_t NodePlatform::NumberOfAvailableBackgroundThreads() {
return background_task_runner_->NumberOfAvailableBackgroundThreads();
int NodePlatform::NumberOfWorkerThreads() {
return worker_thread_task_runner_->NumberOfWorkerThreads();
}

void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr<Task> task) {
Expand Down Expand Up @@ -201,15 +203,12 @@ void PerIsolatePlatformData::CancelPendingDelayedTasks() {
scheduled_delayed_tasks_.clear();
}

void NodePlatform::DrainBackgroundTasks(Isolate* isolate) {
void NodePlatform::DrainTasks(Isolate* isolate) {
std::shared_ptr<PerIsolatePlatformData> per_isolate = ForIsolate(isolate);

do {
// Right now, there is no way to drain only background tasks associated
// with a specific isolate, so this sometimes does more work than
// necessary. In the long run, that functionality is probably going to
// be available anyway, though.
background_task_runner_->BlockingDrain();
// Worker tasks aren't associated with an Isolate.
worker_thread_task_runner_->BlockingDrain();
} while (per_isolate->FlushForegroundTasksInternal());
}

Expand Down Expand Up @@ -249,11 +248,17 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
return did_work;
}

void NodePlatform::CallOnBackgroundThread(Task* task,
ExpectedRuntime expected_runtime) {
background_task_runner_->PostTask(std::unique_ptr<Task>(task));
void NodePlatform::CallOnWorkerThread(std::unique_ptr<v8::Task> task) {
worker_thread_task_runner_->PostTask(std::move(task));
}

void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
worker_thread_task_runner_->PostDelayedTask(std::move(task),
delay_in_seconds);
}


std::shared_ptr<PerIsolatePlatformData>
NodePlatform::ForIsolate(Isolate* isolate) {
Mutex::ScopedLock lock(per_isolate_mutex_);
Expand Down Expand Up @@ -283,11 +288,6 @@ void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) {

bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; }

std::shared_ptr<v8::TaskRunner>
NodePlatform::GetBackgroundTaskRunner(Isolate* isolate) {
return background_task_runner_;
}

std::shared_ptr<v8::TaskRunner>
NodePlatform::GetForegroundTaskRunner(Isolate* isolate) {
return ForIsolate(isolate);
Expand Down
30 changes: 14 additions & 16 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,23 +93,22 @@ class PerIsolatePlatformData :
std::vector<DelayedTaskPointer> scheduled_delayed_tasks_;
};

// This acts as the single background task runner for all Isolates.
class BackgroundTaskRunner : public v8::TaskRunner {
// This acts as the single worker thread task runner for all Isolates.
class WorkerThreadsTaskRunner {
public:
explicit BackgroundTaskRunner(int thread_pool_size);
explicit WorkerThreadsTaskRunner(int thread_pool_size);

void PostTask(std::unique_ptr<v8::Task> task) override;
void PostIdleTask(std::unique_ptr<v8::IdleTask> task) override;
void PostTask(std::unique_ptr<v8::Task> task);
void PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
bool IdleTasksEnabled() override { return false; };
double delay_in_seconds);

void BlockingDrain();
void Shutdown();

size_t NumberOfAvailableBackgroundThreads() const;
int NumberOfWorkerThreads() const;

private:
TaskQueue<v8::Task> background_tasks_;
TaskQueue<v8::Task> pending_worker_tasks_;
std::vector<std::unique_ptr<uv_thread_t>> threads_;
};

Expand All @@ -118,14 +117,15 @@ class NodePlatform : public MultiIsolatePlatform {
NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller);
virtual ~NodePlatform() {}

void DrainBackgroundTasks(v8::Isolate* isolate) override;
void DrainTasks(v8::Isolate* isolate) override;
void CancelPendingDelayedTasks(v8::Isolate* isolate) override;
void Shutdown();

// v8::Platform implementation.
size_t NumberOfAvailableBackgroundThreads() override;
void CallOnBackgroundThread(v8::Task* task,
ExpectedRuntime expected_runtime) override;
int NumberOfWorkerThreads() override;
void CallOnWorkerThread(std::unique_ptr<v8::Task> task) override;
void CallDelayedOnWorkerThread(std::unique_ptr<v8::Task> task,
double delay_in_seconds) override;
void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override;
void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task,
double delay_in_seconds) override;
Expand All @@ -138,8 +138,6 @@ class NodePlatform : public MultiIsolatePlatform {
void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override;
void UnregisterIsolate(IsolateData* isolate_data) override;

std::shared_ptr<v8::TaskRunner> GetBackgroundTaskRunner(
v8::Isolate* isolate) override;
std::shared_ptr<v8::TaskRunner> GetForegroundTaskRunner(
v8::Isolate* isolate) override;

Expand All @@ -151,7 +149,7 @@ class NodePlatform : public MultiIsolatePlatform {
std::shared_ptr<PerIsolatePlatformData>> per_isolate_;

std::unique_ptr<v8::TracingController> tracing_controller_;
std::shared_ptr<BackgroundTaskRunner> background_task_runner_;
std::shared_ptr<WorkerThreadsTaskRunner> worker_thread_task_runner_;
};

} // namespace node
Expand Down
4 changes: 2 additions & 2 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void Worker::Run() {
uv_run(&loop_, UV_RUN_DEFAULT);
if (is_stopped()) break;

platform->DrainBackgroundTasks(isolate_);
platform->DrainTasks(isolate_);

more = uv_loop_alive(&loop_);
if (more && !is_stopped())
Expand Down Expand Up @@ -232,7 +232,7 @@ void Worker::Run() {
// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
platform->DrainBackgroundTasks(isolate_);
platform->DrainTasks(isolate_);
}

env_.reset();
Expand Down