Skip to content

Commit 0dbdf69

Browse files
matthiasrichterktf
authored andcommitted
Using the end-of-stream condition in the RootTreeReader/Writer tests (#2785)
The EOS condition allows to drop the logic for triggering the workflow termination. Also make sure not to publish any more data after the EOS condition has been sent. The publisher of the RootTreeReader test was constantly pumping data into the workflow. This can conflict with the device termination and produces the error which sometimes makes the test fail Failed sending on socket foo.push, reason: Interrupted system call This is the actual cause for the problem tried to be fixed in d474376.
1 parent d1d6e6f commit 0dbdf69

2 files changed

Lines changed: 12 additions & 9 deletions

File tree

Framework/Utils/test/test_RootTreeReader.cxx

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ using namespace o2::framework;
3333
LOG(ERROR) << R"(Test condition ")" #condition R"(" failed)"; \
3434
}
3535

36-
constexpr int kTreeSize = 10; // elements in the test tree
36+
const int gTreeSize = 10; // elements in the test tree
3737
DataProcessorSpec getSourceSpec()
3838
{
3939
auto initFct = [](InitContext& ic) {
@@ -48,7 +48,7 @@ DataProcessorSpec getSourceSpec()
4848
std::vector<o2::test::Polymorphic> valarray;
4949
auto* branch = testTree->Branch("dataarray", &valarray);
5050

51-
for (int entry = 0; entry < kTreeSize; entry++) {
51+
for (int entry = 0; entry < gTreeSize; entry++) {
5252
valarray.clear();
5353
for (int idx = 0; idx < entry + 1; ++idx) {
5454
valarray.emplace_back((entry * 10) + idx);
@@ -68,6 +68,9 @@ DataProcessorSpec getSourceSpec()
6868
RootTreeReader::PublishingMode::Single);
6969

7070
auto processingFct = [reader](ProcessingContext& pc) {
71+
if (reader->getCount() >= gTreeSize) {
72+
return;
73+
}
7174
if (reader->getCount() == 0) {
7275
// add two additional headers on the stack in the first entry
7376
o2::header::NameHeader<16> auxHeader("extended_info");
@@ -77,8 +80,9 @@ DataProcessorSpec getSourceSpec()
7780
// test signature without headers for the rest of the entries
7881
(++(*reader))(pc);
7982
}
80-
if ((reader->getCount() + 1) >= kTreeSize) {
83+
if ((reader->getCount()) >= gTreeSize) {
8184
pc.services().get<ControlService>().endOfStream();
85+
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
8286
}
8387
};
8488

@@ -119,10 +123,7 @@ DataProcessorSpec getSinkSpec()
119123
LOG(INFO) << data[idx].get();
120124
ASSERT_ERROR(data[idx].get() == 10 * counter + idx);
121125
}
122-
if (++counter >= kTreeSize) {
123-
pc.services().get<ControlService>().endOfStream();
124-
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
125-
}
126+
++counter;
126127
};
127128

128129
return DataProcessorSpec{"sink", // name of the processor

Framework/Utils/test/test_RootTreeWriterWorkflow.cxx

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,15 +125,17 @@ DataProcessorSpec getSourceSpec()
125125
auto processingFct = [counter](ProcessingContext& pc) {
126126
if (*counter >= sTreeSize) {
127127
// don't publish more
128-
pc.services().get<ControlService>().endOfStream();
129-
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
130128
return;
131129
}
132130
o2::test::Polymorphic a(*counter);
133131
pc.outputs().snapshot(OutputRef{"output"}, a);
134132
int& metadata = pc.outputs().make<int>(Output{"TST", "METADATA", 0, Lifetime::Timeframe});
135133
metadata = *counter;
136134
*counter = *counter + 1;
135+
if (*counter >= sTreeSize) {
136+
pc.services().get<ControlService>().endOfStream();
137+
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
138+
}
137139
};
138140

139141
return processingFct;

0 commit comments

Comments
 (0)