From 8fb294fd96dee32a50cecc008fcea8a69a50622f Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Wed, 15 Jan 2020 11:28:17 +0100 Subject: [PATCH] Using the end-of-stream condition in the RootTreeReader/Writer tests The EOS condition allows to drop the logic for triggering the workflow termination. Also make sure not to publish any more data after the EOS condition has been sent. The publisher of the RootTreeReader test was constantly pumping data into the workflow. This can conflict with the device termination and produces the error which sometimes makes the test fail Failed sending on socket foo.push, reason: Interrupted system call This is the actual cause for the problem tried to be fixed in d4743769. --- Framework/Utils/test/test_RootTreeReader.cxx | 15 ++++++++------- .../Utils/test/test_RootTreeWriterWorkflow.cxx | 6 ++++-- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/Framework/Utils/test/test_RootTreeReader.cxx b/Framework/Utils/test/test_RootTreeReader.cxx index ea177298d8812..6c6f1d0d7577f 100644 --- a/Framework/Utils/test/test_RootTreeReader.cxx +++ b/Framework/Utils/test/test_RootTreeReader.cxx @@ -33,7 +33,7 @@ using namespace o2::framework; LOG(ERROR) << R"(Test condition ")" #condition R"(" failed)"; \ } -constexpr int kTreeSize = 10; // elements in the test tree +const int gTreeSize = 10; // elements in the test tree DataProcessorSpec getSourceSpec() { auto initFct = [](InitContext& ic) { @@ -48,7 +48,7 @@ DataProcessorSpec getSourceSpec() std::vector valarray; auto* branch = testTree->Branch("dataarray", &valarray); - for (int entry = 0; entry < kTreeSize; entry++) { + for (int entry = 0; entry < gTreeSize; entry++) { valarray.clear(); for (int idx = 0; idx < entry + 1; ++idx) { valarray.emplace_back((entry * 10) + idx); @@ -68,6 +68,9 @@ DataProcessorSpec getSourceSpec() RootTreeReader::PublishingMode::Single); auto processingFct = [reader](ProcessingContext& pc) { + if (reader->getCount() >= gTreeSize) { + return; + } if (reader->getCount() == 0) { // add two additional headers on the stack in the first entry o2::header::NameHeader<16> auxHeader("extended_info"); @@ -77,8 +80,9 @@ DataProcessorSpec getSourceSpec() // test signature without headers for the rest of the entries (++(*reader))(pc); } - if ((reader->getCount() + 1) >= kTreeSize) { + if ((reader->getCount()) >= gTreeSize) { pc.services().get().endOfStream(); + pc.services().get().readyToQuit(QuitRequest::Me); } }; @@ -119,10 +123,7 @@ DataProcessorSpec getSinkSpec() LOG(INFO) << data[idx].get(); ASSERT_ERROR(data[idx].get() == 10 * counter + idx); } - if (++counter >= kTreeSize) { - pc.services().get().endOfStream(); - pc.services().get().readyToQuit(QuitRequest::Me); - } + ++counter; }; return DataProcessorSpec{"sink", // name of the processor diff --git a/Framework/Utils/test/test_RootTreeWriterWorkflow.cxx b/Framework/Utils/test/test_RootTreeWriterWorkflow.cxx index e34a4263d4741..13665b51ed18e 100644 --- a/Framework/Utils/test/test_RootTreeWriterWorkflow.cxx +++ b/Framework/Utils/test/test_RootTreeWriterWorkflow.cxx @@ -125,8 +125,6 @@ DataProcessorSpec getSourceSpec() auto processingFct = [counter](ProcessingContext& pc) { if (*counter >= sTreeSize) { // don't publish more - pc.services().get().endOfStream(); - pc.services().get().readyToQuit(QuitRequest::Me); return; } o2::test::Polymorphic a(*counter); @@ -134,6 +132,10 @@ DataProcessorSpec getSourceSpec() int& metadata = pc.outputs().make(Output{"TST", "METADATA", 0, Lifetime::Timeframe}); metadata = *counter; *counter = *counter + 1; + if (*counter >= sTreeSize) { + pc.services().get().endOfStream(); + pc.services().get().readyToQuit(QuitRequest::Me); + } }; return processingFct;