Skip to content

Commit 1c38285

Browse files
committed
DPL: move string metrics to use DataProcessingStates
1 parent ce980bf commit 1c38285

25 files changed

Lines changed: 247 additions & 135 deletions

Framework/Core/include/Framework/DataProcessingStates.h

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,9 @@ struct DataProcessingStatsHelpers {
3232
};
3333

3434
/// Helper struct to define some known states
35-
enum struct ProcessingStateId : int {
35+
enum struct ProcessingStateId : short {
3636
DUMMY_STATE = 0,
37+
DATA_QUERIES = 1,
3738
};
3839

3940
/// Helper struct to hold state of the data processing while it is running.
@@ -46,13 +47,33 @@ struct DataProcessingStates {
4647
DataProcessingStates(std::function<void(int64_t& base, int64_t& offset)> getRealtimeBase,
4748
std::function<int64_t(int64_t base, int64_t offset)> getTimestamp);
4849

50+
DataProcessingStates(DataProcessingStates const& other)
51+
: getRealtimeBase(other.getRealtimeBase),
52+
getTimestamp(other.getTimestamp),
53+
statesSize(other.statesSize.load()),
54+
lastInsertedState(other.lastInsertedState.load()),
55+
nextState(other.nextState.load()),
56+
pendingStates(other.pendingStates.load()),
57+
generation(other.generation.load()),
58+
updatedMetricsLapse(other.updatedMetricsLapse.load()),
59+
store(other.store),
60+
statesIndex(other.statesIndex),
61+
statesBuffer(other.statesBuffer),
62+
statesViews(other.statesViews),
63+
updated(other.updated),
64+
stateNames(other.stateNames),
65+
updateInfos(other.updateInfos),
66+
stateSpecs(other.stateSpecs)
67+
{
68+
}
69+
4970
constexpr static ServiceKind service_kind = ServiceKind::Global;
5071
constexpr static int STATES_BUFFER_SIZE = 1 << 16;
5172
constexpr static int MAX_STATES = 256;
5273

5374
// This is the structure to request the state update
5475
struct CommandSpec {
55-
int id = -1; // Id of the state to update.
76+
short id = -1; // Id of the state to update.
5677
int size = 0; // Size of the state.
5778
char const* data = nullptr; // Pointer to the beginning of the state
5879
};
@@ -85,9 +106,7 @@ struct DataProcessingStates {
85106
// Id of the metric. It must match the index in the metrics array.
86107
// Name of the metric
87108
std::string name = "";
88-
int stateId = -1;
89-
/// The default value for the state
90-
char defaultValue = 0;
109+
short stateId = -1;
91110
/// How many milliseconds must have passed since the last publishing
92111
int64_t minPublishInterval = 0;
93112
/// After how many milliseconds we should still refresh the metric

Framework/Core/include/Framework/DebugGUI.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "Framework/DeviceMetricsInfo.h"
1919
#include "Framework/DriverInfo.h"
2020
#include "Framework/DriverControl.h"
21+
#include "Framework/DataProcessingStates.h"
2122

2223
#include <functional>
2324
#include <vector>
@@ -29,6 +30,7 @@ struct ServiceRegistry;
2930
struct DebugGUI {
3031
virtual std::function<void(void)> getGUIDebugger(std::vector<o2::framework::DeviceInfo> const& infos,
3132
std::vector<o2::framework::DeviceSpec> const& devices,
33+
std::vector<o2::framework::DataProcessingStates> const& allStates,
3234
std::vector<o2::framework::DataProcessorInfo> const& metadata,
3335
std::vector<o2::framework::DeviceMetricsInfo> const& metricsInfos,
3436
o2::framework::DriverInfo const& driverInfo,

Framework/Core/include/Framework/DeviceInfo.h

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
#include "Framework/LogParsingHelpers.h"
1515
#include "Framework/Metric2DViewIndex.h"
1616
#include "Framework/DeviceState.h"
17-
#include "Framework/DataProcessingStateManager.h"
1817

1918
#include <cstddef>
2019
#include <string>
@@ -75,8 +74,6 @@ struct DeviceInfo {
7574
/// Index for the variables of a given relayer.
7675
Metric2DViewIndex variablesViewIndex;
7776
/// Index for the queries of each input route.
78-
Metric2DViewIndex queriesViewIndex;
79-
/// Index for the queries of each input route.
8077
Metric2DViewIndex outputsViewIndex;
8178
/// Index for the metrics to be displayed associated to
8279
/// each input channel of the device.
@@ -95,8 +92,6 @@ struct DeviceInfo {
9592
size_t lastSignal;
9693
/// An incremental number for the state of the device
9794
int providedState = 0;
98-
99-
DataProcessingStateManager dataProcessingStateManager;
10095
};
10196

10297
} // namespace o2::framework

Framework/Core/include/Framework/Metric2DViewIndex.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct MetricInfo;
2525
/// This allows keeping track of the metrics which should be grouped together
2626
/// in some sort of 2D representation.
2727
struct Metric2DViewIndex {
28-
using Updater = std::function<void(std::array<Metric2DViewIndex*, 6>& views, std::string const&, MetricInfo const&, int value, size_t metricIndex)>;
28+
using Updater = std::function<void(std::array<Metric2DViewIndex*, 5>& views, std::string const&, MetricInfo const&, int value, size_t metricIndex)>;
2929
/// The prefix in the metrics store to be used for the view
3030
std::string prefix;
3131
/// The size in X of the metrics

Framework/Core/src/CommonServices.cxx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,8 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStates()
847847
.configure = noConfiguration(),
848848
.postProcessing = [](ProcessingContext& context, void* service) {
849849
auto* states = (DataProcessingStates*)service;
850-
states->updateState({(short)ProcessingStateId::DUMMY_STATE, (int) strlen("somestate"), "somestate"}); },
850+
states->processCommandQueue();
851+
//states->updateState({(short)ProcessingStateId::DUMMY_STATE, (int) strlen("somestate"), "somestate"}); },
851852
.preDangling = [](DanglingContext& context, void* service) {
852853
auto* states = (DataProcessingStates*)service;
853854
flushStates(context.services(), *states); },
@@ -857,7 +858,7 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStates()
857858
.preEOS = [](EndOfStreamContext& context, void* service) {
858859
auto* states = (DataProcessingStates*)service;
859860
flushStates(context.services(), *states); },
860-
.kind = ServiceKind::Serial};
861+
.kind = ServiceKind::Global };
861862
}
862863

863864
struct GUIMetrics {
@@ -964,8 +965,8 @@ std::vector<ServiceSpec> CommonServices::defaultServices(int numThreads)
964965
parallelSpec(),
965966
callbacksSpec(),
966967
dataProcessingStats(),
967-
dataRelayer(),
968968
dataProcessingStates(),
969+
dataRelayer(),
969970
CommonMessageBackends::fairMQDeviceProxy(),
970971
dataSender(),
971972
objectCache(),

Framework/Core/src/ControlService.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void ControlService::push(std::string_view key, std::string_view value, int64_t
8282
{
8383
std::scoped_lock lock(mMutex);
8484
mDriverClient.tell(fmt::format("CONTROL_ACTION: PUT {} {} {}", key, timestamp, value));
85-
// mDriverClient.flushPending();
85+
mDriverClient.flushPending();
8686
}
8787

8888
void ControlService::notifyDeviceState(std::string currentState)

Framework/Core/src/ControlServiceHelpers.cxx

Lines changed: 59 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
#include "Framework/RawDeviceService.h"
1313
#include "Framework/Logger.h"
1414
#include "Framework/DeviceInfo.h"
15+
#include "Framework/RuntimeError.h"
16+
#include "Framework/DataProcessingStates.h"
1517
#include <string>
1618
#include <string_view>
1719
#include <regex>
@@ -37,18 +39,32 @@ bool ControlServiceHelpers::parseControl(std::string_view const& s, std::match_r
3739
}
3840

3941
void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
42+
std::vector<DataProcessingStates>& allStates,
4043
pid_t pid,
4144
std::string const& command,
4245
std::string const& arg)
4346
{
44-
auto doToMatchingPid = [](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
45-
for (auto& deviceInfo : infos) {
47+
auto doToMatchingPid = [&](std::vector<DeviceInfo>& infos, pid_t pid, auto lambda) {
48+
assert(infos.size() == allStates.size());
49+
for (size_t i = 0; i < infos.size(); ++i) {
50+
auto& deviceInfo = infos[i];
4651
if (deviceInfo.pid == pid) {
4752
return lambda(deviceInfo);
4853
}
4954
}
5055
LOGP(error, "Command received for pid {} which does not exists.", pid);
5156
};
57+
auto doToMatchingStatePid = [&](std::vector<DeviceInfo>& infos, std::vector<DataProcessingStates>& allStates, pid_t pid, auto lambda) {
58+
assert(infos.size() == allStates.size());
59+
for (size_t i = 0; i < infos.size(); ++i) {
60+
auto& deviceInfo = infos[i];
61+
auto& states = allStates[i];
62+
if (deviceInfo.pid == pid) {
63+
return lambda(deviceInfo, states);
64+
}
65+
}
66+
LOGP(error, "Command received for pid {} which does not exists.", pid);
67+
};
5268
LOGP(debug2, "Found control command {} from pid {} with argument {}.", command, pid, arg);
5369
if (command == "QUIT" && arg == "ALL") {
5470
for (auto& deviceInfo : infos) {
@@ -68,37 +84,49 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
6884
} else if (command == "NOTIFY_DEVICE_STATE") {
6985
doToMatchingPid(infos, pid, [arg](DeviceInfo& info) { info.deviceState = arg; info.providedState++; });
7086
} else if (command == "PUT") {
71-
doToMatchingPid(infos, pid, [&arg](DeviceInfo& info) {
72-
// Tokenize arg for the first two empty space using strtok_r
73-
// and create string_views associated to it.
74-
char* brk;
75-
char* beginKey = strtok_r((char*)arg.data(), " ", &brk);
76-
char* endKey = strtok_r(nullptr, " ", &brk);
77-
char* beginTimestamp = endKey;
78-
char* endTimestamp = strtok_r(nullptr, " ", &brk);
79-
char* beginValue = endTimestamp;
80-
char* endValue = (char*)arg.data() + arg.size();
81-
std::string_view key(beginKey, endKey - beginKey);
82-
std::string_view timestamp(beginTimestamp, endTimestamp - beginTimestamp);
83-
std::string_view value(beginValue, endValue - beginValue);
84-
int timestampInt = std::stoll(timestamp.data());
87+
doToMatchingStatePid(infos, allStates, pid, [&arg](DeviceInfo& info, DataProcessingStates& states) {
88+
/// Use scanf to parse PUT <key> <timestamp>
89+
// find the first space, that is the beginning of the key.
90+
// Find the position of the fist space in beginKey.
91+
auto beginKey = 0;
92+
// If we did not find it complain and return.
93+
if (beginKey == std::string::npos) {
94+
LOGP(error, "Cannot parse key in PUT command with arg {} for device {}", arg, info.pid);
95+
return;
96+
}
97+
auto endKey = arg.find(' ', beginKey + 1);
98+
if (endKey == std::string::npos) {
99+
LOGP(error, "Cannot parse timestamp in PUT command with arg {}", arg);
100+
return;
101+
}
102+
auto beginTimestamp = endKey + 1;
103+
auto endTimestamp = arg.find(' ', beginTimestamp + 1);
104+
if (endTimestamp == std::string::npos) {
105+
LOGP(error, "Cannot parse value in PUT command with arg {}", arg);
106+
return;
107+
}
108+
auto beginValue = endTimestamp + 1;
109+
auto endValue = arg.size();
85110

86-
// Find the StateInfo in the dataProcessingStateManager with the same key. Insert a new one if not found.
87-
auto& infos = info.dataProcessingStateManager.infos;
88-
auto& states = info.dataProcessingStateManager.states;
89-
auto it = std::lower_bound(infos.begin(), infos.end(), key, [](DataProcessingStateManager::StateInfo const& stateInfo, std::string_view const& key) { return stateInfo.name < key; });
90-
if (it == infos.end() || it->name != key) {
91-
it = infos.insert(it, DataProcessingStateManager::StateInfo{std::string{key}, timestampInt, (int)states.size()});
92-
states.resize(states.size() + 1);
93-
memcpy(states.back().data(), value.data(), value.size());
94-
states.back()[value.size()] = '\0';
95-
LOG(debug) << "New state" << key << " with timestamp " << timestamp << " and value " << value;
96-
} else {
97-
it->lastUpdate = timestampInt;
98-
memcpy(states[it->index].data(), value.data(), value.size());
99-
states[it->index][value.size()] = '\0';
100-
LOG(debug) << "Updated state" << key << " with timestamp " << timestamp << " and value " << value;
111+
std::string_view key(arg.data() + beginKey, endKey - beginKey);
112+
std::string_view timestamp(arg.data() + beginTimestamp, endTimestamp - beginTimestamp);
113+
std::string_view value(arg.data() + beginValue, endValue - beginValue);
114+
// Find the assocaiated StateSpec and get the id.
115+
auto spec = std::find_if(states.stateSpecs.begin(), states.stateSpecs.end(), [&key](auto const& spec) {
116+
return spec.name == key;
117+
});
118+
if (spec == states.stateSpecs.end()) {
119+
LOGP(warn, "Cannot find state {} in the state specs for pid {}", key.data());
120+
return;
121+
}
122+
if (value.data() == nullptr) {
123+
LOGP(debug, "State {} value is null skipping", key.data());
124+
return;
101125
}
126+
/// Notice this will remap the actual time to the time we received the command.
127+
/// This should not be a problem, because we have separate states per device.
128+
states.updateState(DataProcessingStates::CommandSpec{.id = spec->stateId, .size = (int)value.size(), .data = value.data()});
129+
states.processCommandQueue();
102130
});
103131
} else {
104132
LOGP(error, "Unknown command {} with argument {}", command, arg);

Framework/Core/src/ControlServiceHelpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#define O2_FRAMEWORK_CONTROLSERVICEHELPERS_H_
1313

1414
#include "Framework/DeviceInfo.h"
15+
#include "Framework/DataProcessingStates.h"
1516

1617
#include <unistd.h>
1718
#include <vector>
@@ -24,6 +25,7 @@ namespace o2::framework
2425
struct ControlServiceHelpers {
2526
static bool parseControl(std::string_view const& s, std::match_results<std::string_view::const_iterator>& match);
2627
static void processCommand(std::vector<DeviceInfo>& infos,
28+
std::vector<DataProcessingStates>& allStates,
2729
pid_t pid,
2830
std::string const& command,
2931
std::string const& arg);

Framework/Core/src/ControlWebSocketHandler.cxx

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,8 @@ namespace o2::framework
2222
void ControlWebSocketHandler::frame(char const* frame, size_t s)
2323
{
2424
bool hasNewMetric = false;
25-
std::array<Metric2DViewIndex*, 6> model = {&(*mContext.infos)[mIndex].dataRelayerViewIndex,
25+
std::array<Metric2DViewIndex*, 5> model = {&(*mContext.infos)[mIndex].dataRelayerViewIndex,
2626
&(*mContext.infos)[mIndex].variablesViewIndex,
27-
&(*mContext.infos)[mIndex].queriesViewIndex,
2827
&(*mContext.infos)[mIndex].outputsViewIndex,
2928
&(*mContext.infos)[mIndex].inputChannelMetricsViewIndex,
3029
&(*mContext.infos)[mIndex].outputChannelMetricsViewIndex};
@@ -60,7 +59,7 @@ void ControlWebSocketHandler::frame(char const* frame, size_t s)
6059
std::match_results<std::string_view::const_iterator> match;
6160

6261
if (ControlServiceHelpers::parseControl(token, match) && mContext.infos) {
63-
ControlServiceHelpers::processCommand(*mContext.infos, mPid, match[1].str(), match[2].str());
62+
ControlServiceHelpers::processCommand(*mContext.infos, *mContext.states, mPid, match[1].str(), match[2].str());
6463
} else if (doParseConfig(token, configMatch, (*mContext.infos)[mIndex]) && mContext.infos) {
6564
LOG(debug2) << "Found configuration information for pid " << mPid;
6665
} else {
@@ -78,6 +77,12 @@ void ControlWebSocketHandler::endChunk()
7877
return;
7978
}
8079
size_t timestamp = uv_now(mContext.loop);
80+
assert(mContext.metrics);
81+
assert(mContext.infos);
82+
assert(mContext.states);
83+
assert(mContext.specs);
84+
assert(mContext.driver);
85+
8186
for (auto& callback : *mContext.metricProcessingCallbacks) {
8287
callback(mContext.registry, ServiceMetricsInfo{*mContext.metrics, *mContext.specs, *mContext.infos, mContext.driver->metrics}, timestamp);
8388
}

Framework/Core/src/DataProcessingStates.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ void DataProcessingStates::processCommandQueue()
8282

8383
void DataProcessingStates::updateState(CommandSpec cmd)
8484
{
85+
LOGP(debug, "Updating state {} with {}", cmd.id, std::string_view(cmd.data, cmd.size));
8586
if (stateSpecs[cmd.id].name.empty()) {
8687
throw runtime_error_f("StateID %d was not registered", (int)cmd.id);
8788
}
@@ -160,11 +161,13 @@ void DataProcessingStates::flushChangedStates(std::function<void(std::string con
160161
continue;
161162
}
162163
if (currentTimestamp - update.lastPublished < spec.minPublishInterval) {
164+
LOGP(debug, "not publishing because of minPublishInterval");
163165
continue;
164166
}
165167
publish = true;
166168
assert(view.first + view.size <= statesBuffer.size());
167169
assert(view.first <= statesBuffer.size());
170+
LOGP(debug, "Publishing invoked {} {}", view.first, view.size);
168171
callback(spec.name.data(), update.timestamp, std::string_view(statesBuffer.data() + view.first, view.size));
169172
publishedMetricsLapse++;
170173
update.lastPublished = currentTimestamp;

0 commit comments

Comments
 (0)