Skip to content

Commit ce980bf

Browse files
committed
DPL: enable DataProcessingStates on the data processor side
This enables the DataProcessingState service on the client side, which will then replace all the metrics for which we do not actually care about the history.
1 parent 3124b96 commit ce980bf

11 files changed

Lines changed: 186 additions & 9 deletions

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ target_link_libraries(o2-test-framework-core PRIVATE O2::Catch2)
248248
get_filename_component(outdir ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/../tests ABSOLUTE)
249249
set_property(TARGET o2-test-framework-core PROPERTY RUNTIME_OUTPUT_DIRECTORY ${outdir})
250250

251-
add_test(NAME framework:core COMMAND o2-test-framework-core)
251+
add_test(NAME framework:core COMMAND o2-test-framework-core --skip-benchmarks)
252252

253253
o2_add_test(AlgorithmWrapper NAME test_Framework_test_AlgorithmWrapper
254254
SOURCES test/test_AlgorithmWrapper.cxx

Framework/Core/include/Framework/CommonServices.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ struct CommonServices {
7777
static ServiceSpec summaryServiceSpec();
7878
static ServiceSpec threadPool(int numWorkers);
7979
static ServiceSpec dataProcessingStats();
80+
static ServiceSpec dataProcessingStates();
8081
static ServiceSpec objectCache();
8182
static ServiceSpec timingInfoSpec();
8283
static ServiceSpec ccdbSupportSpec();

Framework/Core/include/Framework/ControlService.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class ControlService
5151
void endOfStream();
5252
/// Report the current streaming state of a given device
5353
void notifyStreamingState(StreamingState state);
54+
/// Push a generic key/value pair to the driver
55+
void push(std::string_view key, std::string_view value, int64_t timestamp);
5456
/// Report the current FairMQ state of a given device
5557
void notifyDeviceState(std::string state);
5658

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#ifndef O2_DATAPROCESSINGSTATEMANAGER_H_
13+
#define O2_DATAPROCESSINGSTATEMANAGER_H_
14+
15+
#include <array>
16+
#include <vector>
17+
#include <string>
18+
19+
struct DataProcessingStateManager {
20+
struct StateIndex {
21+
short id = -1;
22+
short index = -1;
23+
};
24+
struct StateInfo {
25+
std::string name;
26+
int64_t lastUpdate = 0;
27+
int index = -1;
28+
};
29+
30+
static constexpr int MAX_STATES = 1024;
31+
std::vector<std::array<char, 1024>> states = {};
32+
std::vector<StateInfo> infos = {};
33+
};
34+
35+
#endif

Framework/Core/include/Framework/DataProcessingStates.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ struct DataProcessingStatsHelpers {
3131
static std::function<int64_t(int64_t base, int64_t offset)> defaultCPUTimeConfigurator();
3232
};
3333

34+
/// Helper struct to define some known states
35+
enum struct ProcessingStateId : int {
36+
DUMMY_STATE = 0,
37+
};
38+
3439
/// Helper struct to hold state of the data processing while it is running.
3540
/// This is meant to then be used to report the state of the data processing
3641
/// to the driver.
@@ -121,8 +126,11 @@ struct DataProcessingStates {
121126
std::array<std::string, MAX_STATES> stateNames = {};
122127
std::array<UpdateInfo, MAX_STATES> updateInfos;
123128
std::array<StateSpec, MAX_STATES> stateSpecs;
124-
// How many commands have been committed to the queue.
125-
std::atomic<int> insertedStates = 0;
129+
// The last insertion point for the next state.
130+
// We use this to actually process the command buffer
131+
// because nextState is already pointing to the next
132+
// insertion point.
133+
std::atomic<int> lastInsertedState = 0;
126134
// The insertion point for the next state. Notice we
127135
// insert in the buffer backwards, so that on flush we iterate
128136
// from the last insertion point forward.

Framework/Core/include/Framework/DeviceInfo.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "Framework/LogParsingHelpers.h"
1515
#include "Framework/Metric2DViewIndex.h"
1616
#include "Framework/DeviceState.h"
17+
#include "Framework/DataProcessingStateManager.h"
1718

1819
#include <cstddef>
1920
#include <string>
@@ -94,6 +95,8 @@ struct DeviceInfo {
9495
size_t lastSignal;
9596
/// An incremental number for the state of the device
9697
int providedState = 0;
98+
99+
DataProcessingStateManager dataProcessingStateManager;
97100
};
98101

99102
} // namespace o2::framework

Framework/Core/src/CommonServices.cxx

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "Framework/DataRelayer.h"
2525
#include "Framework/Signpost.h"
2626
#include "Framework/DataProcessingStats.h"
27+
#include "Framework/DataProcessingStates.h"
2728
#include "Framework/TimingHelpers.h"
2829
#include "Framework/CommonMessageBackends.h"
2930
#include "Framework/DanglingContext.h"
@@ -627,6 +628,14 @@ auto sendRelayerMetrics(ServiceRegistryRef registry, DataProcessingStats& stats)
627628
stats.updateStats({static_cast<short>(ProcessingStatsId::TOTAL_RATE_OUT_MB_S), DataProcessingStats::Op::InstantaneousRate, totalBytesOut / 1000000});
628629
};
629630

631+
auto flushStates(ServiceRegistryRef registry, DataProcessingStates& states) -> void
632+
{
633+
states.flushChangedStates([&states, registry](std::string const& spec, int64_t timestamp, std::string_view value) mutable -> void {
634+
auto& client = registry.get<ControlService>();
635+
client.push(spec, value, timestamp);
636+
});
637+
}
638+
630639
/// This will flush metrics only once every second.
631640
auto flushMetrics(ServiceRegistryRef registry, DataProcessingStats& stats) -> void
632641
{
@@ -819,6 +828,38 @@ o2::framework::ServiceSpec CommonServices::dataProcessingStats()
819828
.kind = ServiceKind::Serial};
820829
}
821830

