Skip to content

Commit fd50141

Browse files
committed
dcs-config-proxy and sample consumer workflow
1 parent 5a5ce89 commit fd50141

4 files changed

Lines changed: 285 additions & 5 deletions

File tree

Detectors/DCS/testWorkflow/CMakeLists.txt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,14 @@ o2_add_executable(
1616
SOURCES src/dcs-data-client-workflow.cxx
1717
PUBLIC_LINK_LIBRARIES O2::DCStestWorkflow)
1818

19+
o2_add_executable(
20+
config-proxy
21+
COMPONENT_NAME dcs
22+
SOURCES src/dcs-config-proxy.cxx
23+
PUBLIC_LINK_LIBRARIES O2::Framework O2::DetectorsCommonDataFormats O2::CommonUtils)
24+
25+
o2_add_executable(
26+
config-consumer-test-workflow
27+
COMPONENT_NAME dcs
28+
SOURCES src/dcs-config-test-workflow.cxx
29+
PUBLIC_LINK_LIBRARIES O2::Framework O2::DetectorsCommonDataFormats)

Detectors/DCS/testWorkflow/README.md

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,24 @@
77
Local example workflow with local CCDB (running on port 6464) :
88

99
```shell
10-
o2-dcs-random-data-workflow --max-timeframes=10 |
10+
o2-dcs-random-data-workflow --max-timeframes=10 |
1111
o2-calibration-ccdb-populator-workflow --ccdb-path http://localhost:6464
1212
```
1313

1414
# Simulation of detector specific data points
1515

16-
In order to test the processing of their datapoints, subsystems can, for instance, setup a basic workflow chain consisting of a simulator, a processor and a ccdb populator.
16+
In order to test the processing of their datapoints, subsystems can, for instance, setup a basic workflow chain consisting of a simulator, a processor and a ccdb populator.
1717

1818
```console
19-
det-dcs-simulator | det-processor | o2-calibration-ccdb-populator-workflow
19+
det-dcs-simulator | det-processor | o2-calibration-ccdb-populator-workflow
2020
```
2121

2222
The simulator must create a message containing a vector of DataPointCompositeObject for the detector. The processor then does "something" with those data points, and creates a set of object pairs (clbInfo,clbPayload) that are transmitted to the ccdb populator to be uploaded to the CCDB.
2323

24-
The ccdb populator is an existing workflow that can be reused by any susbsystem. The processor is of course detector specific and must be written accordingly.
24+
The ccdb populator is an existing workflow that can be reused by any susbsystem. The processor is of course detector specific and must be written accordingly.
2525
The simulator is also detector specific in the sense that each detector has a different set of datapoints to be simulated. It can be written from scratch if so desired, but it can also be written with the help of `getDCSRandomGeneratorSpec` function (defined in the `Detectors/DCS/testWorkflow/include/DCStestWorkflow/DCSRandomDataGeneratorSpec.h` include file) for cases where random generation of data points is sufficient.
2626

27-
It then boils down to :
27+
It then boils down to :
2828

