Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Utilities/Mergers/include/Mergers/IntegratingMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class IntegratingMerger : public framework::Task
std::unique_ptr<monitoring::Monitoring> mCollector;

// stats
int mTotalObjectsMerged = 0;
int mObjectsMerged = 0;
int mTotalDeltasMerged = 0;
int mDeltasMerged = 0;
};

} // namespace o2::mergers
Expand Down
2 changes: 1 addition & 1 deletion Utilities/Mergers/include/Mergers/MergeInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class MergeInterface
// make sure that all entries are correctly deleted as well.
virtual ~MergeInterface() = default;

/// \brief Custom merge function. Can return a number of merged entries/bins/etc for statistics.
/// \brief Custom merge method.
virtual void merge(MergeInterface* const other) = 0; // const argument

ClassDef(MergeInterface, 0);
Expand Down
3 changes: 3 additions & 0 deletions Utilities/Mergers/include/Mergers/MergerConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
///
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch

#include <string>

namespace o2::mergers
{

Expand Down Expand Up @@ -57,6 +59,7 @@ struct MergerConfig {
ConfigEntry<MergedObjectTimespan> mergedObjectTimespan = {MergedObjectTimespan::FullHistory};
ConfigEntry<PublicationDecision> publicationDecision = {PublicationDecision::EachNSeconds, 10};
ConfigEntry<TopologySize, int> topologySize = {TopologySize::NumberOfLayers, 1};
std::string monitoringUrl = "infologger:///debug?qc";
};

} // namespace o2::mergers
Expand Down
9 changes: 7 additions & 2 deletions Utilities/Mergers/src/FullHistoryMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ FullHistoryMerger::FullHistoryMerger(const MergerConfig& config, const header::D
: mConfig(config),
mSubSpec(subSpec)
{
mCollector = monitoring::MonitoringFactory::Get("infologger:///debug?mergers");
}

FullHistoryMerger::~FullHistoryMerger()
Expand All @@ -46,6 +45,8 @@ FullHistoryMerger::~FullHistoryMerger()

void FullHistoryMerger::init(framework::InitContext& ictx)
{
mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl);
mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
}

void FullHistoryMerger::run(framework::ProcessingContext& ctx)
Expand Down Expand Up @@ -96,7 +97,7 @@ void FullHistoryMerger::updateCache(const DataRef& ref)

void FullHistoryMerger::mergeCache()
{
LOG(INFO) << "Merging " << mCache.size() + 1 << " objects.";
LOG(DEBUG) << "Merging " << mCache.size() + 1 << " objects.";

mMergedObject = object_store_helpers::extractObjectFrom(mFirstObjectSerialized.second);
assert(!std::holds_alternative<std::monostate>(mMergedObject));
Expand Down Expand Up @@ -132,9 +133,13 @@ void FullHistoryMerger::publish(framework::DataAllocator& allocator)
} else if (std::holds_alternative<MergeInterfacePtr>(mMergedObject)) {
allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec},
*std::get<MergeInterfacePtr>(mMergedObject));
LOG(INFO) << "Published the merged object containing " << mCache.size() + 1 << " incomplete objects. "
<< mUpdatesReceived << " updates were received during the last cycle.";
} else if (std::holds_alternative<TObjectPtr>(mMergedObject)) {
allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec},
*std::get<TObjectPtr>(mMergedObject));
LOG(INFO) << "Published the merged object containing " << mCache.size() + 1 << " incomplete objects. "
<< mUpdatesReceived << " updates were received during the last cycle.";
} else {
throw std::runtime_error("mMergedObject' variant has no value.");
}
Expand Down
22 changes: 12 additions & 10 deletions Utilities/Mergers/src/IntegratingMerger.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,8 @@

#include "Framework/InputRecordWalker.h"
#include "Framework/Logger.h"
//#include "Framework/DataRef.h"

//using namespace o2;
using namespace o2::framework;
//using namespace std::chrono;

namespace o2::mergers
{
Expand All @@ -35,12 +32,12 @@ IntegratingMerger::IntegratingMerger(const MergerConfig& config, const header::D
: mConfig(config),
mSubSpec(subSpec)
{
mCollector = monitoring::MonitoringFactory::Get("infologger:///debug?qc");
// mCollector->enableProcessMonitoring();
}

void IntegratingMerger::init(framework::InitContext& ictx)
{
mCollector = monitoring::MonitoringFactory::Get(mConfig.monitoringUrl);
mCollector->addGlobalTag(monitoring::tags::Key::Subsystem, monitoring::tags::Value::Mergers);
}

void IntegratingMerger::run(framework::ProcessingContext& ctx)
Expand All @@ -66,7 +63,7 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)
} else {
throw std::runtime_error("mMergedObject' variant has no value.");
}
mObjectsMerged++;
mDeltasMerged++;
}
}

Expand All @@ -82,22 +79,27 @@ void IntegratingMerger::run(framework::ProcessingContext& ctx)

void IntegratingMerger::publish(framework::DataAllocator& allocator)
{
mTotalDeltasMerged += mDeltasMerged;

if (std::holds_alternative<std::monostate>(mMergedObject)) {
LOG(INFO) << "Nothing to publish yet";
} else if (std::holds_alternative<MergeInterfacePtr>(mMergedObject)) {
allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec},
*std::get<MergeInterfacePtr>(mMergedObject));
LOG(INFO) << "Published the merged object with " << mTotalDeltasMerged << " deltas in total,"
<< " including " << mDeltasMerged << " in the last cycle.";
} else if (std::holds_alternative<TObjectPtr>(mMergedObject)) {
allocator.snapshot(framework::OutputRef{MergerBuilder::mergerOutputBinding(), mSubSpec},
*std::get<TObjectPtr>(mMergedObject));
LOG(INFO) << "Published the merged object with " << mTotalDeltasMerged << " deltas in total,"
<< " including " << mDeltasMerged << " in the last cycle.";
} else {
throw std::runtime_error("mMergedObject' variant has no value.");
}

mTotalObjectsMerged += mObjectsMerged;
mCollector->send({mTotalObjectsMerged, "total_objects_merged"}, monitoring::DerivedMetricMode::RATE);
mCollector->send({mObjectsMerged, "objects_merged_since_last_publication"});
mObjectsMerged = 0;
mCollector->send({mTotalDeltasMerged, "total_deltas_merged"}, monitoring::DerivedMetricMode::RATE);
mCollector->send({mDeltasMerged, "deltas_merged_since_last_publication"});
mDeltasMerged = 0;
}

} // namespace o2::mergers