diff --git a/Utilities/Mergers/include/Mergers/IntegratingMerger.h b/Utilities/Mergers/include/Mergers/IntegratingMerger.h index 2fedcda0f3b44..8a6cecc437d1e 100644 --- a/Utilities/Mergers/include/Mergers/IntegratingMerger.h +++ b/Utilities/Mergers/include/Mergers/IntegratingMerger.h @@ -55,6 +55,7 @@ class IntegratingMerger : public framework::Task void endOfStream(framework::EndOfStreamContext& eosContext) override; private: + void finishCycle(framework::DataAllocator& outputs); void publishIntegral(framework::DataAllocator& allocator); void publishMovingWindow(framework::DataAllocator& allocator); static void merge(ObjectStore& mMergedDelta, ObjectStore&& other); diff --git a/Utilities/Mergers/src/FullHistoryMerger.cxx b/Utilities/Mergers/src/FullHistoryMerger.cxx index a2836e63732e0..2ed05d46c0e8a 100644 --- a/Utilities/Mergers/src/FullHistoryMerger.cxx +++ b/Utilities/Mergers/src/FullHistoryMerger.cxx @@ -90,6 +90,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx) void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext) { + mergeCache(); publish(eosContext.outputs()); } diff --git a/Utilities/Mergers/src/IntegratingMerger.cxx b/Utilities/Mergers/src/IntegratingMerger.cxx index f935628ff94b5..749becd463a5d 100644 --- a/Utilities/Mergers/src/IntegratingMerger.cxx +++ b/Utilities/Mergers/src/IntegratingMerger.cxx @@ -69,31 +69,37 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx) } if (ctx.inputs().isValid("timer-publish")) { - mCyclesSinceReset++; + finishCycle(ctx.outputs()); + } +} - if (mConfig.publishMovingWindow.value == PublishMovingWindow::Yes) { - publishMovingWindow(ctx.outputs()); - } +void IntegratingMerger::finishCycle(DataAllocator& outputs) +{ + mCyclesSinceReset++; - if (!std::holds_alternative(mMergedObjectLastCycle)) { - merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle)); - } - mMergedObjectLastCycle = std::monostate{}; - mTotalDeltasMerged += mDeltasMerged; + if (mConfig.publishMovingWindow.value == PublishMovingWindow::Yes) { + publishMovingWindow(outputs); + } - publishIntegral(ctx.outputs()); + if (!std::holds_alternative(mMergedObjectLastCycle)) { + merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle)); + } + mMergedObjectLastCycle = std::monostate{}; + mTotalDeltasMerged += mDeltasMerged; - if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference || - mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) { - clear(); - } + publishIntegral(outputs); - mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE); - mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"}); - mCollector->send({mCyclesSinceReset, "cycles_since_reset"}); - mDeltasMerged = 0; + if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference || + mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) { + clear(); } + + mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE); + mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"}); + mCollector->send({mCyclesSinceReset, "cycles_since_reset"}); + mDeltasMerged = 0; } + void IntegratingMerger::merge(ObjectStore& target, ObjectStore&& other) { if (std::holds_alternative(target)) { @@ -121,7 +127,7 @@ void IntegratingMerger::merge(ObjectStore& target, ObjectStore&& other) void IntegratingMerger::endOfStream(framework::EndOfStreamContext& eosContext) { - publishIntegral(eosContext.outputs()); + finishCycle(eosContext.outputs()); } // I am not calling it reset(), because it does not have to be performed during the FairMQs reset. diff --git a/Utilities/Mergers/test/common.h b/Utilities/Mergers/test/common.h index e85e256cd513a..c2726959bb8ac 100644 --- a/Utilities/Mergers/test/common.h +++ b/Utilities/Mergers/test/common.h @@ -15,6 +15,7 @@ #include #include #include +#include namespace o2::mergers::test { @@ -22,6 +23,26 @@ inline auto to_span(const TH1F& histo) { return gsl::span(histo.GetArray(), histo.GetSize()); } + +void registerCallbacksForTestFailure(framework::CallbackService& cb, std::shared_ptr success) +{ + cb.set([success](framework::EndOfStreamContext& ctx) { + if (*success == false) { + LOG(fatal) << "Received an EndOfStream without having received the expected object"; + } + }); + cb.set([success]() { + if (*success == false) { + LOG(fatal) << "STOP transition without having received the expected object"; + } + }); + cb.set([success](framework::ServiceRegistryRef) { + if (*success == false) { + LOG(fatal) << "EXIT transition without having received the expected object"; + } + }); +} + } // namespace o2::mergers::test namespace std diff --git a/Utilities/Mergers/test/customTopologyCommon.h b/Utilities/Mergers/test/customTopologyCommon.h index 13c745d6f8a91..6233cb53bd215 100644 --- a/Utilities/Mergers/test/customTopologyCommon.h +++ b/Utilities/Mergers/test/customTopologyCommon.h @@ -24,6 +24,7 @@ #include #include #include +#include "common.h" void customize(std::vector& policies) { @@ -109,9 +110,12 @@ class CustomMergerTestGenerator }, Outputs{}, AlgorithmSpec{ - AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) { + AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) { + auto success = std::make_shared(false); + mergers::test::registerCallbacksForTestFailure(initContext.services().get(), success); + return AlgorithmSpec::ProcessCallback{ - [expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5](ProcessingContext& processingContext) mutable { + [expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5, success](ProcessingContext& processingContext) mutable { numberOfCalls++; if (processingContext.inputs().isValid("custom")) { @@ -139,7 +143,10 @@ class CustomMergerTestGenerator if (lastObjectValue != expectedResult) { LOG(fatal) << "got wrong secret from object: " << lastObjectValue << ", expected: " << expectedResult; } + return; } + LOG(info) << "Received the expected objects, test successful"; + *success = true; } }}; }}}}); @@ -154,12 +161,17 @@ class CustomMergerTestGenerator }, Outputs{}, AlgorithmSpec{ - AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) { + AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) { + auto success = std::make_shared(false); + mergers::test::registerCallbacksForTestFailure(initContext.services().get(), success); + return AlgorithmSpec::ProcessCallback{ - [expectedResult, retryNumber = 0, numberOfRetries = 5](ProcessingContext& processingContext) mutable { + [expectedResult, retryNumber = 0, numberOfRetries = 5, success](ProcessingContext& processingContext) mutable { const auto obj = processingContext.inputs().get("custom"); if (obj->getSecret() == expectedResult) { + LOG(info) << "Received the expected object, test successful"; + *success = true; processingContext.services().get().readyToQuit(QuitRequest::All); return; } diff --git a/Utilities/Mergers/test/histosTopologyCommon.h b/Utilities/Mergers/test/histosTopologyCommon.h index 7c713b3409747..d2305e3f6e6f2 100644 --- a/Utilities/Mergers/test/histosTopologyCommon.h +++ b/Utilities/Mergers/test/histosTopologyCommon.h @@ -71,6 +71,7 @@ class HistosMergerTestGenerator "histo", "histo", histoBinsCount, histoMin, histoMax); histo.Fill(5); histo.Fill(producerIdx); + processingContext.services().get().endOfStream(); processingContext.services().get().readyToQuit(QuitRequest::Me); })}}); } @@ -103,14 +104,19 @@ class HistosMergerTestGenerator Inputs{{"histo", origin, description, 0, Lifetime::Sporadic}}, Outputs{}, AlgorithmSpec{ - AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) { + AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) { + auto success = std::make_shared(false); + mergers::test::registerCallbacksForTestFailure(initContext.services().get(), success); + // reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers, // number of retries is chosen arbitrarily as we need to retry at least twice - return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5](ProcessingContext& processingContext) mutable { + return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable { const auto histo = processingContext.inputs().get("histo"); LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(histo) << " to the expected: " << std::to_string(expectedResult); if (std::equal(expectedResult.begin(), expectedResult.end(), histo->GetArray(), histo->GetArray() + histo->GetSize())) { + LOG(info) << "Received the expected object, test successful"; + *success = true; processingContext.services().get().readyToQuit(QuitRequest::All); return; } diff --git a/Utilities/Mergers/test/test_MergerTopologyHistosFullHistory.cxx b/Utilities/Mergers/test/test_MergerTopologyHistosFullHistory.cxx index d08f6ee077edf..5ac18f998d444 100644 --- a/Utilities/Mergers/test/test_MergerTopologyHistosFullHistory.cxx +++ b/Utilities/Mergers/test/test_MergerTopologyHistosFullHistory.cxx @@ -32,7 +32,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&) const auto mergersInputs = generator.generateHistoProducers(specs, producersCount); - generator.generateMergers(specs, mergersInputs, InputObjectsTimespan::LastDifference); + generator.generateMergers(specs, mergersInputs, InputObjectsTimespan::FullHistory); generator.generateChecker(specs); diff --git a/Utilities/Mergers/test/vectorTopologyCommon.h b/Utilities/Mergers/test/vectorTopologyCommon.h index d98576fd094f1..34dcc8515a04a 100644 --- a/Utilities/Mergers/test/vectorTopologyCommon.h +++ b/Utilities/Mergers/test/vectorTopologyCommon.h @@ -127,15 +127,20 @@ class VectorMergerTestGenerator Inputs{{"vec", origin, description, 0, Lifetime::Sporadic}}, Outputs{}, AlgorithmSpec{ - AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) { + AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) { + auto success = std::make_shared(false); + mergers::test::registerCallbacksForTestFailure(initContext.services().get(), success); + // reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers. // number of retries was chosen a bit randomly, as we need to have at least 2 runs through this function because of publish // timers inside of the mergers - return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5](ProcessingContext& processingContext) mutable { + return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable { const auto vectorOfHistos = processingContext.inputs().get*>("vec"); LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(vectorOfHistos) << " to the expected: " << std::to_string(expectedResult); if (vectorOfHistos == expectedResult) { + LOG(info) << "Received the expected object, test successful"; + *success = true; processingContext.services().get().readyToQuit(QuitRequest::All); return; }