1717#include " Framework/InputSpec.h"
1818#include " Framework/RawDeviceService.h"
1919#include " Framework/CommonServices.h"
20+ #include " Framework/DataTakingContext.h"
21+ #include " Framework/TimingInfo.h"
2022#include < fairmq/Device.h>
2123
2224#include " CTFWorkflow/CTFWriterSpec.h"
@@ -99,16 +101,17 @@ class CTFWriterSpec : public o2::framework::Task
99101 bool isPresent (DetID id) const { return mDets [id]; }
100102
101103 private:
104+ void updateTimeDependentParams (ProcessingContext& pc);
102105 template <typename C>
103106 size_t processDet (o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree);
104107 template <typename C>
105108 void storeDictionary (DetID det, CTFHeader& header);
106109 void storeDictionaries ();
107110 void closeTFTreeAndFile ();
108- void prepareTFTreeAndFile (const o2::header::DataHeader* dh );
111+ void prepareTFTreeAndFile ();
109112 size_t estimateCTFSize (ProcessingContext& pc);
110113 size_t getAvailableDiskSpace (const std::string& path, int level);
111- void createLockFile (const o2::header::DataHeader* dh, int level);
114+ void createLockFile (int level);
112115 void removeLockFile ();
113116 void finalize ();
114117
@@ -120,10 +123,8 @@ class CTFWriterSpec : public o2::framework::Task
120123 bool mStoreMetaFile = false ;
121124 int mVerbosity = 0 ;
122125 int mSaveDictAfter = 0 ; // if positive and mWriteCTF==true, save dictionary after each mSaveDictAfter TFs processed
123- int mFlagMinDet = 1 ; // append list of detectors to LHC period if their number is <= mFlagMinDet
124126 uint32_t mPrevDictTimeStamp = 0 ; // timestamp of the previously stored dictionary
125127 uint32_t mDictTimeStamp = 0 ; // timestamp of the currently stored dictionary
126- uint64_t mRun = 0 ;
127128 size_t mMinSize = 0 ; // if > 0, accumulate CTFs in the same tree until the total size exceeds this minimum
128129 size_t mMaxSize = 0 ; // if > MinSize, and accumulated size will exceed this value, stop accumulation (even if mMinSize is not reached)
129130 size_t mChkSize = 0 ; // if > 0 and fallback storage provided, reserve this size per CTF file in production on primary storage
@@ -136,10 +137,9 @@ class CTFWriterSpec : public o2::framework::Task
136137 size_t mNCTFFiles = 0 ; // total number of CTF files written
137138 int mMaxCTFPerFile = 0 ; // max CTFs per files to store
138139 std::vector<uint32_t > mTFOrbits {}; // 1st orbits of TF accumulated in current file
139-
140+ o2::framework::DataTakingContext mDataTakingContext {};
141+ o2::framework::TimingInfo mTimingInfo {};
140142 std::string mOutputType {}; // RS FIXME once global/local options clash is solved, --output-type will become device option
141- std::string mLHCPeriod {};
142- std::string mEnvironmentID {}; // partition env. id
143143 std::string mDictDir {};
144144 std::string mCTFDir {};
145145 std::string mCTFDirFallBack = " /dev/null" ;
@@ -172,7 +172,7 @@ const std::string CTFWriterSpec::TMPFileEnding{".part"};
172172
173173// ___________________________________________________________________
174174CTFWriterSpec::CTFWriterSpec (DetID::mask_t dm, uint64_t r, const std::string& outType, int verbosity)
175- : mDets (dm), mRun (r), mOutputType (outType), mVerbosity (verbosity)
175+ : mDets (dm), mOutputType (outType), mVerbosity (verbosity)
176176{
177177 std::for_each (mIsSaturatedFrequencyTable .begin (), mIsSaturatedFrequencyTable .end (), [](auto & bitset) { bitset.reset (); });
178178 mTimer .Stop ();
@@ -213,7 +213,6 @@ void CTFWriterSpec::init(InitContext& ic)
213213 mCTFMetaFileDir = o2::utils::Str::rectifyDirectory (mCTFMetaFileDir );
214214 mStoreMetaFile = true ;
215215 }
216- mFlagMinDet = ic.options ().get <int >(" append-det-to-period" );
217216 mCreateRunEnvDir = !ic.options ().get <bool >(" ignore-partition-run-dir" );
218217 mMinSize = ic.options ().get <int64_t >(" min-file-size" );
219218 mMaxSize = ic.options ().get <int64_t >(" max-file-size" );
@@ -238,6 +237,17 @@ void CTFWriterSpec::init(InitContext& ic)
238237 }
239238}
240239
240+ // ___________________________________________________________________
241+ void CTFWriterSpec::updateTimeDependentParams (ProcessingContext& pc)
242+ {
243+ static bool initOnceDone = false ;
244+ if (!initOnceDone) {
245+ initOnceDone = true ;
246+ mDataTakingContext = pc.services ().get <DataTakingContext>();
247+ }
248+ mTimingInfo = pc.services ().get <o2::framework::TimingInfo>();
249+ }
250+
241251// ___________________________________________________________________
242252// process data of particular detector
243253template <typename C>
@@ -351,65 +361,15 @@ void CTFWriterSpec::run(ProcessingContext& pc)
351361 const std::string NAStr = " NA" ;
352362 auto cput = mTimer .CpuTime ();
353363 mTimer .Start (false );
354- const auto ref = pc.inputs ().getFirstValid (true );
355- const auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
356- const auto dph = DataRefUtils::getHeader<DataProcessingHeader*>(ref);
357- auto oldRun = mRun ;
358- if (dh->runNumber != 0 ) {
359- mRun = dh->runNumber ;
360- }
361- // check runNumber with FMQ property, if set, override DH number
362- {
363- auto runNStr = pc.services ().get <RawDeviceService>().device ()->fConfig ->GetProperty <std::string>(" runNumber" , NAStr);
364- if (runNStr != NAStr) {
365- size_t nc = 0 ;
366- auto runNProp = std::stol (runNStr, &nc);
367- if (nc != runNStr.size ()) {
368- LOGP (error, " Property runNumber={} is provided but is not a number, ignoring" , runNStr);
369- } else {
370- mRun = runNProp;
371- }
372- }
373- }
374- auto oldEnv = mEnvironmentID ;
375- {
376- auto envN = pc.services ().get <RawDeviceService>().device ()->fConfig ->GetProperty <std::string>(" environment_id" , NAStr);
377- if (envN != NAStr) {
378- mEnvironmentID = envN;
379- }
380- }
381- if ((oldRun != 0 && oldRun != mRun ) || (!oldEnv.empty () && oldEnv != mEnvironmentID )) {
382- LOGP (warning, " RunNumber/Environment changed from {}/{} to {}/{}" , oldRun, oldEnv, mRun , mEnvironmentID );
383- closeTFTreeAndFile ();
384- }
385- // check for the LHCPeriod
386- if (mLHCPeriod .empty ()) {
387- auto LHCPeriodStr = pc.services ().get <RawDeviceService>().device ()->fConfig ->GetProperty <std::string>(" LHCPeriod" , NAStr);
388- if (LHCPeriodStr != NAStr) {
389- mLHCPeriod = LHCPeriodStr;
390- } else {
391- const char * months[12 ] = {" JAN" , " FEB" , " MAR" , " APR" , " MAY" , " JUN" , " JUL" , " AUG" , " SEP" , " OCT" , " NOV" , " DEC" };
392- time_t now = time (nullptr );
393- auto ltm = gmtime (&now);
394- mLHCPeriod = months[ltm->tm_mon ];
395- LOG (warning) << " LHCPeriod is not available, using current month " << mLHCPeriod ;
396- }
397- if (mDets .count () <= mFlagMinDet ) { // flag participating detectors
398- for (auto id = DetID::First; id <= DetID::Last; id++) {
399- if (isPresent (id)) {
400- mLHCPeriod += fmt::format (" _{}" , DetID::getName (id));
401- }
402- }
403- }
404- }
364+ updateTimeDependentParams (pc);
405365
406366 mCurrCTFSize = estimateCTFSize (pc);
407367 if (mWriteCTF ) {
408- prepareTFTreeAndFile (dh );
368+ prepareTFTreeAndFile ();
409369 }
410370
411371 // create header
412- CTFHeader header{mRun , dph-> creation , dh-> firstTForbit , dh-> tfCounter };
372+ CTFHeader header{mTimingInfo . runNumber , mTimingInfo . creation , mTimingInfo . firstTFOrbit , mTimingInfo . tfCounter };
413373 size_t szCTF = 0 ;
414374 szCTF += processDet<o2::itsmft::CTF>(pc, DetID::ITS, header, mCTFTreeOut .get ());
415375 szCTF += processDet<o2::itsmft::CTF>(pc, DetID::MFT, header, mCTFTreeOut .get ());
@@ -434,7 +394,7 @@ void CTFWriterSpec::run(ProcessingContext& pc)
434394 szCTF += appendToTree (*mCTFTreeOut .get (), " CTFHeader" , header);
435395 mAccCTFSize += szCTF;
436396 mCTFTreeOut ->SetEntries (++mNAccCTF );
437- mTFOrbits .push_back (dh-> firstTForbit );
397+ mTFOrbits .push_back (mTimingInfo . firstTFOrbit );
438398 LOG (info) << " TF#" << mNCTF << " : wrote CTF{" << header << " } of size " << szCTF << " to " << mCurrentCTFFileNameFull << " in " << mTimer .CpuTime () - cput << " s" ;
439399 if (mNAccCTF > 1 ) {
440400 LOG (info) << " Current CTF tree has " << mNAccCTF << " entries with total size of " << mAccCTFSize << " bytes" ;
@@ -480,7 +440,7 @@ void CTFWriterSpec::finalize()
480440}
481441
482442// ___________________________________________________________________
483- void CTFWriterSpec::prepareTFTreeAndFile (const o2::header::DataHeader* dh )
443+ void CTFWriterSpec::prepareTFTreeAndFile ()
484444{
485445 if (!mWriteCTF ) {
486446 return ;
@@ -498,25 +458,25 @@ void CTFWriterSpec::prepareTFTreeAndFile(const o2::header::DataHeader* dh)
498458 }
499459 if (needToOpen) {
500460 closeTFTreeAndFile ();
501- auto fname = o2::base::NameConf::getCTFFileName (mRun , dh-> firstTForbit , dh-> tfCounter );
461+ auto fname = o2::base::NameConf::getCTFFileName (mTimingInfo . runNumber , mTimingInfo . firstTFOrbit , mTimingInfo . tfCounter );
502462 auto ctfDir = mCTFDir .empty () ? o2::utils::Str::rectifyDirectory (" ./" ) : mCTFDir ;
503463 if (mChkSize > 0 && (mCTFDirFallBack != " /dev/null" )) {
504- createLockFile (dh, 0 );
464+ createLockFile (0 );
505465 auto sz = getAvailableDiskSpace (ctfDir, 0 ); // check main storage
506466 if (sz < mChkSize ) {
507467 removeLockFile ();
508468 LOG (warning) << " Primary CTF output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one" ;
509469 ctfDir = mCTFDirFallBack ;
510470 }
511471 }
512- if (mCreateRunEnvDir && !mEnvironmentID . empty ()) {
513- ctfDir += fmt::format (" {}_{}/" , mEnvironmentID , mRun );
472+ if (mCreateRunEnvDir && !mDataTakingContext . envId . empty () && ( mDataTakingContext . envId != o2::framework::DataTakingContext::UNKNOWN )) {
473+ ctfDir += fmt::format (" {}_{}/" , mDataTakingContext . envId , mDataTakingContext . runNumber );
514474 if (!ctfDir.empty ()) {
515475 o2::utils::createDirectoriesIfAbsent (ctfDir);
516476 LOGP (info, " Created {} directory for CTFs output" , ctfDir);
517477 }
518478 }
519- mCurrentCTFFileName = o2::base::NameConf::getCTFFileName (mRun , dh-> firstTForbit , dh-> tfCounter );
479+ mCurrentCTFFileName = o2::base::NameConf::getCTFFileName (mTimingInfo . runNumber , mTimingInfo . firstTFOrbit , mTimingInfo . tfCounter );
520480 mCurrentCTFFileNameFull = fmt::format (" {}{}" , ctfDir, mCurrentCTFFileName );
521481 mCTFFileOut .reset (TFile::Open (fmt::format (" {}{}" , mCurrentCTFFileNameFull , TMPFileEnding).c_str (), " recreate" )); // to prevent premature external usage, use temporary name
522482 mCTFTreeOut = std::make_unique<TTree>(std::string (o2::base::NameConf::CTFTREENAME).c_str (), " O2 CTF tree" );
@@ -542,20 +502,15 @@ void CTFWriterSpec::closeTFTreeAndFile()
542502 if (mStoreMetaFile ) {
543503 o2::dataformats::FileMetaData ctfMetaData;
544504 ctfMetaData.fillFileData (mCurrentCTFFileNameFull );
545- ctfMetaData.run = mRun ;
546- ctfMetaData.LHCPeriod = mLHCPeriod ;
505+ ctfMetaData.setDataTakingContext (mDataTakingContext );
547506 ctfMetaData.type = " raw" ;
548507 ctfMetaData.priority = " high" ;
508+ ctfMetaData.tfOrbits .swap (mTFOrbits );
549509 auto metaFileNameTmp = fmt::format (" {}{}.tmp" , mCTFMetaFileDir , mCurrentCTFFileName );
550510 auto metaFileName = fmt::format (" {}{}.done" , mCTFMetaFileDir , mCurrentCTFFileName );
551511 try {
552512 std::ofstream metaFileOut (metaFileNameTmp);
553513 metaFileOut << ctfMetaData;
554- metaFileOut << " TFOrbits: " ;
555- for (size_t i = 0 ; i < mTFOrbits .size (); i++) {
556- metaFileOut << fmt::format (" {}{}" , i ? " , " : " " , mTFOrbits [i]);
557- }
558- metaFileOut << ' \n ' ;
559514 metaFileOut.close ();
560515 std::filesystem::rename (metaFileNameTmp, metaFileName);
561516 } catch (std::exception const & e) {
@@ -584,7 +539,7 @@ void CTFWriterSpec::storeDictionaries()
584539 mDictFileOut .reset (TFile::Open (dictFileName.c_str (), " recreate" ));
585540 mDictTreeOut = std::make_unique<TTree>(std::string (o2::base::NameConf::CTFDICT).c_str (), " O2 CTF dictionary" );
586541
587- CTFHeader header{mRun , uint32_t (mNCTF )};
542+ CTFHeader header{mTimingInfo . runNumber , uint32_t (mNCTF )};
588543 storeDictionary<o2::itsmft::CTF>(DetID::ITS, header);
589544 storeDictionary<o2::itsmft::CTF>(DetID::MFT, header);
590545 storeDictionary<o2::tpc::CTF>(DetID::TPC, header);
@@ -625,11 +580,11 @@ void CTFWriterSpec::storeDictionaries()
625580}
626581
627582// ___________________________________________________________________
628- void CTFWriterSpec::createLockFile (const o2::header::DataHeader* dh, int level)
583+ void CTFWriterSpec::createLockFile (int level)
629584{
630585 // create lock file for the CTF to be written to the storage of given level
631586 while (1 ) {
632- mLockFileName = fmt::format (" {}/ctfs{}-{}_{}_{}_{}.lock" , LOCKFileDir, level, o2::utils::Str::getRandomString (8 ), mRun , dh-> firstTForbit , dh-> tfCounter );
587+ mLockFileName = fmt::format (" {}/ctfs{}-{}_{}_{}_{}.lock" , LOCKFileDir, level, o2::utils::Str::getRandomString (8 ), mTimingInfo . runNumber , mTimingInfo . firstTFOrbit , mTimingInfo . tfCounter );
633588 if (!std::filesystem::exists (mLockFileName )) {
634589 break ;
635590 }
@@ -728,7 +683,6 @@ DataProcessorSpec getCTFWriterSpec(DetID::mask_t dets, uint64_t run, const std::
728683 {" output-dir" , VariantType::String, " none" , {" CTF output directory, must exist" }},
729684 {" output-dir-alt" , VariantType::String, " /dev/null" , {" Alternative CTF output directory, must exist (if not /dev/null)" }},
730685 {" meta-output-dir" , VariantType::String, " /dev/null" , {" CTF metadata output directory, must exist (if not /dev/null)" }},
731- {" append-det-to-period" , VariantType::Int, 1 , {" Append detectors name to LHCPeriod in metadata if their number is does not exceed this" }},
732686 {" min-file-size" , VariantType::Int64, 0l , {" accumulate CTFs until given file size reached" }},
733687 {" max-file-size" , VariantType::Int64, 0l , {" if > 0, try to avoid exceeding given file size, also used for space check" }},
734688 {" max-ctf-per-file" , VariantType::Int, 0 , {" if > 0, avoid storing more than requested CTFs per file" }},
0 commit comments