Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
src: refactor thread stopping mechanism
- Follow style guide for naming, e.g. use lower_snake_case
  for simple setters/getters.
- For performance, use atomics instead of a mutex, and inline
  the corresponding getter/setter pair.
  • Loading branch information
addaleax committed Mar 18, 2019
commit 47a0854aaa768ddfcfd653ab8e67baef35110d3c
10 changes: 9 additions & 1 deletion src/env-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
}

inline bool Environment::is_stopping() const {
return thread_stopper_.IsStopped();
return thread_stopper_.is_stopped();
}

inline performance::performance_state* Environment::performance_state() {
Expand Down Expand Up @@ -979,6 +979,14 @@ void Environment::ForEachBaseObject(T&& iterator) {
}
}

bool AsyncRequest::is_stopped() const {
return stopped_.load(std::memory_order_relaxed);
}

void AsyncRequest::set_stopped(bool flag) {
stopped_.store(flag, std::memory_order_relaxed);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so: the heavy usage of muexes around the worker code where multi-thread data access was expected, was almost always to ensure data consistency by flushing cache lines? (in other words, writes in one thread is made visible to other threads instantly). If so, std::memory_order_relaxed constraint is insufficient to ensure that? we might need at least memory_order_acquire ?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I retract above question - when I tested with a small code, I see mfence or sync instruction being added with memory_order_relaxed itself; so pls ignore.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So … my line of thinking was that the syscalls behind uv_async_send() would themselves present full memory barriers. I’ll check later and verify that that’s indeed correct.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@addaleax - that may be true; but we do have set_stopped is_stopped calls that the threads can call but do not involve syscalls? however:

int foo(bool flag) {
  stopped_.store(flag, std::memory_order_relaxed);
}

this is what I tested and this is what I see in the generated code:

(gdb) x/30i 0x40053e
   0x40053e <_ZNSt11atomic_bool5storeEbSt12memory_order>:	push   rbp
   0x40053f <_ZNSt11atomic_bool5storeEbSt12memory_order+1>:	mov    rbp,rsp
   0x400542 <_ZNSt11atomic_bool5storeEbSt12memory_order+4>:	sub    rsp,0x30
   0x400546 <_ZNSt11atomic_bool5storeEbSt12memory_order+8>:	mov    QWORD PTR [rbp-0x28],rdi
   0x40054a <_ZNSt11atomic_bool5storeEbSt12memory_order+12>:	mov    eax,esi
   0x40054c <_ZNSt11atomic_bool5storeEbSt12memory_order+14>:	mov    DWORD PTR [rbp-0x30],edx
   0x40054f <_ZNSt11atomic_bool5storeEbSt12memory_order+17>:	mov    BYTE PTR [rbp-0x2c],al
   0x400552 <_ZNSt11atomic_bool5storeEbSt12memory_order+20>:	movzx  eax,BYTE PTR [rbp-0x2c]
   0x400556 <_ZNSt11atomic_bool5storeEbSt12memory_order+24>:	mov    rdx,QWORD PTR [rbp-0x28]
   0x40055a <_ZNSt11atomic_bool5storeEbSt12memory_order+28>:	mov    QWORD PTR [rbp-0x8],rdx
   0x40055e <_ZNSt11atomic_bool5storeEbSt12memory_order+32>:	mov    BYTE PTR [rbp-0x9],al
   0x400561 <_ZNSt11atomic_bool5storeEbSt12memory_order+35>:	and    BYTE PTR [rbp-0x9],0x1
   0x400565 <_ZNSt11atomic_bool5storeEbSt12memory_order+39>:	mov    eax,DWORD PTR [rbp-0x30]
   0x400568 <_ZNSt11atomic_bool5storeEbSt12memory_order+42>:	mov    DWORD PTR [rbp-0x10],eax
   0x40056b <_ZNSt11atomic_bool5storeEbSt12memory_order+45>:	mov    eax,DWORD PTR [rbp-0x10]
   0x40056e <_ZNSt11atomic_bool5storeEbSt12memory_order+48>:	mov    esi,0xffff
   0x400573 <_ZNSt11atomic_bool5storeEbSt12memory_order+53>:	mov    edi,eax
   0x400575 <_ZNSt11atomic_bool5storeEbSt12memory_order+55>:	
    call   0x40052a <_ZStanSt12memory_orderSt23__memory_order_modifier>
   0x40057a <_ZNSt11atomic_bool5storeEbSt12memory_order+60>:	mov    DWORD PTR [rbp-0x14],eax
   0x40057d <_ZNSt11atomic_bool5storeEbSt12memory_order+63>:	movzx  edx,BYTE PTR [rbp-0x9]
   0x400581 <_ZNSt11atomic_bool5storeEbSt12memory_order+67>:	mov    rax,QWORD PTR [rbp-0x8]
   0x400585 <_ZNSt11atomic_bool5storeEbSt12memory_order+71>:	mov    BYTE PTR [rax],dl
   0x400587 <_ZNSt11atomic_bool5storeEbSt12memory_order+73>:	mfence 
   0x40058a <_ZNSt11atomic_bool5storeEbSt12memory_order+76>:	leave  
   0x40058b <_ZNSt11atomic_bool5storeEbSt12memory_order+77>:	ret    

please note that mfence at 0x400587 that settles the matter?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please note that mfence at 0x400587 that settles the matter?

On x64 it does, yes – to be honest, I don’t know how the different memory order modes are implemented on different platforms? It seems like this disassembled implementation simply ignores the order argument?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

   0x400529 <_Z3foob+35>:	ret    
   0x40052a <_ZStanSt12memory_orderSt23__memory_order_modifier>:	push   rbp

@addaleax - if you look at the continuity of the instructions, looks like these (the atomic* helpers) are not static APIs, but compiler-generated code, on the fly; so it is possible that only necessary code was generated, on a per compilation unit basis?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my line of thinking was that the syscalls behind uv_async_send() would themselves present full memory barriers.

Libuv doesn't promise that. uv_async_send() can (at least in theory) elide the system call.

I'm kind of surprised the compiler emits an mfence. It's not needed on x64 (nor any other architecture, I think?) because aligned loads and stores are always atomic. It might just be a compiler bug; I wouldn't depend on it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gireeshpunathil I wouldn’t think so, the linker should be able to elide multiple copies of that variable into a single one.

@bnoordhuis Yeah, thanks. I’ve removed the memory_order_relaxed bit.

}

