Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Utilities/Mergers/include/Mergers/IntegratingMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class IntegratingMerger : public framework::Task
void endOfStream(framework::EndOfStreamContext& eosContext) override;

private:
void finishCycle(framework::DataAllocator& outputs);
void publishIntegral(framework::DataAllocator& allocator);
void publishMovingWindow(framework::DataAllocator& allocator);
static void merge(ObjectStore& mMergedDelta, ObjectStore&& other);
Expand Down
1 change: 1 addition & 0 deletions Utilities/Mergers/src/FullHistoryMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void FullHistoryMerger::run(framework::ProcessingContext& ctx)

void FullHistoryMerger::endOfStream(framework::EndOfStreamContext& eosContext)
{
mergeCache();
publish(eosContext.outputs());
}

Expand Down
44 changes: 25 additions & 19 deletions Utilities/Mergers/src/IntegratingMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -69,31 +69,37 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)
}

if (ctx.inputs().isValid("timer-publish")) {
mCyclesSinceReset++;
finishCycle(ctx.outputs());
}
}

if (mConfig.publishMovingWindow.value == PublishMovingWindow::Yes) {
publishMovingWindow(ctx.outputs());
}
void IntegratingMerger::finishCycle(DataAllocator& outputs)
{
mCyclesSinceReset++;

if (!std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle));
}
mMergedObjectLastCycle = std::monostate{};
mTotalDeltasMerged += mDeltasMerged;
if (mConfig.publishMovingWindow.value == PublishMovingWindow::Yes) {
publishMovingWindow(outputs);
}

publishIntegral(ctx.outputs());
if (!std::holds_alternative<std::monostate>(mMergedObjectLastCycle)) {
merge(mMergedObjectIntegral, std::move(mMergedObjectLastCycle));
}
mMergedObjectLastCycle = std::monostate{};
mTotalDeltasMerged += mDeltasMerged;

if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference ||
mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
clear();
}
publishIntegral(outputs);

mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"});
mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
mDeltasMerged = 0;
if (mConfig.mergedObjectTimespan.value == MergedObjectTimespan::LastDifference ||
mConfig.mergedObjectTimespan.value == MergedObjectTimespan::NCycles && mConfig.mergedObjectTimespan.param == mCyclesSinceReset) {
clear();
}

mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"});
mCollector->send({mCyclesSinceReset, "cycles_since_reset"});
mDeltasMerged = 0;
}

void IntegratingMerger::merge(ObjectStore& target, ObjectStore&& other)
{
if (std::holds_alternative<std::monostate>(target)) {
Expand Down Expand Up @@ -121,7 +127,7 @@ void IntegratingMerger::merge(ObjectStore& target, ObjectStore&& other)

void IntegratingMerger::endOfStream(framework::EndOfStreamContext& eosContext)
{
publishIntegral(eosContext.outputs());
finishCycle(eosContext.outputs());
}

// I am not calling it reset(), because it does not have to be performed during the FairMQs reset.
Expand Down
21 changes: 21 additions & 0 deletions Utilities/Mergers/test/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,34 @@
#include <sstream>
#include <gsl/span>
#include <TH1.h>
#include <Framework/CallbackService.h>

namespace o2::mergers::test
{
inline auto to_span(const TH1F& histo)
{
return gsl::span(histo.GetArray(), histo.GetSize());
}

void registerCallbacksForTestFailure(framework::CallbackService& cb, std::shared_ptr<bool> success)
{
cb.set<framework::CallbackService::Id::EndOfStream>([success](framework::EndOfStreamContext& ctx) {
if (*success == false) {
LOG(fatal) << "Received an EndOfStream without having received the expected object";
}
});
cb.set<framework::CallbackService::Id::Stop>([success]() {
if (*success == false) {
LOG(fatal) << "STOP transition without having received the expected object";
}
});
cb.set<framework::CallbackService::Id::ExitRequested>([success](framework::ServiceRegistryRef) {
if (*success == false) {
LOG(fatal) << "EXIT transition without having received the expected object";
}
});
}

} // namespace o2::mergers::test

namespace std
Expand Down
20 changes: 16 additions & 4 deletions Utilities/Mergers/test/customTopologyCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <Mergers/CustomMergeableObject.h>
#include <Mergers/MergerBuilder.h>
#include <Mergers/MergerInfrastructureBuilder.h>
#include "common.h"

void customize(std::vector<o2::framework::CompletionPolicy>& policies)
{
Expand Down Expand Up @@ -109,9 +110,12 @@ class CustomMergerTestGenerator
},
Outputs{},
AlgorithmSpec{
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
auto success = std::make_shared<bool>(false);
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);

return AlgorithmSpec::ProcessCallback{
[expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5](ProcessingContext& processingContext) mutable {
[expectedResult, numberOfCalls = 0, numberOfObjects = 0, numberOfMovingWindows = 0, lastObjectValue = 0, retries = 5, success](ProcessingContext& processingContext) mutable {
numberOfCalls++;

if (processingContext.inputs().isValid("custom")) {
Expand Down Expand Up @@ -139,7 +143,10 @@ class CustomMergerTestGenerator
if (lastObjectValue != expectedResult) {
LOG(fatal) << "got wrong secret from object: " << lastObjectValue << ", expected: " << expectedResult;
}
return;
}
LOG(info) << "Received the expected objects, test successful";
*success = true;
}
}};
}}}});
Expand All @@ -154,12 +161,17 @@ class CustomMergerTestGenerator
},
Outputs{},
AlgorithmSpec{
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
auto success = std::make_shared<bool>(false);
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);

