| 1 | // Copyright 2013 The Flutter Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style license that can be |
| 3 | // found in the LICENSE file. |
| 4 | |
| 5 | #define FML_USED_ON_EMBEDDER |
| 6 | |
| 7 | #include "flutter/fml/message_loop_task_queues.h" |
| 8 | |
| 9 | #include <algorithm> |
| 10 | #include <iostream> |
| 11 | #include <memory> |
| 12 | #include <optional> |
| 13 | |
| 14 | #include "flutter/fml/make_copyable.h" |
| 15 | #include "flutter/fml/task_source.h" |
| 16 | #include "flutter/fml/thread_local.h" |
| 17 | |
| 18 | namespace fml { |
| 19 | |
| 20 | const size_t TaskQueueId::kUnmerged = ULONG_MAX; |
| 21 | |
| 22 | namespace { |
| 23 | |
| 24 | // iOS prior to version 9 prevents c++11 thread_local and __thread specifier, |
| 25 | // having us resort to boxed enum containers. |
| 26 | class TaskSourceGradeHolder { |
| 27 | public: |
| 28 | TaskSourceGrade task_source_grade; |
| 29 | |
| 30 | explicit TaskSourceGradeHolder(TaskSourceGrade task_source_grade_arg) |
| 31 | : task_source_grade(task_source_grade_arg) {} |
| 32 | }; |
| 33 | } // namespace |
| 34 | |
| 35 | FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder> |
| 36 | tls_task_source_grade; |
| 37 | |
| 38 | TaskQueueEntry::TaskQueueEntry(TaskQueueId created_for_arg) |
| 39 | : subsumed_by(_kUnmerged), created_for(created_for_arg) { |
| 40 | wakeable = NULL; |
| 41 | task_observers = TaskObservers(); |
| 42 | task_source = std::make_unique<TaskSource>(args&: created_for); |
| 43 | } |
| 44 | |
| 45 | MessageLoopTaskQueues* MessageLoopTaskQueues::GetInstance() { |
| 46 | static MessageLoopTaskQueues* instance = new MessageLoopTaskQueues; |
| 47 | return instance; |
| 48 | } |
| 49 | |
| 50 | TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() { |
| 51 | std::lock_guard guard(queue_mutex_); |
| 52 | TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_); |
| 53 | ++task_queue_id_counter_; |
| 54 | queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(args&: loop_id); |
| 55 | return loop_id; |
| 56 | } |
| 57 | |
| 58 | MessageLoopTaskQueues::MessageLoopTaskQueues() |
| 59 | : task_queue_id_counter_(0), order_(0) { |
| 60 | tls_task_source_grade.reset( |
| 61 | ptr: new TaskSourceGradeHolder{TaskSourceGrade::kUnspecified}); |
| 62 | } |
| 63 | |
| 64 | MessageLoopTaskQueues::~MessageLoopTaskQueues() = default; |
| 65 | |
| 66 | void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) { |
| 67 | std::lock_guard guard(queue_mutex_); |
| 68 | const auto& queue_entry = queue_entries_.at(k: queue_id); |
| 69 | FML_DCHECK(queue_entry->subsumed_by == _kUnmerged); |
| 70 | auto& subsumed_set = queue_entry->owner_of; |
| 71 | for (auto& subsumed : subsumed_set) { |
| 72 | queue_entries_.erase(k: subsumed); |
| 73 | } |
| 74 | // Erase owner queue_id at last to avoid &subsumed_set from being invalid |
| 75 | queue_entries_.erase(k: queue_id); |
| 76 | } |
| 77 | |
| 78 | void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) { |
| 79 | std::lock_guard guard(queue_mutex_); |
| 80 | const auto& queue_entry = queue_entries_.at(k: queue_id); |
| 81 | FML_DCHECK(queue_entry->subsumed_by == _kUnmerged); |
| 82 | auto& subsumed_set = queue_entry->owner_of; |
| 83 | queue_entry->task_source->ShutDown(); |
| 84 | for (auto& subsumed : subsumed_set) { |
| 85 | queue_entries_.at(k: subsumed)->task_source->ShutDown(); |
| 86 | } |
| 87 | } |
| 88 | |
| 89 | TaskSourceGrade MessageLoopTaskQueues::GetCurrentTaskSourceGrade() { |
| 90 | return tls_task_source_grade.get()->task_source_grade; |
| 91 | } |
| 92 | |
| 93 | void MessageLoopTaskQueues::RegisterTask( |
| 94 | TaskQueueId queue_id, |
| 95 | const fml::closure& task, |
| 96 | fml::TimePoint target_time, |
| 97 | fml::TaskSourceGrade task_source_grade) { |
| 98 | std::lock_guard guard(queue_mutex_); |
| 99 | size_t order = order_++; |
| 100 | const auto& queue_entry = queue_entries_.at(k: queue_id); |
| 101 | queue_entry->task_source->RegisterTask( |
| 102 | task: {order, task, target_time, task_source_grade}); |
| 103 | TaskQueueId loop_to_wake = queue_id; |
| 104 | if (queue_entry->subsumed_by != _kUnmerged) { |
| 105 | loop_to_wake = queue_entry->subsumed_by; |
| 106 | } |
| 107 | |
| 108 | // This can happen when the secondary tasks are paused. |
| 109 | if (HasPendingTasksUnlocked(queue_id: loop_to_wake)) { |
| 110 | WakeUpUnlocked(queue_id: loop_to_wake, time: GetNextWakeTimeUnlocked(queue_id: loop_to_wake)); |
| 111 | } |
| 112 | } |
| 113 | |
| 114 | bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const { |
| 115 | std::lock_guard guard(queue_mutex_); |
| 116 | return HasPendingTasksUnlocked(queue_id); |
| 117 | } |
| 118 | |
| 119 | fml::closure MessageLoopTaskQueues::GetNextTaskToRun(TaskQueueId queue_id, |
| 120 | fml::TimePoint from_time) { |
| 121 | std::lock_guard guard(queue_mutex_); |
| 122 | if (!HasPendingTasksUnlocked(queue_id)) { |
| 123 | return nullptr; |
| 124 | } |
| 125 | TaskSource::TopTask top = PeekNextTaskUnlocked(owner: queue_id); |
| 126 | |
| 127 | if (!HasPendingTasksUnlocked(queue_id)) { |
| 128 | WakeUpUnlocked(queue_id, time: fml::TimePoint::Max()); |
| 129 | } else { |
| 130 | WakeUpUnlocked(queue_id, time: GetNextWakeTimeUnlocked(queue_id)); |
| 131 | } |
| 132 | |
| 133 | if (top.task.GetTargetTime() > from_time) { |
| 134 | return nullptr; |
| 135 | } |
| 136 | fml::closure invocation = top.task.GetTask(); |
| 137 | queue_entries_.at(k: top.task_queue_id) |
| 138 | ->task_source->PopTask(grade: top.task.GetTaskSourceGrade()); |
| 139 | const auto task_source_grade = top.task.GetTaskSourceGrade(); |
| 140 | tls_task_source_grade.reset(ptr: new TaskSourceGradeHolder{task_source_grade}); |
| 141 | return invocation; |
| 142 | } |
| 143 | |
| 144 | void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id, |
| 145 | fml::TimePoint time) const { |
| 146 | if (queue_entries_.at(k: queue_id)->wakeable) { |
| 147 | queue_entries_.at(k: queue_id)->wakeable->WakeUp(time_point: time); |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const { |
| 152 | std::lock_guard guard(queue_mutex_); |
| 153 | const auto& queue_entry = queue_entries_.at(k: queue_id); |
| 154 | if (queue_entry->subsumed_by != _kUnmerged) { |
| 155 | return 0; |
| 156 | } |
| 157 | |
| 158 | size_t total_tasks = 0; |
| 159 | total_tasks += queue_entry->task_source->GetNumPendingTasks(); |
| 160 | |
| 161 | auto& subsumed_set = queue_entry->owner_of; |
| 162 | for (auto& subsumed : subsumed_set) { |
| 163 | const auto& subsumed_entry = queue_entries_.at(k: subsumed); |
| 164 | total_tasks += subsumed_entry->task_source->GetNumPendingTasks(); |
| 165 | } |
| 166 | return total_tasks; |
| 167 | } |
| 168 | |
| 169 | void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id, |
| 170 | intptr_t key, |
| 171 | const fml::closure& callback) { |
| 172 | std::lock_guard guard(queue_mutex_); |
| 173 | FML_DCHECK(callback != nullptr) << "Observer callback must be non-null." ; |
| 174 | queue_entries_.at(k: queue_id)->task_observers[key] = callback; |
| 175 | } |
| 176 | |
| 177 | void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id, |
| 178 | intptr_t key) { |
| 179 | std::lock_guard guard(queue_mutex_); |
| 180 | queue_entries_.at(k: queue_id)->task_observers.erase(k: key); |
| 181 | } |
| 182 | |
| 183 | std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify( |
| 184 | TaskQueueId queue_id) const { |
| 185 | std::lock_guard guard(queue_mutex_); |
| 186 | std::vector<fml::closure> observers; |
| 187 | |
| 188 | if (queue_entries_.at(k: queue_id)->subsumed_by != _kUnmerged) { |
| 189 | return observers; |
| 190 | } |
| 191 | |
| 192 | for (const auto& observer : queue_entries_.at(k: queue_id)->task_observers) { |
| 193 | observers.push_back(x: observer.second); |
| 194 | } |
| 195 | |
| 196 | auto& subsumed_set = queue_entries_.at(k: queue_id)->owner_of; |
| 197 | for (auto& subsumed : subsumed_set) { |
| 198 | for (const auto& observer : queue_entries_.at(k: subsumed)->task_observers) { |
| 199 | observers.push_back(x: observer.second); |
| 200 | } |
| 201 | } |
| 202 | |
| 203 | return observers; |
| 204 | } |
| 205 | |
| 206 | void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id, |
| 207 | fml::Wakeable* wakeable) { |
| 208 | std::lock_guard guard(queue_mutex_); |
| 209 | FML_CHECK(!queue_entries_.at(queue_id)->wakeable) |
| 210 | << "Wakeable can only be set once." ; |
| 211 | queue_entries_.at(k: queue_id)->wakeable = wakeable; |
| 212 | } |
| 213 | |
| 214 | bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) { |
| 215 | if (owner == subsumed) { |
| 216 | return true; |
| 217 | } |
| 218 | std::lock_guard guard(queue_mutex_); |
| 219 | auto& owner_entry = queue_entries_.at(k: owner); |
| 220 | auto& subsumed_entry = queue_entries_.at(k: subsumed); |
| 221 | auto& subsumed_set = owner_entry->owner_of; |
| 222 | if (subsumed_set.find(k: subsumed) != subsumed_set.end()) { |
| 223 | return true; |
| 224 | } |
| 225 | |
| 226 | // Won't check owner_entry->owner_of, because it may contains items when |
| 227 | // merged with other different queues. |
| 228 | |
| 229 | // Ensure owner_entry->subsumed_by being _kUnmerged |
| 230 | if (owner_entry->subsumed_by != _kUnmerged) { |
| 231 | FML_LOG(WARNING) << "Thread merging failed: owner_entry was already " |
| 232 | "subsumed by others, owner=" |
| 233 | << owner << ", subsumed=" << subsumed |
| 234 | << ", owner->subsumed_by=" << owner_entry->subsumed_by; |
| 235 | return false; |
| 236 | } |
| 237 | // Ensure subsumed_entry->owner_of being empty |
| 238 | if (!subsumed_entry->owner_of.empty()) { |
| 239 | FML_LOG(WARNING) |
| 240 | << "Thread merging failed: subsumed_entry already owns others, owner=" |
| 241 | << owner << ", subsumed=" << subsumed |
| 242 | << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size(); |
| 243 | return false; |
| 244 | } |
| 245 | // Ensure subsumed_entry->subsumed_by being _kUnmerged |
| 246 | if (subsumed_entry->subsumed_by != _kUnmerged) { |
| 247 | FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already " |
| 248 | "subsumed by others, owner=" |
| 249 | << owner << ", subsumed=" << subsumed |
| 250 | << ", subsumed->subsumed_by=" |
| 251 | << subsumed_entry->subsumed_by; |
| 252 | return false; |
| 253 | } |
| 254 | // All checking is OK, set merged state. |
| 255 | owner_entry->owner_of.insert(v: subsumed); |
| 256 | subsumed_entry->subsumed_by = owner; |
| 257 | |
| 258 | if (HasPendingTasksUnlocked(queue_id: owner)) { |
| 259 | WakeUpUnlocked(queue_id: owner, time: GetNextWakeTimeUnlocked(queue_id: owner)); |
| 260 | } |
| 261 | |
| 262 | return true; |
| 263 | } |
| 264 | |
| 265 | bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner, TaskQueueId subsumed) { |
| 266 | std::lock_guard guard(queue_mutex_); |
| 267 | const auto& owner_entry = queue_entries_.at(k: owner); |
| 268 | if (owner_entry->owner_of.empty()) { |
| 269 | FML_LOG(WARNING) |
| 270 | << "Thread unmerging failed: owner_entry doesn't own anyone, owner=" |
| 271 | << owner << ", subsumed=" << subsumed; |
| 272 | return false; |
| 273 | } |
| 274 | if (owner_entry->subsumed_by != _kUnmerged) { |
| 275 | FML_LOG(WARNING) |
| 276 | << "Thread unmerging failed: owner_entry was subsumed by others, owner=" |
| 277 | << owner << ", subsumed=" << subsumed |
| 278 | << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by; |
| 279 | return false; |
| 280 | } |
| 281 | if (queue_entries_.at(k: subsumed)->subsumed_by == _kUnmerged) { |
| 282 | FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't " |
| 283 | "subsumed by others, owner=" |
| 284 | << owner << ", subsumed=" << subsumed; |
| 285 | return false; |
| 286 | } |
| 287 | if (owner_entry->owner_of.find(k: subsumed) == owner_entry->owner_of.end()) { |
| 288 | FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the " |
| 289 | "given subsumed queue id, owner=" |
| 290 | << owner << ", subsumed=" << subsumed; |
| 291 | return false; |
| 292 | } |
| 293 | |
| 294 | queue_entries_.at(k: subsumed)->subsumed_by = _kUnmerged; |
| 295 | owner_entry->owner_of.erase(k: subsumed); |
| 296 | |
| 297 | if (HasPendingTasksUnlocked(queue_id: owner)) { |
| 298 | WakeUpUnlocked(queue_id: owner, time: GetNextWakeTimeUnlocked(queue_id: owner)); |
| 299 | } |
| 300 | |
| 301 | if (HasPendingTasksUnlocked(queue_id: subsumed)) { |
| 302 | WakeUpUnlocked(queue_id: subsumed, time: GetNextWakeTimeUnlocked(queue_id: subsumed)); |
| 303 | } |
| 304 | |
| 305 | return true; |
| 306 | } |
| 307 | |
| 308 | bool MessageLoopTaskQueues::Owns(TaskQueueId owner, |
| 309 | TaskQueueId subsumed) const { |
| 310 | std::lock_guard guard(queue_mutex_); |
| 311 | if (owner == _kUnmerged || subsumed == _kUnmerged) { |
| 312 | return false; |
| 313 | } |
| 314 | auto& subsumed_set = queue_entries_.at(k: owner)->owner_of; |
| 315 | return subsumed_set.find(k: subsumed) != subsumed_set.end(); |
| 316 | } |
| 317 | |
| 318 | std::set<TaskQueueId> MessageLoopTaskQueues::GetSubsumedTaskQueueId( |
| 319 | TaskQueueId owner) const { |
| 320 | std::lock_guard guard(queue_mutex_); |
| 321 | return queue_entries_.at(k: owner)->owner_of; |
| 322 | } |
| 323 | |
| 324 | void MessageLoopTaskQueues::PauseSecondarySource(TaskQueueId queue_id) { |
| 325 | std::lock_guard guard(queue_mutex_); |
| 326 | queue_entries_.at(k: queue_id)->task_source->PauseSecondary(); |
| 327 | } |
| 328 | |
| 329 | void MessageLoopTaskQueues::ResumeSecondarySource(TaskQueueId queue_id) { |
| 330 | std::lock_guard guard(queue_mutex_); |
| 331 | queue_entries_.at(k: queue_id)->task_source->ResumeSecondary(); |
| 332 | // Schedule a wake as needed. |
| 333 | if (HasPendingTasksUnlocked(queue_id)) { |
| 334 | WakeUpUnlocked(queue_id, time: GetNextWakeTimeUnlocked(queue_id)); |
| 335 | } |
| 336 | } |
| 337 | |
| 338 | // Subsumed queues will never have pending tasks. |
| 339 | // Owning queues will consider both their and their subsumed tasks. |
| 340 | bool MessageLoopTaskQueues::HasPendingTasksUnlocked( |
| 341 | TaskQueueId queue_id) const { |
| 342 | const auto& entry = queue_entries_.at(k: queue_id); |
| 343 | bool is_subsumed = entry->subsumed_by != _kUnmerged; |
| 344 | if (is_subsumed) { |
| 345 | return false; |
| 346 | } |
| 347 | |
| 348 | if (!entry->task_source->IsEmpty()) { |
| 349 | return true; |
| 350 | } |
| 351 | |
| 352 | auto& subsumed_set = entry->owner_of; |
| 353 | return std::any_of( |
| 354 | first: subsumed_set.begin(), last: subsumed_set.end(), pred: [&](const auto& subsumed) { |
| 355 | return !queue_entries_.at(subsumed)->task_source->IsEmpty(); |
| 356 | }); |
| 357 | } |
| 358 | |
| 359 | fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked( |
| 360 | TaskQueueId queue_id) const { |
| 361 | return PeekNextTaskUnlocked(owner: queue_id).task.GetTargetTime(); |
| 362 | } |
| 363 | |
| 364 | TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked( |
| 365 | TaskQueueId owner) const { |
| 366 | FML_DCHECK(HasPendingTasksUnlocked(owner)); |
| 367 | const auto& entry = queue_entries_.at(k: owner); |
| 368 | if (entry->owner_of.empty()) { |
| 369 | FML_CHECK(!entry->task_source->IsEmpty()); |
| 370 | return entry->task_source->Top(); |
| 371 | } |
| 372 | |
| 373 | // Use optional for the memory of TopTask object. |
| 374 | std::optional<TaskSource::TopTask> top_task; |
| 375 | |
| 376 | std::function<void(const TaskSource*)> top_task_updater = |
| 377 | [&top_task](const TaskSource* source) { |
| 378 | if (source && !source->IsEmpty()) { |
| 379 | TaskSource::TopTask other_task = source->Top(); |
| 380 | if (!top_task.has_value() || top_task->task > other_task.task) { |
| 381 | top_task.emplace(args&: other_task); |
| 382 | } |
| 383 | } |
| 384 | }; |
| 385 | |
| 386 | TaskSource* owner_tasks = entry->task_source.get(); |
| 387 | top_task_updater(owner_tasks); |
| 388 | |
| 389 | for (TaskQueueId subsumed : entry->owner_of) { |
| 390 | TaskSource* subsumed_tasks = queue_entries_.at(k: subsumed)->task_source.get(); |
| 391 | top_task_updater(subsumed_tasks); |
| 392 | } |
| 393 | // At least one task at the top because PeekNextTaskUnlocked() is called after |
| 394 | // HasPendingTasksUnlocked() |
| 395 | FML_CHECK(top_task.has_value()); |
| 396 | // Covered by FML_CHECK. |
| 397 | // NOLINTNEXTLINE(bugprone-unchecked-optional-access) |
| 398 | return top_task.value(); |
| 399 | } |
| 400 | |
| 401 | } // namespace fml |
| 402 | |