Skip to content

Commit 7a1045d

Browse files
wiechuladavidrohr
authored andcommitted
TPC: Add file writer for synchronous Kr-data taking
1 parent 304fbb8 commit 7a1045d

4 files changed

Lines changed: 392 additions & 4 deletions

File tree

Detectors/TPC/workflow/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ o2_add_library(TPCWorkflow
3131
src/MonitorWorkflowSpec.cxx
3232
src/ProcessingHelpers.cxx
3333
src/TrackAndClusterFilterSpec.cxx
34+
src/FileWriterSpec.cxx
3435
TARGETVARNAME targetName
3536
PUBLIC_LINK_LIBRARIES O2::Framework O2::DataFormatsTPC
3637
O2::DPLUtils O2::TPCReconstruction
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// \file FileWriterSpec.h
13+
/// \brief Writer for calibration data
14+
/// \author Jens Wiechula
15+
16+
#ifndef TPC_FileWriterSpec_H_
17+
#define TPC_FileWriterSpec_H_
18+
19+
#include "Framework/DataProcessorSpec.h"
20+
#include <string>
21+
22+
namespace o2
23+
{
24+
namespace tpc
25+
{
26+
27+
/// create a processor spec
28+
/// read simulated TPC clusters from file and publish
29+
o2::framework::DataProcessorSpec getFileWriterSpec(const std::string inputSpec);
30+
31+
} // end namespace tpc
32+
} // end namespace o2
33+
34+
#endif // TPC_RAWTODIGITSSPEC_H_
Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
/// \file FileWriterSpec.cxx
13+
/// \brief Writer for calibration data
14+
/// \author Jens Wiechula, Jens.Wiechula@ikf.uni-frankfurt.de
15+
//
16+
#include <filesystem>
17+
#include <memory>
18+
#include <vector>
19+
#include <string>
20+
#include "fmt/format.h"
21+
22+
#include "TTree.h"
23+
24+
#include <FairMQDevice.h>
25+
#include "Framework/Task.h"
26+
#include "Framework/RawDeviceService.h"
27+
#include "Framework/ControlService.h"
28+
#include "Framework/ConfigParamRegistry.h"
29+
#include "Framework/Logger.h"
30+
#include "Framework/DataProcessorSpec.h"
31+
#include "Framework/WorkflowSpec.h"
32+
#include "Framework/InputRecordWalker.h"
33+
34+
#include "Headers/DataHeader.h"
35+
#include "DetectorsCommonDataFormats/NameConf.h"
36+
#include "DetectorsCommonDataFormats/FileMetaData.h"
37+
#include "DetectorsCommonDataFormats/DetID.h"
38+
39+
#include "TPCWorkflow/ProcessingHelpers.h"
40+
#include "DataFormatsTPC/Digit.h"
41+
#include "DataFormatsTPC/KrCluster.h"
42+
#include "DataFormatsTPC/TPCSectorHeader.h"
43+
#include "TPCBase/Sector.h"
44+
45+
using namespace o2::framework;
46+
using o2::dataformats::FileMetaData;
47+
using SubSpecificationType = DataAllocator::SubSpecificationType;
48+
using DetID = o2::detectors::DetID;
49+
50+
namespace o2::tpc
51+
{
52+
53+
class FileWriterDevice : public Task
54+
{
55+
public:
56+
FileWriterDevice() = default;
57+
58+
void init(InitContext& ic) final
59+
{
60+
mOutDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("output-dir"));
61+
62+
mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir");
63+
mMaxTFPerFile = ic.options().get<int>("max-tf-per-file");
64+
mMetaFileDir = ic.options().get<std::string>("meta-output-dir");
65+
if (mMetaFileDir != "/dev/null") {
66+
mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir);
67+
mStoreMetaFile = true;
68+
}
69+
}
70+
71+
void run(ProcessingContext& pc) final
72+
{
73+
const std::string NAStr = "NA";
74+
75+
const auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true));
76+
auto oldRun = mRun;
77+
auto oldOrbit = mFirstTForbit;
78+
mRun = processing_helpers::getRunNumber(pc);
79+
mPresentTF = dh->tfCounter;
80+
mFirstTForbit = dh->firstTForbit;
81+
82+
LOGP(info, "run: {}, TF: {}, Orbit: {} ({}), info branches: {}", mRun, mPresentTF, mFirstTForbit, oldOrbit, mInfoBranches.size());
83+
84+
auto oldEnv = mEnvironmentID;
85+
{
86+
auto envN = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", NAStr);
87+
if (envN != NAStr) {
88+
mEnvironmentID = envN;
89+
}
90+
}
91+
if ((oldRun != 0 && oldRun != mRun) || (!oldEnv.empty() && oldEnv != mEnvironmentID)) {
92+
LOGP(WARNING, "RunNumber/Environment changed from {}/{} to {}/{}", oldRun, oldEnv, mRun, mEnvironmentID);
93+
closeTreeAndFile();
94+
}
95+
// check for the LHCPeriod
96+
if (mLHCPeriod.empty()) {
97+
auto LHCPeriodStr = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("LHCPeriod", NAStr);
98+
if (LHCPeriodStr != NAStr) {
99+
mLHCPeriod = LHCPeriodStr;
100+
} else {
101+
const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"};
102+
time_t now = time(nullptr);
103+
auto ltm = gmtime(&now);
104+
mLHCPeriod = months[ltm->tm_mon];
105+
LOG(WARNING) << "LHCPeriod is not available, using current month " << mLHCPeriod;
106+
}
107+
mLHCPeriod += fmt::format("_{}", DetID::getName(DetID::TPC));
108+
}
109+
110+
if (mWrite && (mFirstTForbit != oldOrbit)) {
111+
prepareTreeAndFile(dh);
112+
113+
for (auto br : mInfoBranches) {
114+
br->Fill();
115+
LOGP(info, "Filling branch {}", br->GetName());
116+
}
117+
mTFOrbits.push_back(mFirstTForbit);
118+
++mNTFs;
119+
}
120+
121+
bool hasData = false;
122+
for (auto const& inputRef : InputRecordWalker(pc.inputs())) {
123+
auto const* sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(inputRef);
124+
if (sectorHeader == nullptr) {
125+
LOGP(error, "sector header missing on header stack for input on ", inputRef.spec->binding);
126+
continue;
127+
}
128+
129+
const int sector = sectorHeader->sector();
130+
auto inData = pc.inputs().get<std::vector<o2::tpc::KrCluster>>(inputRef);
131+
auto dataPtr = &inData;
132+
133+
if (!mDataBranches[sector]) {
134+
mDataBranches[sector] = mTreeOut->Branch(fmt::format("TPCBoxCluster_{}", sector).data(), &inData);
135+
} else {
136+
mDataBranches[sector]->SetAddress(&dataPtr);
137+
}
138+
mDataBranches[sector]->Fill();
139+
mDataBranches[sector]->ResetAddress();
140+
141+
LOGP(info, "getting data for sector {}", sector);
142+
hasData = true;
143+
}
144+
145+
//if (hasData && mTreeOut) {
146+
//LOGP(info, "fill tree");
147+
//mTreeOut->Fill();
148+
//}
149+
//for (auto br : mDataBranches) {
150+
//if (br) {
151+
//br->ResetAddress();
152+
//}
153+
//}
154+
}
155+
156+
void endOfStream(EndOfStreamContext& ec) final
157+
{
158+
closeTreeAndFile();
159+
}
160+
161+
void stop() final
162+
{
163+
closeTreeAndFile();
164+
}
165+
166+
private:
167+
std::array<TBranch*, Sector::MAXSECTOR> mDataBranches{}; ///< data branch pointers
168+
std::vector<TBranch*> mInfoBranches; ///< common information
169+
std::vector<uint32_t> mTFOrbits{}; ///< 1st orbits of TF accumulated in current file
170+
std::unique_ptr<TFile> mFileOut; ///< output file containin the tree
171+
std::unique_ptr<TTree> mTreeOut; ///< output tree
172+
std::unique_ptr<FileMetaData> mFileMetaData; ///< meta data file for eos and alien file creation
173+
std::string mOutDir{}; ///< file output direcotry
174+
std::string mLHCPeriod{}; ///< LHC period under which to register the data on eos and alien
175+
std::string mMetaFileDir{"/dev/null"}; ///< output directory for meta data file
176+
std::string mCurrentFileName{}; ///< current file name
177+
std::string mCurrentFileNameFull{}; ///< current file name with full directory
178+
std::string mEnvironmentID{}; ///< partition env. id
179+
uint64_t mRun = 0; ///< present run number
180+
uint32_t mPresentTF = 0; ///< present TF number
181+
uint32_t mFirstTForbit = 0; ///< first orbit of present tf
182+
size_t mNTFs = 0; ///< total number of TFs accumulated in the current file
183+
size_t mNFiles = 0; ///< total number of calibration files written
184+
int mMaxTFPerFile = 0; ///< maximum number of TFs per file
185+
bool mStoreMetaFile = false; ///< store the meata data file?
186+
bool mWrite = true; ///< write data
187+
bool mCreateRunEnvDir = true; ///< create the output directory structure?
188+
189+
static constexpr std::string_view TMPFileEnding{".part"};
190+
191+
void prepareTreeAndFile(const o2::header::DataHeader* dh);
192+
void closeTreeAndFile();
193+
};
194+
//___________________________________________________________________
195+
void FileWriterDevice::prepareTreeAndFile(const o2::header::DataHeader* dh)
196+
{
197+
if (!mWrite) {
198+
return;
199+
}
200+
bool needToOpen = false;
201+
if (!mTreeOut) {
202+
needToOpen = true;
203+
} else {
204+
if ((mMaxTFPerFile > 0) && (mNTFs >= mMaxTFPerFile)) {
205+
needToOpen = true;
206+
}
207+
}
208+
if (needToOpen) {
209+
LOGP(info, "opening new file");
210+
closeTreeAndFile();
211+
//auto fname = o2::base::NameConf::getCTFFileName(mRun, dh->firstTForbit, dh->tfCounter, "tpc_krypton");
212+
auto ctfDir = mOutDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mOutDir;
213+
//if (mChkSize > 0 && (mDirFallBack != "/dev/null")) {
214+
//createLockFile(dh, 0);
215+
//auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage
216+
//if (sz < mChkSize) {
217+
//removeLockFile();
218+
//LOG(WARNING) << "Primary output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one";
219+
//ctfDir = mDirFallBack;
220+
//}
221+
//}
222+
if (mCreateRunEnvDir && !mEnvironmentID.empty()) {
223+
ctfDir += fmt::format("{}_{}/", mEnvironmentID, mRun);
224+
if (!std::filesystem::exists(ctfDir)) {
225+
if (!std::filesystem::create_directories(ctfDir)) {
226+
throw std::runtime_error(fmt::format("Failed to create {} directory", ctfDir));
227+
} else {
228+
LOG(INFO) << "Created {} directory for s output" << ctfDir;
229+
}
230+
}
231+
}
232+
mCurrentFileName = o2::base::NameConf::getCTFFileName(mRun, dh->firstTForbit, dh->tfCounter, "tpc_krypton");
233+
mCurrentFileNameFull = fmt::format("{}{}", ctfDir, mCurrentFileName);
234+
mFileOut.reset(TFile::Open(fmt::format("{}{}", mCurrentFileNameFull, TMPFileEnding).c_str(), "recreate")); // to prevent premature external usage, use temporary name
235+
mTreeOut = std::make_unique<TTree>("Clusters", "O2 tree");
236+
mInfoBranches.emplace_back(mTreeOut->Branch("run", &mRun));
237+
mInfoBranches.emplace_back(mTreeOut->Branch("tfCounter", &mPresentTF));
238+
mInfoBranches.emplace_back(mTreeOut->Branch("firstOrbit", &mFirstTForbit));
239+
LOGP(info, "created {} info branches", mInfoBranches.size());
240+
//for (int iSec = 0; iSec < Sector::MAXSECTOR; ++iSec) {
241+
//std::vector<KrCluster> clusters;
242+
//LOGP(info, "creating branch for sector {} on tree {} with name {}", iSec, (void*)mTreeOut.get(), fmt::format("TPCBoxCluster_{}", iSec).data());
243+
//mDataBranches[iSec] = mTreeOut->Branch(fmt::format("TPCBoxCluster_{}", iSec).data(), &clusters);
244+
//mDataBranches[iSec]->ResetAddress();
245+
//}
246+
if (mStoreMetaFile) {
247+
mFileMetaData = std::make_unique<o2::dataformats::FileMetaData>();
248+
}
249+
250+
mNFiles++;
251+
}
252+
}
253+
//___________________________________________________________________
254+
void FileWriterDevice::closeTreeAndFile()
255+
{
256+
if (!mTreeOut) {
257+
return;
258+
}
259+
260+
LOGP(info, "closing file {}", mCurrentFileName);
261+
try {
262+
mFileOut->cd();
263+
mTreeOut->SetEntries();
264+
mTreeOut->Write();
265+
mTreeOut.reset();
266+
mFileOut->Close();
267+
mFileOut.reset();
268+
if (!TMPFileEnding.empty()) {
269+
std::filesystem::rename(o2::utils::Str::concat_string(mCurrentFileNameFull, TMPFileEnding), mCurrentFileNameFull);
270+
}
271+
// write file metaFile data
272+
if (mStoreMetaFile) {
273+
mFileMetaData->fillFileData(mCurrentFileNameFull);
274+
mFileMetaData->run = mRun;
275+
mFileMetaData->LHCPeriod = mLHCPeriod;
276+
mFileMetaData->type = "raw";
277+
auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, mCurrentFileName);
278+
auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, mCurrentFileName);
279+
try {
280+
std::ofstream metaFileOut(metaFileNameTmp);
281+
metaFileOut << *mFileMetaData.get();
282+
metaFileOut << "TFOrbits: ";
283+
for (size_t i = 0; i < mTFOrbits.size(); i++) {
284+
metaFileOut << fmt::format("{}{}", i ? ", " : "", mTFOrbits[i]);
285+
}
286+
metaFileOut << '\n';
287+
metaFileOut.close();
288+
std::filesystem::rename(metaFileNameTmp, metaFileName);
289+
} catch (std::exception const& e) {
290+
LOG(ERROR) << "Failed to store meta data file " << metaFileName << ", reason: " << e.what();
291+
}
292+
mFileMetaData.reset();
293+
}
294+
} catch (std::exception const& e) {
295+
LOG(ERROR) << "Failed to finalize file " << mCurrentFileNameFull << ", reason: " << e.what();
296+
}
297+
mTFOrbits.clear();
298+
mInfoBranches.clear();
299+
std::fill(mDataBranches.begin(), mDataBranches.end(), nullptr);
300+
mNTFs = 0;
301+
//mAccSize = 0;
302+
//removeLockFile();
303+
}
304+
305+
DataProcessorSpec getFileWriterSpec(const std::string inputSpec)
306+
{
307+
return DataProcessorSpec{
308+
"file-writer",
309+
select(inputSpec.data()),
310+
Outputs{},
311+
AlgorithmSpec{adaptFromTask<FileWriterDevice>()},
312+
Options{
313+
{"output-dir", VariantType::String, "none", {" output directory, must exist"}},
314+
{"meta-output-dir", VariantType::String, "/dev/null", {" metadata output directory, must exist (if not /dev/null)"}},
315+
{"max-tf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested TFs per file"}},
316+
{"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}},
317+
}};
318+
}; // end DataProcessorSpec
319+
320+
} // namespace o2::tpc

0 commit comments

Comments
 (0)