1// Copyright 2014 The Flutter Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5import 'dart:async';
6import 'dart:convert';
7import 'dart:developer';
8import 'dart:io';
9import 'dart:isolate';
10
11import 'package:logging/logging.dart';
12import 'package:path/path.dart' as path;
13import 'package:process/process.dart';
14import 'package:stack_trace/stack_trace.dart';
15
16import 'devices.dart';
17import 'host_agent.dart';
18import 'running_processes.dart';
19import 'task_result.dart';
20import 'utils.dart';
21
22/// Identifiers for devices that should never be rebooted.
23final Set<String> noRebootForbidList = <String>{
24 '822ef7958bba573829d85eef4df6cbdd86593730', // 32bit iPhone requires manual intervention on reboot.
25};
26
27/// The maximum number of test runs before a device must be rebooted.
28///
29/// This number was chosen arbitrarily.
30const int maximumRuns = 30;
31
32/// Represents a unit of work performed in the CI environment that can
33/// succeed, fail and be retried independently of others.
34typedef TaskFunction = Future<TaskResult> Function();
35
36bool _isTaskRegistered = false;
37
38/// Registers a [task] to run, returns the result when it is complete.
39///
40/// The task does not run immediately but waits for the request via the
41/// VM service protocol to run it.
42///
43/// It is OK for a [task] to perform many things. However, only one task can be
44/// registered per Dart VM.
45///
46/// If no `processManager` is provided, a default [LocalProcessManager] is created
47/// for the task.
48Future<TaskResult> task(TaskFunction task, {ProcessManager? processManager}) async {
49 if (_isTaskRegistered) {
50 throw StateError('A task is already registered');
51 }
52 _isTaskRegistered = true;
53
54 processManager ??= const LocalProcessManager();
55
56 // TODO(ianh): allow overriding logging.
57 Logger.root.level = Level.ALL;
58 Logger.root.onRecord.listen((LogRecord rec) {
59 print('${rec.level.name}: ${rec.time}: ${rec.message}');
60 });
61
62 final _TaskRunner runner = _TaskRunner(task, processManager);
63 runner.keepVmAliveUntilTaskRunRequested();
64 return runner.whenDone;
65}
66
67class _TaskRunner {
68 _TaskRunner(this.task, this.processManager) {
69 final String successResponse = json.encode(const <String, String>{'result': 'success'});
70
71 registerExtension('ext.cocoonRunTask', (String method, Map<String, String> parameters) async {
72 final Duration? taskTimeout = parameters.containsKey('timeoutInMinutes')
73 ? Duration(minutes: int.parse(parameters['timeoutInMinutes']!))
74 : null;
75 final bool runFlutterConfig =
76 parameters['runFlutterConfig'] !=
77 'false'; // used by tests to avoid changing the configuration
78 final bool runProcessCleanup = parameters['runProcessCleanup'] != 'false';
79 final String? localEngine = parameters['localEngine'];
80 final String? localEngineHost = parameters['localEngineHost'];
81 final TaskResult result = await run(
82 taskTimeout,
83 runProcessCleanup: runProcessCleanup,
84 runFlutterConfig: runFlutterConfig,
85 localEngine: localEngine,
86 localEngineHost: localEngineHost,
87 );
88 const Duration taskResultReceivedTimeout = Duration(seconds: 30);
89 _taskResultReceivedTimeout = Timer(taskResultReceivedTimeout, () {
90 logger.severe(
91 'Task runner did not acknowledge task results in $taskResultReceivedTimeout.',
92 );
93 _closeKeepAlivePort();
94 exitCode = 1;
95 });
96 return ServiceExtensionResponse.result(json.encode(result.toJson()));
97 });
98 registerExtension('ext.cocoonRunnerReady', (
99 String method,
100 Map<String, String> parameters,
101 ) async {
102 return ServiceExtensionResponse.result(successResponse);
103 });
104 registerExtension('ext.cocoonTaskResultReceived', (
105 String method,
106 Map<String, String> parameters,
107 ) async {
108 _closeKeepAlivePort();
109 return ServiceExtensionResponse.result(successResponse);
110 });
111 }
112
113 final TaskFunction task;
114 final ProcessManager processManager;
115
116 Future<Device?> _getWorkingDeviceIfAvailable() async {
117 try {
118 return await devices.workingDevice;
119 } on DeviceException {
120 return null;
121 }
122 }
123
124 // TODO(ianh): workaround for https://github.com/dart-lang/sdk/issues/23797
125 RawReceivePort? _keepAlivePort;
126 Timer? _startTaskTimeout;
127 Timer? _taskResultReceivedTimeout;
128 bool _taskStarted = false;
129
130 final Completer<TaskResult> _completer = Completer<TaskResult>();
131
132 static final Logger logger = Logger('TaskRunner');
133
134 /// Signals that this task runner finished running the task.
135 Future<TaskResult> get whenDone => _completer.future;
136
137 Future<TaskResult> run(
138 Duration? taskTimeout, {
139 bool runFlutterConfig = true,
140 bool runProcessCleanup = true,
141 required String? localEngine,
142 required String? localEngineHost,
143 }) async {
144 try {
145 _taskStarted = true;
146 print('Running task with a timeout of $taskTimeout.');
147 final String exe = Platform.isWindows ? '.exe' : '';
148 late Set<RunningProcessInfo> beforeRunningDartInstances;
149 if (runProcessCleanup) {
150 section('Checking running Dart$exe processes');
151 beforeRunningDartInstances = await getRunningProcesses(
152 processName: 'dart$exe',
153 processManager: processManager,
154 );
155 final Set<RunningProcessInfo> allProcesses = await getRunningProcesses(
156 processManager: processManager,
157 );
158 beforeRunningDartInstances.forEach(print);
159 for (final RunningProcessInfo info in allProcesses) {
160 if (info.commandLine.contains('iproxy')) {
161 print('[LEAK]: ${info.commandLine} ${info.creationDate} ${info.pid} ');
162 }
163 }
164 }
165
166 if (runFlutterConfig) {
167 print('Enabling configs for macOS and Linux...');
168 final int configResult = await exec(
169 path.join(flutterDirectory.path, 'bin', 'flutter'),
170 <String>[
171 'config',
172 '-v',
173 '--enable-macos-desktop',
174 '--enable-linux-desktop',
175 if (localEngine != null) ...<String>['--local-engine', localEngine],
176 if (localEngineHost != null) ...<String>['--local-engine-host', localEngineHost],
177 ],
178 canFail: true,
179 );
180 if (configResult != 0) {
181 print('Failed to enable configuration, tasks may not run.');
182 }
183 }
184
185 final Device? device = await _getWorkingDeviceIfAvailable();
186
187 // Some tests assume the phone is in home
188 await device?.home();
189
190 late TaskResult result;
191 IOSink? sink;
192 try {
193 if (device != null && device.canStreamLogs && hostAgent.dumpDirectory != null) {
194 sink = File(
195 path.join(hostAgent.dumpDirectory!.path, '${device.deviceId}.log'),
196 ).openWrite();
197 await device.startLoggingToSink(sink);
198 }
199
200 Future<TaskResult> futureResult = _performTask();
201 if (taskTimeout != null) {
202 futureResult = futureResult.timeout(taskTimeout);
203 }
204
205 result = await futureResult;
206 } finally {
207 if (device != null && device.canStreamLogs) {
208 await device.stopLoggingToSink();
209 await sink?.close();
210 }
211 }
212
213 if (runProcessCleanup) {
214 section('Terminating lingering Dart$exe processes after task...');
215 final Set<RunningProcessInfo> afterRunningDartInstances = await getRunningProcesses(
216 processName: 'dart$exe',
217 processManager: processManager,
218 );
219 for (final RunningProcessInfo info in afterRunningDartInstances) {
220 if (!beforeRunningDartInstances.contains(info)) {
221 print('$info was leaked by this test.');
222 if (result is TaskResultCheckProcesses) {
223 result = TaskResult.failure('This test leaked dart processes');
224 }
225 if (await info.terminate(processManager: processManager)) {
226 print('Killed process id ${info.pid}.');
227 } else {
228 print('Failed to kill process ${info.pid}.');
229 }
230 }
231 }
232 }
233 _completer.complete(result);
234 return result;
235 } on TimeoutException catch (err, stackTrace) {
236 print('Task timed out in framework.dart after $taskTimeout.');
237 print(err);
238 print(stackTrace);
239 return TaskResult.failure('Task timed out after $taskTimeout');
240 } finally {
241 await checkForRebootRequired();
242 await forceQuitRunningProcesses();
243 }
244 }
245
246 Future<void> checkForRebootRequired() async {
247 print('Checking for reboot');
248 try {
249 final Device device = await devices.workingDevice;
250 if (noRebootForbidList.contains(device.deviceId)) {
251 return;
252 }
253 final File rebootFile = _rebootFile();
254 int runCount;
255 if (rebootFile.existsSync()) {
256 runCount = int.tryParse(rebootFile.readAsStringSync().trim()) ?? 0;
257 } else {
258 runCount = 0;
259 }
260 if (runCount < maximumRuns) {
261 rebootFile
262 ..createSync()
263 ..writeAsStringSync((runCount + 1).toString());
264 return;
265 }
266 rebootFile.deleteSync();
267 print('rebooting');
268 await device.reboot();
269 } on TimeoutException {
270 // Could not find device in order to reboot.
271 } on DeviceException {
272 // No attached device needed to reboot.
273 }
274 }
275
276 /// Causes the Dart VM to stay alive until a request to run the task is
277 /// received via the VM service protocol.
278 void keepVmAliveUntilTaskRunRequested() {
279 if (_taskStarted) {
280 throw StateError('Task already started.');
281 }
282
283 // Merely creating this port object will cause the VM to stay alive and keep
284 // the VM service server running until the port is disposed of.
285 _keepAlivePort = RawReceivePort();
286
287 // Timeout if nothing bothers to connect and ask us to run the task.
288 const Duration taskStartTimeout = Duration(seconds: 60);
289 _startTaskTimeout = Timer(taskStartTimeout, () {
290 if (!_taskStarted) {
291 logger.severe('Task did not start in $taskStartTimeout.');
292 _closeKeepAlivePort();
293 exitCode = 1;
294 }
295 });
296 }
297
298 /// Disables the keepalive port, allowing the VM to exit.
299 void _closeKeepAlivePort() {
300 _startTaskTimeout?.cancel();
301 _taskResultReceivedTimeout?.cancel();
302 _keepAlivePort?.close();
303 }
304
305 Future<TaskResult> _performTask() {
306 final Completer<TaskResult> completer = Completer<TaskResult>();
307 Chain.capture(
308 () async {
309 completer.complete(await task());
310 },
311 onError: (dynamic taskError, Chain taskErrorStack) {
312 final String message = 'Task failed: $taskError';
313 stderr
314 ..writeln(message)
315 ..writeln('\nStack trace:')
316 ..writeln(taskErrorStack.terse);
317 // IMPORTANT: We're completing the future _successfully_ but with a value
318 // that indicates a task failure. This is intentional. At this point we
319 // are catching errors coming from arbitrary (and untrustworthy) task
320 // code. Our goal is to convert the failure into a readable message.
321 // Propagating it further is not useful.
322 if (!completer.isCompleted) {
323 completer.complete(TaskResult.failure(message));
324 }
325 },
326 );
327 return completer.future;
328 }
329}
330
331File _rebootFile() {
332 if (Platform.isLinux || Platform.isMacOS) {
333 return File(path.join(Platform.environment['HOME']!, '.reboot-count'));
334 }
335 if (!Platform.isWindows) {
336 throw StateError('Unexpected platform ${Platform.operatingSystem}');
337 }
338 return File(path.join(Platform.environment['USERPROFILE']!, '.reboot-count'));
339}
340