Skip to content

Commit a1bcdbf

Browse files
committed
DPL: reduce chances of hanging at the end of processing
Apparently sleep(1) does its own signal handling and that somehow interferes with the signal handling done by FairMQ. Changing it to std::this_thread::sleep_for makes it better behaved.
1 parent 7c1a9b5 commit a1bcdbf

25 files changed

Lines changed: 88 additions & 49 deletions

Framework/Core/src/CommonDataProcessors.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "Framework/OutputSpec.h"
2222
#include "Framework/Variant.h"
2323

24+
#include <chrono>
2425
#include <exception>
2526
#include <fstream>
2627
#include <functional>
@@ -63,7 +64,7 @@ DataProcessorSpec CommonDataProcessors::getGlobalFileSink(std::vector<InputSpec>
6364
LOG(INFO) << "No dangling output to be dumped.";
6465
once = true;
6566
}
66-
sleep(1);
67+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
6768
});
6869
}
6970
auto output = std::make_shared<std::ofstream>(filename.c_str(), std::ios_base::binary);

Framework/Core/test/test_CCDBFetcher.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#include "Framework/ServiceRegistry.h"
1212
#include "Framework/ControlService.h"
1313

14+
#include <chrono>
15+
1416
using namespace o2::framework;
1517
using namespace o2::header;
1618

Framework/Core/test/test_CallbackService.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#include <iostream>
1110
#include <boost/algorithm/string.hpp>
1211

1312
#include "Framework/InputSpec.h"
@@ -19,6 +18,9 @@
1918
#include "Framework/DebugGUI.h"
2019
#include "DebugGUI/imgui.h"
2120

21+
#include <chrono>
22+
#include <iostream>
23+
2224
using namespace o2::framework;
2325
using DataHeader = o2::header::DataHeader;
2426

@@ -32,7 +34,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
3234
},
3335
AlgorithmSpec{
3436
[](ProcessingContext& ctx) {
35-
sleep(1);
37+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
3638
auto out = ctx.outputs().make<int>(OutputRef{ "test", 0 });
3739
} } },
3840
{ "dest",

Framework/Core/test/test_CustomGUIGL.cxx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#include <iostream>
11-
#include <boost/algorithm/string.hpp>
12-
1310
#include "Framework/InputSpec.h"
1411
#include "Framework/CallbackService.h"
1512
#include "Framework/ControlService.h"
@@ -21,6 +18,9 @@
2118
#include "DebugGUI/Sokol3DUtils.h"
2219
#include "DebugGUI/imgui.h"
2320

21+
#include <chrono>
22+
#include <iostream>
23+
2424
using namespace o2::framework;
2525
using DataHeader = o2::header::DataHeader;
2626

@@ -34,7 +34,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
3434
},
3535
AlgorithmSpec{
3636
adaptStateless([](DataAllocator& outputs) {
37-
sleep(1);
37+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
3838
auto out = outputs.make<int>(OutputRef{ "test", 0 });
3939
}) } },
4040
{ "dest",

Framework/Core/test/test_CustomGUISokol.cxx

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#include <iostream>
11-
#include <boost/algorithm/string.hpp>
12-
1310
#include "Framework/InputSpec.h"
1411
#include "Framework/CallbackService.h"
1512
#include "Framework/ControlService.h"
@@ -21,6 +18,11 @@
2118
#include "DebugGUI/Sokol3DUtils.h"
2219
#include "DebugGUI/imgui.h"
2320

21+
#include <boost/algorithm/string.hpp>
22+
23+
#include <chrono>
24+
#include <iostream>
25+
2426
using namespace o2::framework;
2527
using DataHeader = o2::header::DataHeader;
2628

@@ -34,7 +36,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)
3436
},
3537
AlgorithmSpec{
3638
adaptStateless([](DataAllocator& outputs) {
37-
sleep(1);
39+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
3840
auto out = outputs.make<int>(OutputRef{ "test", 0 });
3941
}) } },
4042
{ "dest",

Framework/Core/test/test_DanglingInputs.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& specs)
3636
OutputSpec{ { "a2" }, "TST", "A2" } },
3737
AlgorithmSpec{
3838
[](ProcessingContext& ctx) {
39-
sleep(rand() % 2 + 1);
39+
std::this_thread::sleep_for(std::chrono::milliseconds((rand() % 2 + 1) * 1000));
4040
auto aData = ctx.outputs().make<int>(OutputRef{ "a1" }, 1);
4141
auto bData = ctx.outputs().make<int>(OutputRef{ "a2" }, 1);
4242
} } },

