Skip to content

Commit 1b5a5f5

Browse files
committed
o2-sim: More robust termination initiated by driver
So far, we've been relying on child-procs to return correctly for termination. This has proven not to be 100% reliable (FairMQ sometimes hangs during shutdown). The new treatment initiates the termination from the driver via a callback-extension of the existing event-monitoring. This actually provides a door-opener for more advanced communication and triggering between all the parties involved.
1 parent 78da91c commit 1b5a5f5

1 file changed

Lines changed: 30 additions & 7 deletions

File tree

run/o2sim_parallel.cxx

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <SimConfig/SimConfig.h>
2020
#include <sys/wait.h>
2121
#include <vector>
22+
#include <functional>
2223
#include <thread>
2324
#include <csignal>
2425
#include "TStopwatch.h"
@@ -135,11 +136,14 @@ void sighandler(int signal)
135136
}
136137
}
137138

138-
// monitores a certain incoming pipe and displays new information
139-
void launchThreadMonitoringEvents(int pipefd, std::string text)
139+
// monitores a certain incoming event pipes and displays new information
140+
// gives possibility to exec a callback at these events
141+
void launchThreadMonitoringEvents(
142+
int pipefd, std::string text, std::vector<int>& eventcontainer,
143+
std::function<void(std::vector<int> const&)> callback = [](std::vector<int> const&) {})
140144
{
141145
static std::vector<std::thread> threads;
142-
auto lambda = [pipefd, text]() {
146+
auto lambda = [pipefd, text, callback, &eventcontainer]() {
143147
int eventcounter;
144148
while (1) {
145149
ssize_t count = read(pipefd, &eventcounter, sizeof(eventcounter));
@@ -154,6 +158,8 @@ void launchThreadMonitoringEvents(int pipefd, std::string text)
154158
break;
155159
} else {
156160
LOG(INFO) << text.c_str() << eventcounter;
161+
eventcontainer.push_back(eventcounter);
162+
callback(eventcontainer);
157163
}
158164
};
159165
};
@@ -302,13 +308,14 @@ int main(int argc, char* argv[])
302308
o2::utils::ShmManager::Instance().disable();
303309
}
304310

305-
std::vector<int> childpids;
306-
307311
int pipe_serverdriver_fd[2];
308312
if (pipe(pipe_serverdriver_fd) != 0) {
309313
perror("problem in creating pipe");
310314
}
311315

316+
// record distributed events in a container
317+
std::vector<int> distributedEvents;
318+
312319
// the server
313320
int pid = fork();
314321
if (pid == 0) {
@@ -356,7 +363,7 @@ int main(int argc, char* argv[])
356363
gChildProcesses.push_back(pid);
357364
close(pipe_serverdriver_fd[1]);
358365
std::cout << "Spawning particle server on PID " << pid << "; Redirect output to " << getServerLogName() << "\n";
359-
launchThreadMonitoringEvents(pipe_serverdriver_fd[0], "DISTRIBUTING EVENT : ");
366+
launchThreadMonitoringEvents(pipe_serverdriver_fd[0], "DISTRIBUTING EVENT : ", distributedEvents);
360367
}
361368

362369
auto internalfork = getenv("ALICE_SIMFORKINTERNAL");
@@ -399,6 +406,9 @@ int main(int argc, char* argv[])
399406
perror("problem in creating pipe");
400407
}
401408

409+
// record finished events in a container
410+
std::vector<int> finishedEvents;
411+
402412
pid = fork();
403413
if (pid == 0) {
404414
int fd = open(getMergerLogName().c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
@@ -418,7 +428,20 @@ int main(int argc, char* argv[])
418428
std::cout << "Spawning hit merger on PID " << pid << "; Redirect output to " << getMergerLogName() << "\n";
419429
gChildProcesses.push_back(pid);
420430
close(pipe_mergerdriver_fd[1]);
421-
launchThreadMonitoringEvents(pipe_mergerdriver_fd[0], "EVENT FINISHED : ");
431+
432+
// A simple callback that determines if the simulation is complete and triggers
433+
// a shutdown of all child processes. This appears to be more robust than leaving
434+
// that decision upon the children (sometimes there are problems with that).
435+
auto finishCallback = [&conf](std::vector<int> const& v) {
436+
if (conf.getNEvents() == v.size()) {
437+
LOG(INFO) << "SIMULATION IS DONE. INITIATING SHUTDOWN.";
438+
for (auto p : gChildProcesses) {
439+
kill(p, SIGTERM);
440+
}
441+
}
442+
};
443+
444+
launchThreadMonitoringEvents(pipe_mergerdriver_fd[0], "EVENT FINISHED : ", finishedEvents, finishCallback);
422445
}
423446

424447
// wait on merger (which when exiting completes the workflow)

0 commit comments

Comments
 (0)