2929
```
3030
#include "DCStestWorkflow/DCSRandomDataGeneratorSpec.h"
@@ -64,3 +64,19 @@ o2-dcs-proxy --dcs-proxy '--channel-config "name=dcs-proxy,type=pull,method=conn
6464

6565

6666

67+
# dcs-config proxy
68+
69+
This is a proxy to import the detector configuration files from DCS server into the DPL. A simple test is
70+
71+
```
72+
DET="TOF"
73+
CHANFROM='"type=sub,method=connect,address=tcp://127.0.0.1:5556,rateLogging=1,transport=zeromq"'
74+
CHANACK='"type=push,method=connect,address=tcp://127.0.0.1:5557,rateLogging=1,transport=zeromq"'
75+
o2-dcs-config-proxy --subscribe-to $CHANFROM --acknowlege-to $CHANACK | o2-dcs-config-consumer-test-workflow --detector $DET
76+
```
77+
78+
to receive from the `CHANFROM` DCS channel the configuration file name (starting with detector name) and the file itself and inject them as DPL messages with specs
79+
`<DET>/DCS_CONFIG_NAME/0` and `<DET>/DCS_CONFIG_FILE/0` respectively.
80+
The `o2-dcs-config-consumer-test-workflow` is a dummy processing device which just consumes such messages for the detector `<DET>`.
81+
82+
If the `CHANACK` string is not empty, then the acknowledgment string `OK` will be sent to this channel on every reception of the DCS message.
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
// example to run:
12+
// o2-dcs-config-proxy --dcs-config-proxy '--channel-config "name=dcs-config-proxy,type=sub,method=connect,address=tcp://127.0.0.1:5556,rateLogging=1,transport=zeromq"' \
13+
// --acknowlege-to "type=push,method=connect,address=tcp://127.0.0.1:5557,rateLogging=1,transport=zeromq"
14+
15+
#include "Framework/WorkflowSpec.h"
16+
#include "Framework/DataProcessorSpec.h"
17+
#include "Framework/DataSpecUtils.h"
18+
#include "Framework/ControlService.h"
19+
#include "Framework/Logger.h"
20+
#include "Framework/Lifetime.h"
21+
#include "Framework/ConfigParamSpec.h"
22+
#include "Framework/ExternalFairMQDeviceProxy.h"
23+
#include "DetectorsCommonDataFormats/DetID.h"
24+
#include "Headers/DataHeaderHelpers.h"
25+
#include <fairmq/FairMQDevice.h>
26+
#include "CommonUtils/StringUtils.h"
27+
#include <vector>
28+
#include <string>
29+
30+
using namespace o2::framework;
31+
using DetID = o2::detectors::DetID;
32+
33+
void sendAnswer(const std::string& what, const std::string& ack_chan, FairMQDevice& device)
34+
{
35+
if (!ack_chan.empty()) {
36+
LOG(INFO) << "Sending acknowledgment " << what;
37+
auto fmqFactory = device.GetChannel(ack_chan).Transport();
38+
auto msg = fmqFactory->CreateMessage(what.size(), fair::mq::Alignment{64});
39+
memcpy(msg->GetData(), what.c_str(), what.size());
40+
FairMQParts outParts;
41+
outParts.AddPart(std::move(msg));
42+
sendOnChannel(device, outParts, ack_chan);
43+
}
44+
}
45+
46+
auto getDetID(const std::string& filename)
47+
{
48+
// assume the filename start with detector name
49+
return DetID::nameToID(filename.substr(0, 3).c_str(), DetID::First);
50+
}
51+
52+
InjectorFunction dcs2dpl(const std::string& acknowledge)
53+
{
54+
55+
auto timesliceId = std::make_shared<size_t>(0);
56+
57+
return [acknowledge, timesliceId](FairMQDevice& device, FairMQParts& parts, ChannelRetriever channelRetriever) {
58+
// make sure just 2 messages received
59+
if (parts.Size() != 2) {
60+
LOG(ERROR) << "received " << parts.Size() << " instead of 2 expected";
61+
sendAnswer("error: wrong number of messages", acknowledge, device);
62+
return;
63+
}
64+
std::string filename{static_cast<const char*>(parts.At(0)->GetData()), parts.At(0)->GetSize()};
65+
size_t filesize = parts.At(1)->GetSize();
66+
LOG(INFO) << "received file " << filename << " of size " << filesize;
67+
int dID = getDetID(filename);
68+
if (dID < 0) {
69+
LOG(ERROR) << "unknown detector for " << filename;
70+
sendAnswer("error: unrecognized filename", acknowledge, device);
71+
return;
72+
}
73+
74+
o2::header::DataHeader hdrF("DCS_CONFIG_FILE", DetID(dID).getDataOrigin(), 0);
75+
o2::header::DataHeader hdrN("DCS_CONFIG_NAME", DetID(dID).getDataOrigin(), 0);
76+
OutputSpec outsp{hdrN.dataOrigin, hdrN.dataDescription, hdrN.subSpecification};
77+
auto channel = channelRetriever(outsp, *timesliceId);
78+
if (channel.empty()) {
79+
LOG(ERROR) << "No output channel found for OutputSpec " << outsp;
80+
sendAnswer("error: no channel to send", acknowledge, device);
81+
return;
82+
}
83+
84+
hdrN.tfCounter = *timesliceId; // this also
85+
hdrN.payloadSerializationMethod = o2::header::gSerializationMethodNone;
86+
hdrN.splitPayloadParts = 1;
87+
hdrN.splitPayloadIndex = 1;
88+
hdrN.payloadSize = parts.At(0)->GetSize();
89+
hdrN.firstTForbit = 0; // this should be irrelevant for DCS
90+
91+
hdrF.tfCounter = *timesliceId; // this also
92+
hdrF.payloadSerializationMethod = o2::header::gSerializationMethodNone;
93+
hdrF.splitPayloadParts = 1;
94+
hdrF.splitPayloadIndex = 1;
95+
hdrF.payloadSize = filesize;
96+
hdrF.firstTForbit = 0; // this should be irrelevant for DCS
97+
98+
auto fmqFactory = device.GetChannel(channel).Transport();
99+
100+
o2::header::Stack headerStackF{hdrF, DataProcessingHeader{*timesliceId, 0}};
101+
auto hdMessageF = fmqFactory->CreateMessage(headerStackF.size(), fair::mq::Alignment{64});
102+
auto plMessageF = fmqFactory->CreateMessage(hdrF.payloadSize, fair::mq::Alignment{64});
103+
memcpy(hdMessageF->GetData(), headerStackF.data(), headerStackF.size());
104+
memcpy(plMessageF->GetData(), parts.At(1)->GetData(), hdrF.payloadSize);
105+
106+
o2::header::Stack headerStackN{hdrN, DataProcessingHeader{*timesliceId, 0}};
107+
auto hdMessageN = fmqFactory->CreateMessage(headerStackN.size(), fair::mq::Alignment{64});
108+
auto plMessageN = fmqFactory->CreateMessage(hdrN.payloadSize, fair::mq::Alignment{64});
109+
memcpy(hdMessageN->GetData(), headerStackN.data(), headerStackN.size());
110+
memcpy(plMessageN->GetData(), parts.At(0)->GetData(), hdrN.payloadSize);
111+
112+
FairMQParts outParts;
113+
outParts.AddPart(std::move(hdMessageF));
114+
outParts.AddPart(std::move(plMessageF));
115+
outParts.AddPart(std::move(hdMessageN));
116+
outParts.AddPart(std::move(plMessageN));
117+
sendOnChannel(device, outParts, channel);
118+
119+
sendAnswer("OK", acknowledge, device);
120+
LOG(INFO) << "Sent DPL message and acknowledgment for file " << filename;
121+
};
122+
}
123+
124+
// we need to add workflow options before including Framework/runDataProcessing
125+
void customize(std::vector<ConfigParamSpec>& workflowOptions)
126+
{
127+
workflowOptions.push_back(ConfigParamSpec{"acknowlege-to", VariantType::String, "type=push,method=connect,address=tcp://127.0.0.1:5557,rateLogging=1,transport=zeromq", {"channel to acknowledge, no acknowledgement if empty"}});
128+
workflowOptions.push_back(ConfigParamSpec{"subscribe-to", VariantType::String, "type=sub,method=connect,address=tcp://127.0.0.1:5556,rateLogging=1,transport=zeromq", {"channel subscribe to"}});
129+
}
130+
131+
#include "Framework/runDataProcessing.h"
132+
133+
WorkflowSpec defineDataProcessing(ConfigContext const& config)
134+
{
135+
auto setChanName = [](const std::string& chan, const std::string& name) {
136+
size_t n = 0;
137+
if (std::string(chan).find("name=") != std::string::npos) {
138+
n = std::string(chan).find(",");
139+
if (n == std::string::npos) {
140+
throw std::runtime_error(fmt::format("wrongly formatted channel: {}", chan));
141+
}
142+
n++;
143+
}
144+
return o2::utils::Str::concat_string("name=", name, ",", chan.substr(n, chan.size()));
145+
};
146+
147+
const std::string devName = "dcs-config-proxy";
148+
auto chan = config.options().get<std::string>("subscribe-to");
149+
if (chan.empty()) {
150+
throw std::runtime_error("input channel is not provided");
151+
}
152+
chan = setChanName(chan, devName);
153+
154+
auto chanTo = config.options().get<std::string>("acknowlege-to");
155+
std::string ackChan{};
156+
if (!chanTo.empty()) {
157+
ackChan = "ackChan";
158+
chan = o2::utils::Str::concat_string(chan, ";", setChanName(chanTo, ackChan));
159+
}
160+
LOG(INFO) << "Channels setup: " << chan;
161+
Outputs dcsOutputs;
162+
for (int id = DetID::First; id <= DetID::Last; id++) {
163+
dcsOutputs.emplace_back(DetID(id).getDataOrigin(), "DCS_CONFIG_FILE", 0, Lifetime::Timeframe);
164+
dcsOutputs.emplace_back(DetID(id).getDataOrigin(), "DCS_CONFIG_NAME", 0, Lifetime::Timeframe);
165+
}
166+
167+
DataProcessorSpec dcsConfigProxy = specifyExternalFairMQDeviceProxy(
168+
devName.c_str(),
169+
std::move(dcsOutputs),
170+
// this is just default, can be overriden by --dcs-config-proxy '--channel-config..'
171+
chan.c_str(),
172+
dcs2dpl(ackChan));
173+
174+
WorkflowSpec workflow;
175+
workflow.emplace_back(dcsConfigProxy);
176+
return workflow;
177+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
#include "Framework/DataProcessorSpec.h"
12+
#include "Framework/Task.h"
13+
#include "Framework/Logger.h"
14+
#include "CommonUtils/ConfigurableParam.h"
15+
#include "DetectorsCommonDataFormats/DetID.h"
16+
17+
using namespace o2::framework;
18+
using DetID = o2::detectors::DetID;
19+
20+
// we need to add workflow options before including Framework/runDataProcessing
21+
void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
22+
{
23+
// option allowing to set parameters
24+
std::vector<o2::framework::ConfigParamSpec> options{
25+
{"configKeyValues", VariantType::String, "", {"Semicolon separated key=value strings"}},
26+
{"detector", VariantType::String, "ITS", {"detector name"}}};
27+
28+
std::swap(workflowOptions, options);
29+
}
30+
// ------------------------------------------------------------------
31+
32+
namespace o2
33+
{
34+
namespace dcs
35+
{
36+
class DCSConfigConsumer : public o2::framework::Task
37+
{
38+
public:
39+
void run(o2::framework::ProcessingContext& pc) final
40+
{
41+
auto fileBuff = pc.inputs().get<gsl::span<char>>("confFile");
42+
auto fileName = pc.inputs().get<std::string>("confFileName");
43+
LOG(INFO) << "got input file " << fileName << " of size " << fileBuff.size();
44+
}
45+
};
46+
} // namespace dcs
47+
} // namespace o2
48+
49+
DataProcessorSpec getDCSConsumerSpec(DetID det)
50+
{
51+
std::string procName = "dcs-config-consumer-";
52+
procName += det.getName();
53+
return DataProcessorSpec{
54+
procName,
55+
Inputs{{"confFile", ConcreteDataTypeMatcher{det.getDataOrigin(), "DCS_CONFIG_FILE"}, Lifetime::Timeframe},
56+
{"confFileName", ConcreteDataTypeMatcher{det.getDataOrigin(), "DCS_CONFIG_NAME"}, Lifetime::Timeframe}},
57+
Outputs{},
58+
AlgorithmSpec{adaptFromTask<o2::dcs::DCSConfigConsumer>()},
59+
Options{}};
60+
}
61+
62+
#include "Framework/runDataProcessing.h"
63+
64+
WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
65+
{
66+
WorkflowSpec specs;
67+
o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
68+
auto detName = configcontext.options().get<std::string>("detector");
69+
auto detID = DetID::nameToID(detName.c_str(), DetID::First);
70+
if (detID < 0) {
71+
throw std::runtime_error(fmt::format("{} is not a valid detector name", detName));
72+
}
73+
specs.emplace_back(getDCSConsumerSpec({detID}));
74+
75+
return specs;
76+
}

0 commit comments

Comments
 (0)