Skip to content

Commit d70e386

Browse files
committed
Add example for threadsafe-async-iterator
1 parent 374d846 commit d70e386

4 files changed

Lines changed: 214 additions & 0 deletions

File tree

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
project (example)
2+
include_directories(${CMAKE_JS_INC} node_modules/node-addon-api/)
3+
cmake_minimum_required(VERSION 3.18)
4+
5+
set(CMAKE_CXX_STANDARD 20)
6+
set(CMAKE_CXX_STANDARD_REQUIRED ON)
7+
include_directories(${CMAKE_JS_INC})
8+
file(GLOB SOURCE_FILES "*.cc")
9+
add_library(${PROJECT_NAME} SHARED ${SOURCE_FILES} ${CMAKE_JS_SRC})
10+
set_target_properties(${PROJECT_NAME} PROPERTIES PREFIX "" SUFFIX ".node")
11+
target_link_libraries(${PROJECT_NAME} ${CMAKE_JS_LIB})
12+
13+
# Include Node-API wrappers
14+
execute_process(COMMAND node -p "require('node-addon-api').include"
15+
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
16+
OUTPUT_VARIABLE NODE_ADDON_API_DIR
17+
)
18+
string(REGEX REPLACE "[\r\n\"]" "" NODE_ADDON_API_DIR ${NODE_ADDON_API_DIR})
19+
20+
target_include_directories(${PROJECT_NAME} PRIVATE ${NODE_ADDON_API_DIR})
21+
22+
# define NAPI_VERSION
23+
add_definitions(-DNAPI_VERSION=6)
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
#include <napi.h>
2+
#include <functional>
3+
#include <memory>
4+
#include <optional>
5+
#include <thread>
6+
7+
using namespace Napi;
8+
9+
class ThreadSafeAsyncIteratorExample
10+
: public ObjectWrap<ThreadSafeAsyncIteratorExample> {
11+
public:
12+
ThreadSafeAsyncIteratorExample(const CallbackInfo& info)
13+
: ObjectWrap<ThreadSafeAsyncIteratorExample>(info),
14+
_current(info[0].As<Number>()),
15+
_last(info[1].As<Number>()) {}
16+
17+
static Object Init(Napi::Env env, Napi::Object exports);
18+
19+
Napi::Value Iterator(const CallbackInfo& info);
20+
21+
private:
22+
using Context = ThreadSafeAsyncIteratorExample;
23+
24+
struct DataType {
25+
std::unique_ptr<Promise::Deferred> deferred;
26+
bool done;
27+
std::optional<int> value;
28+
};
29+
30+
static void CallJs(Napi::Env env,
31+
Function callback,
32+
Context* context,
33+
DataType* data);
34+
35+
using TSFN = TypedThreadSafeFunction<Context, DataType, CallJs>;
36+
37+
using FinalizerDataType = void;
38+
39+
int _current;
40+
int _last;
41+
TSFN _tsfn;
42+
std::thread _thread;
43+
std::unique_ptr<Promise::Deferred> _deferred;
44+
45+
// Thread-safety
46+
std::mutex _mtx;
47+
std::condition_variable _cv;
48+
49+
void threadEntry();
50+
51+
static void FinalizerCallback(Napi::Env env,
52+
void*,
53+
ThreadSafeAsyncIteratorExample* context);
54+
};
55+
56+
Object ThreadSafeAsyncIteratorExample::Init(Napi::Env env,
57+
Napi::Object exports) {
58+
Napi::Function func =
59+
DefineClass(env,
60+
"ThreadSafeAsyncIteratorExample",
61+
{InstanceMethod(Napi::Symbol::WellKnown(env, "asyncIterator"),
62+
&ThreadSafeAsyncIteratorExample::Iterator)});
63+
64+
exports.Set("ThreadSafeAsyncIteratorExample", func);
65+
return exports;
66+
}
67+
68+
Napi::Value ThreadSafeAsyncIteratorExample::Iterator(const CallbackInfo& info) {
69+
auto env = info.Env();
70+
71+
if (_thread.joinable()) {
72+
Napi::Error::New(env, "Concurrent iterations not implemented.")
73+
.ThrowAsJavaScriptException();
74+
return Napi::Value();
75+
}
76+
77+
_tsfn =
78+
TSFN::New(info.Env(),
79+
"tsfn",
80+
0,
81+
1,
82+
this,
83+
std::function<decltype(FinalizerCallback)>(FinalizerCallback));
84+
85+
// To prevent premature garbage collection; Unref in TFSN finalizer
86+
Ref();
87+
88+
// Create thread
89+
_thread = std::thread(&ThreadSafeAsyncIteratorExample::threadEntry, this);
90+
91+
// Create iterable
92+
auto iterable = Napi::Object::New(env);
93+
94+
iterable["next"] =
95+
Function::New(env, [this](const CallbackInfo& info) -> Napi::Value {
96+
std::lock_guard<std::mutex> lk(_mtx);
97+
auto env = info.Env();
98+
if (_deferred) {
99+
Napi::Error::New(env, "Concurrent iterations not implemented.")
100+
.ThrowAsJavaScriptException();
101+
return Napi::Value();
102+
}
103+
_deferred = std::make_unique<Promise::Deferred>(env);
104+
_cv.notify_all();
105+
return _deferred->Promise();
106+
});
107+
108+
return iterable;
109+
}
110+
111+
void ThreadSafeAsyncIteratorExample::threadEntry() {
112+
while (true) {
113+
std::unique_lock<std::mutex> lk(_mtx);
114+
_cv.wait(lk, [this] { return this->_deferred != nullptr; });
115+
auto done = _current > _last;
116+
if (done) {
117+
_tsfn.BlockingCall(new DataType{std::move(this->_deferred), true, {}});
118+
break;
119+
} else {
120+
std::this_thread::sleep_for(
121+
std::chrono::seconds(1)); // Simulate CPU-intensive work
122+
_tsfn.BlockingCall(
123+
new DataType{std::move(this->_deferred), false, _current++});
124+
}
125+
}
126+
_tsfn.Release();
127+
}
128+
129+
void ThreadSafeAsyncIteratorExample::CallJs(Napi::Env env,
130+
Function callback,
131+
Context* context,
132+
DataType* data) {
133+
if (env != nullptr) {
134+
auto value = Object::New(env);
135+
136+
if (data->done) {
137+
value["done"] = Boolean::New(env, true);
138+
} else {
139+
value["done"] = Boolean::New(env, false);
140+
value["value"] = Number::New(env, data->value.value());
141+
}
142+
data->deferred->Resolve(value);
143+
}
144+
145+
if (data != nullptr) {
146+
delete data;
147+
}
148+
}
149+
150+
void ThreadSafeAsyncIteratorExample::FinalizerCallback(
151+
Napi::Env env, void*, ThreadSafeAsyncIteratorExample* context) {
152+
context->_thread.join();
153+
context->Unref();
154+
}
155+
156+
Napi::Object Init(Napi::Env env, Object exports) {
157+
ThreadSafeAsyncIteratorExample::Init(env, exports);
158+
return exports;
159+
}
160+
161+
NODE_API_MODULE(example, Init)
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
const { ThreadSafeAsyncIteratorExample } = require('bindings')('example');
2+
3+
async function main(from, to) {
4+
const iterator = new ThreadSafeAsyncIteratorExample(from, to);
5+
for await (const value of iterator) {
6+
console.log(value);
7+
}
8+
}
9+
10+
main(0, 5)
11+
.catch(e => {
12+
console.error(e);
13+
process.exit(1);
14+
});
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"name": "threadsafe-async-iterator-example",
3+
"version": "0.0.0",
4+
"description": "Async iterator example with threadsafe functions using node-addon-api",
5+
"main": "index.js",
6+
"private": true,
7+
"dependencies": {
8+
"bindings": "^1.5.0",
9+
"cmake-js": "^6.3.0",
10+
"node-addon-api": "^5.0.0"
11+
},
12+
"scripts": {
13+
"test": "node index.js",
14+
"install": "cmake-js compile"
15+
}
16+
}

0 commit comments

Comments
 (0)