Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixing event channels
  • Loading branch information
Lyokone committed Mar 5, 2026
commit 2e824231af48f850a74393b84a291ea6a7f75e74
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,8 @@ namespace firebase_database_windows {

static const std::string kLibraryName = "flutter-fire-db";

// Custom Windows message for dispatching tasks to the platform thread.
static constexpr UINT WM_DISPATCH_TASK = WM_APP + 0xDB; // 0xDB for "database"

// Static member initialization
flutter::BinaryMessenger* FirebaseDatabasePlugin::messenger_ = nullptr;
HWND FirebaseDatabasePlugin::hwnd_ = nullptr;
std::mutex FirebaseDatabasePlugin::dispatch_mutex_;
std::queue<std::function<void()>> FirebaseDatabasePlugin::dispatch_queue_;
std::map<std::string,
std::unique_ptr<flutter::EventChannel<flutter::EncodableValue>>>
FirebaseDatabasePlugin::event_channels_;
Expand All @@ -69,17 +63,6 @@ std::map<std::string, std::unique_ptr<flutter::StreamHandler<>>>
std::map<std::string, firebase::database::Database*>
FirebaseDatabasePlugin::database_instances_;

void FirebaseDatabasePlugin::DispatchToMainThread(
std::function<void()> task) {
{
std::lock_guard<std::mutex> lock(dispatch_mutex_);
dispatch_queue_.push(std::move(task));
}
if (hwnd_) {
::PostMessage(hwnd_, WM_DISPATCH_TASK, 0, 0);
}
}

// atexit handler: clean up Database resources before static destruction.
// 1. Clear event channels to trigger StreamHandler destruction, which
// unregisters listeners from the Database while it's still alive.
Expand Down Expand Up @@ -371,35 +354,6 @@ void FirebaseDatabasePlugin::RegisterWithRegistrar(
auto plugin = std::make_unique<FirebaseDatabasePlugin>();
messenger_ = registrar->messenger();
FirebaseDatabaseHostApi::SetUp(registrar->messenger(), plugin.get());

// Capture the HWND for cross-thread dispatch.
auto* view = registrar->GetView();
if (view) {
hwnd_ = view->GetNativeWindow();
}

// Register a WindowProc delegate to process dispatched tasks on the
// platform thread.
registrar->RegisterTopLevelWindowProcDelegate(
[](HWND hwnd, UINT message, WPARAM wparam,
LPARAM lparam) -> std::optional<LRESULT> {
if (message == WM_DISPATCH_TASK) {
// Drain the queue on the platform thread.
std::queue<std::function<void()>> tasks;
{
std::lock_guard<std::mutex> lock(
FirebaseDatabasePlugin::dispatch_mutex_);
std::swap(tasks, FirebaseDatabasePlugin::dispatch_queue_);
}
while (!tasks.empty()) {
tasks.front()();
tasks.pop();
}
return 0;
}
return std::nullopt;
});

registrar->AddPlugin(std::move(plugin));
App::RegisterLibrary(kLibraryName.c_str(), getPluginVersion().c_str(),
nullptr);
Expand Down Expand Up @@ -731,49 +685,37 @@ void FirebaseDatabasePlugin::DatabaseReferenceRunTransaction(
EncodableValue snapshot_value =
FirebaseDatabasePlugin::VariantToEncodableValue(current_value);

// Call the Flutter transaction handler synchronously.
// The pigeon call must be dispatched to the platform thread, then
// we wait here (on the SDK worker thread) for the response.
// Call the Flutter transaction handler synchronously using a semaphore
std::mutex mtx;
std::condition_variable cv;
bool handler_complete = false;
TransactionHandlerResult* handler_result = nullptr;

// Copy snapshot data for the dispatch closure.
auto snapshot_copy = std::make_shared<EncodableValue>(snapshot_value);
int64_t txn_key = ctx->transaction_key;
flutter::BinaryMessenger* messenger = ctx->messenger;

FirebaseDatabasePlugin::DispatchToMainThread([&, snapshot_copy, txn_key,
messenger]() {
auto flutter_api =
std::make_unique<FirebaseDatabaseFlutterApi>(messenger);

const EncodableValue* snapshot_ptr =
std::holds_alternative<std::monostate>(*snapshot_copy)
? nullptr
: snapshot_copy.get();

flutter_api->CallTransactionHandler(
txn_key, snapshot_ptr,
[&](const TransactionHandlerResult& result) {
handler_result = new TransactionHandlerResult(
result.value(), result.aborted(), result.exception());
std::lock_guard<std::mutex> lock(mtx);
handler_complete = true;
cv.notify_one();
},
[&](const FlutterError& error) {
handler_result = new TransactionHandlerResult(true, true);
std::lock_guard<std::mutex> lock(mtx);
handler_complete = true;
cv.notify_one();
});
});

// Wait for the Flutter callback to complete.
// The platform thread pumps messages while we wait, so the pigeon
// response will be processed and our cv will be notified.
auto flutter_api =
std::make_unique<FirebaseDatabaseFlutterApi>(ctx->messenger);

const EncodableValue* snapshot_ptr =
std::holds_alternative<std::monostate>(snapshot_value)
? nullptr
: &snapshot_value;

flutter_api->CallTransactionHandler(
ctx->transaction_key, snapshot_ptr,
[&](const TransactionHandlerResult& result) {
handler_result = new TransactionHandlerResult(
result.value(), result.aborted(), result.exception());
std::lock_guard<std::mutex> lock(mtx);
handler_complete = true;
cv.notify_one();
},
[&](const FlutterError& error) {
handler_result = new TransactionHandlerResult(true, true);
std::lock_guard<std::mutex> lock(mtx);
handler_complete = true;
cv.notify_one();
});

// Wait for the Flutter callback to complete
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [&] { return handler_complete; });
Expand Down Expand Up @@ -1022,36 +964,23 @@ void FirebaseDatabasePlugin::QueryObserve(
}

if (event_type == "value") {
// Value listener — dispatches to platform thread via PostMessage
// Value listener
class VL : public firebase::database::ValueListener {
public:
VL(flutter::EventSink<flutter::EncodableValue>* events)
: events_(events) {}
void OnValueChanged(const DataSnapshot& snapshot) override {
// Copy snapshot data before dispatching — the snapshot reference
// is only valid during this callback.
EncodableMap event;
event[EncodableValue("eventType")] = EncodableValue("value");
event[EncodableValue("previousChildKey")] = EncodableValue();
event[EncodableValue("snapshot")] = EncodableValue(
FirebaseDatabasePlugin::DataSnapshotToEncodableMap(snapshot));
auto event_value =
std::make_shared<EncodableValue>(std::move(event));
auto* sink = events_;
FirebaseDatabasePlugin::DispatchToMainThread(
[sink, event_value]() {
sink->Success(*event_value);
});
events_->Success(EncodableValue(event));
}
void OnCancelled(const Error& error,
const char* error_message) override {
std::string code =
FirebaseDatabasePlugin::GetDatabaseErrorCode(error);
std::string msg =
error_message ? error_message : "Unknown error";
auto* sink = events_;
FirebaseDatabasePlugin::DispatchToMainThread(
[sink, code, msg]() { sink->Error(code, msg); });
events_->Error(FirebaseDatabasePlugin::GetDatabaseErrorCode(error),
error_message ? error_message : "Unknown error");
}

private:
Expand All @@ -1060,7 +989,7 @@ void FirebaseDatabasePlugin::QueryObserve(
value_listener_ = new VL(events_.get());
query_.AddValueListener(value_listener_);
} else {
// Child listener — dispatches to platform thread via PostMessage
// Child listener
class CL : public firebase::database::ChildListener {
public:
CL(flutter::EventSink<flutter::EncodableValue>* events,
Expand All @@ -1085,32 +1014,20 @@ void FirebaseDatabasePlugin::QueryObserve(
}
void OnCancelled(const Error& error,
const char* error_message) override {
std::string code =
FirebaseDatabasePlugin::GetDatabaseErrorCode(error);
std::string msg =
error_message ? error_message : "Unknown error";
auto* sink = events_;
FirebaseDatabasePlugin::DispatchToMainThread(
[sink, code, msg]() { sink->Error(code, msg); });
events_->Error(FirebaseDatabasePlugin::GetDatabaseErrorCode(error),
error_message ? error_message : "Unknown error");
}

private:
void Send(const std::string& type, const DataSnapshot& snapshot,
const char* prev) {
// Copy data before dispatching.
EncodableMap event;
event[EncodableValue("eventType")] = EncodableValue(type);
event[EncodableValue("previousChildKey")] =
prev ? EncodableValue(std::string(prev)) : EncodableValue();
event[EncodableValue("snapshot")] = EncodableValue(
FirebaseDatabasePlugin::DataSnapshotToEncodableMap(snapshot));
auto event_value =
std::make_shared<EncodableValue>(std::move(event));
auto* sink = events_;
FirebaseDatabasePlugin::DispatchToMainThread(
[sink, event_value]() {
sink->Success(*event_value);
});
events_->Success(EncodableValue(event));
}
flutter::EventSink<flutter::EncodableValue>* events_;
std::string event_type_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@
#include <flutter/method_channel.h>
#include <flutter/plugin_registrar_windows.h>

#include <functional>
#include <memory>
#include <mutex>
#include <queue>
#include <variant>

#include "firebase/database.h"
#include "firebase/database/common.h"
Expand Down Expand Up @@ -134,14 +130,6 @@ class FirebaseDatabasePlugin : public flutter::Plugin,
static std::map<std::string, firebase::database::Database*>
database_instances_;

// Thread-safe dispatch: posts a task to run on the platform (UI) thread.
static void DispatchToMainThread(std::function<void()> task);

// Platform thread HWND for PostMessage-based dispatch.
static HWND hwnd_;
static std::mutex dispatch_mutex_;
static std::queue<std::function<void()>> dispatch_queue_;

private:
firebase::database::Database* GetDatabaseFromPigeon(
const DatabasePigeonFirebaseApp& app);
Expand Down
Loading