#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)
Expand Down
29 changes: 7 additions & 22 deletions src/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,13 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));

GetAsyncRequest()->Install(
thread_stopper()->Install(
this, static_cast<void*>(this), [](uv_async_t* handle) {
Environment* env = static_cast<Environment*>(handle->data);
uv_stop(env->event_loop());
});
GetAsyncRequest()->SetStopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(GetAsyncRequest()->GetHandle()));
thread_stopper()->set_stopped(false);
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper()->GetHandle()));

// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
Expand All @@ -365,7 +365,7 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {

void Environment::ExitEnv() {
set_can_call_into_js(false);
GetAsyncRequest()->Stop();
thread_stopper()->Stop();
isolate_->TerminateExecution();
}

Expand Down Expand Up @@ -533,7 +533,7 @@ void Environment::RunCleanup() {
started_cleanup_ = true;
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunCleanup", this);
GetAsyncRequest()->Uninstall();
thread_stopper()->Uninstall();
CleanupHandles();

while (!cleanup_hooks_.empty()) {
Expand Down Expand Up @@ -948,49 +948,34 @@ char* Environment::Reallocate(char* data, size_t old_size, size_t size) {
}

void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
Mutex::ScopedLock lock(mutex_);
CHECK_NULL(async_);
env_ = env;
async_ = new uv_async_t;
async_->data = data;
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
}

void AsyncRequest::Uninstall() {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) {
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
async_ = nullptr;
}
}

void AsyncRequest::Stop() {
Mutex::ScopedLock lock(mutex_);
stop_ = true;
set_stopped(true);
if (async_ != nullptr) uv_async_send(async_);
}

void AsyncRequest::SetStopped(bool flag) {
Mutex::ScopedLock lock(mutex_);
stop_ = flag;
}

bool AsyncRequest::IsStopped() const {
Mutex::ScopedLock lock(mutex_);
return stop_;
}

uv_async_t* AsyncRequest::GetHandle() {
Mutex::ScopedLock lock(mutex_);
return async_;
}

void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
Mutex::ScopedLock lock(mutex_);
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
}

