Skip to content

Commit 52aedc7

Browse files
committed
Eliminate obsolete options of raw-file-reader
No need for options message-per-tf (must be always sent as a multipart of HBFs) and output-per-link (must be always sent per output_channel)
1 parent d31e3d8 commit 52aedc7

7 files changed

Lines changed: 40 additions & 74 deletions

File tree

Detectors/MUON/MCH/Workflow/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
## Example of DPL chain:
2020

21-
`o2-raw-file-reader-workflow --conf file-reader.cfg --loop 0 --message-per-tf -b | o2-mch-raw-to-digits-workflow -b | o2-mch-digits-to-preclusters-workflow -b | o2-mch-preclusters-sink-workflow -b`
21+
`o2-raw-file-reader-workflow --conf file-reader.cfg --loop 0 -b | o2-mch-raw-to-digits-workflow -b | o2-mch-digits-to-preclusters-workflow -b | o2-mch-preclusters-sink-workflow -b`
2222

2323
where the `file-reader.cfg` looks like this:
2424

Detectors/MUON/MID/Workflow/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Notice that the executable also generate a configuration file that is needed to
3333
### Reconstruction from raw data
3434
To reconstruct the raw data (either from converted MC digits or real data), run:
3535
```bash
36-
o2-raw-file-reader-workflow --conf mid_raw.cfg --message-per-tf | o2-mid-reco-workflow
36+
o2-raw-file-reader-workflow --conf mid_raw.cfg | o2-mid-reco-workflow
3737
```
3838

3939
## Timing
@@ -44,4 +44,4 @@ An example output is the following:
4444
Processing time / 90 ROFs: full: 3.55542 us tracking: 2.02182 us
4545
```
4646
Two timing values are provided: one is for the full execution of the device (including retrieval and sending of the DPL messages) and one which concerns only the execution of the algorithm (the tracking algorithm in the above example)
47-
The timing refers to the time needed to process one read-out-frame, i.e. one event.
47+
The timing refers to the time needed to process one read-out-frame, i.e. one event.

Detectors/Raw/README.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,8 @@ dataOrigin = ITS
230230
#optional, if missing then default is used
231231
dataDescription = RAWDATA
232232
filePath = path_and_name_of_the_data_file0
233+
# for CRU detectors the "readoutCard" record below is optional
234+
# readoutCard = CRU
233235

234236
[input-1]
235237
dataOrigin = TPC
@@ -238,6 +240,14 @@ filePath = path_and_name_of_the_data_file1
238240
[input-2]
239241
filePath = path_and_name_of_the_data_file2
240242

243+
[input-1-RORC]
244+
dataOrigin = EMC
245+
filePath = path_and_name_of_the_data_fileX
246+
# for RORC detectors the record below is obligatory
247+
readoutCard = RORC
248+
249+
250+
241251
#...
242252
# [input-XXX] blocks w/o filePath will be ignoder, XXX is irrelevant
243253

@@ -290,8 +300,6 @@ o2-raw-file-reader-workflow
290300
--loop arg (=1) loop N times (infinite for N<0)
291301
--min-tf arg (=0) min TF ID to process
292302
--max-tf arg (=4294967295) max TF ID to process
293-
--message-per-tf send TF of each link as a single FMQ message rather than multipart with message per HB
294-
--output-per-link send message per Link rather than per FMQ output route
295303
--delay arg (=0) delay in seconds between consecutive TFs sending
296304
--configKeyValues arg semicolon separated key=value strings
297305

@@ -315,10 +323,6 @@ If `--loop` argument is provided, data will be re-played in loop. The delay (in
315323
316324
At every invocation of the device `processing` callback a full TimeFrame for every link will be added as N-HBFs parts (one for each HBF in the TF) to the multipart
317325
relayed by the `FairMQ` channel.
318-
In case the `--message-per-tf` option is asked, the whole TF is sent as the only part of the `FairMQPart`.
319-
320-
Instead of sending a single output (for multiple links) per output route (which means their data will be received together) one can request sending an output per link
321-
by using option `--output-per-link`.
322326
323327
The standard use case of this workflow is to provide the input for other worfklows using the piping, e.g.
324328
```cpp

Detectors/Raw/include/DetectorsRaw/RawFileReader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ class RawFileReader
9292
//================================================================================
9393

9494
using RDHAny = header::RDHAny;
95-
using RDH = o2::header::RAWDataHeaderV4;
95+
using RDH = o2::header::RAWDataHeader;
9696
using OrigDescCard = std::tuple<o2::header::DataOrigin, o2::header::DataDescription, ReadoutCardType>;
9797
using InputsMap = std::map<OrigDescCard, std::vector<std::string>>;
9898

