|
14 | 14 | /// \author Piotr Konopka, piotr.jan.konopka@cern.ch |
15 | 15 |
|
16 | 16 | #include "DataSampling/Dispatcher.h" |
17 | | -#include "Framework/RawDeviceService.h" |
18 | 17 | #include "DataSampling/DataSamplingPolicy.h" |
19 | 18 | #include "DataSampling/DataSamplingHeader.h" |
20 | 19 | #include "Framework/DataProcessingHeader.h" |
21 | 20 | #include "Framework/DataSpecUtils.h" |
22 | 21 | #include "Framework/Logger.h" |
23 | 22 | #include "Framework/ConfigParamRegistry.h" |
24 | 23 | #include "Framework/InputRecordWalker.h" |
25 | | - |
26 | 24 | #include "Framework/Monitoring.h" |
| 25 | + |
27 | 26 | #include <Configuration/ConfigurationInterface.h> |
28 | 27 | #include <Configuration/ConfigurationFactory.h> |
29 | | -#include <fairmq/FairMQDevice.h> |
30 | 28 |
|
31 | 29 | using namespace o2::configuration; |
32 | 30 | using namespace o2::monitoring; |
@@ -92,14 +90,9 @@ void Dispatcher::run(ProcessingContext& ctx) |
92 | 90 | std::move(extractAdditionalHeaders(input.header)), |
93 | 91 | std::move(prepareDataSamplingHeader(*policy.get(), ctx.services().get<const DeviceSpec>()))}; |
94 | 92 |
|
95 | | - if (!policy->getFairMQOutputChannel().empty()) { |
96 | | - sendFairMQ(ctx.services().get<RawDeviceService>().device(), input, policy->getFairMQOutputChannelName(), |
97 | | - std::move(headerStack)); |
98 | | - } else { |
99 | | - Output output = policy->prepareOutput(inputMatcher, input.spec->lifetime); |
100 | | - output.metaHeader = std::move(header::Stack{std::move(output.metaHeader), std::move(headerStack)}); |
101 | | - send(ctx.outputs(), input, std::move(output)); |
102 | | - } |
| 93 | + Output output = policy->prepareOutput(inputMatcher, input.spec->lifetime); |
| 94 | + output.metaHeader = std::move(header::Stack{std::move(output.metaHeader), std::move(headerStack)}); |
| 95 | + send(ctx.outputs(), input, std::move(output)); |
103 | 96 | } |
104 | 97 | } |
105 | 98 | } |
@@ -158,35 +151,6 @@ void Dispatcher::send(DataAllocator& dataAllocator, const DataRef& inputData, Ou |
158 | 151 | dataAllocator.snapshot(output, inputData.payload, inputHeader->payloadSize, inputHeader->payloadSerializationMethod); |
159 | 152 | } |
160 | 153 |
|
161 | | -// ideally this should be in a separate proxy device or use Lifetime::External |
162 | | -void Dispatcher::sendFairMQ(FairMQDevice* device, const DataRef& inputData, const std::string& fairMQChannel, |
163 | | - header::Stack&& stack) const |
164 | | -{ |
165 | | - const auto* dh = header::get<header::DataHeader*>(inputData.header); |
166 | | - assert(dh); |
167 | | - const auto* dph = header::get<DataProcessingHeader*>(inputData.header); |
168 | | - assert(dph); |
169 | | - |
170 | | - header::DataHeader dhout{dh->dataDescription, dh->dataOrigin, dh->subSpecification, dh->payloadSize}; |
171 | | - dhout.payloadSerializationMethod = dh->payloadSerializationMethod; |
172 | | - DataProcessingHeader dphout{dph->startTime, dph->duration}; |
173 | | - o2::header::Stack headerStack{dhout, dphout, stack}; |
174 | | - |
175 | | - auto channelAlloc = o2::pmr::getTransportAllocator(device->Transport()); |
176 | | - FairMQMessagePtr msgHeaderStack = o2::pmr::getMessage(std::move(headerStack), channelAlloc); |
177 | | - |
178 | | - char* payloadCopy = new char[dh->payloadSize]; |
179 | | - memcpy(payloadCopy, inputData.payload, dh->payloadSize); |
180 | | - auto cleanupFcn = [](void* data, void*) { delete[] reinterpret_cast<char*>(data); }; |
181 | | - FairMQMessagePtr msgPayload(device->NewMessage(payloadCopy, dh->payloadSize, cleanupFcn, payloadCopy)); |
182 | | - |
183 | | - FairMQParts message; |
184 | | - message.AddPart(move(msgHeaderStack)); |
185 | | - message.AddPart(move(msgPayload)); |
186 | | - |
187 | | - int64_t bytesSent = device->Send(message, fairMQChannel); |
188 | | -} |
189 | | - |
190 | 154 | void Dispatcher::registerPolicy(std::unique_ptr<DataSamplingPolicy>&& policy) |
191 | 155 | { |
192 | 156 | mPolicies.emplace_back(std::move(policy)); |
@@ -245,20 +209,9 @@ Outputs Dispatcher::getOutputSpecs() |
245 | 209 | } |
246 | 210 | framework::Options Dispatcher::getOptions() |
247 | 211 | { |
248 | | - o2::framework::Options options; |
249 | | - for (const auto& policy : mPolicies) { |
250 | | - if (!policy->getFairMQOutputChannel().empty()) { |
251 | | - if (!options.empty()) { |
252 | | - throw std::runtime_error("Maximum one policy with raw FairMQ channel is allowed, more have been declared."); |
253 | | - } |
254 | | - options.push_back({"channel-config", VariantType::String, policy->getFairMQOutputChannel().c_str(), {"Out-of-band channel config"}}); |
255 | | - LOG(DEBUG) << " - registering output FairMQ channel '" << policy->getFairMQOutputChannel() << "'"; |
256 | | - } |
257 | | - } |
258 | | - options.push_back({"period-timer-stats", framework::VariantType::Int, 10 * 1000000, {"Dispatcher's stats timer period"}}); |
259 | | - |
260 | | - return options; |
| 212 | + return {{"period-timer-stats", framework::VariantType::Int, 10 * 1000000, {"Dispatcher's stats timer period"}}}; |
261 | 213 | } |
| 214 | + |
262 | 215 | size_t Dispatcher::numberOfPolicies() |
263 | 216 | { |
264 | 217 | return mPolicies.size(); |
|
0 commit comments