From ada282f9d095cad88150fcc47080f53b239437a6 Mon Sep 17 00:00:00 2001 From: Giulio Eulisse Date: Thu, 11 May 2017 14:11:53 +0200 Subject: [PATCH] TimeframeReader and TimeframeWriter devices Initial attempt at TimeframeReader and TimeframeWriter devices. This includes: - Initial plumbing for the devices themselves. - TimeframeParser helper class can be used to read data from an std:istream and create FairMQParts from it. - The test_TimeframeParser example generates a dummy timeframe and pumps it through the TimeframeParser. - FakeTimeframeGeneratorDevice, which can be used to generate timeframes programmatically. - TimeframeValidationTool which can be used to verify the contents of a timeframe file. --- .gitignore | 4 + .../include/Headers/SubframeMetadata.h | 2 + .../TimeFrame/include/TimeFrame/TimeFrame.h | 4 + Utilities/DataFlow/CMakeLists.txt | 32 +++ .../DataFlow/doc/TimeframeReaderDevice.1.in | 29 +++ .../DataFlow/doc/TimeframeWriterDevice.1.in | 29 +++ .../include/DataFlow/FakeTimeframeBuilder.h | 23 ++ .../DataFlow/FakeTimeframeGeneratorDevice.h | 36 ++++ .../include/DataFlow/TimeframeParser.h | 22 ++ .../include/DataFlow/TimeframeReaderDevice.h | 38 ++++ .../include/DataFlow/TimeframeWriterDevice.h | 47 +++++ Utilities/DataFlow/run/confFakeTimeframe.json | 72 +++++++ .../DataFlow/run/startTimeframeExample.sh | 4 + Utilities/DataFlow/src/EPNReceiverDevice.cxx | 3 +- .../DataFlow/src/FakeTimeframeBuilder.cxx | 97 +++++++++ .../src/FakeTimeframeGeneratorDevice.cxx | 85 ++++++++ Utilities/DataFlow/src/TimeframeParser.cxx | 196 ++++++++++++++++++ .../DataFlow/src/TimeframeReaderDevice.cxx | 56 +++++ .../DataFlow/src/TimeframeValidationTool.cxx | 52 +++++ .../DataFlow/src/TimeframeValidatorDevice.cxx | 45 ++-- .../DataFlow/src/TimeframeWriterDevice.cxx | 87 ++++++++ .../src/runFakeTimeframeGeneratorDevice.cxx | 22 ++ .../DataFlow/src/runTimeframeReaderDevice.cxx | 21 ++ .../DataFlow/src/runTimeframeWriterDevice.cxx | 33 +++ .../DataFlow/test/test_TimeframeParser.cxx | 53 +++++ .../Publishers/src/DataPublisherDevice.cxx | 22 +- cmake/O2Dependencies.cmake | 1 + doc/o2-timeframe-file-format.1.in | 27 +++ 28 files changed, 1119 insertions(+), 23 deletions(-) create mode 100644 Utilities/DataFlow/doc/TimeframeReaderDevice.1.in create mode 100644 Utilities/DataFlow/doc/TimeframeWriterDevice.1.in create mode 100644 Utilities/DataFlow/include/DataFlow/FakeTimeframeBuilder.h create mode 100644 Utilities/DataFlow/include/DataFlow/FakeTimeframeGeneratorDevice.h create mode 100644 Utilities/DataFlow/include/DataFlow/TimeframeParser.h create mode 100644 Utilities/DataFlow/include/DataFlow/TimeframeReaderDevice.h create mode 100644 Utilities/DataFlow/include/DataFlow/TimeframeWriterDevice.h create mode 100644 Utilities/DataFlow/run/confFakeTimeframe.json create mode 100755 Utilities/DataFlow/run/startTimeframeExample.sh create mode 100644 Utilities/DataFlow/src/FakeTimeframeBuilder.cxx create mode 100644 Utilities/DataFlow/src/FakeTimeframeGeneratorDevice.cxx create mode 100644 Utilities/DataFlow/src/TimeframeParser.cxx create mode 100644 Utilities/DataFlow/src/TimeframeReaderDevice.cxx create mode 100644 Utilities/DataFlow/src/TimeframeValidationTool.cxx create mode 100644 Utilities/DataFlow/src/TimeframeWriterDevice.cxx create mode 100644 Utilities/DataFlow/src/runFakeTimeframeGeneratorDevice.cxx create mode 100644 Utilities/DataFlow/src/runTimeframeReaderDevice.cxx create mode 100644 Utilities/DataFlow/src/runTimeframeWriterDevice.cxx create mode 100644 Utilities/DataFlow/test/test_TimeframeParser.cxx create mode 100644 doc/o2-timeframe-file-format.1.in diff --git a/.gitignore b/.gitignore index 3b74920d9904a..a300d23da1d35 100644 --- a/.gitignore +++ b/.gitignore @@ -58,3 +58,7 @@ compile_commands.json .idea .settings .ycm_extra_conf.py + +# Datafiles +*.root +*.o2tf diff --git a/DataFormats/Headers/include/Headers/SubframeMetadata.h b/DataFormats/Headers/include/Headers/SubframeMetadata.h index b9ba834b76c3f..d1f638495eb1f 100644 --- a/DataFormats/Headers/include/Headers/SubframeMetadata.h +++ b/DataFormats/Headers/include/Headers/SubframeMetadata.h @@ -1,6 +1,8 @@ #ifndef SUBFRAMEMETADATA_H #define SUBFRAMEMETADATA_H +#include + namespace o2 { namespace DataFlow { diff --git a/DataFormats/TimeFrame/include/TimeFrame/TimeFrame.h b/DataFormats/TimeFrame/include/TimeFrame/TimeFrame.h index 29333723d1b83..fc6697a2dc021 100644 --- a/DataFormats/TimeFrame/include/TimeFrame/TimeFrame.h +++ b/DataFormats/TimeFrame/include/TimeFrame/TimeFrame.h @@ -9,6 +9,10 @@ namespace o2 { namespace DataFormat { + +using PartPosition = int; +typedef std::pair IndexElement; + // helper struct so that we can // stream messages using ROOT struct MessageSizePair { diff --git a/Utilities/DataFlow/CMakeLists.txt b/Utilities/DataFlow/CMakeLists.txt index 3581234d592a2..0f891fcd1bd39 100644 --- a/Utilities/DataFlow/CMakeLists.txt +++ b/Utilities/DataFlow/CMakeLists.txt @@ -18,9 +18,14 @@ set(MODULE_BUCKET_NAME O2DeviceApplication_bucket) O2_SETUP(NAME ${MODULE_NAME}) set(SRCS + src/FakeTimeframeBuilder.cxx + src/FakeTimeframeGeneratorDevice.cxx src/HeartbeatSampler.cxx src/SubframeBuilderDevice.cxx + src/TimeframeParser.cxx + src/TimeframeReaderDevice.cxx src/TimeframeValidatorDevice.cxx + src/TimeframeWriterDevice.cxx src/EPNReceiverDevice.cxx src/FLPSenderDevice.cxx ) @@ -36,17 +41,23 @@ set(LIBRARY_NAME ${MODULE_NAME}) set(BUCKET_NAME ${MODULE_BUCKET_NAME}) Set(Exe_Names + FakeTimeframeGeneratorDevice heartbeatSampler SubframeBuilderDevice + TimeframeReaderDevice TimeframeValidatorDevice + TimeframeWriterDevice EPNReceiverDevice FLPSenderDevice ) set(Exe_Source + src/runFakeTimeframeGeneratorDevice.cxx src/runHeartbeatSampler.cxx src/runSubframeBuilderDevice.cxx + src/runTimeframeReaderDevice.cxx src/runTimeframeValidatorDevice.cxx + src/runTimeframeWriterDevice.cxx src/runEPNReceiver.cxx src/runFLPSender.cxx ) @@ -64,3 +75,24 @@ ForEach (_file RANGE 0 ${_length}) BUCKET_NAME ${MODULE_BUCKET_NAME} ) EndForEach (_file RANGE 0 ${_length}) + +O2_GENERATE_EXECUTABLE( + EXE_NAME TimeframeValidationTool + SOURCES src/TimeframeValidationTool + MODULE_LIBRARY_NAME ${LIBRARY_NAME} + BUCKET_NAME ${MODULE_BUCKET_NAME} +) + +set(TEST_SRCS + test/test_TimeframeParser.cxx +) + +O2_GENERATE_TESTS( + MODULE_LIBRARY_NAME ${LIBRARY_NAME} + BUCKET_NAME ${BUCKET_NAME} + TEST_SRCS ${TEST_SRCS}) + +O2_GENERATE_MAN(NAME TimeframeReaderDevice) +O2_GENERATE_MAN(NAME TimeframeWriterDevice) + +target_compile_options(test_TimeframeParser PUBLIC -O0 -g) diff --git a/Utilities/DataFlow/doc/TimeframeReaderDevice.1.in b/Utilities/DataFlow/doc/TimeframeReaderDevice.1.in new file mode 100644 index 0000000000000..6bf91ce0a51b4 --- /dev/null +++ b/Utilities/DataFlow/doc/TimeframeReaderDevice.1.in @@ -0,0 +1,29 @@ +.\" Manpage for TimeframeReaderDevice. +.TH man 1 "12 May 2017" "1.0" "TimeframeReaderDevice man page" + +.SH NAME + +TimeframeReaderDevice - read a timeframe from disk + +.SH SYNOPSIS + +TimeframeReaderDevice --input-file [FILE] + +.SH DESCRIPTION + +TimeframeReaderDevice will read a Timeframe from the FILE on disk and streams it +via FairMQ + +.SH OPTIONS + +.TP 5 + +--input-file [FILE] the file to be streamed + +.SH SEE ALSO + +TimeframeWriterDevice(1) + +.SH BUGS + +Lots of bugs diff --git a/Utilities/DataFlow/doc/TimeframeWriterDevice.1.in b/Utilities/DataFlow/doc/TimeframeWriterDevice.1.in new file mode 100644 index 0000000000000..6539b37d7f5a0 --- /dev/null +++ b/Utilities/DataFlow/doc/TimeframeWriterDevice.1.in @@ -0,0 +1,29 @@ +.\" Manpage for TimeframeWriterDevice. +.TH man 1 "12 May 2017" "1.0" "TimeframeWriterDevice man page" + +.SH NAME + +TimeframeWriterDevice - writes a timeframe to disk + +.SH SYNOPSIS + +TimeframeWriterDevice --input-file [FILE] + +.SH DESCRIPTION + +TimeframeWriterDevice will receive a Timeframe from FairMQ transport and stream +it via FairMQ. + +.SH OPTIONS + +.TP 5 + +--output-file [FILE] the file where to stream results + +.SH SEE ALSO + +TimeframeReaderDevice(1) + +.SH BUGS + +Lots of bugs diff --git a/Utilities/DataFlow/include/DataFlow/FakeTimeframeBuilder.h b/Utilities/DataFlow/include/DataFlow/FakeTimeframeBuilder.h new file mode 100644 index 0000000000000..85d8b009c0482 --- /dev/null +++ b/Utilities/DataFlow/include/DataFlow/FakeTimeframeBuilder.h @@ -0,0 +1,23 @@ +#ifndef DATAFLOW_FAKETIMEFRAMEBUILDER_H_ +#define DATAFLOW_FAKETIMEFRAMEBUILDER_H_ + +#include "Headers/DataHeader.h" +#include +#include +#include + +namespace o2 { namespace DataFlow { + +struct FakeTimeframeSpec { + const char *origin; + const char *dataDescription; + std::function bufferFiller; + size_t bufferSize; +}; + +/** Generate a timeframe from the provided specification + */ +std::unique_ptr fakeTimeframeGenerator(std::vector &specs, std::size_t &totalSize); + +}} +#endif /* DATAFLOW_FAKETIMEFRAMEBUILDER_H_ */ diff --git a/Utilities/DataFlow/include/DataFlow/FakeTimeframeGeneratorDevice.h b/Utilities/DataFlow/include/DataFlow/FakeTimeframeGeneratorDevice.h new file mode 100644 index 0000000000000..f0aedd3b04fe7 --- /dev/null +++ b/Utilities/DataFlow/include/DataFlow/FakeTimeframeGeneratorDevice.h @@ -0,0 +1,36 @@ +#ifndef ALICEO2_FAKE_TIMEFRAME_GENERATOR_H_ +#define ALICEO2_FAKE_TIMEFRAME_GENERATOR_H_ + +#include "O2Device/O2Device.h" + +namespace o2 { +namespace DataFlow { + +/// A device which writes to file the timeframes. +class FakeTimeframeGeneratorDevice : public Base::O2Device +{ +public: + static constexpr const char* OptionKeyOutputChannelName = "output-channel-name"; + static constexpr const char* OptionKeyMaxTimeframes = "max-timeframes"; + + /// Default constructor + FakeTimeframeGeneratorDevice(); + + /// Default destructor + ~FakeTimeframeGeneratorDevice() override = default; + + void InitTask() final; + + protected: + /// Overloads the ConditionalRun() method of FairMQDevice + bool ConditionalRun() final; + + std::string mOutChannelName; + size_t mMaxTimeframes; + size_t mTimeframeCount; +}; + +} // namespace DataFlow +} // namespace o2 + +#endif diff --git a/Utilities/DataFlow/include/DataFlow/TimeframeParser.h b/Utilities/DataFlow/include/DataFlow/TimeframeParser.h new file mode 100644 index 0000000000000..d83cf9426459a --- /dev/null +++ b/Utilities/DataFlow/include/DataFlow/TimeframeParser.h @@ -0,0 +1,22 @@ +#ifndef TIMEFRAME_PARSER_H_ +#define TIMEFRAME_PARSER_H_ + +#include +#include + +class FairMQParts; + +namespace o2 { namespace DataFlow { + +/// An helper function which takes a std::istream pointing +/// to a naively persisted timeframe and pumps its parts to +/// FairMQParts, ready to be shipped via FairMQ. +void streamTimeframe(std::istream &stream, + std::function onAddPart, + std::function onSend); + +void streamTimeframe(std::ostream &stream, FairMQParts &parts); + +} } // end + +#endif // TIMEFRAME_PARSER_H diff --git a/Utilities/DataFlow/include/DataFlow/TimeframeReaderDevice.h b/Utilities/DataFlow/include/DataFlow/TimeframeReaderDevice.h new file mode 100644 index 0000000000000..c7ab6e87d8872 --- /dev/null +++ b/Utilities/DataFlow/include/DataFlow/TimeframeReaderDevice.h @@ -0,0 +1,38 @@ +#ifndef ALICEO2_TIMEFRAME_READER_H_ +#define ALICEO2_TIMEFRAME_READER_H_ + +#include "O2Device/O2Device.h" +#include + +namespace o2 { +namespace DataFlow { + +/// A device which writes to file the timeframes. +class TimeframeReaderDevice : public Base::O2Device +{ +public: + static constexpr const char* OptionKeyOutputChannelName = "output-channel-name"; + static constexpr const char* OptionKeyInputFileName = "input-file"; + + /// Default constructor + TimeframeReaderDevice(); + + /// Default destructor + ~TimeframeReaderDevice() override = default; + + void InitTask() final; + + protected: + /// Overloads the ConditionalRun() method of FairMQDevice + bool ConditionalRun() final; + + std::string mOutChannelName; + std::string mInFileName; + std::fstream mFile; + std::vector mSeen; +}; + +} // namespace DataFlow +} // namespace o2 + +#endif diff --git a/Utilities/DataFlow/include/DataFlow/TimeframeWriterDevice.h b/Utilities/DataFlow/include/DataFlow/TimeframeWriterDevice.h new file mode 100644 index 0000000000000..3e2dfbd4e9b9d --- /dev/null +++ b/Utilities/DataFlow/include/DataFlow/TimeframeWriterDevice.h @@ -0,0 +1,47 @@ +#ifndef ALICEO2_TIMEFRAME_WRITER_DEVICE_H_ +#define ALICEO2_TIMEFRAME_WRITER_DEVICE_H_ + +#include "O2Device/O2Device.h" +#include + +namespace o2 { +namespace DataFlow { + +/// A device which writes to file the timeframes. +class TimeframeWriterDevice : public Base::O2Device +{ +public: + static constexpr const char* OptionKeyInputChannelName = "input-channel-name"; + static constexpr const char* OptionKeyOutputFileName = "output-file"; + static constexpr const char* OptionKeyMaxTimeframesPerFile = "max-timeframes-per-file"; + static constexpr const char* OptionKeyMaxFileSize = "max-file-size"; + static constexpr const char* OptionKeyMaxFiles = "max-files"; + + /// Default constructor + TimeframeWriterDevice(); + + /// Default destructor + ~TimeframeWriterDevice() override = default; + + void InitTask() final; + + /// The PostRun will trigger saving the file to disk + void PostRun() final; + + protected: + /// Overloads the Run() method of FairMQDevice + void Run() final; + + std::string mInChannelName; + std::string mOutFileName; + std::fstream mFile; + size_t mMaxTimeframes; + size_t mMaxFileSize; + size_t mMaxFiles; + size_t mFileCount; +}; + +} // namespace DataFlow +} // namespace o2 + +#endif diff --git a/Utilities/DataFlow/run/confFakeTimeframe.json b/Utilities/DataFlow/run/confFakeTimeframe.json new file mode 100644 index 0000000000000..ae1f686087f25 --- /dev/null +++ b/Utilities/DataFlow/run/confFakeTimeframe.json @@ -0,0 +1,72 @@ +{ + "fairMQOptions": + { + "devices": + [ + { + "id": "FakeTimeframeGeneratorDevice", + "channels": + [ + { + "name": "output", + "type": "pub", + "method": "bind", + "sockets": + [ + { "address": "tcp://*:5550" } + ], + "sndBufSize": "10" + } + ] + }, + { + "id": "TimeframeWriterDevice", + "channels": + [ + { + "name": "input", + "type": "sub", + "method": "connect", + "sockets": + [ + { "address": "tcp://127.0.0.1:5550"} + ], + "sndBufSize": "10" + } + ] + }, + { + "id": "TimeframeReaderDevice", + "channels": + [ + { + "name": "output", + "type": "pub", + "method": "bind", + "sockets": + [ + { "address": "tcp://127.0.0.1:5551"} + ], + "sndBufSize": "10" + } + ] + }, + { + "id": "TimeframeValidatorDevice", + "channels": + [ + { + "name": "input", + "type": "sub", + "method": "connect", + "sockets": + [ + { "address": "tcp://127.0.0.1:5551"} + ], + "sndBufSize": "10" + } + ] + } + ] + } +} diff --git a/Utilities/DataFlow/run/startTimeframeExample.sh b/Utilities/DataFlow/run/startTimeframeExample.sh new file mode 100755 index 0000000000000..dfd84f6cceca5 --- /dev/null +++ b/Utilities/DataFlow/run/startTimeframeExample.sh @@ -0,0 +1,4 @@ +xterm -geometry 80x25+0+0 -hold -e FakeTimeframeGeneratorDevice --id FakeTimeframeGeneratorDevice --mq-config confFakeTimeframe.json --output-channel-name output & +xterm -geometry 80x25+0+0 -hold -e TimeframeWriterDevice --id TimeframeWriterDevice --mq-config confFakeTimeframe.json --input-channel-name input --max-timeframes 1 --output-file data.o2tf & +#xterm -geometry 80x25+0+0 -hold -e TimeframeReaderDevice --id TimeframeReaderDevice --mq-config confFakeTimeframe.json --input-file data.o2tf --output-channel-name output & +#xterm -geometry 80x25+0+0 -hold -e TimeframeValidatorDevice --id TimeframeValidatorDevice --mq-config confFakeTimeframe.json --input-channel-name input & diff --git a/Utilities/DataFlow/src/EPNReceiverDevice.cxx b/Utilities/DataFlow/src/EPNReceiverDevice.cxx index e796e1b725347..daca76d1506c9 100644 --- a/Utilities/DataFlow/src/EPNReceiverDevice.cxx +++ b/Utilities/DataFlow/src/EPNReceiverDevice.cxx @@ -8,6 +8,7 @@ #include "DataFlow/EPNReceiverDevice.h" #include "Headers/DataHeader.h" #include "Headers/SubframeMetadata.h" +#include "TimeFrame/TimeFrame.h" using namespace std; using namespace std::chrono; @@ -15,6 +16,7 @@ using namespace o2::Devices; using SubframeMetadata = o2::DataFlow::SubframeMetadata; using TPCTestPayload = o2::DataFlow::TPCTestPayload; using TPCTestCluster = o2::DataFlow::TPCTestCluster; +using IndexElement = o2::DataFormat::IndexElement; void EPNReceiverDevice::InitTask() { @@ -74,7 +76,6 @@ void EPNReceiverDevice::Run() using PartPosition = int; using TimeframeId = int; using FlpId = int; - typedef std::pair IndexElement; std::multimap index; std::multimap flpIds; diff --git a/Utilities/DataFlow/src/FakeTimeframeBuilder.cxx b/Utilities/DataFlow/src/FakeTimeframeBuilder.cxx new file mode 100644 index 0000000000000..23d3197ec2edf --- /dev/null +++ b/Utilities/DataFlow/src/FakeTimeframeBuilder.cxx @@ -0,0 +1,97 @@ +#include "DataFlow/FakeTimeframeBuilder.h" +#include "TimeFrame/TimeFrame.h" +#include "Headers/DataHeader.h" +#include +#include +#include +#include + +using DataHeader = o2::Header::DataHeader; +using DataDescription = o2::Header::DataDescription; +using DataOrigin = o2::Header::DataOrigin; +using IndexElement = o2::DataFormat::IndexElement; + +namespace { + o2::Header::DataDescription lookupDataDescription(const char *key) { + if (strcmp(key, "RAWDATA") == 0) + return o2::Header::gDataDescriptionRawData; + else if (strcmp(key, "CLUSTERS") == 0) + return o2::Header::gDataDescriptionClusters; + else if (strcmp(key, "TRACKS") == 0) + return o2::Header::gDataDescriptionTracks; + else if (strcmp(key, "CONFIG") == 0) + return o2::Header::gDataDescriptionConfig; + else if (strcmp(key, "INFO") == 0) + return o2::Header::gDataDescriptionInfo; + return o2::Header::gDataDescriptionInvalid; + } + + o2::Header::DataOrigin lookupDataOrigin(const char *key) { + if (strcmp(key, "TPC") == 0) + return o2::Header::gDataOriginTPC; + if (strcmp(key, "TRD") == 0) + return o2::Header::gDataOriginTRD; + if (strcmp(key, "TOF") == 0) + return o2::Header::gDataOriginTOF; + if (strcmp(key, "ITS") == 0) + return o2::Header::gDataOriginITS; + return o2::Header::gDataOriginInvalid; + } + + +} + +namespace o2 { namespace DataFlow { + +std::unique_ptr fakeTimeframeGenerator(std::vector &specs, std::size_t &totalSize) { + // Calculate the total size of your timeframe. This is + // given by: + // - N*The size of the data header (this should actually depend on the + // kind of data as different dataDescriptions will probably have + // different headers). + // - Sum_N(The size of the buffer_i) + // - The size of the index header + // - N*sizeof(dataheader) + // Assuming all the data header + size_t sizeOfHeaders = specs.size()*sizeof(DataHeader); + size_t sizeOfBuffers = 0; + for (auto && spec : specs) { + sizeOfBuffers += spec.bufferSize; + } + size_t sizeOfIndexHeader = sizeof(DataHeader); + size_t sizeOfIndex = sizeof(IndexElement)*specs.size(); + totalSize = sizeOfHeaders + sizeOfBuffers + sizeOfIndexHeader + sizeOfIndex; + + // Add the actual - data + auto buffer = std::make_unique(totalSize); + char *bi = buffer.get(); + std::vector headers; + int count = 0; + for (auto &&spec : specs) { + IndexElement el; + el.first.dataDescription = lookupDataDescription(spec.dataDescription); + el.first.dataOrigin = lookupDataOrigin(spec.origin); + el.first.payloadSize = spec.bufferSize; + el.first.headerSize = sizeof(el.first); + el.second = count++; + // Let's zero at least the header... + memset(bi, 0, sizeof(el.first)); + memcpy(bi, &el, sizeof(el.first)); + headers.push_back(el); + bi += sizeof(el.first); + spec.bufferFiller(bi, spec.bufferSize); + bi += spec.bufferSize; + } + + // Add the index + DataHeader index; + index.dataDescription = DataDescription("TIMEFRAMEINDEX"); + index.dataOrigin = DataOrigin("FKE"); + index.headerSize = sizeOfIndexHeader; + index.payloadSize = sizeOfIndex; + memcpy(bi, &index, sizeof(index)); + memcpy(bi+sizeof(index), headers.data(), headers.size() * sizeof(IndexElement)); + return std::move(buffer); +} + +}} // o2::Headers diff --git a/Utilities/DataFlow/src/FakeTimeframeGeneratorDevice.cxx b/Utilities/DataFlow/src/FakeTimeframeGeneratorDevice.cxx new file mode 100644 index 0000000000000..346acff646545 --- /dev/null +++ b/Utilities/DataFlow/src/FakeTimeframeGeneratorDevice.cxx @@ -0,0 +1,85 @@ +#include + +#include "DataFlow/FakeTimeframeGeneratorDevice.h" +#include "DataFlow/FakeTimeframeBuilder.h" +#include "DataFlow/TimeframeParser.h" +#include "Headers/SubframeMetadata.h" +#include "Headers/DataHeader.h" +#include +#include + +using DataHeader = o2::Header::DataHeader; + +namespace { +struct OneShotReadBuf : public std::streambuf +{ + OneShotReadBuf(char* s, std::size_t n) + { + setg(s, s, s + n); + } +}; +} +namespace o2 { namespace DataFlow { + +FakeTimeframeGeneratorDevice::FakeTimeframeGeneratorDevice() + : O2Device{} + , mOutChannelName{} + , mMaxTimeframes{} + , mTimeframeCount{0} +{ +} + +void FakeTimeframeGeneratorDevice::InitTask() +{ + mOutChannelName = GetConfig()->GetValue(OptionKeyOutputChannelName); + mMaxTimeframes = GetConfig()->GetValue(OptionKeyMaxTimeframes); +} + +bool FakeTimeframeGeneratorDevice::ConditionalRun() +{ + auto addPartFn = [this](FairMQParts &parts, char *buffer, size_t size) { + parts.AddPart(this->NewMessage(buffer, + size, + [](void* data, void* hint) { delete[] (char*)data; }, + nullptr)); + }; + auto sendFn = [this](FairMQParts &parts) {this->Send(parts, this->mOutChannelName);}; + auto zeroFiller = [](char *b, size_t s) {memset(b, 0, s);}; + + std::vector specs = { + { + .origin = "TPC", + .dataDescription = "CLUSTERS", + .bufferFiller = zeroFiller, + .bufferSize = 1000 + }, + { + .origin = "ITS", + .dataDescription = "CLUSTERS", + .bufferFiller = zeroFiller, + .bufferSize = 500 + } + }; + + try { + size_t totalSize; + auto buffer = fakeTimeframeGenerator(specs, totalSize); + OneShotReadBuf osrb(buffer.get(), totalSize); + std::istream s(&osrb); + + streamTimeframe(s, + addPartFn, + sendFn); + } catch(std::runtime_error &e) { + LOG(ERROR) << e.what() << "\n"; + } + + mTimeframeCount++; + + if (mTimeframeCount < mMaxTimeframes) { + return true; + } + return false; +} + +}} // namespace o2::DataFlow diff --git a/Utilities/DataFlow/src/TimeframeParser.cxx b/Utilities/DataFlow/src/TimeframeParser.cxx new file mode 100644 index 0000000000000..8962513c5e4bb --- /dev/null +++ b/Utilities/DataFlow/src/TimeframeParser.cxx @@ -0,0 +1,196 @@ +/// @file TimeframeValidatorDevice.cxx +/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel +/// @since 2017-02-07 +/// @brief Validator device for a full time frame + +#include // this_thread::sleep_for +#include +#include + +#include "DataFlow/TimeframeParser.h" +#include "Headers/SubframeMetadata.h" +#include "Headers/DataHeader.h" +#include "TimeFrame/TimeFrame.h" + +#include +#include + + +using DataHeader = o2::Header::DataHeader; +using DataDescription = o2::Header::DataDescription; +using IndexElement = o2::DataFormat::IndexElement; + +namespace o2 { namespace DataFlow { + +// Possible states for the parsing of a timeframe +// PARSE_BEGIN_STREAM -> +enum ParsingState { + PARSE_BEGIN_STREAM = 0, + PARSE_BEGIN_TIMEFRAME, + PARSE_BEGIN_PAIR, + PARSE_DATA_HEADER, + PARSE_CONCRETE_HEADER, + PARSE_PAYLOAD, + PARSE_END_PAIR, + PARSE_END_TIMEFRAME, + PARSE_END_STREAM, + ERROR +}; + +struct StreamingState { + StreamingState() = default; + + ParsingState state = PARSE_BEGIN_STREAM; + bool hasDataHeader = false; + bool hasConcreteHeader = false; + void *payloadBuffer = nullptr; + void *headerBuffer = nullptr; + DataHeader dh; // The current DataHeader being parsed +}; + +void streamTimeframe(std::istream &stream, + std::function onAddPart, + std::function onSend) { + FairMQParts parts; + StreamingState state; + assert(state.state == PARSE_BEGIN_STREAM); + while(true) { + switch(state.state) { + case PARSE_BEGIN_STREAM: + LOG(INFO) << "In PARSE_BEGIN_STREAM\n"; + state.state = PARSE_BEGIN_TIMEFRAME; + break; + case PARSE_BEGIN_TIMEFRAME: + LOG(INFO) << "In PARSE_BEGIN_TIMEFRAME\n"; + state.state = PARSE_BEGIN_PAIR; + break; + case PARSE_BEGIN_PAIR: + LOG(INFO) << "In PARSE_BEGIN_PAIR\n"; + state.state = PARSE_DATA_HEADER; + state.hasDataHeader = false; + state.payloadBuffer = nullptr; + state.headerBuffer = nullptr; + break; + case PARSE_DATA_HEADER: + LOG(INFO) << "In PARSE_DATA_HEADER\n"; + if (state.hasDataHeader) { + throw std::runtime_error("DataHeader already present."); + } else if (state.payloadBuffer) { + throw std::runtime_error("Unexpected payload."); + } + LOG(INFO) << "Reading dataheader of " << sizeof(state.dh) << " bytes\n"; + stream.read(reinterpret_cast(&state.dh), sizeof(state.dh)); + // If we have a TIMEFRAMEINDEX part and we find the eof, we are done. + if (stream.eof()) { + throw std::runtime_error("Premature end of stream"); + } + + // Otherwise we move to the state which is responsible for parsing the + // kind of header. + state.state = PARSE_CONCRETE_HEADER; + break; + case PARSE_CONCRETE_HEADER: + LOG(INFO) << "In PARSE_CONCRETE_HEADER\n"; + if (state.headerBuffer) + { + throw std::runtime_error("File has two consecutive headers"); + } + if (state.dh.headerSize < sizeof(DataHeader)) + { + std::ostringstream str; + str << "Bad header size. Should be greater then " + << sizeof(DataHeader) + << ". Found " << state.dh.headerSize << "\n"; + throw std::runtime_error(str.str()); + } + // We get the full header size and read the rest of the header + state.headerBuffer = malloc(state.dh.headerSize); + memcpy(state.headerBuffer, &state.dh, sizeof(state.dh)); + LOG(INFO) << "Reading rest of the header of " << state.dh.headerSize - sizeof(state.dh) << " bytes\n"; + stream.read(reinterpret_cast(state.headerBuffer)+ sizeof(state.dh), + state.dh.headerSize - sizeof(state.dh)); + // Handle the case the file was truncated. + if (stream.eof()) + { + throw std::runtime_error("Unexpected end of file"); + } + onAddPart(parts, reinterpret_cast(state.headerBuffer), state.dh.headerSize); + // Move to parse the payload + state.state = PARSE_PAYLOAD; + break; + case PARSE_PAYLOAD: + LOG(INFO) << "In PARSE_PAYLOAD\n"; + if(state.payloadBuffer) + { + throw std::runtime_error("File has two consecutive payloads"); + } + state.payloadBuffer = new char[state.dh.payloadSize]; + LOG(INFO) << "Reading payload of " << state.dh.payloadSize << " bytes\n"; + stream.read(reinterpret_cast(state.payloadBuffer), state.dh.payloadSize); + if (stream.eof()) + { + throw std::runtime_error("Unexpected end of file"); + } + onAddPart(parts, reinterpret_cast(state.payloadBuffer), state.dh.payloadSize); + state.state = PARSE_END_PAIR; + break; + case PARSE_END_PAIR: + LOG(INFO) << "In PARSE_END_PAIR\n"; + state.state = state.dh == DataDescription("TIMEFRAMEINDEX") ? PARSE_END_TIMEFRAME : PARSE_BEGIN_PAIR; + break; + case PARSE_END_TIMEFRAME: + LOG(INFO) << "In PARSE_END_TIMEFRAME\n"; + onSend(parts); + // Check if we have more. If not, we can declare success. + stream.peek(); + if (stream.eof()) { + state.state = PARSE_END_STREAM; + } else { + state.state = PARSE_BEGIN_TIMEFRAME; + } + break; + case PARSE_END_STREAM: + return; + break; + default: + break; + } + } +} + +void streamTimeframe(std::ostream &stream, FairMQParts &parts) { + if (parts.Size() < 2) + { + throw std::runtime_error("Expecting at least 2 parts\n"); + } + + auto indexHeader = o2::Header::get(parts.At(parts.Size() - 2)->GetData()); + // FIXME: Provide iterator pair API for the index + // Index should really be something which provides an + // iterator pair API so that we can sort / find / lower_bound + // easily. Right now we simply use it a C-style array. + auto index = reinterpret_cast(parts.At(parts.Size() - 1)->GetData()); + + LOG(INFO) << "This time frame has " << parts.Size() << " parts.\n"; + auto indexEntries = indexHeader->payloadSize / sizeof(IndexElement); + if (indexHeader->dataDescription != DataDescription("TIMEFRAMEINDEX")) { + throw std::runtime_error("Could not find a valid index header\n"); + } + LOG(INFO) << indexHeader->dataDescription.str << "\n"; + LOG(INFO) << "This time frame has " << indexEntries << "entries in the index.\n"; + if ((indexEntries * 2 + 2) != (parts.Size())) { + std::stringstream err; + err << "Mismatched index and received parts. Expected " + << (parts.Size() - 2 * 2) << " found " << indexEntries ; + throw std::runtime_error(err.str()); + } + + LOG(INFO) << "Everything is fine with received timeframe\n"; + for (size_t i = 0; i < parts.Size(); ++i) + { + stream.write(reinterpret_cast(parts.At(i)->GetData()), + parts.At(i)->GetSize()); + } +} + +}} // namespace o2::DataFlow diff --git a/Utilities/DataFlow/src/TimeframeReaderDevice.cxx b/Utilities/DataFlow/src/TimeframeReaderDevice.cxx new file mode 100644 index 0000000000000..4a4b6258e0035 --- /dev/null +++ b/Utilities/DataFlow/src/TimeframeReaderDevice.cxx @@ -0,0 +1,56 @@ +#include + +#include "DataFlow/TimeframeReaderDevice.h" +#include "DataFlow/TimeframeParser.h" +#include "Headers/SubframeMetadata.h" +#include "Headers/DataHeader.h" +#include + +using DataHeader = o2::Header::DataHeader; + +namespace o2 { namespace DataFlow { + +TimeframeReaderDevice::TimeframeReaderDevice() + : O2Device{} + , mOutChannelName{} + , mFile{} +{ +} + +void TimeframeReaderDevice::InitTask() +{ + mOutChannelName = GetConfig()->GetValue(OptionKeyOutputChannelName); + mInFileName = GetConfig()->GetValue(OptionKeyInputFileName); + mSeen.clear(); +} + +bool TimeframeReaderDevice::ConditionalRun() +{ + auto addPartFn = [this](FairMQParts &parts, char *buffer, size_t size) { + parts.AddPart(this->NewMessage(buffer, + size, + [](void* data, void* hint) { delete[] (char*)data; }, + nullptr)); + }; + auto sendFn = [this](FairMQParts &parts) {this->Send(parts, this->mOutChannelName);}; + + // FIXME: For the moment we support a single file. This should really be a glob. We + // should also have a strategy for watching directories. + std::vector files; + files.push_back(mInFileName); + for (auto &&fn : files) { + mFile.open(fn, std::ofstream::in | std::ofstream::binary); + try { + streamTimeframe(mFile, + addPartFn, + sendFn); + } catch(std::runtime_error &e) { + LOG(ERROR) << e.what() << "\n"; + } + mSeen.push_back(fn); + } + + return false; +} + +}} // namespace o2::DataFlow diff --git a/Utilities/DataFlow/src/TimeframeValidationTool.cxx b/Utilities/DataFlow/src/TimeframeValidationTool.cxx new file mode 100644 index 0000000000000..4fd3a2bde6c10 --- /dev/null +++ b/Utilities/DataFlow/src/TimeframeValidationTool.cxx @@ -0,0 +1,52 @@ +#include "DataFlow/TimeframeParser.h" +#include "fairmq/FairMQParts.h" +#include +#include +#include +#include +#include +#include + +// A simple tool which verifies timeframe files +int main(int argc, char **argv) { + int c; + opterr = 0; + + while ((c = getopt (argc, argv, "")) != -1) { + switch (c) + { + case '?': + if (isprint (optopt)) + fprintf (stderr, "Unknown option `-%c'.\n", optopt); + else + fprintf (stderr, + "Unknown option character `\\x%x'.\n", + optopt); + return 1; + default: + abort(); + } + } + + std::vector filenames; + for (size_t index = optind; index < argc; index++) { + filenames.emplace_back(std::string(argv[index])); + } + + for (auto &&fn : filenames) { + LOG(INFO) << "Processing file" << fn << "\n"; + std::ifstream s(fn); + FairMQParts parts; + auto onAddParts = [](FairMQParts &p, char *buffer, size_t size) { + }; + auto onSend = [](FairMQParts &p) { + }; + + try { + o2::DataFlow::streamTimeframe(s, onAddParts, onSend); + } catch(std::runtime_error &e) { + LOG(ERROR) << e.what() << std::endl; + exit(1); + } + } +} diff --git a/Utilities/DataFlow/src/TimeframeValidatorDevice.cxx b/Utilities/DataFlow/src/TimeframeValidatorDevice.cxx index 2da78e3806acb..98bbc24135b42 100644 --- a/Utilities/DataFlow/src/TimeframeValidatorDevice.cxx +++ b/Utilities/DataFlow/src/TimeframeValidatorDevice.cxx @@ -7,16 +7,16 @@ #include #include "DataFlow/TimeframeValidatorDevice.h" +#include "TimeFrame/TimeFrame.h" #include "Headers/SubframeMetadata.h" #include "Headers/DataHeader.h" -#include +#include using DataHeader = o2::Header::DataHeader; - -// FIXME: this should really be in a central place -using PartPosition = int; -typedef std::pair IndexElement; +using DataOrigin = o2::Header::DataOrigin; +using DataDescription = o2::Header::DataDescription; +using IndexElement = o2::DataFormat::IndexElement; o2::DataFlow::TimeframeValidatorDevice::TimeframeValidatorDevice() : O2Device() @@ -48,8 +48,8 @@ void o2::DataFlow::TimeframeValidatorDevice::Run() // TODO: fill this with checks on time frame LOG(INFO) << "This time frame has " << timeframeParts.Size() << " parts.\n"; - auto indexEntries = indexHeader->payloadSize / sizeof(IndexElement); - if (strncmp(indexHeader->dataDescription.str, "TIMEFRAMEINDEX", 14) != 0) + auto indexEntries = indexHeader->payloadSize / sizeof(DataHeader); + if (indexHeader->dataDescription != DataDescription("TIMEFRAMEINDEX")) LOG(ERROR) << "Could not find a valid index header\n"; LOG(INFO) << indexHeader->dataDescription.str << "\n"; LOG(INFO) << "This time frame has " << indexEntries << "entries in the index.\n"; @@ -66,11 +66,16 @@ void o2::DataFlow::TimeframeValidatorDevice::Run() for (int ii = 0; ii < indexEntries; ++ii) { IndexElement &ie = index[ii]; assert(ie.second >= 0); - LOG(DEBUG) << ie.first.dataDescription.str << std::endl; - if (ie.first.dataDescription == "TPCCLUSTER") + LOG(DEBUG) << ie.first.dataDescription.str << " " + << ie.first.dataOrigin.str << std::endl; + if ((ie.first.dataOrigin == Header::gDataOriginTPC) + && (ie.first.dataDescription == Header::gDataDescriptionClusters)) { tpcIndex = ie.second; - if (ie.first.dataDescription == "ITSRAW") + } + if ((ie.first.dataOrigin == Header::gDataOriginITS) + && (ie.first.dataDescription == Header::gDataDescriptionClusters)) { itsIndex = ie.second; + } } if (tpcIndex < 0) @@ -88,14 +93,17 @@ void o2::DataFlow::TimeframeValidatorDevice::Run() // Data header it at position - 1 auto tpcHeader = reinterpret_cast(timeframeParts.At(tpcIndex)->GetData()); - if (tpcHeader->dataDescription != "TPCCLUSTER") + if ((tpcHeader->dataDescription != Header::gDataDescriptionClusters) || + (tpcHeader->dataOrigin != Header::gDataOriginTPC)) { - LOG(ERROR) << "Wrong data description. Expecting TPCCLUSTER, found " << tpcHeader->dataDescription.str << "\n"; + LOG(ERROR) << "Wrong data description. Expecting TPC - CLUSTERS, found " + << tpcHeader->dataOrigin.str << " - " + << tpcHeader->dataDescription.str << "\n"; continue; } auto tpcPayload = reinterpret_cast(timeframeParts.At(tpcIndex + 1)->GetData()); if (tpcHeader->payloadSize % sizeof(TPCTestCluster)) - LOG(ERROR) << "TPCCLUSTER Size Mismatch\n"; + LOG(ERROR) << "TPC - CLUSTERS Size Mismatch\n"; auto numOfClusters = tpcHeader->payloadSize / sizeof(TPCTestCluster); for (size_t ci = 0 ; ci < numOfClusters; ++ci) { @@ -114,21 +122,24 @@ void o2::DataFlow::TimeframeValidatorDevice::Run() // Data header it at position - 1 auto itsHeader = reinterpret_cast(timeframeParts.At(itsIndex)->GetData()); - if (strcmp(itsHeader->dataDescription.str,"ITSRAW")!=0) + if ((itsHeader->dataDescription != Header::gDataDescriptionClusters) + || (itsHeader->dataOrigin != Header::gDataOriginITS)) { - LOG(ERROR) << "Wrong data description. Expecting ITSRAW, found " << itsHeader->dataDescription.str << "\n"; + LOG(ERROR) << "Wrong data description. Expecting ITS - CLUSTERS, found " + << itsHeader->dataOrigin.str << " - " << itsHeader->dataDescription.str << "\n"; continue; } auto itsPayload = reinterpret_cast(timeframeParts.At(itsIndex + 1)->GetData()); if (itsHeader->payloadSize % sizeof(ITSRawData)) - LOG(ERROR) << "ITSRawData Size Mismatch.\n"; + LOG(ERROR) << "ITS - CLUSTERS Size Mismatch.\n"; numOfClusters = itsHeader->payloadSize / sizeof(ITSRawData); for (size_t ci = 0 ; ci < numOfClusters; ++ci) { ITSRawData &cluster = itsPayload[ci]; if (cluster.timeStamp != ci) { - LOG(ERROR) << "ITS Data mismatch. Expecting " << ci << " got " << cluster.timeStamp << "\n"; + LOG(ERROR) << "ITS Data mismatch. Expecting " << ci + << " got " << cluster.timeStamp << "\n"; break; } } diff --git a/Utilities/DataFlow/src/TimeframeWriterDevice.cxx b/Utilities/DataFlow/src/TimeframeWriterDevice.cxx new file mode 100644 index 0000000000000..c939d22fa95a6 --- /dev/null +++ b/Utilities/DataFlow/src/TimeframeWriterDevice.cxx @@ -0,0 +1,87 @@ +/// @file TimeframeValidatorDevice.cxx +/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel +/// @since 2017-02-07 +/// @brief Validator device for a full time frame + +#include // this_thread::sleep_for +#include + +#include "DataFlow/TimeframeWriterDevice.h" +#include "DataFlow/TimeframeParser.h" +#include "TimeFrame/TimeFrame.h" +#include "Headers/SubframeMetadata.h" +#include "Headers/DataHeader.h" +#include +#include + + +using DataHeader = o2::Header::DataHeader; +using IndexElement = o2::DataFormat::IndexElement; + +namespace o2 { namespace DataFlow { + +TimeframeWriterDevice::TimeframeWriterDevice() + : O2Device{} + , mInChannelName{} + , mFile{} + , mMaxTimeframes{} + , mMaxFileSize{} + , mMaxFiles{} + , mFileCount{0} +{ +} + +void TimeframeWriterDevice::InitTask() +{ + mInChannelName = GetConfig()->GetValue(OptionKeyInputChannelName); + mOutFileName = GetConfig()->GetValue(OptionKeyOutputFileName); + mMaxTimeframes = GetConfig()->GetValue(OptionKeyMaxTimeframesPerFile); + mMaxFileSize = GetConfig()->GetValue(OptionKeyMaxFileSize); + mMaxFiles = GetConfig()->GetValue(OptionKeyMaxFiles); +} + +void TimeframeWriterDevice::Run() +{ + boost::filesystem::path p(mOutFileName); + size_t streamedTimeframes = 0; + bool needsNewFile = true; + while (CheckCurrentState(RUNNING) && mFileCount < mMaxFiles) { + // In case we need to process more than one file, + // the filename is split in basename and extension + // and we call the files `.`. + if (needsNewFile) { + std::string filename = mOutFileName; + if (mMaxFiles > 1) { + std::string base_path(mOutFileName, 0, mOutFileName.find_last_of(".")); + std::string extension(mOutFileName, mOutFileName.find_last_of(".")); + filename = base_path + std::to_string(mFileCount) + extension; + } + LOG(INFO) << "Opening " << filename << " for output\n"; + mFile.open(filename.c_str(), std::ofstream::out | std::ofstream::binary); + needsNewFile = false; + } + + FairMQParts timeframeParts; + if (Receive(timeframeParts, mInChannelName, 0, 100) <= 0) + continue; + + streamTimeframe(mFile, timeframeParts); + if ((mFile.tellp() > mMaxFileSize) || (streamedTimeframes++ > mMaxTimeframes)) + { + mFile.flush(); + mFile.close(); + mFileCount++; + needsNewFile = true; + } + } +} + +void TimeframeWriterDevice::PostRun() +{ + if (mFile.is_open()) { + mFile.flush(); + mFile.close(); + } +} + +}} // namespace o2::DataFlow diff --git a/Utilities/DataFlow/src/runFakeTimeframeGeneratorDevice.cxx b/Utilities/DataFlow/src/runFakeTimeframeGeneratorDevice.cxx new file mode 100644 index 0000000000000..11aa1dbc2e5f2 --- /dev/null +++ b/Utilities/DataFlow/src/runFakeTimeframeGeneratorDevice.cxx @@ -0,0 +1,22 @@ +#include "runFairMQDevice.h" +#include "DataFlow/FakeTimeframeGeneratorDevice.h" +#include + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + (o2::DataFlow::FakeTimeframeGeneratorDevice::OptionKeyOutputChannelName, + bpo::value()->default_value("output"), + "Name of the output channel"); + options.add_options() + (o2::DataFlow::FakeTimeframeGeneratorDevice::OptionKeyMaxTimeframes, + bpo::value()->default_value("1"), + "Number of timeframes to generate"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new o2::DataFlow::FakeTimeframeGeneratorDevice(); +} diff --git a/Utilities/DataFlow/src/runTimeframeReaderDevice.cxx b/Utilities/DataFlow/src/runTimeframeReaderDevice.cxx new file mode 100644 index 0000000000000..5da17a708f692 --- /dev/null +++ b/Utilities/DataFlow/src/runTimeframeReaderDevice.cxx @@ -0,0 +1,21 @@ +#include "runFairMQDevice.h" +#include "DataFlow/TimeframeReaderDevice.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + (o2::DataFlow::TimeframeReaderDevice::OptionKeyOutputChannelName, + bpo::value()->default_value("output"), + "Name of the output channel"); + options.add_options() + (o2::DataFlow::TimeframeReaderDevice::OptionKeyInputFileName, + bpo::value()->default_value("data.o2tf"), + "Name of the input file"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new o2::DataFlow::TimeframeReaderDevice(); +} diff --git a/Utilities/DataFlow/src/runTimeframeWriterDevice.cxx b/Utilities/DataFlow/src/runTimeframeWriterDevice.cxx new file mode 100644 index 0000000000000..a261210edbae5 --- /dev/null +++ b/Utilities/DataFlow/src/runTimeframeWriterDevice.cxx @@ -0,0 +1,33 @@ +#include "runFairMQDevice.h" +#include "DataFlow/TimeframeWriterDevice.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& options) +{ + options.add_options() + (o2::DataFlow::TimeframeWriterDevice::OptionKeyInputChannelName, + bpo::value()->default_value("input"), + "Name of the input channel"); + options.add_options() + (o2::DataFlow::TimeframeWriterDevice::OptionKeyOutputFileName, + bpo::value()->default_value("data.o2tf"), + "Name of the input channel"); + options.add_options() + (o2::DataFlow::TimeframeWriterDevice::OptionKeyMaxFiles, + bpo::value()->default_value(1), + "Maximum number of files to write"); + options.add_options() + (o2::DataFlow::TimeframeWriterDevice::OptionKeyMaxTimeframesPerFile, + bpo::value()->default_value(1), + "Maximum number of timeframes per file"); + options.add_options() + (o2::DataFlow::TimeframeWriterDevice::OptionKeyMaxFileSize, + bpo::value()->default_value(-1), + "Maximum size per file"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) +{ + return new o2::DataFlow::TimeframeWriterDevice(); +} diff --git a/Utilities/DataFlow/test/test_TimeframeParser.cxx b/Utilities/DataFlow/test/test_TimeframeParser.cxx new file mode 100644 index 0000000000000..423c94bc128c0 --- /dev/null +++ b/Utilities/DataFlow/test/test_TimeframeParser.cxx @@ -0,0 +1,53 @@ +#include "DataFlow/TimeframeParser.h" +#include "DataFlow/FakeTimeframeBuilder.h" +#include "Headers/DataHeader.h" +#include +#include +#include + +struct OneShotReadBuf : public std::streambuf +{ + OneShotReadBuf(char* s, std::size_t n) + { + setg(s, s, s + n); + } +}; + +using DataHeader = o2::Header::DataHeader; + +int +main(int argc, char **argv) { + // Construct a dummy timeframe. + // Stream it and get the parts + FairMQParts parts; + auto onAddParts = [](FairMQParts &p, char *buffer, size_t size) { + LOG(INFO) << "Adding part to those to be sent.\n"; + }; + auto onSend = [](FairMQParts &p) { + LOG(INFO) << "Everything OK. Sending parts\n"; + }; + + // Prepare a test timeframe to be streamed + auto zeroFiller = [](char *b, size_t s) {memset(b, 0, s);}; + std::vector specs = { + { + .origin = "TPC", + .dataDescription = "CLUSTERS", + .bufferFiller = zeroFiller, + .bufferSize = 1000 + } + }; + + size_t testBufferSize; + auto testBuffer = fakeTimeframeGenerator(specs, testBufferSize); + + OneShotReadBuf osrb(testBuffer.get(), testBufferSize); + std::istream s(&osrb); + + try { + o2::DataFlow::streamTimeframe(s, onAddParts, onSend); + } catch(std::runtime_error &e) { + LOG(ERROR) << e.what() << std::endl; + exit(1); + } +} diff --git a/Utilities/Publishers/src/DataPublisherDevice.cxx b/Utilities/Publishers/src/DataPublisherDevice.cxx index 98a67ba5f0fd5..8817a68d083b6 100644 --- a/Utilities/Publishers/src/DataPublisherDevice.cxx +++ b/Utilities/Publishers/src/DataPublisherDevice.cxx @@ -15,6 +15,7 @@ using TPCTestCluster = o2::DataFlow::TPCTestCluster; using ITSRawData = o2::DataFlow::ITSRawData; using DataDescription = o2::Header::DataDescription; +using DataOrigin = o2::Header::DataOrigin; template void fakePayload(std::vector &buffer, std::function filler, int numOfElements) { @@ -64,10 +65,15 @@ void o2::Utilities::DataPublisherDevice::InitTask() // check in the registry // * constructors and assignment operators taking the integer type as argument if (GetConfig()->GetValue(OptionKeyDataDescription) == "TPCCLUSTER") - mDataDescription = DataDescription("TPCCLUSTER"); + { + mDataDescription = DataDescription("CLUSTERS"); + mDataOrigin = DataOrigin("TPC"); + } else if (GetConfig()->GetValue(OptionKeyDataDescription) == "ITSRAW") - mDataDescription = DataDescription("ITSRAW"); - mDataOrigin = o2::Header::DataOrigin("TEST"); + { + mDataDescription = DataDescription("CLUSTERS"); + mDataOrigin = DataOrigin("ITS"); + } mSubSpecification = GetConfig()->GetValue(OptionKeySubspecification); mFileName = GetConfig()->GetValue(OptionKeyFileName); @@ -78,12 +84,18 @@ void o2::Utilities::DataPublisherDevice::InitTask() if (!mFileName.empty()) { AppendFile(mFileName.c_str(), mFileBuffer); - } else if (strncmp(mDataDescription.str, "TPCCLUSTER", 16) == 0) { + } + else if (mDataDescription == DataDescription("CLUSTERS") + && mDataOrigin == DataOrigin("TPC")) + { auto f = [](TPCTestCluster &cluster, int idx) {cluster.timeStamp = idx;}; fakePayload(mFileBuffer, f, 1000); LOG(INFO) << "Payload size (after) " << mFileBuffer.size() << "\n"; // For the moment, add the data as another part to this message - } else if (strncmp(mDataDescription.str, "ITSRAW", 16) == 0) { + } + else if (mDataDescription == DataDescription("CLUSTERS") + && mDataOrigin == DataOrigin("ITS")) + { auto f = [](ITSRawData &cluster, int idx) {cluster.timeStamp = idx;}; fakePayload(mFileBuffer, f, 500); } diff --git a/cmake/O2Dependencies.cmake b/cmake/O2Dependencies.cmake index 83be7ef3ee371..bbfd63c22ee53 100644 --- a/cmake/O2Dependencies.cmake +++ b/cmake/O2Dependencies.cmake @@ -142,6 +142,7 @@ o2_define_bucket( DEPENDENCIES Base Headers + TimeFrame O2Device dl ) diff --git a/doc/o2-timeframe-file-format.1.in b/doc/o2-timeframe-file-format.1.in new file mode 100644 index 0000000000000..3d10757532dfe --- /dev/null +++ b/doc/o2-timeframe-file-format.1.in @@ -0,0 +1,27 @@ +.\" Manpage for O2. +.TH man 1 "19 May 2017" "1.0" "Alice O2 Timeframe Format" + +.SH DESCRIPTION + +O2 is Alice next generation software framework to be used for RUN3. This is a +quick desctiption of the timeframe file format as dumped by +TimeframeWriterDevice and read by the TimeframeReaderDevice. + +The file format is simply a dump of the timeframe on disk. Multiple timeframes +can be concatenated resulting in a valid file. The format is as follow: + +o2tf: Timeframe [Timeframe [..]] +Timeframe: Subtimeframe [Subtimeframe [...]] TimeframeIndex +Subtimeframe: Header Payload +Header: DataHeader derived header stack +Payload: binary blob +TimeframeIndex: IndexElement [IndexElement [..]] +IndexElement: DataHeader Payload +Position in timeframe: int (4 bytes) +DataHeader: only the DataHeader part +Payload: binary blob + +.SH DISCLAIMER + +Notice that this file format is a work in progress and cannot be used for +anything but debugging purposes.