831+
// This is similar to the dataProcessingStats, but it designed to synchronize
832+
// history-less metrics which are e.g. used for the GUI.
833+
o2::framework::ServiceSpec CommonServices::dataProcessingStates()
834+
{
835+
return ServiceSpec{
836+
.name = "data-processing-states",
837+
.init = [](ServiceRegistryRef services, DeviceState& state, fair::mq::ProgOptions& options) -> ServiceHandle {
838+
timespec now;
839+
clock_gettime(CLOCK_REALTIME, &now);
840+
uv_update_time(state.loop);
841+
uint64_t offset = now.tv_sec * 1000 - uv_now(state.loop);
842+
auto* states = new DataProcessingStates(TimingHelpers::defaultRealtimeBaseConfigurator(offset, state.loop),
843+
TimingHelpers::defaultCPUTimeConfigurator(state.loop));
844+
states->registerState({"dummy_state", (short)ProcessingStateId::DUMMY_STATE});
845+
return ServiceHandle{TypeIdHelpers::uniqueId<DataProcessingStates>(), states};
846+
},
847+
.configure = noConfiguration(),
848+
.postProcessing = [](ProcessingContext& context, void* service) {
849+
auto* states = (DataProcessingStates*)service;
850+
states->updateState({(short)ProcessingStateId::DUMMY_STATE, (int) strlen("somestate"), "somestate"}); },
851+
.preDangling = [](DanglingContext& context, void* service) {
852+
auto* states = (DataProcessingStates*)service;
853+
flushStates(context.services(), *states); },
854+
.postDangling = [](DanglingContext& context, void* service) {
855+
auto* states = (DataProcessingStates*)service;
856+
flushStates(context.services(), *states); },
857+
.preEOS = [](EndOfStreamContext& context, void* service) {
858+
auto* states = (DataProcessingStates*)service;
859+
flushStates(context.services(), *states); },
860+
.kind = ServiceKind::Serial};
861+
}
862+
822863
struct GUIMetrics {
823864
};
824865

@@ -924,6 +965,7 @@ std::vector<ServiceSpec> CommonServices::defaultServices(int numThreads)
924965
callbacksSpec(),
925966
dataProcessingStats(),
926967
dataRelayer(),
968+
dataProcessingStates(),
927969
CommonMessageBackends::fairMQDeviceProxy(),
928970
dataSender(),
929971
objectCache(),

Framework/Core/src/ControlService.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ void ControlService::notifyStreamingState(StreamingState state)
7878
mDriverClient.flushPending();
7979
}
8080

