Skip to content

Commit f1cdb26

Browse files
authored
Data Sampling: remove the possibility to use raw FairMQ channel (AliceO2Group#6313)
This was needed only in early implementation to feed Data Dump with data, which has been recently removed as well.
1 parent 484e93b commit f1cdb26

4 files changed

Lines changed: 7 additions & 91 deletions

File tree

Utilities/DataSampling/include/DataSampling/Dispatcher.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,6 @@ class Dispatcher : public framework::Task
6666
header::Stack extractAdditionalHeaders(const char* inputHeaderStack) const;
6767
void reportStats(monitoring::Monitoring& monitoring) const;
6868
void send(framework::DataAllocator& dataAllocator, const framework::DataRef& inputData, framework::Output&& output) const;
69-
void sendFairMQ(FairMQDevice* device, const framework::DataRef& inputData, const std::string& fairMQChannel,
70-
header::Stack&& stack) const;
7169

7270
std::string mName;
7371
std::string mReconfigurationSource;

Utilities/DataSampling/src/Dispatcher.cxx

Lines changed: 6 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,17 @@
1414
/// \author Piotr Konopka, piotr.jan.konopka@cern.ch
1515

1616
#include "DataSampling/Dispatcher.h"
17-
#include "Framework/RawDeviceService.h"
1817
#include "DataSampling/DataSamplingPolicy.h"
1918
#include "DataSampling/DataSamplingHeader.h"
2019
#include "Framework/DataProcessingHeader.h"
2120
#include "Framework/DataSpecUtils.h"
2221
#include "Framework/Logger.h"
2322
#include "Framework/ConfigParamRegistry.h"
2423
#include "Framework/InputRecordWalker.h"
25-
2624
#include "Framework/Monitoring.h"
25+
2726
#include <Configuration/ConfigurationInterface.h>
2827
#include <Configuration/ConfigurationFactory.h>
29-
#include <fairmq/FairMQDevice.h>
3028

3129
using namespace o2::configuration;
3230
using namespace o2::monitoring;
@@ -92,14 +90,9 @@ void Dispatcher::run(ProcessingContext& ctx)
9290
std::move(extractAdditionalHeaders(input.header)),
9391
std::move(prepareDataSamplingHeader(*policy.get(), ctx.services().get<const DeviceSpec>()))};
9492

95-
if (!policy->getFairMQOutputChannel().empty()) {
96-
sendFairMQ(ctx.services().get<RawDeviceService>().device(), input, policy->getFairMQOutputChannelName(),
97-
std::move(headerStack));
98-
} else {
99-
Output output = policy->prepareOutput(inputMatcher, input.spec->lifetime);
100-
output.metaHeader = std::move(header::Stack{std::move(output.metaHeader), std::move(headerStack)});
101-
send(ctx.outputs(), input, std::move(output));
102-
}
93+
Output output = policy->prepareOutput(inputMatcher, input.spec->lifetime);
94+
output.metaHeader = std::move(header::Stack{std::move(output.metaHeader), std::move(headerStack)});
95+
send(ctx.outputs(), input, std::move(output));
10396
}
10497
}
10598
}
@@ -158,35 +151,6 @@ void Dispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, Ou
158151
dataAllocator.snapshot(output, inputData.payload, inputHeader->payloadSize, inputHeader->payloadSerializationMethod);
159152
}
160153

