diff --git a/Framework/Core/ANALYSIS.md b/Framework/Core/ANALYSIS.md index 25b1784e64ae1..28327e3bea7b4 100644 --- a/Framework/Core/ANALYSIS.md +++ b/Framework/Core/ANALYSIS.md @@ -35,7 +35,7 @@ defineDataProcessing() { > **Implementation details**: `AnalysisTask` is simply a `struct`. Since `struct` default inheritance policy is `public`, we can omit specifying it when declaring MyTask. > -> `AnalysisTask` will not actually provide any virtual method, as the `adaptAnalysis` helper relyes on template argument matching to discover the properties of the task. It will come clear in the next paragraph how this allow is used to avoid the proliferation of data subscription methods. +> `AnalysisTask` will not actually provide any virtual method, as the `adaptAnalysis` helper relies on template argument matching to discover the properties of the task. It will come clear in the next paragraph how this allow is used to avoid the proliferation of data subscription methods. ## Processing data @@ -55,7 +55,7 @@ struct MyTask : AnalysisTask { }; ``` -will allow you to get a per timeframe collection of tracks. You can then iterate on the tracks using the syntax: +will allow you to get a per time frame collection of tracks. You can then iterate on the tracks using the syntax: ```cpp for (auto &track : tracks) { @@ -77,7 +77,7 @@ This has the advantage that you might be able to benefit from vectorization / pa > **Implementation notes**: as mentioned before, the arguments of the process method are inspected using template argument matching. This way the system knows at compile time what data types are requested by a given `process` method and can create the relevant DPL data descriptions. > -> The distiction between `Tracks` and `Track` above is simply that one refers to the whole collection, while the second is an alias to `Tracks::iterator`. Notice that we assume that each collection is of type `o2::soa::Table` which carries metadata about the dataOrigin and dataDescription to be used by DPL to subscribe to the associated data stream. +> The distinction between `Tracks` and `Track` above is simply that one refers to the whole collection, while the second is an alias to `Tracks::iterator`. Notice that we assume that each collection is of type `o2::soa::Table` which carries meta data about the dataOrigin and dataDescription to be used by DPL to subscribe to the associated data stream. ### Navigating data associations @@ -89,7 +89,7 @@ void process(o2::aod::Collision const& collision, o2::aod::Tracks &tracks) { } ``` -the above will be called once per collision found in the timeframe, and `tracks` will allow you to iterate on all the tracks associated to the given collision. +the above will be called once per collision found in the time frame, and `tracks` will allow you to iterate on all the tracks associated to the given collision. Alternatively, you might not require to have all the tracks at once and you could do with: @@ -176,7 +176,7 @@ struct MyTask : AnalysisTask { }; ``` -the `etaphi` object is a functor that will effectively act as a cursor which allows to populate the `EtaPhi` table. Each invokation of the functor will create a new row in the table, using the arguments as contents of the given column. By default the arguments must be given in order, but one can give them in any order by using the correct column type. E.g. in the example above: +the `etaphi` object is a functor that will effectively act as a cursor which allows to populate the `EtaPhi` table. Each invocation of the functor will create a new row in the table, using the arguments as contents of the given column. By default the arguments must be given in order, but one can give them in any order by using the correct column type. E.g. in the example above: ```cpp etaphi(track::Phi(calculatePhi(track), track::Eta(calculateEta(track))); @@ -186,7 +186,7 @@ etaphi(track::Phi(calculatePhi(track), track::Eta(calculateEta(track))); Sometimes columns are not backed by actual persisted data, but they are merely derived from it. For example you might want to have different representations -(e.g. spherical, cylindrical) for a given persisten representation. You can +(e.g. spherical, cylindrical) for a given persistent representation. You can do that by using the `DECLARE_SOA_DYNAMIC_COLUMN` macro. ```cpp @@ -200,10 +200,10 @@ DECLARE_SOA_DYNAMIC_COLUMN(R2, r2, [](float x, float y) { return x*x + y+y; }); DECLARE_SOA_TABLE(Point, "MISC", "POINT", X, Y, (R2)); ``` -Notice how the dynamic column is defined as a standalone column and binds to X and Y +Notice how the dynamic column is defined as a stand alone column and binds to X and Y only when you attach it as part of a table. -### Executing a finalisation method, post run +### Executing a finalization method, post run Sometimes it's handy to perform an action when all the data has been processed, for example executing a fit on a histogram we filled during the processing. This can be done by implementing the postRun method. @@ -335,7 +335,7 @@ struct MyTask : AnalysisTask { ### Getting combinations (pairs, triplets, ...) To get combinations of distinct tracks, helper functions from `ASoAHelpers.h` can be used. Presently, there are 3 combinations policies available: strictly upper, upper and full. -The number of elements in a combination is deduced from the number of arguments pased to `combinations()` call. For example, to get pairs of tracks from the same source, one must specify `tracks` table twice: +The number of elements in a combination is deduced from the number of arguments passed to `combinations()` call. For example, to get pairs of tracks from the same source, one must specify `tracks` table twice: ```cpp struct MyTask : AnalysisTask { @@ -362,7 +362,7 @@ struct MyTask : AnalysisTask { }; ``` -It will be possible to specify a filter for a combination as a whole, and only matching combinations will be then outputed. Currently, the filter is applied to each element separately. Note that for filter version the input tables are mentioned twice, both in policy constructor and in `combinations()` call itself. +It will be possible to specify a filter for a combination as a whole, and only matching combinations will be then output. Currently, the filter is applied to each element separately. Note that for filter version the input tables are mentioned twice, both in policy constructor and in `combinations()` call itself. ```cpp struct MyTask : AnalysisTask { @@ -384,6 +384,69 @@ combinations(tracks, tracks); // equivalent to combinations(CombinationsStrictly combinations(filter, tracks, covs); // equivalent to combinations(CombinationsFullIndexPolicy(tracks, covs), filter, tracks, covs); ``` +### Saving tables to file + +Produced tables can be saved to file as TTrees. This process is customized by the command line option `--keep` (of the internal-dpl-AOD-writer). **Please be aware, that the format of the `keep` option as described here is preliminary and might be changed in future.** + +`keep` is a comma-separated list of `DataOuputDescriptions`. + +`keep` +```csh +DataOuputDescription1,DataOuputDescription2, ... +``` + +Each `DataOuputDescription` is a semicolon-separated list of 4 items + +`DataOuputDescription` +```csh +table:tree:columns:file +``` +and instructs the internal-dpl-AOD-writer, to save the columns `columns` of table `table` as TTree `tree` into files `file_x.root`, where `x` is an incremental number. The selected columns are saved as separate TBranches of TTree `tree`. + +By default `x` is incremented with every time frame. This behavior can be modified with the command line option `--ntfmerge`. The value of `ntfmerge` specifies the number of time frames to merge into one file. + +The first item of a `DataOuputDescription` is mandatory and needs to be specified, otherwise the `DataOuputDescription` is ignored. The other three items are optional and are filled by default values if missing. + +The format of `table` is + +`table` +```csh +AOD/tablename/0 +``` +`tablename` is the name of the table as defined in the workflow definition. + +The format of `tree` is a simple string which names the TTree the table will be saved to. If `tree` is not specified then `tablename` will be used as TTree name. + +`columns` is a slash(/)-separated list of column names., e.g. + +`columns` +```csh +col1/col2/col3 +``` +The column names are expected to match column names of table `tablename` as defined in the respective workflow. Non-matching columns are ignored. The selected table columns are saved as separate TBranches with the same names as the corresponding table columns. If `columns` is not specified then all table columns will be saved. + +`file` finally specifies the base name of the files the tables are saved to. The actual file names are composed as `file`_`x`.root, where 'x' is an incremental number. If `file` is not specified the default file name is used. The default file name can be set with the command line option `--res-file`. However, if `res-file` is missing then the default file name is set to `AnalysisResults`. + +#### Valid example command line options + +```csh +--keep AOD/UNO/0 + # save all columns of table 'UNO' to TTree 'UNO' in files 'AnalysisResults'_x.root + +--keep AOD/UNO/0::c2/c4:unoresults + # save columns 'c2' and 'c4' of table 'UNO' to TTree 'UNO' in files 'unoresults'_x.root + +--res-file myskim --ntfmerge 50 --keep AOD/UNO/0:trsel1:c1/c2,AOD/DUE/0:trsel2:c6/c7/c8 + # save columns 'c1' and 'c2' of table 'UNO' to TTree 'trsel1' in files 'myskim'_x.root and + # save columns 'c6', 'c7' and 'c8' of table 'DUE' to TTree 'trsel2' in files 'myskim'_x.root. + # Merge 50 time frames in each file. + +``` + +#### Limitations + +If the provided `--keep` option contains two `DataOuputDescriptions` with equal combination of `tree` and `file` then the processing will be stopped! It is not pssible to save two trees with equal name to a given file. + ### Possible ideas We could add a template ` reshuffle()` method to the Table class which allows you to reduce the number of columns or attach new dynamic columns. A template wrapper could diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index 9d7eeb1969384..647506711c633 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -86,6 +86,7 @@ o2_add_library(Framework src/TMessageSerializer.cxx src/TableBuilder.cxx src/TableConsumer.cxx + src/DataOutputDirector.cxx src/Task.cxx src/TextControlService.cxx src/Variant.cxx diff --git a/Framework/Core/include/Framework/CommonDataProcessors.h b/Framework/Core/include/Framework/CommonDataProcessors.h index 409f1f094fa64..f07133978e5dc 100644 --- a/Framework/Core/include/Framework/CommonDataProcessors.h +++ b/Framework/Core/include/Framework/CommonDataProcessors.h @@ -12,6 +12,7 @@ #include "Framework/DataProcessorSpec.h" #include "Framework/InputSpec.h" +#include "Framework/DataOutputDirector.h" #include "TTree.h" #include @@ -34,11 +35,9 @@ struct CommonDataProcessors { /// not going to be used by the returned DataProcessorSpec. static DataProcessorSpec getGlobalFileSink(std::vector const& danglingInputs, std::vector& unmatched); - /// Helper function to create and write TTree - static void table2tree(TTree* tout, - std::shared_ptr table, - bool tupdate); - static DataProcessorSpec getGlobalAODSink(std::vector const& danglingInputs); + /// writes inputs of kind AOD to file + static DataProcessorSpec getGlobalAODSink(std::vector const& OutputInputs, + std::vector const& isdangling); /// @return a dummy DataProcessorSpec which requires all the passed @a InputSpec /// and simply discards them. static DataProcessorSpec getDummySink(std::vector const& danglingInputs); diff --git a/Framework/Core/include/Framework/DataDescriptorQueryBuilder.h b/Framework/Core/include/Framework/DataDescriptorQueryBuilder.h index 81b349a6efba1..fd3d82328e751 100644 --- a/Framework/Core/include/Framework/DataDescriptorQueryBuilder.h +++ b/Framework/Core/include/Framework/DataDescriptorQueryBuilder.h @@ -15,6 +15,7 @@ #include #include #include +#include namespace o2 { @@ -66,6 +67,9 @@ struct DataDescriptorQueryBuilder { /// /// Example for config: TPC/CLUSTER/0;ITS/TRACKS/1 static DataDescriptorQuery buildFromKeepConfig(std::string const& config); + static DataDescriptorQuery buildFromExtendedKeepConfig(std::string const& config); + static std::unique_ptr buildNode(std::string const& nodeString); + static std::smatch getTokens(std::string const& nodeString); }; } // namespace framework diff --git a/Framework/Core/include/Framework/DataOutputDirector.h b/Framework/Core/include/Framework/DataOutputDirector.h new file mode 100644 index 0000000000000..bc5572a7d7cda --- /dev/null +++ b/Framework/Core/include/Framework/DataOutputDirector.h @@ -0,0 +1,175 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#ifndef o2_framework_DataOutputDirector_H_INCLUDED +#define o2_framework_DataOutputDirector_H_INCLUDED + +#include "TFile.h" + +#include "Framework/DataDescriptorQueryBuilder.h" +#include "Framework/DataDescriptorMatcher.h" +#include "Framework/DataSpecUtils.h" +#include "Framework/InputSpec.h" + +#include + +namespace o2 +{ +namespace framework +{ +namespace data_matcher +{ + +struct DataOutputDescriptor { + /// Holds information concerning the writing of aod tables. + /// The information includes the table specification, treename, + /// columns to save, and the file name + + std::string tablename = ""; + std::string treename = ""; + std::string filename = ""; + std::vector colnames; + std::unique_ptr matcher; + + DataOutputDescriptor(std::string sin) + { + // sin is an item consisting of 4 parts which are separated by a ':' + // "origin/description/subSpec:treename:col1/col2/col3:filename" + // the 1st part is used to create a DataDescriptorMatcher + // the other parts are used to fill treename, colnames, and filename + + // remove all spaces + auto s = remove_ws(sin); + + // reset + treename = ""; + colnames.clear(); + filename = ""; + + // analyze the parts of the input string + static const std::regex delim1(":"); + std::sregex_token_iterator end; + std::sregex_token_iterator iter1(s.begin(), + s.end(), + delim1, + -1); + + // create the DataDescriptorMatcher + if (iter1 == end) + return; + auto a = iter1->str(); + matcher = DataDescriptorQueryBuilder::buildNode(a); + + // get the table name + auto m = DataDescriptorQueryBuilder::getTokens(a); + if (!std::string(m[2]).empty()) + tablename = m[2]; + + // get the tree name + // defaul tree name is the table name + treename = tablename; + ++iter1; + if (iter1 == end) + return; + if (!iter1->str().empty()) + treename = iter1->str(); + + // get column names + ++iter1; + if (iter1 == end) + return; + if (!iter1->str().empty()) { + auto cns = iter1->str(); + + static const std::regex delim2("/"); + std::sregex_token_iterator iter2(cns.begin(), + cns.end(), + delim2, + -1); + for (; iter2 != end; ++iter2) + if (!iter2->str().empty()) + colnames.emplace_back(iter2->str()); + } + + // get the filename + ++iter1; + if (iter1 == end) + return; + if (!iter1->str().empty()) + filename = iter1->str(); + } + + void setFilename(std::string fn) { filename = fn; } + + void printOut() + { + LOG(INFO) << "DataOutputDescriptor"; + LOG(INFO) << " table name: " << tablename.c_str(); + LOG(INFO) << " file name : " << filename.c_str(); + LOG(INFO) << " tree name : " << treename.c_str(); + LOG(INFO) << " columns : " << colnames.size(); + for (auto cn : colnames) + LOG(INFO) << " " << cn.c_str(); + } + + std::string remove_ws(const std::string& s) + { + std::string s_wns; + for (auto c : s) + if (!std::isspace(c)) + s_wns += c; + return s_wns; + } +}; + +struct DataOutputDirector { + + int ndod = 0; + std::string defaultfname; + std::vector dodescrs; + + std::vector tnfns; + + std::vector fnames; + std::vector fcnts; + std::vector fouts; + + DataOutputDirector(); + void reset(); + + // fill the DataOutputDirector with information from a + // keep-string + void readString(std::string const& keepString); + + // fill the DataOutputDirector with information from a + // list of InputSpec + void readSpecs(std::vector inputs); + + // fill the DataOutputDirector with information from a json file + //readJson (std::string const& fnjson) {}; + + // get matching DataOutputDescriptors + std::vector getDataOutputDescriptors(header::DataHeader dh); + std::vector getDataOutputDescriptors(InputSpec spec); + + // get the matching TFile + TFile* getDataOutputFile(DataOutputDescriptor* dod, + int ntf, int ntfmerge, std::string filemode); + void closeDataOutputFiles(); + + void setDefaultfname(std::string dfn) { defaultfname = dfn; } + + void printOut(); +}; + +} // namespace data_matcher +} // namespace framework +} // namespace o2 + +#endif // o2_framework_DataOutputDirector_H_INCLUDED diff --git a/Framework/Core/include/Framework/TableTreeHelpers.h b/Framework/Core/include/Framework/TableTreeHelpers.h index 01528d862ed7a..cc641ec7cb7ff 100644 --- a/Framework/Core/include/Framework/TableTreeHelpers.h +++ b/Framework/Core/include/Framework/TableTreeHelpers.h @@ -27,7 +27,7 @@ namespace framework // // To write the contents of a table ta to a tree tr on file f do: // . TableToTree t2t(ta,f,treename); -// . t2t.AddBranch(branchname1); t2t.AddBranch(branchname2); ... +// . t2t.AddBranch(coumn1); t2t.AddBranch(coumn1); ... // OR // t2t.AddAllBranches(); // . t2t.Process(); @@ -149,7 +149,7 @@ class branchIterator v = (void*)var_l; break; default: - LOG(FATAL) << "Type not handled: " << dt << std::endl; + LOG(FATAL) << "Type not handled: " << dt; break; } br->SetAddress(v); @@ -227,7 +227,7 @@ class branchIterator v = (void*)var_l++; break; default: - LOG(FATAL) << "Type not handled: " << dt << std::endl; + LOG(FATAL) << "Type not handled: " << dt; break; } } @@ -304,13 +304,14 @@ class TableToTree { bool togo = true; + LOG(DEBUG) << "Number of colums " << brits.size(); while (togo) { // fill the tree tr->Fill(); // update the branches - for (auto ii = 0; ii < ta->num_columns(); ii++) - togo &= brits.at(ii)->push(); + for (auto brit : brits) + togo &= brit->push(); } tr->Write(); @@ -369,22 +370,19 @@ class columnIterator // find branch auto tree = reader->GetTree(); if (!tree) { - LOG(INFO) << "Can not locate tree!"; + LOG(FATAL) << "Can not locate tree!"; return; } //tree->Print(); auto br = tree->GetBranch(colname); if (!br) { - LOG(INFO) << "Can not locate branch " << colname; + LOG(FATAL) << "Can not locate branch " << colname; return; } cname = colname; TClass* cl; br->GetExpectedType(cl, dt); - //LOG(INFO) << "Initialisation of TTreeReaderValue"; - //LOG(INFO) << "The column " << cname << " is of type " << dt; - // initialize the TTreeReaderValue // the corresponding arrow::TBuilder // the column schema @@ -534,9 +532,6 @@ class columnIterator // with this ar is prepared to be used in arrow::Table::Make void finish() { - - //LOG(INFO) << "columnIterator::finish " << dt; - arrow::Status stat; // switch according to dt @@ -622,7 +617,7 @@ class TreeToTable // get a list of column names auto tree = reader->GetTree(); if (!tree) { - LOG(INFO) << "Tree not found!"; + LOG(FATAL) << "Tree not found!"; return false; } auto branchList = tree->GetListOfBranches(); diff --git a/Framework/Core/src/CommonDataProcessors.cxx b/Framework/Core/src/CommonDataProcessors.cxx index ab31215270391..0c6b461b2d782 100644 --- a/Framework/Core/src/CommonDataProcessors.cxx +++ b/Framework/Core/src/CommonDataProcessors.cxx @@ -199,13 +199,14 @@ DataProcessorSpec CommonDataProcessors::getOutputObjSink(outputObjects const& ob return spec; } -// add sink for dangling AODs +// add sink for the AODs DataProcessorSpec - CommonDataProcessors::getGlobalAODSink(std::vector const& danglingOutputInputs) + CommonDataProcessors::getGlobalAODSink(std::vector const& OutputInputs, + std::vector const& isdangling) { - auto writerFunction = [danglingOutputInputs](InitContext& ic) -> std::function { - LOG(DEBUG) << "======== getGlobalAODSink::Inint =========="; + auto writerFunction = [OutputInputs, isdangling](InitContext& ic) -> std::function { + LOG(DEBUG) << "======== getGlobalAODSink::Init =========="; // analyze ic and take actions accordingly auto fnbase = ic.options().get("res-file"); @@ -213,22 +214,37 @@ DataProcessorSpec auto keepString = ic.options().get("keep"); auto ntfmerge = ic.options().get("ntfmerge"); - // find out if any tables need to be saved - bool hasOutputsToWrite = true; - bool usematch = false; + // find out if any table needs to be saved + bool hasOutputsToWrite = false; - // use the parameter keep to create a matcher - std::shared_ptr matcher; + // parse the keepString + auto dod = std::make_shared(); + if (!fnbase.empty()) + dod->setDefaultfname(fnbase); if (!keepString.empty()) { - usematch = true; - auto [variables, outputMatcher] = DataDescriptorQueryBuilder::buildFromKeepConfig(keepString); - matcher = outputMatcher; - - VariableContext context; - for (auto& spec : danglingOutputInputs) { - auto concrete = DataSpecUtils::asConcreteDataMatcher(spec); - if (outputMatcher->match(concrete, context)) { + + std::string d("dangling"); + if (d.find(keepString) == 0) { + + // use the dangling outputs + std::vector danglingOutputs; + for (auto ii = 0; ii < OutputInputs.size(); ii++) { + if (isdangling[ii]) + danglingOutputs.emplace_back(OutputInputs[ii]); + } + dod->readSpecs(danglingOutputs); + + } else { + + // use the keep string + dod->readString(keepString); + } + + for (auto& outobj : OutputInputs) { + auto ds = dod->getDataOutputDescriptors(outobj); + if (ds.size() > 0) { hasOutputsToWrite = true; + break; } } } @@ -238,17 +254,15 @@ DataProcessorSpec return [](ProcessingContext&) mutable -> void { static bool once = false; if (!once) { - LOG(DEBUG) << "No dangling output to be saved."; + LOG(INFO) << "No AODs to be saved."; once = true; } }; } // end of data functor is called at the end of the data stream - auto fout = std::make_shared(); - auto endofdatacb = [fout](EndOfStreamContext& context) { - if (fout) - fout->Close(); + auto endofdatacb = [dod](EndOfStreamContext& context) { + dod->closeDataOutputFiles(); context.services().get().readyToQuit(QuitRequest::All); }; @@ -257,77 +271,61 @@ DataProcessorSpec callbacks.set(CallbackService::Id::EndOfStream, endofdatacb); // this functor is called once per time frame - Int_t ntf = 0; - return [fout, fnbase, filemode, ntf, ntfmerge, usematch, matcher](ProcessingContext& pc) mutable -> void { + Int_t ntf = -1; + return std::move([ntf, ntfmerge, filemode, dod](ProcessingContext& pc) mutable -> void { LOG(DEBUG) << "======== getGlobalAODSink::processing =========="; LOG(DEBUG) << " processing data set with " << pc.inputs().size() << " entries"; - LOG(DEBUG) << " result are saved to " << fnbase << "_*.root"; // return immediately if pc.inputs() is empty auto ninputs = pc.inputs().size(); if (ninputs == 0) { - LOG(DEBUG) << "no inputs available!"; + LOG(INFO) << "No inputs available!"; return; } - // open new file if ntfmerge time frames is reached - LOG(DEBUG) << "This is time frame number " << ntf; - - std::string fname; - if ((ntf % ntfmerge) == 0) { - if (fout) - fout->Close(); - - fname = fnbase + "_" + std::to_string((Int_t)(ntf / ntfmerge)) + ".root"; - fout = - std::make_shared(fname.c_str(), filemode.c_str()); - } + // increment the time frame counter ntf ntf++; // loop over the DataRefs which are contained in pc.inputs() - VariableContext matchingContext; for (const auto& ref : pc.inputs()) { - // is this table to be saved? + // does this need to be saved? auto dh = DataRefUtils::getHeader(ref); - // only arrow tables are processed here - if (dh->payloadSerializationMethod != o2::header::gSerializationMethodArrow) - continue; - - // does it match the keep parameter - if (usematch && !matcher->match(*dh, matchingContext)) - continue; - - // get the table name - auto treename = dh->dataDescription.as(); - - // get the TableConsumer and convert it into an arrow table - auto s = pc.inputs().get(ref.spec->binding); - auto table = s->asArrowTable(); - LOG(DEBUG) << "The tree name is " << treename; - LOG(DEBUG) << "Number of columns " << table->num_columns(); - LOG(DEBUG) << "Number of rows " << table->num_rows(); - - // we need finite number of rows and columns - if (table->num_columns() == 0 || table->num_rows() == 0) - continue; - - // this table needs to be saved to file - // use TableToTree - TableToTree ta2tr(table, fout.get(), treename.c_str()); - - // all columns of the table are saved - ta2tr.AddAllBranches(); - // to select specific columns use ... - // ta2tr.AddBranch(colname); - ta2tr.Process(); + auto ds = dod->getDataOutputDescriptors(*dh); + + if (ds.size() > 0) { + + // get the TableConsumer and corresponding arrow table + auto s = pc.inputs().get(ref.spec->binding); + auto table = s->asArrowTable(); + + // loop over all DataOutputDescriptors + // a table can be saved in multiple ways + // e.g. different selections of columns to different files + for (auto d : ds) { + TableToTree ta2tr(table, + dod->getDataOutputFile(d, ntf, ntfmerge, filemode), + d->treename.c_str()); + + if (d->colnames.size() > 0) { + for (auto cn : d->colnames) { + auto col = table->GetColumnByName(cn); + if (col) + ta2tr.AddBranch(col); + } + } else { + ta2tr.AddAllBranches(); + } + ta2tr.Process(); + } + } } - }; + }); }; // end of writerFunction DataProcessorSpec spec{ - "internal-dpl-AOD-writter", - danglingOutputInputs, + "internal-dpl-AOD-writer", + OutputInputs, Outputs{}, AlgorithmSpec(writerFunction), {{"res-file", VariantType::String, "AnalysisResults", {"Name of the output file"}}, diff --git a/Framework/Core/src/DataDescriptorMatcher.cxx b/Framework/Core/src/DataDescriptorMatcher.cxx index dc98e094aff3c..c067233a6d3b2 100644 --- a/Framework/Core/src/DataDescriptorMatcher.cxx +++ b/Framework/Core/src/DataDescriptorMatcher.cxx @@ -182,8 +182,8 @@ bool DataDescriptorMatcher::match(ConcreteDataMatcher const& matcher, VariableCo return this->match(reinterpret_cast(s.data()), context); } -/// @return true if the (sub-)query associated to this matcher will -/// match the provided @a spec, false otherwise. +/// @return true if the (sub-)query associated to this matcher will +/// match the provided @a spec, false otherwise. bool DataDescriptorMatcher::match(ConcreteDataTypeMatcher const& matcher, VariableContext& context) const { header::DataHeader dh; @@ -195,8 +195,7 @@ bool DataDescriptorMatcher::match(ConcreteDataTypeMatcher const& matcher, Variab header::Stack s{dh, dph}; return this->match(reinterpret_cast(s.data()), context); -} - +} bool DataDescriptorMatcher::match(header::DataHeader const& header, VariableContext& context) const { diff --git a/Framework/Core/src/DataDescriptorQueryBuilder.cxx b/Framework/Core/src/DataDescriptorQueryBuilder.cxx index b217807d83a8b..0a3737b2ba971 100644 --- a/Framework/Core/src/DataDescriptorQueryBuilder.cxx +++ b/Framework/Core/src/DataDescriptorQueryBuilder.cxx @@ -14,7 +14,6 @@ #include #include -#include #include #include @@ -236,29 +235,21 @@ std::vector DataDescriptorQueryBuilder::parse(char const* config) DataDescriptorQuery DataDescriptorQueryBuilder::buildFromKeepConfig(std::string const& config) { - static const std::regex specTokenRE(R"re((\w{1,4})/(\w{1,16})/(\d*))re"); - static const std::regex delimiter(","); + static const std::regex delim(","); + std::sregex_token_iterator end; std::sregex_token_iterator iter(config.begin(), config.end(), - delimiter, + delim, -1); - std::sregex_token_iterator end; - std::unique_ptr result; + std::unique_ptr next, result; for (; iter != end; ++iter) { std::smatch m; auto s = iter->str(); - std::regex_match(s, m, specTokenRE); - std::unique_ptr next; - auto newNode = std::make_unique( - DataDescriptorMatcher::Op::And, - OriginValueMatcher{m[1]}, - std::make_unique( - DataDescriptorMatcher::Op::And, - DescriptionValueMatcher{m[2]}, - SubSpecificationTypeValueMatcher{m[3]})); + auto newNode = buildNode(s); + if (result.get() == nullptr) { result = std::move(newNode); } else { @@ -272,5 +263,76 @@ DataDescriptorQuery DataDescriptorQueryBuilder::buildFromKeepConfig(std::string return std::move(DataDescriptorQuery{{}, std::move(result)}); } +DataDescriptorQuery DataDescriptorQueryBuilder::buildFromExtendedKeepConfig(std::string const& config) +{ + static const std::regex delim1(","); + static const std::regex delim2(":"); + + std::sregex_token_iterator end; + std::sregex_token_iterator iter1(config.begin(), + config.end(), + delim1, + -1); + + std::unique_ptr next, result; + + // looping over ','-separated items + for (; iter1 != end; ++iter1) { + auto s = iter1->str(); + + // get first part of item + std::sregex_token_iterator iter2(s.begin(), + s.end(), + delim2, + -1); + if (iter2 == end) + continue; + s = iter2->str(); + + // create the corresponding DataDescriptorMatcher + std::smatch m; + auto newNode = buildNode(s); + + if (result.get() == nullptr) { + result = std::move(newNode); + } else { + next = std::move(std::make_unique(DataDescriptorMatcher::Op::Or, + std::move(result), + std::move(newNode))); + result = std::move(next); + } + } + + return std::move(DataDescriptorQuery{{}, std::move(result)}); +} + +std::unique_ptr DataDescriptorQueryBuilder::buildNode(std::string const& nodeString) +{ + + std::smatch m = getTokens(nodeString); + + std::unique_ptr next; + auto newNode = std::make_unique( + DataDescriptorMatcher::Op::And, + OriginValueMatcher{m[1]}, + std::make_unique( + DataDescriptorMatcher::Op::And, + DescriptionValueMatcher{m[2]}, + SubSpecificationTypeValueMatcher{m[3]})); + + return newNode; +} + +std::smatch DataDescriptorQueryBuilder::getTokens(std::string const& nodeString) +{ + + static const std::regex specTokenRE(R"re((\w{1,4})/(\w{1,16})/(\d*))re"); + std::smatch m; + + std::regex_match(nodeString, m, specTokenRE); + + return m; +} + } // namespace framework } // namespace o2 diff --git a/Framework/Core/src/DataOutputDirector.cxx b/Framework/Core/src/DataOutputDirector.cxx new file mode 100644 index 0000000000000..c26c7ce94d81b --- /dev/null +++ b/Framework/Core/src/DataOutputDirector.cxx @@ -0,0 +1,190 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +#include "Framework/DataOutputDirector.h" + +namespace o2 +{ +namespace framework +{ +namespace data_matcher +{ + +DataOutputDirector::DataOutputDirector() +{ + defaultfname = std::string("AnalysisResults"); +} + +void DataOutputDirector::reset() +{ + dodescrs.clear(); + fnames.clear(); + closeDataOutputFiles(); + fouts.clear(); + fcnts.clear(); + ndod = 0; +}; + +void DataOutputDirector::readString(std::string const& keepString) +{ + // the keep-string keepString consists of ','-separated items + // create for each item a corresponding DataOutputDescriptor + + static const std::regex delim(","); + std::sregex_token_iterator end; + std::sregex_token_iterator iter(keepString.begin(), + keepString.end(), + delim, + -1); + + // loop over ','-separated items + for (; iter != end; ++iter) { + auto s = iter->str(); + + // create a new DataOutputDescriptor and add it to the list + auto dod = new DataOutputDescriptor(s); + if (dod->filename.empty()) + dod->setFilename(defaultfname); + + dodescrs.emplace_back(dod); + fnames.emplace_back(dod->filename); + tnfns.emplace_back(dod->treename + dod->filename); + } + + // the combination [tree name/file name] must be unique + // throw exception if this is not the case + auto it = std::unique(tnfns.begin(), tnfns.end()); + if (it != tnfns.end()) { + printOut(); + LOG(FATAL) << "Dublicate tree names in a file!"; + } + + // make unique/sorted list of fnames + std::sort(fnames.begin(), fnames.end()); + auto last = std::unique(fnames.begin(), fnames.end()); + fnames.erase(last, fnames.end()); + + // prepare list fouts of TFile and fcnts + for (auto fn : fnames) { + fouts.emplace_back(new TFile()); + fcnts.emplace_back(-1); + } + + // number of DataOutputDescriptors + ndod = dodescrs.size(); +} + +// creates a keep string from a InputSpec +std::string SpectoString(InputSpec input) +{ + std::string s; + std::string delim("/"); + + auto matcher = DataSpecUtils::asConcreteDataMatcher(input); + s += matcher.origin.str + delim; + s += matcher.description.str + delim; + s += std::to_string(matcher.subSpec); + + return s; +} + +void DataOutputDirector::readSpecs(std::vector inputs) +{ + + for (auto input : inputs) { + auto s = SpectoString(input); + readString(s); + } +} + +std::vector DataOutputDirector::getDataOutputDescriptors(header::DataHeader dh) +{ + std::vector result; + + // compute list of matching outputs + VariableContext context; + + for (auto dodescr : dodescrs) { + if (dodescr->matcher->match(dh, context)) + result.emplace_back(dodescr); + } + + return result; +} + +std::vector DataOutputDirector::getDataOutputDescriptors(InputSpec spec) +{ + std::vector result; + + // compute list of matching outputs + VariableContext context; + auto concrete = std::get(spec.matcher); + + for (auto dodescr : dodescrs) { + if (dodescr->matcher->match(concrete, context)) + result.emplace_back(dodescr); + } + + return result; +} + +TFile* DataOutputDirector::getDataOutputFile(DataOutputDescriptor* dod, + int ntf, int ntfmerge, + std::string filemode) +{ + // initialisation + TFile* fout = nullptr; + + // search dod->filename in fnames and return corresponding fout + auto it = std::find(fnames.begin(), fnames.end(), dod->filename); + if (it != fnames.end()) { + int ind = std::distance(fnames.begin(), it); + + // check if new version of file needs to be opened + int fcnt = (int)(ntf / ntfmerge); + if ((ntf % ntfmerge) == 0 && fcnt > fcnts[ind]) { + if (fouts[ind]) + fouts[ind]->Close(); + + fcnts[ind] = fcnt; + auto fn = fnames[ind] + "_" + std::to_string(fcnts[ind]) + ".root"; + fouts[ind] = new TFile(fn.c_str(), filemode.c_str()); + } + fout = fouts[ind]; + } + + return fout; +} + +void DataOutputDirector::closeDataOutputFiles() +{ + for (auto fout : fouts) + if (fout) + fout->Close(); +} + +void DataOutputDirector::printOut() +{ + LOG(INFO) << "DataOutputDirector"; + LOG(INFO) << " Default file name: " << defaultfname; + LOG(INFO) << " Number of dods : " << ndod; + LOG(INFO) << " Number of files : " << fnames.size(); + + LOG(INFO) << " dods:"; + for (auto const& ds : dodescrs) + ds->printOut(); + + LOG(INFO) << " File names:"; + for (auto const& fb : fnames) + LOG(INFO) << fb; +} + +} // namespace data_matcher +} // namespace framework +} // namespace o2 diff --git a/Framework/Core/src/DataSamplingConditionCustom.cxx b/Framework/Core/src/DataSamplingConditionCustom.cxx index 690d85bbcb41b..f02d9128df274 100644 --- a/Framework/Core/src/DataSamplingConditionCustom.cxx +++ b/Framework/Core/src/DataSamplingConditionCustom.cxx @@ -81,4 +81,4 @@ std::unique_ptr DataSamplingConditionFactory::createDataS return std::make_unique(); } -} // namespace o2 +} // namespace o2::framework diff --git a/Framework/Core/src/DataSamplingConditionNConsecutive.cxx b/Framework/Core/src/DataSamplingConditionNConsecutive.cxx index ad6e4fc59c47a..4d42df42f4ae0 100644 --- a/Framework/Core/src/DataSamplingConditionNConsecutive.cxx +++ b/Framework/Core/src/DataSamplingConditionNConsecutive.cxx @@ -65,4 +65,4 @@ std::unique_ptr DataSamplingConditionFactory::createDataS return std::make_unique(); } -} // namespace o2 +} // namespace o2::framework diff --git a/Framework/Core/src/WorkflowHelpers.cxx b/Framework/Core/src/WorkflowHelpers.cxx index 6af9ae0ad6f99..4ad686b7a36eb 100644 --- a/Framework/Core/src/WorkflowHelpers.cxx +++ b/Framework/Core/src/WorkflowHelpers.cxx @@ -311,24 +311,48 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); - /// This will create file sinks for dangling outputs of origin ... - /// . AOD - getGlobalAODSink - /// . not AOD - getGlobalFileSink + /// This will create different file sinks + /// . AOD - getGlobalAODSink + /// . dangling, not AOD - getGlobalFileSink /// - // First find all the dangling ouputs - auto danglingOutputsInputs = computeDanglingOutputs(workflow); + // First analyze all ouputs + // outputtypes = isAOD*2 + isdangling*1 + 0 + auto [OutputsInputs, outputtypes] = analyzeOutputs(workflow); - // From that list select the ones of origin AOD ... - // .. and remove them also from the original list - auto danglingOutputsInputsAOD = selectAODs(danglingOutputsInputs); + // file sink for any AOD output + extraSpecs.clear(); + + // select outputs of type AOD + std::vector OutputsInputsAOD; + std::vector isdangling; + for (int ii = 0; ii < OutputsInputs.size(); ii++) { + if ((outputtypes[ii] & 2) == 2) { + OutputsInputsAOD.emplace_back(OutputsInputs[ii]); + isdangling.emplace_back((outputtypes[ii] & 1) == 1); + } + } + + if (OutputsInputsAOD.size() > 0) { + auto fileSink = CommonDataProcessors::getGlobalAODSink(OutputsInputsAOD, + isdangling); + extraSpecs.push_back(fileSink); + } + workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); // file sink for notAOD dangling outputs extraSpecs.clear(); + // select dangling outputs which are not of type AOD + std::vector OutputsInputsDangling; + for (int ii = 0; ii < OutputsInputs.size(); ii++) { + if ((outputtypes[ii] & 1) == 1 && (outputtypes[ii] & 2) == 0) + OutputsInputsDangling.emplace_back(OutputsInputs[ii]); + } + std::vector unmatched; - if (danglingOutputsInputs.size() > 0) { - auto fileSink = CommonDataProcessors::getGlobalFileSink(danglingOutputsInputs, unmatched); - if (unmatched.size() != danglingOutputsInputs.size()) { + if (OutputsInputsDangling.size() > 0) { + auto fileSink = CommonDataProcessors::getGlobalFileSink(OutputsInputsDangling, unmatched); + if (unmatched.size() != OutputsInputsDangling.size()) { extraSpecs.push_back(fileSink); } } @@ -336,15 +360,6 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext extraSpecs.push_back(CommonDataProcessors::getDummySink(unmatched)); } workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); - - // file sink for AOD dangling outputs - extraSpecs.clear(); - - if (danglingOutputsInputsAOD.size() > 0) { - auto fileSink = CommonDataProcessors::getGlobalAODSink(danglingOutputsInputsAOD); - extraSpecs.push_back(fileSink); - } - workflow.insert(workflow.end(), extraSpecs.begin(), extraSpecs.end()); } void WorkflowHelpers::constructGraph(const WorkflowSpec& workflow, @@ -649,23 +664,28 @@ struct DataMatcherId { size_t id; }; -std::vector WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow) +std::tuple, std::vector> WorkflowHelpers::analyzeOutputs(WorkflowSpec const& workflow) { + LOG(INFO) << "Analyzing OutputSpecs"; - std::vector inputs; - std::vector outputs; - std::vector results; + // compute total number of input/output size_t totalInputs = 0; size_t totalOutputs = 0; - for (auto& spec : workflow) { totalInputs += spec.inputs.size(); totalOutputs += spec.outputs.size(); } + std::vector inputs; + std::vector outputs; inputs.reserve(totalInputs); outputs.reserve(totalOutputs); + std::vector results; + std::vector outputtypes; + results.reserve(totalOutputs); + outputtypes.reserve(totalOutputs); + /// Prepare an index to do the iterations quickly. for (size_t wi = 0, we = workflow.size(); wi != we; ++wi) { auto& spec = workflow[wi]; @@ -679,56 +699,72 @@ std::vector WorkflowHelpers::computeDanglingOutputs(WorkflowSpec cons for (size_t oi = 0, oe = outputs.size(); oi != oe; ++oi) { auto& output = outputs[oi]; + auto& outputSpec = workflow[output.workflowId].outputs[output.id]; + + // compute output type + unsigned char outputtype = 0; + + // is AOD? + if (DataSpecUtils::partialMatch(outputSpec, header::DataOrigin("AOD"))) + outputtype += 2; + + // is dangling output? bool matched = false; for (size_t ii = 0, ie = inputs.size(); ii != ie; ++ii) { auto& input = inputs[ii]; // Inputs of the same workflow cannot match outputs - if (output.workflowId == input.workflowId) { + if (output.workflowId == input.workflowId) continue; - } - auto& outputSpec = workflow[output.workflowId].outputs[output.id]; auto& inputSpec = workflow[input.workflowId].inputs[input.id]; if (DataSpecUtils::match(inputSpec, outputSpec)) { matched = true; break; } } + if (!matched) + outputtype += 1; - if (matched == false) { - auto& outputSpec = workflow[output.workflowId].outputs[output.id]; + // update results and outputtypes + auto input = DataSpecUtils::matchingInput(outputSpec); + char buf[64]; + input.binding = (snprintf(buf, 63, "output_%zu_%zu", output.workflowId, output.id), buf); + results.emplace_back(input); - auto input = DataSpecUtils::matchingInput(outputSpec); - char buf[64]; - input.binding = (snprintf(buf, 63, "dangling_%zu_%zu", output.workflowId, output.id), buf); - results.emplace_back(input); - } + outputtypes.emplace_back(outputtype); } - return results; + /* can be used for debuging + int ndang = 0; + int naod = 0; + int ndangother = 0; + for (auto ot : outputtypes) { + if ((ot & 1) == 1) + ndang++; + if ((ot & 2) == 2) + naod++; + if ((ot & 1) == 1 && (ot & 2) == 0) + ndangother++; + } + LOG(INFO) << "Number of outputs " << results.size(); + LOG(INFO) << " dangling " << ndang; + LOG(INFO) << " AOD " << naod; + LOG(INFO) << " dangling, not AOD " << ndangother; + */ + + return std::make_tuple(results, outputtypes); } -std::vector WorkflowHelpers::selectAODs(std::vector& specs) +std::vector WorkflowHelpers::computeDanglingOutputs(WorkflowSpec const& workflow) { - LOG(DEBUG) << "Selecting dangling OutputSpecs of origin AOD - " << specs.size(); - // create result list - std::vector results; - for (auto specit = specs.begin(); specit != specs.end();) { + auto [OutputsInputs, outputtypes] = analyzeOutputs(workflow); - // only add if origin=="AOD" - if (DataSpecUtils::partialMatch(*specit, header::DataOrigin("AOD"))) { - // add it to the AOD list ... - results.emplace_back(*specit); - - // ... and remove it from the original list - specit = specs.erase(specit); - } else { - ++specit; - } + std::vector results; + for (int ii = 0; ii < OutputsInputs.size(); ii++) { + if ((outputtypes[ii] & 1) == 1) + results.emplace_back(OutputsInputs[ii]); } - LOG(DEBUG) << "Number of AODs " << results.size() << " - " << specs.size() << std::endl; - return results; } diff --git a/Framework/Core/src/WorkflowHelpers.h b/Framework/Core/src/WorkflowHelpers.h index 085d0915efc10..d47a94c5c23b3 100644 --- a/Framework/Core/src/WorkflowHelpers.h +++ b/Framework/Core/src/WorkflowHelpers.h @@ -120,6 +120,11 @@ struct TopoIndexInfo { friend std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info); }; +struct OutputObj { + InputSpec spec; + bool isdangling; +}; + /// A set of internal helper classes to manipulate a Workflow struct WorkflowHelpers { /// Topological sort for a graph of @a nodeCount nodes. @@ -168,11 +173,17 @@ struct WorkflowHelpers { const std::vector& edges, const std::vector& index); - /// Given @a workflow it finds the OutputSpec in every module which do not have - /// a corresponding InputSpec. I.e. they are dangling. - /// @return a vector of InputSpec which would have matched said dangling outputs. + /// Given @a workflow it gathers all the OutputSpec and in addition provides + /// the information whether and output is dangling and/or of type AOD + /// An Output is dangling if it does not have a corresponding InputSpec. + /// The type of the output is encoded in an unsigend char whichs values are defined by + /// 0 + isdangling*1 + isAOD*2 + /// @return a vector of InputSpec of all outputs and a vector of unsigned char + /// with the encoded output type + static std::tuple, std::vector> analyzeOutputs(WorkflowSpec const& workflow); + + /// returns only dangling outputs static std::vector computeDanglingOutputs(WorkflowSpec const& workflow); - static std::vector selectAODs(std::vector& outputs); }; } // namespace o2::framework