1919#include < SimConfig/SimConfig.h>
2020#include < sys/wait.h>
2121#include < vector>
22+ #include < functional>
2223#include < thread>
2324#include < csignal>
2425#include " TStopwatch.h"
@@ -135,11 +136,14 @@ void sighandler(int signal)
135136 }
136137}
137138
138- // monitores a certain incoming pipe and displays new information
139- void launchThreadMonitoringEvents (int pipefd, std::string text)
139+ // monitores a certain incoming event pipes and displays new information
140+ // gives possibility to exec a callback at these events
141+ void launchThreadMonitoringEvents (
142+ int pipefd, std::string text, std::vector<int >& eventcontainer,
143+ std::function<void (std::vector<int > const &)> callback = [](std::vector<int > const &) {})
140144{
141145 static std::vector<std::thread> threads;
142- auto lambda = [pipefd, text]() {
146+ auto lambda = [pipefd, text, callback, &eventcontainer ]() {
143147 int eventcounter;
144148 while (1 ) {
145149 ssize_t count = read (pipefd, &eventcounter, sizeof (eventcounter));
@@ -154,6 +158,8 @@ void launchThreadMonitoringEvents(int pipefd, std::string text)
154158 break ;
155159 } else {
156160 LOG (INFO) << text.c_str () << eventcounter;
161+ eventcontainer.push_back (eventcounter);
162+ callback (eventcontainer);
157163 }
158164 };
159165 };
@@ -302,13 +308,14 @@ int main(int argc, char* argv[])
302308 o2::utils::ShmManager::Instance ().disable ();
303309 }
304310
305- std::vector<int > childpids;
306-
307311 int pipe_serverdriver_fd[2 ];
308312 if (pipe (pipe_serverdriver_fd) != 0 ) {
309313 perror (" problem in creating pipe" );
310314 }
311315
316+ // record distributed events in a container
317+ std::vector<int > distributedEvents;
318+
312319 // the server
313320 int pid = fork ();
314321 if (pid == 0 ) {
@@ -356,7 +363,7 @@ int main(int argc, char* argv[])
356363 gChildProcesses .push_back (pid);
357364 close (pipe_serverdriver_fd[1 ]);
358365 std::cout << " Spawning particle server on PID " << pid << " ; Redirect output to " << getServerLogName () << " \n " ;
359- launchThreadMonitoringEvents (pipe_serverdriver_fd[0 ], " DISTRIBUTING EVENT : " );
366+ launchThreadMonitoringEvents (pipe_serverdriver_fd[0 ], " DISTRIBUTING EVENT : " , distributedEvents );
360367 }
361368
362369 auto internalfork = getenv (" ALICE_SIMFORKINTERNAL" );
@@ -399,6 +406,9 @@ int main(int argc, char* argv[])
399406 perror (" problem in creating pipe" );
400407 }
401408
409+ // record finished events in a container
410+ std::vector<int > finishedEvents;
411+
402412 pid = fork ();
403413 if (pid == 0 ) {
404414 int fd = open (getMergerLogName ().c_str (), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
@@ -418,7 +428,20 @@ int main(int argc, char* argv[])
418428 std::cout << " Spawning hit merger on PID " << pid << " ; Redirect output to " << getMergerLogName () << " \n " ;
419429 gChildProcesses .push_back (pid);
420430 close (pipe_mergerdriver_fd[1 ]);
421- launchThreadMonitoringEvents (pipe_mergerdriver_fd[0 ], " EVENT FINISHED : " );
431+
432+ // A simple callback that determines if the simulation is complete and triggers
433+ // a shutdown of all child processes. This appears to be more robust than leaving
434+ // that decision upon the children (sometimes there are problems with that).
435+ auto finishCallback = [&conf](std::vector<int > const & v) {
436+ if (conf.getNEvents () == v.size ()) {
437+ LOG (INFO) << " SIMULATION IS DONE. INITIATING SHUTDOWN." ;
438+ for (auto p : gChildProcesses ) {
439+ kill (p, SIGTERM);
440+ }
441+ }
442+ };
443+
444+ launchThreadMonitoringEvents (pipe_mergerdriver_fd[0 ], " EVENT FINISHED : " , finishedEvents, finishCallback);
422445 }
423446
424447 // wait on merger (which when exiting completes the workflow)
0 commit comments