return AlgorithmSpec::ProcessCallback{
[expectedResult, retryNumber = 0, numberOfRetries = 5](ProcessingContext& processingContext) mutable {
[expectedResult, retryNumber = 0, numberOfRetries = 5, success](ProcessingContext& processingContext) mutable {
const auto obj = processingContext.inputs().get<mergers::CustomMergeableObject*>("custom");

if (obj->getSecret() == expectedResult) {
LOG(info) << "Received the expected object, test successful";
*success = true;
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
return;
}
Expand Down
10 changes: 8 additions & 2 deletions Utilities/Mergers/test/histosTopologyCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class HistosMergerTestGenerator
"histo", "histo", histoBinsCount, histoMin, histoMax);
histo.Fill(5);
histo.Fill(producerIdx);
processingContext.services().get<ControlService>().endOfStream();
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::Me);
})}});
}
Expand Down Expand Up @@ -103,14 +104,19 @@ class HistosMergerTestGenerator
Inputs{{"histo", origin, description, 0, Lifetime::Sporadic}},
Outputs{},
AlgorithmSpec{
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
auto success = std::make_shared<bool>(false);
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);

// reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers,
// number of retries is chosen arbitrarily as we need to retry at least twice
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5](ProcessingContext& processingContext) mutable {
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable {
const auto histo = processingContext.inputs().get<TH1F*>("histo");

LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(histo) << " to the expected: " << std::to_string(expectedResult);
if (std::equal(expectedResult.begin(), expectedResult.end(), histo->GetArray(), histo->GetArray() + histo->GetSize())) {
LOG(info) << "Received the expected object, test successful";
*success = true;
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const&)

const auto mergersInputs = generator.generateHistoProducers(specs, producersCount);

generator.generateMergers(specs, mergersInputs, InputObjectsTimespan::LastDifference);
generator.generateMergers(specs, mergersInputs, InputObjectsTimespan::FullHistory);

generator.generateChecker(specs);

Expand Down
9 changes: 7 additions & 2 deletions Utilities/Mergers/test/vectorTopologyCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,15 +127,20 @@ class VectorMergerTestGenerator
Inputs{{"vec", origin, description, 0, Lifetime::Sporadic}},
Outputs{},
AlgorithmSpec{
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext&) {
AlgorithmSpec::InitCallback{[expectedResult = mExpectedResult](InitContext& initContext) {
auto success = std::make_shared<bool>(false);
mergers::test::registerCallbacksForTestFailure(initContext.services().get<CallbackService>(), success);

// reason for this crude retry is that multiple layers are not synchronized between each other and publish on their own timers.
// number of retries was chosen a bit randomly, as we need to have at least 2 runs through this function because of publish
// timers inside of the mergers
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5](ProcessingContext& processingContext) mutable {
return AlgorithmSpec::ProcessCallback{[expectedResult, retryNumber = 1, retries = 5, success](ProcessingContext& processingContext) mutable {
const auto vectorOfHistos = processingContext.inputs().get<std::vector<TObject*>*>("vec");

LOG(info) << "RETRY: " << retryNumber << ": comparing: " << std::to_string(vectorOfHistos) << " to the expected: " << std::to_string(expectedResult);
if (vectorOfHistos == expectedResult) {
LOG(info) << "Received the expected object, test successful";
*success = true;
processingContext.services().get<ControlService>().readyToQuit(QuitRequest::All);
return;
}
Expand Down