161-
// ideally this should be in a separate proxy device or use Lifetime::External
162-
void Dispatcher::sendFairMQ(FairMQDevice* device, const DataRef& inputData, const std::string& fairMQChannel,
163-
header::Stack&& stack) const
164-
{
165-
const auto* dh = header::get<header::DataHeader*>(inputData.header);
166-
assert(dh);
167-
const auto* dph = header::get<DataProcessingHeader*>(inputData.header);
168-
assert(dph);
169-
170-
header::DataHeader dhout{dh->dataDescription, dh->dataOrigin, dh->subSpecification, dh->payloadSize};
171-
dhout.payloadSerializationMethod = dh->payloadSerializationMethod;
172-
DataProcessingHeader dphout{dph->startTime, dph->duration};
173-
o2::header::Stack headerStack{dhout, dphout, stack};
174-
175-
auto channelAlloc = o2::pmr::getTransportAllocator(device->Transport());
176-
FairMQMessagePtr msgHeaderStack = o2::pmr::getMessage(std::move(headerStack), channelAlloc);
177-
178-
char* payloadCopy = new char[dh->payloadSize];
179-
memcpy(payloadCopy, inputData.payload, dh->payloadSize);
180-
auto cleanupFcn = [](void* data, void*) { delete[] reinterpret_cast<char*>(data); };
181-
FairMQMessagePtr msgPayload(device->NewMessage(payloadCopy, dh->payloadSize, cleanupFcn, payloadCopy));
182-
183-
FairMQParts message;
184-
message.AddPart(move(msgHeaderStack));
185-
message.AddPart(move(msgPayload));
186-
187-
int64_t bytesSent = device->Send(message, fairMQChannel);
188-
}
189-
190154
void Dispatcher::registerPolicy(std::unique_ptr<DataSamplingPolicy>&& policy)
191155
{
192156
mPolicies.emplace_back(std::move(policy));
@@ -245,20 +209,9 @@ Outputs Dispatcher::getOutputSpecs()
245209
}
246210
framework::Options Dispatcher::getOptions()
247211
{
248-
o2::framework::Options options;
249-
for (const auto& policy : mPolicies) {
250-
if (!policy->getFairMQOutputChannel().empty()) {
251-
if (!options.empty()) {
252-
throw std::runtime_error("Maximum one policy with raw FairMQ channel is allowed, more have been declared.");
253-
}
254-
options.push_back({"channel-config", VariantType::String, policy->getFairMQOutputChannel().c_str(), {"Out-of-band channel config"}});
255-
LOG(DEBUG) << " - registering output FairMQ channel '" << policy->getFairMQOutputChannel() << "'";
256-
}
257-
}
258-
options.push_back({"period-timer-stats", framework::VariantType::Int, 10 * 1000000, {"Dispatcher's stats timer period"}});
259-
260-
return options;
212+
return {{"period-timer-stats", framework::VariantType::Int, 10 * 1000000, {"Dispatcher's stats timer period"}}};
261213
}
214+
262215
size_t Dispatcher::numberOfPolicies()
263216
{
264217
return mPolicies.size();

Utilities/DataSampling/test/test_DataSampling.cxx

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616
#include "DataSampling/DataSampling.h"
1717
#include "DataSampling/Dispatcher.h"
1818
#include "DataSampling/DataSamplingPolicy.h"
19-
#include "Framework/DataProcessingHeader.h"
20-
#include "Framework/ExternalFairMQDeviceProxy.h"
21-
#include "DataSampling/DataSamplingReadoutAdapter.h"
2219
#include "Framework/DataSpecUtils.h"
2320

2421
#include "Headers/DataHeader.h"
@@ -171,37 +168,6 @@ BOOST_AUTO_TEST_CASE(DataSamplingTimePipelineFlow)
171168
BOOST_CHECK_EQUAL(disp->maxInputTimeslices, 3);
172169
}
173170

174-
BOOST_AUTO_TEST_CASE(DataSamplingFairMq)
175-
{
176-
WorkflowSpec workflow{
177-
specifyExternalFairMQDeviceProxy(
178-
"readout-proxy",
179-
Outputs{{"TPC", "RAWDATA"}},
180-
"fake-channel-config",
181-
dataSamplingReadoutAdapter({"TPC", "RAWDATA"}))};
182-
183-
std::string configFilePath = std::string(getenv("O2_ROOT")) + "/share/tests/test_DataSampling.json";
184-
DataSampling::GenerateInfrastructure(workflow, "json:/" + configFilePath);
185-
186-
auto disp = std::find_if(workflow.begin(), workflow.end(),
187-
[](const DataProcessorSpec& d) {
188-
return d.name.find("Dispatcher") != std::string::npos;
189-
});
190-
BOOST_REQUIRE(disp != workflow.end());
191-
192-
auto input = std::find_if(disp->inputs.begin(), disp->inputs.end(),
193-
[](const InputSpec& in) {
194-
return DataSpecUtils::match(in, ConcreteDataMatcher{DataOrigin("TPC"), DataDescription("RAWDATA"), 0}) && in.lifetime == Lifetime::Timeframe;
195-
});
196-
BOOST_CHECK(input != disp->inputs.end());
197-
198-
auto channelConfig = std::find_if(disp->options.begin(), disp->options.end(),
199-
[](const ConfigParamSpec& opt) {
200-
return opt.name == "channel-config";
201-
});
202-
BOOST_REQUIRE(channelConfig != disp->options.end());
203-
}
204-
205171
BOOST_AUTO_TEST_CASE(InputSpecsForPolicy)
206172
{
207173
std::string configFilePath = "json:/" + std::string(getenv("O2_ROOT")) + "/share/tests/test_DataSampling.json";

Utilities/DataSampling/test/test_DataSampling.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
"active": "true",
1313
"machines": [],
1414
"query": "clusters:TPC/RAWDATA",
15-
"samplingConditions": [],
16-
"fairMQOutput": "name=data,type=push,method=bind,address=tcp://127.0.0.1:26525,rateLogging=1"
15+
"samplingConditions": []
1716
}
1817
]
1918
}

0 commit comments

Comments
 (0)