Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
simplify
  • Loading branch information
indutny committed May 2, 2021
commit 9835c84521b8376fe72c70172823057158dbd987
42 changes: 11 additions & 31 deletions src/node_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource {
return napi_closing;
}
} else {
if (Send() != 0) {
return napi_generic_failure;
}
queue.push(data);
Send();
Copy link
Copy Markdown
Member Author

@indutny indutny May 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've swapped the order since uv_async_send() error is treated as a hard failure and it is more logical to push before the notification.

return napi_ok;
}
}
Expand Down Expand Up @@ -213,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource {
if (is_closing && max_queue_size > 0) {
cond->Signal(lock);
}
if (Send() != 0) {
return napi_generic_failure;
}
Send();
}
}

Expand All @@ -240,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource {
cond = std::make_unique<node::ConditionVariable>();
}
if (max_queue_size == 0 || cond) {
CHECK_EQ(0, uv_idle_init(loop, &idle));
return napi_ok;
}

Expand All @@ -265,14 +260,12 @@ class ThreadSafeFunction : public node::AsyncResource {

napi_status Unref() {
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}

napi_status Ref() {
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));

return napi_ok;
}
Expand All @@ -297,6 +290,10 @@ class ThreadSafeFunction : public node::AsyncResource {
has_more = true;
}
}

if (has_more) {
Send();
}
}

bool DispatchOne() {
Expand Down Expand Up @@ -327,8 +324,6 @@ class ThreadSafeFunction : public node::AsyncResource {
cond->Signal(lock);
}
CloseHandlesAndMaybeDelete();
} else {
CHECK_EQ(0, uv_idle_stop(&idle));
}
} else {
has_more = true;
Expand Down Expand Up @@ -383,26 +378,18 @@ class ThreadSafeFunction : public node::AsyncResource {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async,
reinterpret_cast<uv_async_t*>(handle));
v8::HandleScope scope(ts_fn->env->isolate);
ts_fn->env->node_env()->CloseHandle(
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
[](uv_handle_t* handle) -> void {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle,
reinterpret_cast<uv_idle_t*>(handle));
ts_fn->Finalize();
});
ts_fn->Finalize();
});
}

int Send() {
void Send() {
// Ask currently running Dispatch() to make one more iteration
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
if ((current_state & kDispatchRunning) == kDispatchRunning) {
return 0;
return;
}

return uv_async_send(&async);
CHECK_EQ(0, uv_async_send(&async));
}

// Default way of calling into JavaScript. Used when ThreadSafeFunction is
Expand All @@ -428,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource {
}
}

static void IdleCb(uv_idle_t* idle) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::idle, idle);
ts_fn->Dispatch();
}

static void AsyncCb(uv_async_t* async) {
ThreadSafeFunction* ts_fn =
node::ContainerOf(&ThreadSafeFunction::async, async);
CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
ts_fn->Dispatch();
}

static void Cleanup(void* data) {
Expand All @@ -457,7 +438,6 @@ class ThreadSafeFunction : public node::AsyncResource {
std::unique_ptr<node::ConditionVariable> cond;
std::queue<void*> queue;
uv_async_t async;
uv_idle_t idle;
size_t thread_count;
bool is_closing;
std::atomic_uchar dispatch_state;
Expand Down
4 changes: 2 additions & 2 deletions test/node-api/test_threadsafe_function/binding.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include <node_api.h>
#include "../../js-native-api/common.h"

#define ARRAY_LENGTH 10
#define ARRAY_LENGTH 10000
#define MAX_QUEUE_SIZE 2

static uv_thread_t uv_threads[2];
Expand Down Expand Up @@ -72,7 +72,7 @@ static void data_source_thread(void* data) {
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
status = napi_call_threadsafe_function(ts_fn, &ints[index],
ts_fn_info->block_on_full);
if (ts_fn_info->max_queue_size == 0) {
if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
uint64_t start = uv_hrtime();
Expand Down
13 changes: 11 additions & 2 deletions test/node-api/test_threadsafe_function/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ function testWithJSMarshaller({
binding[threadStarter](function testCallback(value) {
array.push(value);
if (array.length === quitAfter) {
process.nextTick(() => {
setImmediate(() => {
binding.StopThread(common.mustCall(() => {
resolve(array);
}), !!abort);
Expand Down Expand Up @@ -85,7 +85,7 @@ new Promise(function testWithoutJSMarshaller(resolve) {
// The default call-into-JS implementation passes no arguments.
assert.strictEqual(arguments.length, 0);
if (callCount === binding.ARRAY_LENGTH) {
process.nextTick(() => {
setImmediate(() => {
binding.StopThread(common.mustCall(() => {
resolve();
}), false);
Expand Down Expand Up @@ -211,6 +211,15 @@ new Promise(function testWithoutJSMarshaller(resolve) {
}))
.then((result) => assert.strictEqual(result.indexOf(0), -1))

// Make sure that threadsafe function isn't stalled when we hit
// `kMaxIterationCount` in `src/node_api.cc`
.then(() => testWithJSMarshaller({
threadStarter: 'StartThreadNonblocking',
maxQueueSize: binding.ARRAY_LENGTH >>> 1,
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks stupid, but there is an assert in the binding that the queue block at least once, and I didn't want to change .c code 😂

quitAfter: binding.ARRAY_LENGTH
}))
.then((result) => assert.deepStrictEqual(result, expectedArray))

// Start a child process to test rapid teardown
.then(() => testUnref(binding.MAX_QUEUE_SIZE))

Expand Down