81+
void ControlService::push(std::string_view key, std::string_view value, int64_t timestamp)
82+
{
83+
std::scoped_lock lock(mMutex);
84+
mDriverClient.tell(fmt::format("CONTROL_ACTION: PUT {} {} {}", key, timestamp, value));
85+
// mDriverClient.flushPending();
86+
}
87+
8188
void ControlService::notifyDeviceState(std::string currentState)
8289
{
8390
std::scoped_lock lock(mMutex);

Framework/Core/src/ControlServiceHelpers.cxx

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,12 @@ bool ControlServiceHelpers::parseControl(std::string_view const& s, std::match_r
2828
const static std::regex controlRE1("^READY_TO_(QUIT)_(ME|ALL)", std::regex::optimize);
2929
const static std::regex controlRE2("^(NOTIFY_STREAMING_STATE) (IDLE|STREAMING|EOS)", std::regex::optimize);
3030
const static std::regex controlRE3("^(NOTIFY_DEVICE_STATE) ([A-Z ]*)", std::regex::optimize);
31+
const static std::regex controlRE4("^(PUT) (.*)", std::regex::optimize);
3132
std::string_view sv = s.substr(pos + strlen("CONTROL_ACTION: "));
32-
return std::regex_search(sv.begin(), sv.end(), match, controlRE1) || std::regex_search(sv.begin(), sv.end(), match, controlRE2) || std::regex_search(sv.begin(), sv.end(), match, controlRE3);
33+
return std::regex_search(sv.begin(), sv.end(), match, controlRE1) ||
34+
std::regex_search(sv.begin(), sv.end(), match, controlRE2) ||
35+
std::regex_search(sv.begin(), sv.end(), match, controlRE3) ||
36+
std::regex_search(sv.begin(), sv.end(), match, controlRE4);
3337
}
3438

3539
void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
@@ -63,6 +67,39 @@ void ControlServiceHelpers::processCommand(std::vector<DeviceInfo>& infos,
6367
doToMatchingPid(infos, pid, [](DeviceInfo& info) { info.streamingState = StreamingState::EndOfStreaming; });
6468
} else if (command == "NOTIFY_DEVICE_STATE") {
6569
doToMatchingPid(infos, pid, [arg](DeviceInfo& info) { info.deviceState = arg; info.providedState++; });
70+
} else if (command == "PUT") {
71+
doToMatchingPid(infos, pid, [&arg](DeviceInfo& info) {
72+
// Tokenize arg for the first two empty space using strtok_r
73+
// and create string_views associated to it.
74+
char* brk;
75+
char* beginKey = strtok_r((char*)arg.data(), " ", &brk);
76+
char* endKey = strtok_r(nullptr, " ", &brk);
77+
char* beginTimestamp = endKey;
78+
char* endTimestamp = strtok_r(nullptr, " ", &brk);
79+
char* beginValue = endTimestamp;
80+
char* endValue = (char*)arg.data() + arg.size();
81+
std::string_view key(beginKey, endKey - beginKey);
82+
std::string_view timestamp(beginTimestamp, endTimestamp - beginTimestamp);
83+
std::string_view value(beginValue, endValue - beginValue);
84+
int timestampInt = std::stoll(timestamp.data());
85+
86+
// Find the StateInfo in the dataProcessingStateManager with the same key. Insert a new one if not found.
87+
auto& infos = info.dataProcessingStateManager.infos;
88+
auto& states = info.dataProcessingStateManager.states;
89+
auto it = std::lower_bound(infos.begin(), infos.end(), key, [](DataProcessingStateManager::StateInfo const& stateInfo, std::string_view const& key) { return stateInfo.name < key; });
90+
if (it == infos.end() || it->name != key) {
91+
it = infos.insert(it, DataProcessingStateManager::StateInfo{std::string{key}, timestampInt, (int)states.size()});
92+
states.resize(states.size() + 1);
93+
memcpy(states.back().data(), value.data(), value.size());
94+
states.back()[value.size()] = '\0';
95+
LOG(debug) << "New state" << key << " with timestamp " << timestamp << " and value " << value;
96+
} else {
97+
it->lastUpdate = timestampInt;
98+
memcpy(states[it->index].data(), value.data(), value.size());
99+
states[it->index][value.size()] = '\0';
100+
LOG(debug) << "Updated state" << key << " with timestamp " << timestamp << " and value " << value;
101+
}
102+
});
66103
} else {
67104
LOGP(error, "Unknown command {} with argument {}", command, arg);
68105
}

Framework/Core/src/DataProcessingStates.cxx

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ DataProcessingStates::DataProcessingStates(std::function<void(int64_t& base, int
3333

3434
void DataProcessingStates::processCommandQueue()
3535
{
36-
int position = nextState.load(std::memory_order_relaxed);
36+
int position = lastInsertedState.load(std::memory_order_relaxed);
3737
// Process the commands in the order we have just computed.
3838
while (position < DataProcessingStates::STATES_BUFFER_SIZE) {
3939
DataProcessingStates::CommandHeader header;
@@ -60,6 +60,7 @@ void DataProcessingStates::processCommandQueue()
6060
int newCapacity = std::max(size, 64);
6161
int first = statesBuffer.size();
6262
statesBuffer.resize(statesBuffer.size() + newCapacity);
63+
assert(first < statesBuffer.size());
6364
memcpy(statesBuffer.data() + first, &store[position + sizeof(DataProcessingStates::CommandHeader)], size);
6465
statesViews[id].first = first;
6566
statesViews[id].size = size;
@@ -75,7 +76,7 @@ void DataProcessingStates::processCommandQueue()
7576
assert(position == DataProcessingStates::STATES_BUFFER_SIZE);
7677
// We reset the queue. Once again, the queue is filled in reverse order.
7778
nextState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
78-
insertedStates.store(0, std::memory_order_relaxed);
79+
lastInsertedState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
7980
generation++;
8081
}
8182

@@ -93,7 +94,12 @@ void DataProcessingStates::updateState(CommandSpec cmd)
9394
pendingStates++;
9495
// Add a static mutex to protect the queue
9596
// Get the next available operation in an atomic way.
96-
auto size = sizeof(CommandHeader) + cmd.size;
97+
int size = sizeof(CommandHeader) + cmd.size;
98+
assert(size < 1000);
99+
if (size > 8192) {
100+
throw runtime_error_f("State size is %d for state %s. States larger than 8192 bytes not supported for now.",
101+
size, stateSpecs[cmd.id].name.c_str());
102+
}
97103
int idx = nextState.fetch_sub(size, std::memory_order_relaxed);
98104
if (idx - size < 0) {
99105
// We abort this command
@@ -106,7 +112,7 @@ void DataProcessingStates::updateState(CommandSpec cmd)
106112
std::this_thread::sleep_for(std::chrono::milliseconds(1));
107113
}
108114
processCommandQueue();
109-
insertedStates.store(0, std::memory_order_relaxed);
115+
lastInsertedState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
110116
nextState.store(STATES_BUFFER_SIZE, std::memory_order_relaxed);
111117
pendingStates++;
112118
idx = nextState.fetch_sub(size, std::memory_order_relaxed);
@@ -126,9 +132,12 @@ void DataProcessingStates::updateState(CommandSpec cmd)
126132
// reserved for us.
127133
idx -= size;
128134
CommandHeader header{(short)cmd.id, cmd.size, timestamp};
135+
assert(idx >= 0);
136+
assert(idx + sizeof(CommandHeader) + cmd.size <= store.size());
129137
memcpy(&store.data()[idx], &header, sizeof(CommandHeader));
130138
memcpy(&store.data()[idx + sizeof(CommandHeader)], cmd.data, cmd.size);
131-
insertedStates++;
139+
140+
lastInsertedState = idx;
132141
pendingStates--;
133142
// Keep track of the number of commands we have received.
134143
updatedMetricsLapse++;
@@ -154,6 +163,8 @@ void DataProcessingStates::flushChangedStates(std::function<void(std::string con
154163
continue;
155164
}
156165
publish = true;
166+
assert(view.first + view.size <= statesBuffer.size());
167+
assert(view.first <= statesBuffer.size());
157168
callback(spec.name.data(), update.timestamp, std::string_view(statesBuffer.data() + view.first, view.size));
158169
publishedMetricsLapse++;
159170
update.lastPublished = currentTimestamp;

0 commit comments

Comments
 (0)