1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#define FML_USED_ON_EMBEDDER
6
7#include "flutter/fml/message_loop_task_queues.h"
8
9#include <algorithm>
10#include <iostream>
11#include <memory>
12#include <optional>
13
14#include "flutter/fml/make_copyable.h"
15#include "flutter/fml/task_source.h"
16#include "flutter/fml/thread_local.h"
17
18namespace fml {
19
20const size_t TaskQueueId::kUnmerged = ULONG_MAX;
21
22namespace {
23
24// iOS prior to version 9 prevents c++11 thread_local and __thread specifier,
25// having us resort to boxed enum containers.
26class TaskSourceGradeHolder {
27 public:
28 TaskSourceGrade task_source_grade;
29
30 explicit TaskSourceGradeHolder(TaskSourceGrade task_source_grade_arg)
31 : task_source_grade(task_source_grade_arg) {}
32};
33} // namespace
34
35FML_THREAD_LOCAL ThreadLocalUniquePtr<TaskSourceGradeHolder>
36 tls_task_source_grade;
37
38TaskQueueEntry::TaskQueueEntry(TaskQueueId created_for_arg)
39 : subsumed_by(_kUnmerged), created_for(created_for_arg) {
40 wakeable = NULL;
41 task_observers = TaskObservers();
42 task_source = std::make_unique<TaskSource>(args&: created_for);
43}
44
45MessageLoopTaskQueues* MessageLoopTaskQueues::GetInstance() {
46 static MessageLoopTaskQueues* instance = new MessageLoopTaskQueues;
47 return instance;
48}
49
50TaskQueueId MessageLoopTaskQueues::CreateTaskQueue() {
51 std::lock_guard guard(queue_mutex_);
52 TaskQueueId loop_id = TaskQueueId(task_queue_id_counter_);
53 ++task_queue_id_counter_;
54 queue_entries_[loop_id] = std::make_unique<TaskQueueEntry>(args&: loop_id);
55 return loop_id;
56}
57
58MessageLoopTaskQueues::MessageLoopTaskQueues()
59 : task_queue_id_counter_(0), order_(0) {
60 tls_task_source_grade.reset(
61 ptr: new TaskSourceGradeHolder{TaskSourceGrade::kUnspecified});
62}
63
64MessageLoopTaskQueues::~MessageLoopTaskQueues() = default;
65
66void MessageLoopTaskQueues::Dispose(TaskQueueId queue_id) {
67 std::lock_guard guard(queue_mutex_);
68 const auto& queue_entry = queue_entries_.at(k: queue_id);
69 FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
70 auto& subsumed_set = queue_entry->owner_of;
71 for (auto& subsumed : subsumed_set) {
72 queue_entries_.erase(k: subsumed);
73 }
74 // Erase owner queue_id at last to avoid &subsumed_set from being invalid
75 queue_entries_.erase(k: queue_id);
76}
77
78void MessageLoopTaskQueues::DisposeTasks(TaskQueueId queue_id) {
79 std::lock_guard guard(queue_mutex_);
80 const auto& queue_entry = queue_entries_.at(k: queue_id);
81 FML_DCHECK(queue_entry->subsumed_by == _kUnmerged);
82 auto& subsumed_set = queue_entry->owner_of;
83 queue_entry->task_source->ShutDown();
84 for (auto& subsumed : subsumed_set) {
85 queue_entries_.at(k: subsumed)->task_source->ShutDown();
86 }
87}
88
89TaskSourceGrade MessageLoopTaskQueues::GetCurrentTaskSourceGrade() {
90 return tls_task_source_grade.get()->task_source_grade;
91}
92
93void MessageLoopTaskQueues::RegisterTask(
94 TaskQueueId queue_id,
95 const fml::closure& task,
96 fml::TimePoint target_time,
97 fml::TaskSourceGrade task_source_grade) {
98 std::lock_guard guard(queue_mutex_);
99 size_t order = order_++;
100 const auto& queue_entry = queue_entries_.at(k: queue_id);
101 queue_entry->task_source->RegisterTask(
102 task: {order, task, target_time, task_source_grade});
103 TaskQueueId loop_to_wake = queue_id;
104 if (queue_entry->subsumed_by != _kUnmerged) {
105 loop_to_wake = queue_entry->subsumed_by;
106 }
107
108 // This can happen when the secondary tasks are paused.
109 if (HasPendingTasksUnlocked(queue_id: loop_to_wake)) {
110 WakeUpUnlocked(queue_id: loop_to_wake, time: GetNextWakeTimeUnlocked(queue_id: loop_to_wake));
111 }
112}
113
114bool MessageLoopTaskQueues::HasPendingTasks(TaskQueueId queue_id) const {
115 std::lock_guard guard(queue_mutex_);
116 return HasPendingTasksUnlocked(queue_id);
117}
118
119fml::closure MessageLoopTaskQueues::GetNextTaskToRun(TaskQueueId queue_id,
120 fml::TimePoint from_time) {
121 std::lock_guard guard(queue_mutex_);
122 if (!HasPendingTasksUnlocked(queue_id)) {
123 return nullptr;
124 }
125 TaskSource::TopTask top = PeekNextTaskUnlocked(owner: queue_id);
126
127 if (!HasPendingTasksUnlocked(queue_id)) {
128 WakeUpUnlocked(queue_id, time: fml::TimePoint::Max());
129 } else {
130 WakeUpUnlocked(queue_id, time: GetNextWakeTimeUnlocked(queue_id));
131 }
132
133 if (top.task.GetTargetTime() > from_time) {
134 return nullptr;
135 }
136 fml::closure invocation = top.task.GetTask();
137 queue_entries_.at(k: top.task_queue_id)
138 ->task_source->PopTask(grade: top.task.GetTaskSourceGrade());
139 const auto task_source_grade = top.task.GetTaskSourceGrade();
140 tls_task_source_grade.reset(ptr: new TaskSourceGradeHolder{task_source_grade});
141 return invocation;
142}
143
144void MessageLoopTaskQueues::WakeUpUnlocked(TaskQueueId queue_id,
145 fml::TimePoint time) const {
146 if (queue_entries_.at(k: queue_id)->wakeable) {
147 queue_entries_.at(k: queue_id)->wakeable->WakeUp(time_point: time);
148 }
149}
150
151size_t MessageLoopTaskQueues::GetNumPendingTasks(TaskQueueId queue_id) const {
152 std::lock_guard guard(queue_mutex_);
153 const auto& queue_entry = queue_entries_.at(k: queue_id);
154 if (queue_entry->subsumed_by != _kUnmerged) {
155 return 0;
156 }
157
158 size_t total_tasks = 0;
159 total_tasks += queue_entry->task_source->GetNumPendingTasks();
160
161 auto& subsumed_set = queue_entry->owner_of;
162 for (auto& subsumed : subsumed_set) {
163 const auto& subsumed_entry = queue_entries_.at(k: subsumed);
164 total_tasks += subsumed_entry->task_source->GetNumPendingTasks();
165 }
166 return total_tasks;
167}
168
169void MessageLoopTaskQueues::AddTaskObserver(TaskQueueId queue_id,
170 intptr_t key,
171 const fml::closure& callback) {
172 std::lock_guard guard(queue_mutex_);
173 FML_DCHECK(callback != nullptr) << "Observer callback must be non-null.";
174 queue_entries_.at(k: queue_id)->task_observers[key] = callback;
175}
176
177void MessageLoopTaskQueues::RemoveTaskObserver(TaskQueueId queue_id,
178 intptr_t key) {
179 std::lock_guard guard(queue_mutex_);
180 queue_entries_.at(k: queue_id)->task_observers.erase(k: key);
181}
182
183std::vector<fml::closure> MessageLoopTaskQueues::GetObserversToNotify(
184 TaskQueueId queue_id) const {
185 std::lock_guard guard(queue_mutex_);
186 std::vector<fml::closure> observers;
187
188 if (queue_entries_.at(k: queue_id)->subsumed_by != _kUnmerged) {
189 return observers;
190 }
191
192 for (const auto& observer : queue_entries_.at(k: queue_id)->task_observers) {
193 observers.push_back(x: observer.second);
194 }
195
196 auto& subsumed_set = queue_entries_.at(k: queue_id)->owner_of;
197 for (auto& subsumed : subsumed_set) {
198 for (const auto& observer : queue_entries_.at(k: subsumed)->task_observers) {
199 observers.push_back(x: observer.second);
200 }
201 }
202
203 return observers;
204}
205
206void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
207 fml::Wakeable* wakeable) {
208 std::lock_guard guard(queue_mutex_);
209 FML_CHECK(!queue_entries_.at(queue_id)->wakeable)
210 << "Wakeable can only be set once.";
211 queue_entries_.at(k: queue_id)->wakeable = wakeable;
212}
213
214bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
215 if (owner == subsumed) {
216 return true;
217 }
218 std::lock_guard guard(queue_mutex_);
219 auto& owner_entry = queue_entries_.at(k: owner);
220 auto& subsumed_entry = queue_entries_.at(k: subsumed);
221 auto& subsumed_set = owner_entry->owner_of;
222 if (subsumed_set.find(k: subsumed) != subsumed_set.end()) {
223 return true;
224 }
225
226 // Won't check owner_entry->owner_of, because it may contains items when
227 // merged with other different queues.
228
229 // Ensure owner_entry->subsumed_by being _kUnmerged
230 if (owner_entry->subsumed_by != _kUnmerged) {
231 FML_LOG(WARNING) << "Thread merging failed: owner_entry was already "
232 "subsumed by others, owner="
233 << owner << ", subsumed=" << subsumed
234 << ", owner->subsumed_by=" << owner_entry->subsumed_by;
235 return false;
236 }
237 // Ensure subsumed_entry->owner_of being empty
238 if (!subsumed_entry->owner_of.empty()) {
239 FML_LOG(WARNING)
240 << "Thread merging failed: subsumed_entry already owns others, owner="
241 << owner << ", subsumed=" << subsumed
242 << ", subsumed->owner_of.size()=" << subsumed_entry->owner_of.size();
243 return false;
244 }
245 // Ensure subsumed_entry->subsumed_by being _kUnmerged
246 if (subsumed_entry->subsumed_by != _kUnmerged) {
247 FML_LOG(WARNING) << "Thread merging failed: subsumed_entry was already "
248 "subsumed by others, owner="
249 << owner << ", subsumed=" << subsumed
250 << ", subsumed->subsumed_by="
251 << subsumed_entry->subsumed_by;
252 return false;
253 }
254 // All checking is OK, set merged state.
255 owner_entry->owner_of.insert(v: subsumed);
256 subsumed_entry->subsumed_by = owner;
257
258 if (HasPendingTasksUnlocked(queue_id: owner)) {
259 WakeUpUnlocked(queue_id: owner, time: GetNextWakeTimeUnlocked(queue_id: owner));
260 }
261
262 return true;
263}
264
265bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner, TaskQueueId subsumed) {
266 std::lock_guard guard(queue_mutex_);
267 const auto& owner_entry = queue_entries_.at(k: owner);
268 if (owner_entry->owner_of.empty()) {
269 FML_LOG(WARNING)
270 << "Thread unmerging failed: owner_entry doesn't own anyone, owner="
271 << owner << ", subsumed=" << subsumed;
272 return false;
273 }
274 if (owner_entry->subsumed_by != _kUnmerged) {
275 FML_LOG(WARNING)
276 << "Thread unmerging failed: owner_entry was subsumed by others, owner="
277 << owner << ", subsumed=" << subsumed
278 << ", owner_entry->subsumed_by=" << owner_entry->subsumed_by;
279 return false;
280 }
281 if (queue_entries_.at(k: subsumed)->subsumed_by == _kUnmerged) {
282 FML_LOG(WARNING) << "Thread unmerging failed: subsumed_entry wasn't "
283 "subsumed by others, owner="
284 << owner << ", subsumed=" << subsumed;
285 return false;
286 }
287 if (owner_entry->owner_of.find(k: subsumed) == owner_entry->owner_of.end()) {
288 FML_LOG(WARNING) << "Thread unmerging failed: owner_entry didn't own the "
289 "given subsumed queue id, owner="
290 << owner << ", subsumed=" << subsumed;
291 return false;
292 }
293
294 queue_entries_.at(k: subsumed)->subsumed_by = _kUnmerged;
295 owner_entry->owner_of.erase(k: subsumed);
296
297 if (HasPendingTasksUnlocked(queue_id: owner)) {
298 WakeUpUnlocked(queue_id: owner, time: GetNextWakeTimeUnlocked(queue_id: owner));
299 }
300
301 if (HasPendingTasksUnlocked(queue_id: subsumed)) {
302 WakeUpUnlocked(queue_id: subsumed, time: GetNextWakeTimeUnlocked(queue_id: subsumed));
303 }
304
305 return true;
306}
307
308bool MessageLoopTaskQueues::Owns(TaskQueueId owner,
309 TaskQueueId subsumed) const {
310 std::lock_guard guard(queue_mutex_);
311 if (owner == _kUnmerged || subsumed == _kUnmerged) {
312 return false;
313 }
314 auto& subsumed_set = queue_entries_.at(k: owner)->owner_of;
315 return subsumed_set.find(k: subsumed) != subsumed_set.end();
316}
317
318std::set<TaskQueueId> MessageLoopTaskQueues::GetSubsumedTaskQueueId(
319 TaskQueueId owner) const {
320 std::lock_guard guard(queue_mutex_);
321 return queue_entries_.at(k: owner)->owner_of;
322}
323
324void MessageLoopTaskQueues::PauseSecondarySource(TaskQueueId queue_id) {
325 std::lock_guard guard(queue_mutex_);
326 queue_entries_.at(k: queue_id)->task_source->PauseSecondary();
327}
328
329void MessageLoopTaskQueues::ResumeSecondarySource(TaskQueueId queue_id) {
330 std::lock_guard guard(queue_mutex_);
331 queue_entries_.at(k: queue_id)->task_source->ResumeSecondary();
332 // Schedule a wake as needed.
333 if (HasPendingTasksUnlocked(queue_id)) {
334 WakeUpUnlocked(queue_id, time: GetNextWakeTimeUnlocked(queue_id));
335 }
336}
337
338// Subsumed queues will never have pending tasks.
339// Owning queues will consider both their and their subsumed tasks.
340bool MessageLoopTaskQueues::HasPendingTasksUnlocked(
341 TaskQueueId queue_id) const {
342 const auto& entry = queue_entries_.at(k: queue_id);
343 bool is_subsumed = entry->subsumed_by != _kUnmerged;
344 if (is_subsumed) {
345 return false;
346 }
347
348 if (!entry->task_source->IsEmpty()) {
349 return true;
350 }
351
352 auto& subsumed_set = entry->owner_of;
353 return std::any_of(
354 first: subsumed_set.begin(), last: subsumed_set.end(), pred: [&](const auto& subsumed) {
355 return !queue_entries_.at(subsumed)->task_source->IsEmpty();
356 });
357}
358
359fml::TimePoint MessageLoopTaskQueues::GetNextWakeTimeUnlocked(
360 TaskQueueId queue_id) const {
361 return PeekNextTaskUnlocked(owner: queue_id).task.GetTargetTime();
362}
363
364TaskSource::TopTask MessageLoopTaskQueues::PeekNextTaskUnlocked(
365 TaskQueueId owner) const {
366 FML_DCHECK(HasPendingTasksUnlocked(owner));
367 const auto& entry = queue_entries_.at(k: owner);
368 if (entry->owner_of.empty()) {
369 FML_CHECK(!entry->task_source->IsEmpty());
370 return entry->task_source->Top();
371 }
372
373 // Use optional for the memory of TopTask object.
374 std::optional<TaskSource::TopTask> top_task;
375
376 std::function<void(const TaskSource*)> top_task_updater =
377 [&top_task](const TaskSource* source) {
378 if (source && !source->IsEmpty()) {
379 TaskSource::TopTask other_task = source->Top();
380 if (!top_task.has_value() || top_task->task > other_task.task) {
381 top_task.emplace(args&: other_task);
382 }
383 }
384 };
385
386 TaskSource* owner_tasks = entry->task_source.get();
387 top_task_updater(owner_tasks);
388
389 for (TaskQueueId subsumed : entry->owner_of) {
390 TaskSource* subsumed_tasks = queue_entries_.at(k: subsumed)->task_source.get();
391 top_task_updater(subsumed_tasks);
392 }
393 // At least one task at the top because PeekNextTaskUnlocked() is called after
394 // HasPendingTasksUnlocked()
395 FML_CHECK(top_task.has_value());
396 // Covered by FML_CHECK.
397 // NOLINTNEXTLINE(bugprone-unchecked-optional-access)
398 return top_task.value();
399}
400
401} // namespace fml
402

source code of flutter_engine/flutter/fml/message_loop_task_queues.cc