[go: up one dir, main page]

1// Copyright 2013 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5#include "flutter/fml/concurrent_message_loop.h"
6
7#include <algorithm>
8
9#include "flutter/fml/thread.h"
10#include "flutter/fml/trace_event.h"
11
12namespace fml {
13
14ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count)
15 : worker_count_(std::max<size_t>(a: worker_count, b: 1ul)) {
16 for (size_t i = 0; i < worker_count_; ++i) {
17 workers_.emplace_back(args: [i, this]() {
18 fml::Thread::SetCurrentThreadName(fml::Thread::ThreadConfig(
19 std::string{"io.worker." + std::to_string(val: i + 1)}));
20 WorkerMain();
21 });
22 }
23
24 for (const auto& worker : workers_) {
25 worker_thread_ids_.emplace_back(args: worker.get_id());
26 }
27}
28
29ConcurrentMessageLoop::~ConcurrentMessageLoop() {
30 Terminate();
31 for (auto& worker : workers_) {
32 FML_DCHECK(worker.joinable());
33 worker.join();
34 }
35}
36
37size_t ConcurrentMessageLoop::GetWorkerCount() const {
38 return worker_count_;
39}
40
41std::shared_ptr<ConcurrentTaskRunner> ConcurrentMessageLoop::GetTaskRunner() {
42 return std::make_shared<ConcurrentTaskRunner>(args: weak_from_this());
43}
44
45void ConcurrentMessageLoop::PostTask(const fml::closure& task) {
46 if (!task) {
47 return;
48 }
49
50 std::unique_lock lock(tasks_mutex_);
51
52 // Don't just drop tasks on the floor in case of shutdown.
53 if (shutdown_) {
54 FML_DLOG(WARNING)
55 << "Tried to post a task to shutdown concurrent message "
56 "loop. The task will be executed on the callers thread.";
57 lock.unlock();
58 ExecuteTask(task);
59 return;
60 }
61
62 tasks_.push(v: task);
63
64 // Unlock the mutex before notifying the condition variable because that mutex
65 // has to be acquired on the other thread anyway. Waiting in this scope till
66 // it is acquired there is a pessimization.
67 lock.unlock();
68
69 tasks_condition_.notify_one();
70}
71
72void ConcurrentMessageLoop::WorkerMain() {
73 while (true) {
74 std::unique_lock lock(tasks_mutex_);
75 tasks_condition_.wait(lk&: lock, pred: [&]() {
76 return !tasks_.empty() || shutdown_ || HasThreadTasksLocked();
77 });
78
79 // Shutdown cannot be read with the task mutex unlocked.
80 bool shutdown_now = shutdown_;
81 fml::closure task;
82 std::vector<fml::closure> thread_tasks;
83
84 if (!tasks_.empty()) {
85 task = tasks_.front();
86 tasks_.pop();
87 }
88
89 if (HasThreadTasksLocked()) {
90 thread_tasks = GetThreadTasksLocked();
91 FML_DCHECK(!HasThreadTasksLocked());
92 }
93
94 // Don't hold onto the mutex while tasks are being executed as they could
95 // themselves try to post more tasks to the message loop.
96 lock.unlock();
97
98 TRACE_EVENT0("flutter", "ConcurrentWorkerWake");
99 // Execute the primary task we woke up for.
100 if (task) {
101 ExecuteTask(task);
102 }
103
104 // Execute any thread tasks.
105 for (const auto& thread_task : thread_tasks) {
106 ExecuteTask(task: thread_task);
107 }
108
109 if (shutdown_now) {
110 break;
111 }
112 }
113}
114
115void ConcurrentMessageLoop::ExecuteTask(const fml::closure& task) {
116 task();
117}
118
119void ConcurrentMessageLoop::Terminate() {
120 std::scoped_lock lock(tasks_mutex_);
121 shutdown_ = true;
122 tasks_condition_.notify_all();
123}
124
125void ConcurrentMessageLoop::PostTaskToAllWorkers(const fml::closure& task) {
126 if (!task) {
127 return;
128 }
129
130 std::scoped_lock lock(tasks_mutex_);
131 for (const auto& worker_thread_id : worker_thread_ids_) {
132 thread_tasks_[worker_thread_id].emplace_back(args: task);
133 }
134 tasks_condition_.notify_all();
135}
136
137bool ConcurrentMessageLoop::HasThreadTasksLocked() const {
138 return thread_tasks_.count(k: std::this_thread::get_id()) > 0;
139}
140
141std::vector<fml::closure> ConcurrentMessageLoop::GetThreadTasksLocked() {
142 auto found = thread_tasks_.find(k: std::this_thread::get_id());
143 FML_DCHECK(found != thread_tasks_.end());
144 std::vector<fml::closure> pending_tasks;
145 std::swap(x&: pending_tasks, y&: found->second);
146 thread_tasks_.erase(p: found);
147 return pending_tasks;
148}
149
150ConcurrentTaskRunner::ConcurrentTaskRunner(
151 std::weak_ptr<ConcurrentMessageLoop> weak_loop)
152 : weak_loop_(std::move(weak_loop)) {}
153
154ConcurrentTaskRunner::~ConcurrentTaskRunner() = default;
155
156void ConcurrentTaskRunner::PostTask(const fml::closure& task) {
157 if (!task) {
158 return;
159 }
160
161 if (auto loop = weak_loop_.lock()) {
162 loop->PostTask(task);
163 return;
164 }
165
166 FML_DLOG(WARNING)
167 << "Tried to post to a concurrent message loop that has already died. "
168 "Executing the task on the callers thread.";
169 task();
170}
171
172bool ConcurrentMessageLoop::RunsTasksOnCurrentThread() {
173 std::scoped_lock lock(tasks_mutex_);
174 for (const auto& worker_thread_id : worker_thread_ids_) {
175 if (worker_thread_id == std::this_thread::get_id()) {
176 return true;
177 }
178 }
179 return false;
180}
181
182} // namespace fml
183

source code of flutter_engine/flutter/fml/concurrent_message_loop.cc