AsyncRequest::~AsyncRequest() {
Mutex::ScopedLock lock(mutex_);
CHECK_NULL(async_);
}

Expand Down
12 changes: 7 additions & 5 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "uv.h"
#include "v8.h"

#include <atomic>
#include <cstdint>
#include <functional>
#include <list>
Expand Down Expand Up @@ -518,18 +519,19 @@ class AsyncRequest : public MemoryRetainer {
void Install(Environment* env, void* data, uv_async_cb target);
void Uninstall();
void Stop();
void SetStopped(bool flag);
bool IsStopped() const;
inline void set_stopped(bool flag);
inline bool is_stopped() const;
uv_async_t* GetHandle();
void MemoryInfo(MemoryTracker* tracker) const override;


SET_MEMORY_INFO_NAME(AsyncRequest)
SET_SELF_SIZE(AsyncRequest)

private:
Environment* env_;
uv_async_t* async_ = nullptr;
mutable Mutex mutex_;
bool stop_ = true;
std::atomic_bool stopped_ {true};
};

class Environment {
Expand Down Expand Up @@ -1048,7 +1050,7 @@ class Environment {
inline ExecutionMode execution_mode() { return execution_mode_; }

inline void set_execution_mode(ExecutionMode mode) { execution_mode_ = mode; }
inline AsyncRequest* GetAsyncRequest() { return &thread_stopper_; }
inline AsyncRequest* thread_stopper() { return &thread_stopper_; }

private:
inline void CreateImmediate(native_immediate_callback cb,
Expand Down
4 changes: 2 additions & 2 deletions src/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -832,14 +832,14 @@ inline int StartNodeWithIsolate(Isolate* isolate,
per_process::v8_platform.DrainVMTasks(isolate);

more = uv_loop_alive(env.event_loop());
if (more && !env.GetAsyncRequest()->IsStopped()) continue;
if (more && !env.is_stopping()) continue;

RunBeforeExit(&env);

// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(env.event_loop());
} while (more == true && !env.GetAsyncRequest()->IsStopped());
} while (more == true && !env.is_stopping());
env.performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
Expand Down
7 changes: 4 additions & 3 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Worker::Worker(Environment* env,
bool Worker::is_stopped() const {
Mutex::ScopedLock lock(mutex_);
if (env_ != nullptr)
return env_->GetAsyncRequest()->IsStopped();
return env_->is_stopping();
return stopped_;
}

Expand Down Expand Up @@ -222,7 +222,7 @@ void Worker::Run() {
stopped_ = true;
this->env_ = nullptr;
}
env_->GetAsyncRequest()->SetStopped(true);
env_->thread_stopper()->set_stopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
Expand Down Expand Up @@ -381,7 +381,8 @@ void Worker::OnThreadStopped() {
Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);

CHECK(stopped_ || env_ == nullptr || env_->GetAsyncRequest()->IsStopped());
CHECK(stopped_);
CHECK_NULL(env_);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, there was a control flow that takes to Worker destructor without nullifying env_ , not able to figure that out now; do you know?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gireeshpunathil I think that would be a bug – the child thread is not allowed to exist at this point (and the next CHECK verifies that the thread has been joined), and the child thread in turn owns the Environment.

CHECK(thread_joined_);

Debug(this, "Worker %llu destroyed", thread_id_);
Expand Down