|
| 1 | +#include <napi.h> |
| 2 | +#include <functional> |
| 3 | +#include <memory> |
| 4 | +#include <optional> |
| 5 | +#include <thread> |
| 6 | + |
| 7 | +using namespace Napi; |
| 8 | + |
| 9 | +class ThreadSafeAsyncIteratorExample |
| 10 | + : public ObjectWrap<ThreadSafeAsyncIteratorExample> { |
| 11 | + public: |
| 12 | + ThreadSafeAsyncIteratorExample(const CallbackInfo& info) |
| 13 | + : ObjectWrap<ThreadSafeAsyncIteratorExample>(info), |
| 14 | + _current(info[0].As<Number>()), |
| 15 | + _last(info[1].As<Number>()) {} |
| 16 | + |
| 17 | + static Object Init(Napi::Env env, Napi::Object exports); |
| 18 | + |
| 19 | + Napi::Value Iterator(const CallbackInfo& info); |
| 20 | + |
| 21 | + private: |
| 22 | + using Context = ThreadSafeAsyncIteratorExample; |
| 23 | + |
| 24 | + struct DataType { |
| 25 | + std::unique_ptr<Promise::Deferred> deferred; |
| 26 | + bool done; |
| 27 | + std::optional<int> value; |
| 28 | + }; |
| 29 | + |
| 30 | + static void CallJs(Napi::Env env, |
| 31 | + Function callback, |
| 32 | + Context* context, |
| 33 | + DataType* data); |
| 34 | + |
| 35 | + using TSFN = TypedThreadSafeFunction<Context, DataType, CallJs>; |
| 36 | + |
| 37 | + using FinalizerDataType = void; |
| 38 | + |
| 39 | + int _current; |
| 40 | + int _last; |
| 41 | + TSFN _tsfn; |
| 42 | + std::thread _thread; |
| 43 | + std::unique_ptr<Promise::Deferred> _deferred; |
| 44 | + |
| 45 | + // Thread-safety |
| 46 | + std::mutex _mtx; |
| 47 | + std::condition_variable _cv; |
| 48 | + |
| 49 | + void threadEntry(); |
| 50 | + |
| 51 | + static void FinalizerCallback(Napi::Env env, |
| 52 | + void*, |
| 53 | + ThreadSafeAsyncIteratorExample* context); |
| 54 | +}; |
| 55 | + |
| 56 | +Object ThreadSafeAsyncIteratorExample::Init(Napi::Env env, |
| 57 | + Napi::Object exports) { |
| 58 | + Napi::Function func = |
| 59 | + DefineClass(env, |
| 60 | + "ThreadSafeAsyncIteratorExample", |
| 61 | + {InstanceMethod(Napi::Symbol::WellKnown(env, "asyncIterator"), |
| 62 | + &ThreadSafeAsyncIteratorExample::Iterator)}); |
| 63 | + |
| 64 | + exports.Set("ThreadSafeAsyncIteratorExample", func); |
| 65 | + return exports; |
| 66 | +} |
| 67 | + |
| 68 | +Napi::Value ThreadSafeAsyncIteratorExample::Iterator(const CallbackInfo& info) { |
| 69 | + auto env = info.Env(); |
| 70 | + |
| 71 | + if (_thread.joinable()) { |
| 72 | + Napi::Error::New(env, "Concurrent iterations not implemented.") |
| 73 | + .ThrowAsJavaScriptException(); |
| 74 | + return Napi::Value(); |
| 75 | + } |
| 76 | + |
| 77 | + _tsfn = |
| 78 | + TSFN::New(info.Env(), |
| 79 | + "tsfn", |
| 80 | + 0, |
| 81 | + 1, |
| 82 | + this, |
| 83 | + std::function<decltype(FinalizerCallback)>(FinalizerCallback)); |
| 84 | + |
| 85 | + // To prevent premature garbage collection; Unref in TFSN finalizer |
| 86 | + Ref(); |
| 87 | + |
| 88 | + // Create thread |
| 89 | + _thread = std::thread(&ThreadSafeAsyncIteratorExample::threadEntry, this); |
| 90 | + |
| 91 | + // Create iterable |
| 92 | + auto iterable = Napi::Object::New(env); |
| 93 | + |
| 94 | + iterable["next"] = |
| 95 | + Function::New(env, [this](const CallbackInfo& info) -> Napi::Value { |
| 96 | + std::lock_guard<std::mutex> lk(_mtx); |
| 97 | + auto env = info.Env(); |
| 98 | + if (_deferred) { |
| 99 | + Napi::Error::New(env, "Concurrent iterations not implemented.") |
| 100 | + .ThrowAsJavaScriptException(); |
| 101 | + return Napi::Value(); |
| 102 | + } |
| 103 | + _deferred = std::make_unique<Promise::Deferred>(env); |
| 104 | + _cv.notify_all(); |
| 105 | + return _deferred->Promise(); |
| 106 | + }); |
| 107 | + |
| 108 | + return iterable; |
| 109 | +} |
| 110 | + |
| 111 | +void ThreadSafeAsyncIteratorExample::threadEntry() { |
| 112 | + while (true) { |
| 113 | + std::unique_lock<std::mutex> lk(_mtx); |
| 114 | + _cv.wait(lk, [this] { return this->_deferred != nullptr; }); |
| 115 | + auto done = _current > _last; |
| 116 | + if (done) { |
| 117 | + _tsfn.BlockingCall(new DataType{std::move(this->_deferred), true, {}}); |
| 118 | + break; |
| 119 | + } else { |
| 120 | + std::this_thread::sleep_for( |
| 121 | + std::chrono::seconds(1)); // Simulate CPU-intensive work |
| 122 | + _tsfn.BlockingCall( |
| 123 | + new DataType{std::move(this->_deferred), false, _current++}); |
| 124 | + } |
| 125 | + } |
| 126 | + _tsfn.Release(); |
| 127 | +} |
| 128 | + |
| 129 | +void ThreadSafeAsyncIteratorExample::CallJs(Napi::Env env, |
| 130 | + Function callback, |
| 131 | + Context* context, |
| 132 | + DataType* data) { |
| 133 | + if (env != nullptr) { |
| 134 | + auto value = Object::New(env); |
| 135 | + |
| 136 | + if (data->done) { |
| 137 | + value["done"] = Boolean::New(env, true); |
| 138 | + } else { |
| 139 | + value["done"] = Boolean::New(env, false); |
| 140 | + value["value"] = Number::New(env, data->value.value()); |
| 141 | + } |
| 142 | + data->deferred->Resolve(value); |
| 143 | + } |
| 144 | + |
| 145 | + if (data != nullptr) { |
| 146 | + delete data; |
| 147 | + } |
| 148 | +} |
| 149 | + |
| 150 | +void ThreadSafeAsyncIteratorExample::FinalizerCallback( |
| 151 | + Napi::Env env, void*, ThreadSafeAsyncIteratorExample* context) { |
| 152 | + context->_thread.join(); |
| 153 | + context->Unref(); |
| 154 | +} |
| 155 | + |
| 156 | +Napi::Object Init(Napi::Env env, Object exports) { |
| 157 | + ThreadSafeAsyncIteratorExample::Init(env, exports); |
| 158 | + return exports; |
| 159 | +} |
| 160 | + |
| 161 | +NODE_API_MODULE(example, Init) |
0 commit comments