@@ -72,14 +72,14 @@ const static std::unordered_map<OutputObjHandlingPolicy, std::string> ROOTfileNa
7272 {OutputObjHandlingPolicy::QAObject, " QAResults.root" }};
7373
7474// =============================================================================
75- DataProcessorSpec CommonDataProcessors::getHistogramRegistrySink (outputObjects const & objmap, const outputTasks& tskmap)
75+ DataProcessorSpec CommonDataProcessors::getOutputObjHistSink (outputObjects const & objmap, outputTasks const & tskmap)
7676{
7777 auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function<void (ProcessingContext&)> {
7878 auto & callbacks = ic.services ().get <CallbackService>();
7979 auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
8080
8181 auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
82- LOG (DEBUG ) << " Writing merged histograms to file" ;
82+ LOG (DEBUG ) << " Writing merged objects and histograms to file" ;
8383 if (inputObjects->empty ()) {
8484 LOG (ERROR ) << " Output object map is empty!" ;
8585 context.services ().get <ControlService>().readyToQuit (QuitRequest::Me);
@@ -123,149 +123,25 @@ DataProcessorSpec CommonDataProcessors::getHistogramRegistrySink(outputObjects c
123123 };
124124
125125 TDirectory* currentDir = f[route.policy ]->GetDirectory (currentDirectory.c_str ());
126- TList* outputList = (TList*)entry.obj ;
127- outputList->SetOwner (false );
128-
129- // if registry should live in dedicated folder a TNamed object is appended to the list
130- if (outputList->Last ()->IsA () == TNamed::Class ()) {
131- delete outputList->Last ();
132- outputList->RemoveLast ();
133- currentDir = currentDir->mkdir (outputList->GetName (), outputList->GetName (), true );
134- }
135-
136- writeListToFile (outputList, currentDir);
137- outputList->SetOwner ();
138- delete outputList;
139- entry.obj = nullptr ;
140- }
141- }
142- for (auto i = 0u ; i < OutputObjHandlingPolicy::numPolicies; ++i) {
143- if (f[i] != nullptr ) {
144- f[i]->Close ();
145- }
146- }
147- LOG (INFO ) << " All outputs merged in their respective target files" ;
148- context.services ().get <ControlService>().readyToQuit (QuitRequest::Me);
149- };
150-
151- callbacks.set (CallbackService::Id::EndOfStream, endofdatacb);
152- return [inputObjects, objmap, tskmap](ProcessingContext& pc) mutable -> void {
153- auto const & ref = pc.inputs ().get (" y" );
154- if (!ref.header ) {
155- LOG (ERROR ) << " Header not found" ;
156- return ;
157- }
158- if (!ref.payload ) {
159- LOG (ERROR ) << " Payload not found" ;
160- return ;
161- }
162- auto datah = o2::header::get<o2::header::DataHeader*>(ref.header );
163- if (!datah) {
164- LOG (ERROR ) << " No data header in stack" ;
165- return ;
166- }
167-
168- auto objh = o2::header::get<o2::framework::OutputObjHeader*>(ref.header );
169- if (!objh) {
170- LOG (ERROR ) << " No output object header in stack" ;
171- return ;
172- }
173-
174- FairTMessage tm (const_cast <char *>(ref.payload ), static_cast <int >(datah->payloadSize ));
175- InputObject obj;
176- obj.kind = tm.GetClass ();
177- if (obj.kind == nullptr ) {
178- LOG (error) << " Cannot read class info from buffer." ;
179- return ;
180- }
181-
182- auto policy = objh->mPolicy ;
183- auto hash = objh->mTaskHash ;
184-
185- obj.obj = tm.ReadObjectAny (obj.kind );
186- TNamed* named = static_cast <TNamed*>(obj.obj );
187- obj.name = named->GetName ();
188-
189- auto hpos = std::find_if (tskmap.begin (), tskmap.end (), [&](auto && x) { return x.first == hash; });
190- if (hpos == tskmap.end ()) {
191- LOG (ERROR ) << " No task found for hash " << hash;
192- return ;
193- }
194- auto taskname = hpos->second ;
195- auto opos = std::find_if (objmap.begin (), objmap.end (), [&](auto && x) { return x.first == hash; });
196- if (opos == objmap.end ()) {
197- LOG (ERROR ) << " No object list found for task " << taskname << " (hash=" << hash << " )" ;
198- return ;
199- }
200- auto objects = opos->second ;
201- if (std::find (objects.begin (), objects.end (), obj.name ) == objects.end ()) {
202- LOG (ERROR ) << " No object " << obj.name << " in map for task " << taskname;
203- return ;
204- }
205- auto nameHash = compile_time_hash (obj.name .c_str ());
206- InputObjectRoute key{obj.name , nameHash, taskname, hash, policy};
207- auto existing = std::find_if (inputObjects->begin (), inputObjects->end (), [&](auto && x) { return (x.first .uniqueId == nameHash) && (x.first .taskHash == hash); });
208- if (existing == inputObjects->end ()) {
209- inputObjects->push_back (std::make_pair (key, obj));
210- return ;
211- }
212- auto merger = existing->second .kind ->GetMerge ();
213- if (!merger) {
214- LOG (ERROR ) << " Already one unmergeable object found for " << obj.name ;
215- return ;
216- }
217-
218- TList coll;
219- coll.Add (static_cast <TObject*>(obj.obj ));
220- merger (existing->second .obj , &coll, nullptr );
221- };
222- };
223-
224- DataProcessorSpec spec{
225- " internal-dpl-global-analysis-file-sink" ,
226- {InputSpec (" y" , DataSpecUtils::dataDescriptorMatcherFrom (header::DataOrigin{" HIST" }))},
227- Outputs{},
228- AlgorithmSpec (writerFunction),
229- {}};
230-
231- return spec;
232- }
233-
234- DataProcessorSpec CommonDataProcessors::getOutputObjSink (outputObjects const & objmap, outputTasks const & tskmap)
235- {
236- auto writerFunction = [objmap, tskmap](InitContext& ic) -> std::function<void (ProcessingContext&)> {
237- auto & callbacks = ic.services ().get <CallbackService>();
238- auto inputObjects = std::make_shared<std::vector<std::pair<InputObjectRoute, InputObject>>>();
239-
240- auto endofdatacb = [inputObjects](EndOfStreamContext& context) {
241- LOG (DEBUG ) << " Writing merged objects to file" ;
242- if (inputObjects->empty ()) {
243- LOG (ERROR ) << " Output object map is empty!" ;
244- context.services ().get <ControlService>().readyToQuit (QuitRequest::Me);
245- return ;
246- }
247- std::string currentDirectory = " " ;
248- std::string currentFile = " " ;
249- TFile* f[OutputObjHandlingPolicy::numPolicies];
250- for (auto i = 0u ; i < OutputObjHandlingPolicy::numPolicies; ++i) {
251- f[i] = nullptr ;
252- }
253- for (auto & [route, entry] : *inputObjects) {
254- auto file = ROOTfileNames.find (route.policy );
255- if (file != ROOTfileNames.end ()) {
256- auto filename = file->second ;
257- if (f[route.policy ] == nullptr ) {
258- f[route.policy ] = TFile::Open (filename.c_str (), " RECREATE" );
259- }
260- auto nextDirectory = route.directory ;
261- if ((nextDirectory != currentDirectory) || (filename != currentFile)) {
262- if (!f[route.policy ]->FindKey (nextDirectory.c_str ())) {
263- f[route.policy ]->mkdir (nextDirectory.c_str ());
126+ TNamed* named = static_cast <TNamed*>(entry.obj );
127+ if (named->InheritsFrom (TList::Class ())) {
128+ TList* outputList = (TList*)entry.obj ;
129+ outputList->SetOwner (false );
130+
131+ // if registry should live in dedicated folder a TNamed object is appended to the list
132+ if (outputList->Last ()->IsA () == TNamed::Class ()) {
133+ delete outputList->Last ();
134+ outputList->RemoveLast ();
135+ currentDir = currentDir->mkdir (outputList->GetName (), outputList->GetName (), true );
264136 }
265- currentDirectory = nextDirectory;
266- currentFile = filename;
137+
138+ writeListToFile (outputList, currentDir);
139+ outputList->SetOwner ();
140+ delete outputList;
141+ entry.obj = nullptr ;
142+ } else {
143+ currentDir->WriteObjectAny (entry.obj , entry.kind , entry.name .c_str ());
267144 }
268- (f[route.policy ]->GetDirectory (currentDirectory.c_str ()))->WriteObjectAny (entry.obj , entry.kind , entry.name .c_str ());
269145 }
270146 }
271147 for (auto i = 0u ; i < OutputObjHandlingPolicy::numPolicies; ++i) {
0 commit comments