Detectors/Raw/src/RawFileReaderWorkflow.cxx

Lines changed: 24 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -42,25 +42,18 @@ using namespace o2::raw;
4242
namespace o2f = o2::framework;
4343
namespace o2h = o2::header;
4444

45-
class rawReaderSpecs : public o2f::Task
45+
class RawReaderSpecs : public o2f::Task
4646
{
4747
public:
48-
explicit rawReaderSpecs(const std::string& config, bool tfAsMessage = false, bool outPerRoute = true, int loop = 1, uint32_t delay_us = 0,
48+
explicit RawReaderSpecs(const std::string& config, int loop = 1, uint32_t delay_us = 0,
4949
uint32_t errmap = 0xffffffff, uint32_t minTF = 0, uint32_t maxTF = 0xffffffff, size_t buffSize = 1024L * 1024L)
50-
: mLoop(loop < 0 ? INT_MAX : (loop < 1 ? 1 : loop)), mHBFPerMessage(!tfAsMessage), mOutPerRoute(outPerRoute), mDelayUSec(delay_us), mMinTFID(minTF), mMaxTFID(maxTF), mReader(std::make_unique<o2::raw::RawFileReader>(config))
50+
: mLoop(loop < 0 ? INT_MAX : (loop < 1 ? 1 : loop)), mDelayUSec(delay_us), mMinTFID(minTF), mMaxTFID(maxTF), mReader(std::make_unique<o2::raw::RawFileReader>(config))
5151
{
5252
mReader->setCheckErrors(errmap);
5353
mReader->setMaxTFToRead(maxTF);
5454
mReader->setBufferSize(buffSize);
5555
LOG(INFO) << "Will preprocess files with buffer size of " << buffSize << " bytes";
5656
LOG(INFO) << "Number of loops over whole data requested: " << mLoop;
57-
if (mHBFPerMessage) {
58-
LOG(INFO) << "Every link TF will be sent as multipart of HBF messages";
59-
} else {
60-
LOG(INFO) << "HBF of single TF of each link will be sent as a single message";
61-
}
62-
LOG(INFO) << "A message per " << (mOutPerRoute ? "route" : "link") << " will be sent";
63-
LOG(INFO) << "Delay of " << mDelayUSec << " microseconds will be added between TFs";
6457
}
6558

6659
void init(o2f::InitContext& ic) final
@@ -103,10 +96,6 @@ class rawReaderSpecs : public o2f::Task
10396
int nlinks = mReader->getNLinks();
10497

10598
std::unordered_map<std::string, std::unique_ptr<FairMQParts>> messagesPerRoute;
106-
std::vector<std::unique_ptr<FairMQParts>> messagesPerLink;
107-
if (!mOutPerRoute) {
108-
messagesPerLink.resize(nlinks);
109-
}
11099

111100
if (tfID > mMaxTFID) {
112101
if (mReader->getNTimeFrames() && --mLoop) {
@@ -144,18 +133,18 @@ class rawReaderSpecs : public o2f::Task
144133
o2h::DataHeader hdrTmpl(link.description, link.origin, link.subspec); // template with 0 size
145134
int nhb = link.getNHBFinTF();
146135
hdrTmpl.payloadSerializationMethod = o2h::gSerializationMethodNone;
147-
hdrTmpl.splitPayloadParts = mHBFPerMessage ? nhb : 1;
136+
hdrTmpl.splitPayloadParts = nhb;
148137

149138
while (hdrTmpl.splitPayloadIndex < hdrTmpl.splitPayloadParts) {
150139

151-
tfSize += hdrTmpl.payloadSize = mHBFPerMessage ? link.getNextHBFSize() : link.getNextTFSize();
140+
tfSize += hdrTmpl.payloadSize = link.getNextHBFSize();
152141
o2::header::Stack headerStack{hdrTmpl, o2::framework::DataProcessingHeader{mTFIDaccum}};
153142

154143
auto hdMessage = device->NewMessage(headerStack.size());
155144
memcpy(hdMessage->GetData(), headerStack.data(), headerStack.size());
156145

157146
auto plMessage = device->NewMessage(hdrTmpl.payloadSize);
158-
auto bread = mHBFPerMessage ? link.readNextHBF(reinterpret_cast<char*>(plMessage->GetData())) : link.readNextTF(reinterpret_cast<char*>(plMessage->GetData()));
147+
auto bread = link.readNextHBF(reinterpret_cast<char*>(plMessage->GetData()));
159148
if (bread != hdrTmpl.payloadSize) {
160149
LOG(ERROR) << "Link " << il << " read " << bread << " bytes instead of " << hdrTmpl.payloadSize
161150
<< " expected in TF=" << mTFIDaccum << " part=" << hdrTmpl.splitPayloadIndex;
@@ -175,18 +164,10 @@ class rawReaderSpecs : public o2f::Task
175164
reinterpret_cast<o2::header::DataHeader*>(hdMessage->GetData())->firstTForbit = hdrTmpl.firstTForbit;
176165
}
177166
FairMQParts* parts = nullptr;
178-
if (mOutPerRoute) {
179-
parts = messagesPerRoute[link.fairMQChannel].get(); // FairMQParts*
180-
if (!parts) {
181-
messagesPerRoute[link.fairMQChannel] = std::make_unique<FairMQParts>();
182-
parts = messagesPerRoute[link.fairMQChannel].get();
183-
}
184-
} else { // message per link
185-
parts = messagesPerLink[il].get(); // FairMQParts*
186-
if (!parts) {
187-
messagesPerLink[il] = std::make_unique<FairMQParts>();
188-
parts = messagesPerLink[il].get();
189-
}
167+
parts = messagesPerRoute[link.fairMQChannel].get(); // FairMQParts*
168+
if (!parts) {
169+
messagesPerRoute[link.fairMQChannel] = std::make_unique<FairMQParts>();
170+
parts = messagesPerRoute[link.fairMQChannel].get();
190171
}
191172
parts->AddPart(std::move(hdMessage));
192173
parts->AddPart(std::move(plMessage));
@@ -201,24 +182,13 @@ class rawReaderSpecs : public o2f::Task
201182
usleep(mDelayUSec);
202183
}
203184

204-
if (mOutPerRoute) {
205-
for (auto& msgIt : messagesPerRoute) {
206-
LOG(INFO) << "Sending " << msgIt.second->Size() / 2 << " parts to channel " << msgIt.first;
207-
device->Send(*msgIt.second.get(), msgIt.first);
208-
}
209-
} else {
210-
for (int il = 0; il < nlinks; il++) {
211-
auto& link = mReader->getLink(il);
212-
auto* parts = messagesPerLink[il].get(); // FairMQParts*
213-
if (parts) {
214-
LOG(INFO) << "Sending " << parts->Size() / 2 << " parts to channel " << link.fairMQChannel << " for " << link.describe();
215-
device->Send(*parts, link.fairMQChannel);
216-
}
217-
}
185+
for (auto& msgIt : messagesPerRoute) {
186+
LOG(INFO) << "Sending " << msgIt.second->Size() / 2 << " parts to channel " << msgIt.first;
187+
device->Send(*msgIt.second.get(), msgIt.first);
218188
}
219189

220190
LOGF(INFO, "Sent payload of %zu bytes in %zu parts in %zu messages for TF %d", tfSize, tfNParts,
221-
(mOutPerRoute ? messagesPerRoute.size() : messagesPerLink.size()), mTFIDaccum);
191+
messagesPerRoute.size(), mTFIDaccum);
222192
sentSize += tfSize;
223193
sentMessages += tfNParts;
224194

@@ -240,38 +210,34 @@ class rawReaderSpecs : public o2f::Task
240210
uint32_t mDelayUSec = 0; // Delay in microseconds between TFs
241211
uint32_t mMinTFID = 0; // 1st TF to extract
242212
uint32_t mMaxTFID = 0xffffffff; // last TF to extrct
243-
bool mHBFPerMessage = true; // true: send TF as multipart of HBFs, false: single message per TF
244-
bool mOutPerRoute = true; // true: send 1 large output route, otherwise 1 outpur per link
245213
bool mDone = false; // processing is over or not
246214
std::unique_ptr<o2::raw::RawFileReader> mReader; // matching engine
247215
};
248216

249-
o2f::DataProcessorSpec getReaderSpec(std::string config, bool tfAsMessage, bool outPerRoute, int loop, uint32_t delay_us, uint32_t errmap,
217+
o2f::DataProcessorSpec getReaderSpec(std::string config, int loop, uint32_t delay_us, uint32_t errmap,
250218
uint32_t minTF, uint32_t maxTF, size_t buffSize)
251219
{
252220
// check which inputs are present in files to read
253-
o2f::Outputs outputs;
221+
o2f::DataProcessorSpec spec;
222+
spec.name = "raw-file-reader";
223+
254224
if (!config.empty()) {
255225
auto conf = o2::raw::RawFileReader::parseInput(config);
256226
for (const auto& entry : conf) {
257227
const auto& ordescard = entry.first;
258228
if (!entry.second.empty()) { // origin and decription for files to process
259-
outputs.emplace_back(o2f::OutputSpec(o2f::ConcreteDataTypeMatcher{std::get<0>(ordescard), std::get<1>(ordescard)}));
229+
spec.outputs.emplace_back(o2f::OutputSpec(o2f::ConcreteDataTypeMatcher{std::get<0>(ordescard), std::get<1>(ordescard)}));
260230
}
261231
}
262232
}
263-
return o2f::DataProcessorSpec{
264-
"raw-file-reader",
265-
o2f::Inputs{},
266-
outputs,
267-
o2f::AlgorithmSpec{o2f::adaptFromTask<rawReaderSpecs>(config, tfAsMessage, outPerRoute, loop, delay_us, errmap, minTF, maxTF, buffSize)},
268-
o2f::Options{}};
233+
spec.algorithm = o2f::adaptFromTask<RawReaderSpecs>(config, loop, delay_us, errmap, minTF, maxTF, buffSize);
234+
235+
return spec;
269236
}
270237

271-
o2f::WorkflowSpec o2::raw::getRawFileReaderWorkflow(std::string inifile, bool tfAsMessage, bool outPerRoute,
272-
int loop, uint32_t delay_us, uint32_t errmap, uint32_t minTF, uint32_t maxTF, size_t buffSize)
238+
o2f::WorkflowSpec o2::raw::getRawFileReaderWorkflow(std::string inifile, int loop, uint32_t delay_us, uint32_t errmap, uint32_t minTF, uint32_t maxTF, size_t buffSize)
273239
{
274240
o2f::WorkflowSpec specs;
275-
specs.emplace_back(getReaderSpec(inifile, tfAsMessage, outPerRoute, loop, delay_us, errmap, minTF, maxTF, buffSize));
241+
specs.emplace_back(getReaderSpec(inifile, loop, delay_us, errmap, minTF, maxTF, buffSize));
276242
return specs;
277243
}

Detectors/Raw/src/RawFileReaderWorkflow.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ namespace o2
2020
namespace raw
2121
{
2222

23-
framework::WorkflowSpec getRawFileReaderWorkflow(std::string inifile, bool tfAsMessage = false, bool outPerRoute = true,
23+
framework::WorkflowSpec getRawFileReaderWorkflow(std::string inifile,
2424
int loop = 1, uint32_t delay_us = 0, uint32_t errMap = 0xffffffff,
2525
uint32_t minTF = 0, uint32_t maxTF = 0xffffffff, size_t bufferSize = 1024L * 1024L);
2626

Detectors/Raw/src/rawfile-reader-workflow.cxx

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
2727
options.push_back(ConfigParamSpec{"min-tf", VariantType::Int64, 0L, {"min TF ID to process"}});
2828
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int64, 0xffffffffL, {"max TF ID to process"}});
2929
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 1, {"loop N times (infinite for N<0)"}});
30-
options.push_back(ConfigParamSpec{"message-per-tf", VariantType::Bool, false, {"send TF of each link as a single FMQ message rather than multipart with message per HB"}});
31-
options.push_back(ConfigParamSpec{"output-per-link", VariantType::Bool, false, {"send message per Link rather than per FMQ output route"}});
3230
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
3331
options.push_back(ConfigParamSpec{"buffer-size", VariantType::Int64, 1024L * 1024L, {"buffer size for files preprocessing"}});
3432
options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});
@@ -52,8 +50,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
5250
uint32_t maxTF = uint32_t(configcontext.options().get<int64_t>("max-tf"));
5351
uint32_t minTF = uint32_t(configcontext.options().get<int64_t>("min-tf"));
5452
uint64_t buffSize = uint64_t(configcontext.options().get<int64_t>("buffer-size"));
55-
auto tfAsMessage = configcontext.options().get<bool>("message-per-tf");
56-
auto outPerRoute = !configcontext.options().get<bool>("output-per-link");
5753

5854
uint32_t errmap = 0;
5955
for (int i = RawFileReader::NErrorsDefined; i--;) {
@@ -66,5 +62,5 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
6662
o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
6763
uint32_t delay_us = uint32_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
6864

69-
return std::move(o2::raw::getRawFileReaderWorkflow(inifile, tfAsMessage, outPerRoute, loop, delay_us, errmap, minTF, maxTF, buffSize));
65+
return std::move(o2::raw::getRawFileReaderWorkflow(inifile, loop, delay_us, errmap, minTF, maxTF, buffSize));
7066
}

0 commit comments

Comments
 (0)