Framework/Core/test/test_DanglingOutputs.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
#include "Framework/ConfigParamSpec.h"
1111
#include "Framework/DeviceSpec.h"
1212
#include <InfoLogger/InfoLogger.hxx>
13-
#include <vector>
1413
#include "Framework/runDataProcessing.h"
1514
#include "Framework/ControlService.h"
1615

16+
#include <chrono>
17+
#include <vector>
18+
1719
using namespace o2::framework;
1820

1921
AlgorithmSpec simplePipe(std::string const& what, int minDelay)
@@ -36,7 +38,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& specs)
3638
OutputSpec{ { "a2" }, "TST", "A2" } },
3739
AlgorithmSpec{
3840
[](ProcessingContext& ctx) {
39-
sleep(rand() % 2 + 1);
41+
std::this_thread::sleep_for(std::chrono::milliseconds((rand() % 2 + 1) * 1000));
4042
auto aData1 = ctx.outputs().make<int>(OutputRef{ "a1" }, 1);
4143
auto aData2 = ctx.outputs().make<int>(Output{ "TST", "A2" }, 1);
4244
} } },

Framework/Core/test/test_Forwarding.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
#include "Framework/ConfigParamSpec.h"
1111
#include "Framework/DeviceSpec.h"
1212
#include <InfoLogger/InfoLogger.hxx>
13-
#include <vector>
1413
#include "Framework/runDataProcessing.h"
1514
#include "Framework/ControlService.h"
1615

16+
#include <chrono>
17+
#include <vector>
18+
1719
using namespace o2::framework;
1820

1921
AlgorithmSpec simplePipe(std::string const& what, int minDelay)
@@ -35,7 +37,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& specs)
3537
{ OutputSpec{ { "a1" }, "TST", "A1" } },
3638
AlgorithmSpec{
3739
[](ProcessingContext& ctx) {
38-
sleep(rand() % 2);
40+
std::this_thread::sleep_for(std::chrono::seconds(rand() % 2));
3941
auto aData = ctx.outputs().make<int>(OutputRef{ "a1" }, 1);
4042
} } },
4143
{ "B",

Framework/Core/test/test_FrameworkDataFlowToDDS.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "Framework/ProcessingContext.h"
2222
#include "Framework/WorkflowSpec.h"
2323

24+
#include <chrono>
2425
#include <sstream>
2526

2627
using namespace o2::framework;
@@ -39,7 +40,7 @@ WorkflowSpec defineDataProcessing()
3940
Outputs{ OutputSpec{ "TST", "A1" },
4041
OutputSpec{ "TST", "A2" } },
4142
AlgorithmSpec{ [](ProcessingContext& ctx) {
42-
sleep(1);
43+
std::this_thread::sleep_for(std::chrono::seconds(1));
4344
auto aData = ctx.outputs().make<int>(Output{ "TST", "A1", 0 }, 1);
4445
auto bData = ctx.outputs().make<int>(Output{ "TST", "A2", 0 }, 1);
4546
} } },

Framework/Core/test/test_GenericSource.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
// or submit itself to any jurisdiction.
1010
#include "Framework/runDataProcessing.h"
1111

12+
#include <chrono>
13+
1214
using namespace o2::framework;
1315

1416
// This is how you can define your processing in a declarative way
@@ -24,7 +26,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&specs) {
2426
[](const InputRecord &inputs,
2527
ServiceRegistry& services,
2628
DataAllocator& allocator) {
27-
sleep(1);
29+
std::this_thread::sleep_for(std::chrono::seconds(1));
2830
auto aData = allocator.make<int>(Output{"TST", "A1", 0}, 1);
2931
}
3032
},

0 commit comments

Comments
 (0)