Skip to content

Commit a9e0d07

Browse files
committed
DPL: do delta publishing of ContextVariables
This should bring the driver CPU usage close to 0, even when a lot of data processors are present.
1 parent 58c0086 commit a9e0d07

4 files changed

Lines changed: 37 additions & 10 deletions

File tree

Framework/Core/include/Framework/DataDescriptorMatcher.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "Framework/ConcreteDataMatcher.h"
1515
#include "Framework/DataProcessingHeader.h"
1616
#include "Framework/RuntimeError.h"
17+
#include "Framework/TimesliceSlot.h"
1718
#include "Headers/DataHeader.h"
1819

1920
#include <array>
@@ -62,6 +63,8 @@ struct ContextElement {
6263
using Value = std::variant<uint32_t, uint64_t, std::string, None>;
6364
char label[24]; /// The name of the variable contained in this element.
6465
Value value = None{}; /// The actual contents of the element.
66+
size_t commitVersion = -1; /// The committed version of the element. Every time we commit something to it, we bump the version.
67+
size_t publishVersion = -1; /// The version of the element which has been published to the GUI.
6568
};
6669

6770
struct ContextUpdate {
@@ -79,6 +82,12 @@ class VariableContext
7982

8083
ContextElement::Value const& get(size_t pos) const;
8184

85+
/// Publish the context to the GUI / monitoring.
86+
/// @a callback is a function which will be called for each element
87+
/// @a context is userdata which will be passed to the callback.
88+
/// @a name is the name of the metrics to be used.
89+
void publish(void (*callback)(ContextElement::Value const&, std::string const& name, void* context), void* context, TimesliceSlot slot, std::vector<std::string> const& names);
90+
8291
inline void put(ContextUpdate&& update);
8392

8493
/// Use this after a query to actually commit the matched fields. Notice the

Framework/Core/include/Framework/VariableContextHelpers.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#define O2_FRAMEWORK_DATADESCRIPTORMATCHER_H_
1313

1414
#include "Framework/DataDescriptorMatcher.h"
15+
#include "Framework/TimesliceSlot.h"
1516
#include <variant>
1617

1718
namespace o2::framework

Framework/Core/src/DataDescriptorMatcher.cxx

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,27 @@ ContextElement::Value const& VariableContext::get(size_t pos) const
3434
return mElements.at(pos).value;
3535
}
3636

37+
void VariableContext::publish(void (*callback)(ContextElement::Value const&, std::string const&, void*), void* context, TimesliceSlot slot, std::vector<std::string> const& names)
38+
{
39+
for (size_t i = 0; i < MAX_MATCHING_VARIABLE; i++) {
40+
auto& var = this->get(i);
41+
auto& name = names[16 * slot.index + i];
42+
auto& element = mElements[i];
43+
44+
if (element.commitVersion == element.publishVersion) {
45+
continue;
46+
}
47+
callback(var, name, context);
48+
element.publishVersion = element.commitVersion;
49+
}
50+
}
51+
3752
void VariableContext::commit()
3853
{
3954
for (size_t i = 0; i < mPerformedUpdates; ++i) {
40-
mElements[mUpdates[i].position].value = mUpdates[i].newValue;
55+
auto& element = mElements[mUpdates[i].position];
56+
element.value = mUpdates[i].newValue;
57+
element.commitVersion++;
4158
}
4259
mPerformedUpdates = 0;
4360
}

Framework/Core/src/DataRelayer.cxx

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -251,21 +251,21 @@ size_t matchToContext(void const* data,
251251
void sendVariableContextMetrics(VariableContext& context, TimesliceSlot slot,
252252
monitoring::Monitoring& metrics, std::vector<std::string> const& names)
253253
{
254-
const std::string nullstring{"null"};
254+
static const std::string nullstring{"null"};
255255

256-
for (size_t i = 0; i < MAX_MATCHING_VARIABLE; i++) {
257-
auto& var = context.get(i);
258-
auto& name = names[16 * slot.index + i];
256+
context.publish([](ContextElement::Value const& var, std::string const& name, void* context) {
257+
monitoring::Monitoring* metrics = reinterpret_cast<monitoring::Monitoring*>(context);
259258
if (auto pval = std::get_if<uint64_t>(&var)) {
260-
metrics.send(monitoring::Metric{std::to_string(*pval), name, Verbosity::Debug});
259+
metrics->send(monitoring::Metric{std::to_string(*pval), name, Verbosity::Debug});
261260
} else if (auto pval = std::get_if<uint32_t>(&var)) {
262-
metrics.send(monitoring::Metric{std::to_string(*pval), name, Verbosity::Debug});
261+
metrics->send(monitoring::Metric{std::to_string(*pval), name, Verbosity::Debug});
263262
} else if (auto pval2 = std::get_if<std::string>(&var)) {
264-
metrics.send(monitoring::Metric{*pval2, name, Verbosity::Debug});
263+
metrics->send(monitoring::Metric{*pval2, name, Verbosity::Debug});
265264
} else {
266-
metrics.send(monitoring::Metric{nullstring, name, Verbosity::Debug});
265+
metrics->send(monitoring::Metric{nullstring, name, Verbosity::Debug});
267266
}
268-
}
267+
},
268+
&metrics, slot, names);
269269
}
270270

271271
void DataRelayer::setOldestPossibleInput(TimesliceId proposed, ChannelIndex channel)

0 commit comments

Comments
 (0)