Skip to content

Commit 63a9498

Browse files
committed
Use DataTakingContext and TimingInfo provided by the framework
Instead of querying it from the FMQ properties
1 parent c423b11 commit 63a9498

13 files changed

Lines changed: 138 additions & 216 deletions

File tree

DataFormats/Detectors/Common/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ o2_add_library(DetectorsCommonDataFormats
2626
O2::FrameworkLogger
2727
O2::Headers
2828
O2::rANS
29+
O2::Framework
2930
O2::CommonUtils)
3031

3132
o2_target_root_dictionary(

DataFormats/Detectors/Common/include/DetectorsCommonDataFormats/FileMetaData.h

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,37 @@
1515
#define _ALICEO2_FILE_METADATA_H
1616

1717
#include <string>
18+
#include <vector>
1819

1920
namespace o2
2021
{
22+
namespace framework
23+
{
24+
class DataTakingContext;
25+
}
2126
namespace dataformats
2227
{
2328

24-
struct FileMetaData {
29+
struct FileMetaData { // https://docs.google.com/document/d/1nH9EZEFBSpuZwOWs3RBcfy-6aRChgAqClBv6G06MjH4/edit
2530
std::string LHCPeriod{}; // 1, LHC data taking period + detector name, in case of individual detector data stream, required
2631
std::string lurl{}; // 3, the local EPN path to the CTF or calibration file, required
27-
std::string type{}; // 4, CTF or calibration; default is CTF, optional
32+
std::string type{}; // 4, raw or calib; default is raw, optional
2833
std::string guid{}; // 7, default is auto-generated, optional
2934
std::string surl{}; // 8, the remote storage path where we store the data file, optional
3035
std::string curl{}; // 9, the Grid catalogue path, optional
3136
std::string md5{}; //10, default the checksum of the lurl file; only filled after a successful transfer, if needed, optional
3237
std::string xxhash{}; //11, default calculated from the lurl file, only filled after a successful transfer, if needed, optional
33-
std::string seName{}; //12, default is taken from the configuration file
34-
std::string seioDaemons{}; //13, default is taken from the configuration file
35-
std::string priority{}; //14, low or high; default is low
36-
long run{}; // 2, run number, required
38+
std::string seName{}; // 12, default is taken from the configuration file, optional
39+
std::string seioDaemons{}; // 13, default is taken from the configuration file, optional
40+
std::string priority{}; // 14, low or high; default is low, optional
41+
std::string detComposition{}; // 17, contains a list of detectors, optional
42+
std::string run{}; // 2, run number, required
3743
long ctime{}; // 5, default the timestamp of the lurl file, optional
3844
size_t size{}; // 6, default the size of the lurl file, optional
39-
45+
int persistent{}; // 16, default is forever, optional
46+
std::vector<uint32_t> tfOrbits; // 15, comma-sep. list of 1st orbits of TFs, optional
4047
bool fillFileData(const std::string& fname);
48+
void setDataTakingContext(const o2::framework::DataTakingContext& dtc);
4149
std::string asString() const;
4250
};
4351

DataFormats/Detectors/Common/src/FileMetaData.cxx

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
/// @brief meta data of the file produced by O2
1313

1414
#include "DetectorsCommonDataFormats/FileMetaData.h"
15+
#include "Framework/DataTakingContext.h"
1516
#include <Framework/Logger.h>
1617
#include <TMD5.h>
1718
#include <filesystem>
@@ -77,9 +78,30 @@ std::string FileMetaData::asString() const
7778
if (!priority.empty()) {
7879
ms += fmt::format("priority: {}\n", priority);
7980
}
81+
if (persistent) {
82+
ms += fmt::format("persistent: {}\n", persistent);
83+
}
84+
if (!detComposition.empty()) {
85+
ms += fmt::format("det_composition: {}\n", detComposition);
86+
}
87+
if (!tfOrbits.empty()) {
88+
ms += fmt::format("TFOrbits: {}", tfOrbits[0]);
89+
for (size_t i = 1; i < tfOrbits.size(); i++) {
90+
ms += fmt::format(",{}", tfOrbits[i]);
91+
}
92+
ms += "\n";
93+
}
94+
8095
return ms;
8196
}
8297

98+
void FileMetaData::setDataTakingContext(const o2::framework::DataTakingContext& dtc)
99+
{
100+
LHCPeriod = dtc.lhcPeriod;
101+
detComposition = dtc.detectors;
102+
run = dtc.runNumber;
103+
}
104+
83105
std::ostream& o2::dataformats::operator<<(std::ostream& stream, const FileMetaData& h)
84106
{
85107
stream << h.asString();

Detectors/CTF/workflow/src/CTFWriterSpec.cxx

Lines changed: 34 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
#include "Framework/InputSpec.h"
1818
#include "Framework/RawDeviceService.h"
1919
#include "Framework/CommonServices.h"
20+
#include "Framework/DataTakingContext.h"
21+
#include "Framework/TimingInfo.h"
2022
#include <fairmq/Device.h>
2123

2224
#include "CTFWorkflow/CTFWriterSpec.h"
@@ -99,16 +101,17 @@ class CTFWriterSpec : public o2::framework::Task
99101
bool isPresent(DetID id) const { return mDets[id]; }
100102

101103
private:
104+
void updateTimeDependentParams(ProcessingContext& pc);
102105
template <typename C>
103106
size_t processDet(o2::framework::ProcessingContext& pc, DetID det, CTFHeader& header, TTree* tree);
104107
template <typename C>
105108
void storeDictionary(DetID det, CTFHeader& header);
106109
void storeDictionaries();
107110
void closeTFTreeAndFile();
108-
void prepareTFTreeAndFile(const o2::header::DataHeader* dh);
111+
void prepareTFTreeAndFile();
109112
size_t estimateCTFSize(ProcessingContext& pc);
110113
size_t getAvailableDiskSpace(const std::string& path, int level);
111-
void createLockFile(const o2::header::DataHeader* dh, int level);
114+
void createLockFile(int level);
112115
void removeLockFile();
113116
void finalize();
114117

@@ -120,10 +123,8 @@ class CTFWriterSpec : public o2::framework::Task
120123
bool mStoreMetaFile = false;
121124
int mVerbosity = 0;
122125
int mSaveDictAfter = 0; // if positive and mWriteCTF==true, save dictionary after each mSaveDictAfter TFs processed
123-
int mFlagMinDet = 1; // append list of detectors to LHC period if their number is <= mFlagMinDet
124126
uint32_t mPrevDictTimeStamp = 0; // timestamp of the previously stored dictionary
125127
uint32_t mDictTimeStamp = 0; // timestamp of the currently stored dictionary
126-
uint64_t mRun = 0;
127128
size_t mMinSize = 0; // if > 0, accumulate CTFs in the same tree until the total size exceeds this minimum
128129
size_t mMaxSize = 0; // if > MinSize, and accumulated size will exceed this value, stop accumulation (even if mMinSize is not reached)
129130
size_t mChkSize = 0; // if > 0 and fallback storage provided, reserve this size per CTF file in production on primary storage
@@ -136,10 +137,9 @@ class CTFWriterSpec : public o2::framework::Task
136137
size_t mNCTFFiles = 0; // total number of CTF files written
137138
int mMaxCTFPerFile = 0; // max CTFs per files to store
138139
std::vector<uint32_t> mTFOrbits{}; // 1st orbits of TF accumulated in current file
139-
140+
o2::framework::DataTakingContext mDataTakingContext{};
141+
o2::framework::TimingInfo mTimingInfo{};
140142
std::string mOutputType{}; // RS FIXME once global/local options clash is solved, --output-type will become device option
141-
std::string mLHCPeriod{};
142-
std::string mEnvironmentID{}; // partition env. id
143143
std::string mDictDir{};
144144
std::string mCTFDir{};
145145
std::string mCTFDirFallBack = "/dev/null";
@@ -172,7 +172,7 @@ const std::string CTFWriterSpec::TMPFileEnding{".part"};
172172

173173
//___________________________________________________________________
174174
CTFWriterSpec::CTFWriterSpec(DetID::mask_t dm, uint64_t r, const std::string& outType, int verbosity)
175-
: mDets(dm), mRun(r), mOutputType(outType), mVerbosity(verbosity)
175+
: mDets(dm), mOutputType(outType), mVerbosity(verbosity)
176176
{
177177
std::for_each(mIsSaturatedFrequencyTable.begin(), mIsSaturatedFrequencyTable.end(), [](auto& bitset) { bitset.reset(); });
178178
mTimer.Stop();
@@ -213,7 +213,6 @@ void CTFWriterSpec::init(InitContext& ic)
213213
mCTFMetaFileDir = o2::utils::Str::rectifyDirectory(mCTFMetaFileDir);
214214
mStoreMetaFile = true;
215215
}
216-
mFlagMinDet = ic.options().get<int>("append-det-to-period");
217216
mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
218217
mMinSize = ic.options().get<int64_t>("min-file-size");
219218
mMaxSize = ic.options().get<int64_t>("max-file-size");
@@ -238,6 +237,17 @@ void CTFWriterSpec::init(InitContext& ic)
238237
}
239238
}
240239

240+
//___________________________________________________________________
241+
void CTFWriterSpec::updateTimeDependentParams(ProcessingContext& pc)
242+
{
243+
static bool initOnceDone = false;
244+
if (!initOnceDone) {
245+
initOnceDone = true;
246+
mDataTakingContext = pc.services().get<DataTakingContext>();
247+
}
248+
mTimingInfo = pc.services().get<o2::framework::TimingInfo>();
249+
}
250+
241251
//___________________________________________________________________
242252
// process data of particular detector
243253
template <typename C>
@@ -351,65 +361,15 @@ void CTFWriterSpec::run(ProcessingContext& pc)
351361
const std::string NAStr = "NA";
352362
auto cput = mTimer.CpuTime();
353363
mTimer.Start(false);
354-
const auto ref = pc.inputs().getFirstValid(true);
355-
const auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(ref);
356-
const auto dph = DataRefUtils::getHeader<DataProcessingHeader*>(ref);
357-
auto oldRun = mRun;
358-
if (dh->runNumber != 0) {
359-
mRun = dh->runNumber;
360-
}
361-
// check runNumber with FMQ property, if set, override DH number
362-
{
363-
auto runNStr = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("runNumber", NAStr);
364-
if (runNStr != NAStr) {
365-
size_t nc = 0;
366-
auto runNProp = std::stol(runNStr, &nc);
367-
if (nc != runNStr.size()) {
368-
LOGP(error, "Property runNumber={} is provided but is not a number, ignoring", runNStr);
369-
} else {
370-
mRun = runNProp;
371-
}
372-
}
373-
}
374-
auto oldEnv = mEnvironmentID;
375-
{
376-
auto envN = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", NAStr);
377-
if (envN != NAStr) {
378-
mEnvironmentID = envN;
379-
}
380-
}
381-
if ((oldRun != 0 && oldRun != mRun) || (!oldEnv.empty() && oldEnv != mEnvironmentID)) {
382-
LOGP(warning, "RunNumber/Environment changed from {}/{} to {}/{}", oldRun, oldEnv, mRun, mEnvironmentID);
383-
closeTFTreeAndFile();
384-
}
385-
// check for the LHCPeriod
386-
if (mLHCPeriod.empty()) {
387-
auto LHCPeriodStr = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("LHCPeriod", NAStr);
388-
if (LHCPeriodStr != NAStr) {
389-
mLHCPeriod = LHCPeriodStr;
390-
} else {
391-
const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
392-
time_t now = time(nullptr);
393-
auto ltm = gmtime(&now);
394-
mLHCPeriod = months[ltm->tm_mon];
395-
LOG(warning) << "LHCPeriod is not available, using current month " << mLHCPeriod;
396-
}
397-
if (mDets.count() <= mFlagMinDet) { // flag participating detectors
398-
for (auto id = DetID::First; id <= DetID::Last; id++) {
399-
if (isPresent(id)) {
400-
mLHCPeriod += fmt::format("_{}", DetID::getName(id));
401-
}
402-
}
403-
}
404-
}
364+
updateTimeDependentParams(pc);
405365

406366
mCurrCTFSize = estimateCTFSize(pc);
407367
if (mWriteCTF) {
408-
prepareTFTreeAndFile(dh);
368+
prepareTFTreeAndFile();
409369
}
410370

411371
// create header
412-
CTFHeader header{mRun, dph->creation, dh->firstTForbit, dh->tfCounter};
372+
CTFHeader header{mTimingInfo.runNumber, mTimingInfo.creation, mTimingInfo.firstTFOrbit, mTimingInfo.tfCounter};
413373
size_t szCTF = 0;
414374
szCTF += processDet<o2::itsmft::CTF>(pc, DetID::ITS, header, mCTFTreeOut.get());
415375
szCTF += processDet<o2::itsmft::CTF>(pc, DetID::MFT, header, mCTFTreeOut.get());
@@ -434,7 +394,7 @@ void CTFWriterSpec::run(ProcessingContext& pc)
434394
szCTF += appendToTree(*mCTFTreeOut.get(), "CTFHeader", header);
435395
mAccCTFSize += szCTF;
436396
mCTFTreeOut->SetEntries(++mNAccCTF);
437-
mTFOrbits.push_back(dh->firstTForbit);
397+
mTFOrbits.push_back(mTimingInfo.firstTFOrbit);
438398
LOG(info) << "TF#" << mNCTF << ": wrote CTF{" << header << "} of size " << szCTF << " to " << mCurrentCTFFileNameFull << " in " << mTimer.CpuTime() - cput << " s";
439399
if (mNAccCTF > 1) {
440400
LOG(info) << "Current CTF tree has " << mNAccCTF << " entries with total size of " << mAccCTFSize << " bytes";
@@ -480,7 +440,7 @@ void CTFWriterSpec::finalize()
480440
}
481441

482442
//___________________________________________________________________
483-
void CTFWriterSpec::prepareTFTreeAndFile(const o2::header::DataHeader* dh)
443+
void CTFWriterSpec::prepareTFTreeAndFile()
484444
{
485445
if (!mWriteCTF) {
486446
return;
@@ -498,25 +458,25 @@ void CTFWriterSpec::prepareTFTreeAndFile(const o2::header::DataHeader* dh)
498458
}
499459
if (needToOpen) {
500460
closeTFTreeAndFile();
501-
auto fname = o2::base::NameConf::getCTFFileName(mRun, dh->firstTForbit, dh->tfCounter);
461+
auto fname = o2::base::NameConf::getCTFFileName(mTimingInfo.runNumber, mTimingInfo.firstTFOrbit, mTimingInfo.tfCounter);
502462
auto ctfDir = mCTFDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mCTFDir;
503463
if (mChkSize > 0 && (mCTFDirFallBack != "/dev/null")) {
504-
createLockFile(dh, 0);
464+
createLockFile(0);
505465
auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage
506466
if (sz < mChkSize) {
507467
removeLockFile();
508468
LOG(warning) << "Primary CTF output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one";
509469
ctfDir = mCTFDirFallBack;
510470
}
511471
}
512-
if (mCreateRunEnvDir && !mEnvironmentID.empty()) {
513-
ctfDir += fmt::format("{}_{}/", mEnvironmentID, mRun);
472+
if (mCreateRunEnvDir && !mDataTakingContext.envId.empty() && (mDataTakingContext.envId != o2::framework::DataTakingContext::UNKNOWN)) {
473+
ctfDir += fmt::format("{}_{}/", mDataTakingContext.envId, mDataTakingContext.runNumber);
514474
if (!ctfDir.empty()) {
515475
o2::utils::createDirectoriesIfAbsent(ctfDir);
516476
LOGP(info, "Created {} directory for CTFs output", ctfDir);
517477
}
518478
}
519-
mCurrentCTFFileName = o2::base::NameConf::getCTFFileName(mRun, dh->firstTForbit, dh->tfCounter);
479+
mCurrentCTFFileName = o2::base::NameConf::getCTFFileName(mTimingInfo.runNumber, mTimingInfo.firstTFOrbit, mTimingInfo.tfCounter);
520480
mCurrentCTFFileNameFull = fmt::format("{}{}", ctfDir, mCurrentCTFFileName);
521481
mCTFFileOut.reset(TFile::Open(fmt::format("{}{}", mCurrentCTFFileNameFull, TMPFileEnding).c_str(), "recreate")); // to prevent premature external usage, use temporary name
522482
mCTFTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFTREENAME).c_str(), "O2 CTF tree");
@@ -542,20 +502,15 @@ void CTFWriterSpec::closeTFTreeAndFile()
542502
if (mStoreMetaFile) {
543503
o2::dataformats::FileMetaData ctfMetaData;
544504
ctfMetaData.fillFileData(mCurrentCTFFileNameFull);
545-
ctfMetaData.run = mRun;
546-
ctfMetaData.LHCPeriod = mLHCPeriod;
505+
ctfMetaData.setDataTakingContext(mDataTakingContext);
547506
ctfMetaData.type = "raw";
548507
ctfMetaData.priority = "high";
508+
ctfMetaData.tfOrbits.swap(mTFOrbits);
549509
auto metaFileNameTmp = fmt::format("{}{}.tmp", mCTFMetaFileDir, mCurrentCTFFileName);
550510
auto metaFileName = fmt::format("{}{}.done", mCTFMetaFileDir, mCurrentCTFFileName);
551511
try {
552512
std::ofstream metaFileOut(metaFileNameTmp);
553513
metaFileOut << ctfMetaData;
554-
metaFileOut << "TFOrbits: ";
555-
for (size_t i = 0; i < mTFOrbits.size(); i++) {
556-
metaFileOut << fmt::format("{}{}", i ? ", " : "", mTFOrbits[i]);
557-
}
558-
metaFileOut << '\n';
559514
metaFileOut.close();
560515
std::filesystem::rename(metaFileNameTmp, metaFileName);
561516
} catch (std::exception const& e) {
@@ -584,7 +539,7 @@ void CTFWriterSpec::storeDictionaries()
584539
mDictFileOut.reset(TFile::Open(dictFileName.c_str(), "recreate"));
585540
mDictTreeOut = std::make_unique<TTree>(std::string(o2::base::NameConf::CTFDICT).c_str(), "O2 CTF dictionary");
586541

587-
CTFHeader header{mRun, uint32_t(mNCTF)};
542+
CTFHeader header{mTimingInfo.runNumber, uint32_t(mNCTF)};
588543
storeDictionary<o2::itsmft::CTF>(DetID::ITS, header);
589544
storeDictionary<o2::itsmft::CTF>(DetID::MFT, header);
590545
storeDictionary<o2::tpc::CTF>(DetID::TPC, header);
@@ -625,11 +580,11 @@ void CTFWriterSpec::storeDictionaries()
625580
}
626581

627582
//___________________________________________________________________
628-
void CTFWriterSpec::createLockFile(const o2::header::DataHeader* dh, int level)
583+
void CTFWriterSpec::createLockFile(int level)
629584
{
630585
// create lock file for the CTF to be written to the storage of given level
631586
while (1) {
632-
mLockFileName = fmt::format("{}/ctfs{}-{}_{}_{}_{}.lock", LOCKFileDir, level, o2::utils::Str::getRandomString(8), mRun, dh->firstTForbit, dh->tfCounter);
587+
mLockFileName = fmt::format("{}/ctfs{}-{}_{}_{}_{}.lock", LOCKFileDir, level, o2::utils::Str::getRandomString(8), mTimingInfo.runNumber, mTimingInfo.firstTFOrbit, mTimingInfo.tfCounter);
633588
if (!std::filesystem::exists(mLockFileName)) {
634589
break;
635590
}
@@ -728,7 +683,6 @@ DataProcessorSpec getCTFWriterSpec(DetID::mask_t dets, uint64_t run, const std::
728683
{"output-dir", VariantType::String, "none", {"CTF output directory, must exist"}},
729684
{"output-dir-alt", VariantType::String, "/dev/null", {"Alternative CTF output directory, must exist (if not /dev/null)"}},
730685
{"meta-output-dir", VariantType::String, "/dev/null", {"CTF metadata output directory, must exist (if not /dev/null)"}},
731-
{"append-det-to-period", VariantType::Int, 1, {"Append detectors name to LHCPeriod in metadata if their number is does not exceed this"}},
732686
{"min-file-size", VariantType::Int64, 0l, {"accumulate CTFs until given file size reached"}},
733687
{"max-file-size", VariantType::Int64, 0l, {"if > 0, try to avoid exceeding given file size, also used for space check"}},
734688
{"max-ctf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested CTFs per file"}},

Detectors/GlobalTrackingWorkflow/tpcinterpolationworkflow/include/TPCInterpolationWorkflow/TPCResidualAggregatorSpec.h

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,23 +77,10 @@ class ResidualAggregatorDevice : public o2::framework::Task
7777

7878
void run(o2::framework::ProcessingContext& pc) final
7979
{
80-
o2::base::GRPGeomHelper::instance().checkUpdates(pc);
80+
updateTimeDependentParams(pc);
81+
8182
auto data = pc.inputs().get<gsl::span<o2::tpc::TrackResiduals::UnbinnedResid>>("input");
8283
o2::base::TFIDInfoHelper::fillTFIDInfo(pc, mAggregator->getCurrentTFInfo());
83-
if (!isLHCPeriodSet) {
84-
// read the LHC period information only once
85-
const std::string NAStr = "NA";
86-
auto LHCPeriodStr = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("LHCPeriod", NAStr);
87-
if (LHCPeriodStr == NAStr) {
88-
const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
89-
time_t now = time(nullptr);
90-
auto ltm = gmtime(&now);
91-
LHCPeriodStr = months[ltm->tm_mon];
92-
LOG(warning) << "LHCPeriod is not available, using current month " << LHCPeriodStr;
93-
}
94-
mAggregator->setLHCPeriod(LHCPeriodStr);
95-
isLHCPeriodSet = true;
96-
}
9784
LOG(debug) << "Processing TF " << mAggregator->getCurrentTFInfo().tfCounter << " with " << data.size() << " unbinned residuals";
9885
mAggregator->process(data);
9986
}
@@ -106,9 +93,17 @@ class ResidualAggregatorDevice : public o2::framework::Task
10693
}
10794

10895
private:
96+
void updateTimeDependentParams(ProcessingContext& pc)
97+
{
98+
o2::base::GRPGeomHelper::instance().checkUpdates(pc);
99+
static bool initOnceDone = false;
100+
if (!initOnceDone) {
101+
initOnceDone = true;
102+
mAggregator->setDataTakingContext(pc.services().get<DataTakingContext>());
103+
}
104+
}
109105
std::unique_ptr<o2::tpc::ResidualAggregator> mAggregator;
110106
std::shared_ptr<o2::base::GRPGeomRequest> mCCDBRequest;
111-
bool isLHCPeriodSet{false};
112107
};
113108

114109
} // namespace calibration

0 commit comments

Comments
 (0)