|
| 1 | +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. |
| 2 | +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. |
| 3 | +// All rights not expressly granted are reserved. |
| 4 | +// |
| 5 | +// This software is distributed under the terms of the GNU General Public |
| 6 | +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". |
| 7 | +// |
| 8 | +// In applying this license CERN does not waive the privileges and immunities |
| 9 | +// granted to it by virtue of its status as an Intergovernmental Organization |
| 10 | +// or submit itself to any jurisdiction. |
| 11 | + |
| 12 | +/// \file FileWriterSpec.cxx |
| 13 | +/// \brief Writer for calibration data |
| 14 | +/// \author Jens Wiechula, Jens.Wiechula@ikf.uni-frankfurt.de |
| 15 | +// |
| 16 | +#include <filesystem> |
| 17 | +#include <memory> |
| 18 | +#include <vector> |
| 19 | +#include <string> |
| 20 | +#include "fmt/format.h" |
| 21 | + |
| 22 | +#include "TTree.h" |
| 23 | + |
| 24 | +#include <FairMQDevice.h> |
| 25 | +#include "Framework/Task.h" |
| 26 | +#include "Framework/RawDeviceService.h" |
| 27 | +#include "Framework/ControlService.h" |
| 28 | +#include "Framework/ConfigParamRegistry.h" |
| 29 | +#include "Framework/Logger.h" |
| 30 | +#include "Framework/DataProcessorSpec.h" |
| 31 | +#include "Framework/WorkflowSpec.h" |
| 32 | +#include "Framework/InputRecordWalker.h" |
| 33 | + |
| 34 | +#include "Headers/DataHeader.h" |
| 35 | +#include "DetectorsCommonDataFormats/NameConf.h" |
| 36 | +#include "DetectorsCommonDataFormats/FileMetaData.h" |
| 37 | +#include "DetectorsCommonDataFormats/DetID.h" |
| 38 | + |
| 39 | +#include "TPCWorkflow/ProcessingHelpers.h" |
| 40 | +#include "DataFormatsTPC/Digit.h" |
| 41 | +#include "DataFormatsTPC/KrCluster.h" |
| 42 | +#include "DataFormatsTPC/TPCSectorHeader.h" |
| 43 | +#include "TPCBase/Sector.h" |
| 44 | + |
| 45 | +using namespace o2::framework; |
| 46 | +using o2::dataformats::FileMetaData; |
| 47 | +using SubSpecificationType = DataAllocator::SubSpecificationType; |
| 48 | +using DetID = o2::detectors::DetID; |
| 49 | + |
| 50 | +namespace o2::tpc |
| 51 | +{ |
| 52 | + |
| 53 | +class FileWriterDevice : public Task |
| 54 | +{ |
| 55 | + public: |
| 56 | + FileWriterDevice() = default; |
| 57 | + |
| 58 | + void init(InitContext& ic) final |
| 59 | + { |
| 60 | + mOutDir = o2::utils::Str::rectifyDirectory(ic.options().get<std::string>("output-dir")); |
| 61 | + |
| 62 | + mCreateRunEnvDir = !ic.options().get<bool>("ignore-partition-run-dir"); |
| 63 | + mMaxTFPerFile = ic.options().get<int>("max-tf-per-file"); |
| 64 | + mMetaFileDir = ic.options().get<std::string>("meta-output-dir"); |
| 65 | + if (mMetaFileDir != "/dev/null") { |
| 66 | + mMetaFileDir = o2::utils::Str::rectifyDirectory(mMetaFileDir); |
| 67 | + mStoreMetaFile = true; |
| 68 | + } |
| 69 | + } |
| 70 | + |
| 71 | + void run(ProcessingContext& pc) final |
| 72 | + { |
| 73 | + const std::string NAStr = "NA"; |
| 74 | + |
| 75 | + const auto dh = DataRefUtils::getHeader<o2::header::DataHeader*>(pc.inputs().getFirstValid(true)); |
| 76 | + auto oldRun = mRun; |
| 77 | + auto oldOrbit = mFirstTForbit; |
| 78 | + mRun = processing_helpers::getRunNumber(pc); |
| 79 | + mPresentTF = dh->tfCounter; |
| 80 | + mFirstTForbit = dh->firstTForbit; |
| 81 | + |
| 82 | + LOGP(info, "run: {}, TF: {}, Orbit: {} ({}), info branches: {}", mRun, mPresentTF, mFirstTForbit, oldOrbit, mInfoBranches.size()); |
| 83 | + |
| 84 | + auto oldEnv = mEnvironmentID; |
| 85 | + { |
| 86 | + auto envN = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("environment_id", NAStr); |
| 87 | + if (envN != NAStr) { |
| 88 | + mEnvironmentID = envN; |
| 89 | + } |
| 90 | + } |
| 91 | + if ((oldRun != 0 && oldRun != mRun) || (!oldEnv.empty() && oldEnv != mEnvironmentID)) { |
| 92 | + LOGP(WARNING, "RunNumber/Environment changed from {}/{} to {}/{}", oldRun, oldEnv, mRun, mEnvironmentID); |
| 93 | + closeTreeAndFile(); |
| 94 | + } |
| 95 | + // check for the LHCPeriod |
| 96 | + if (mLHCPeriod.empty()) { |
| 97 | + auto LHCPeriodStr = pc.services().get<RawDeviceService>().device()->fConfig->GetProperty<std::string>("LHCPeriod", NAStr); |
| 98 | + if (LHCPeriodStr != NAStr) { |
| 99 | + mLHCPeriod = LHCPeriodStr; |
| 100 | + } else { |
| 101 | + const char* months[12] = {"JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"}; |
| 102 | + time_t now = time(nullptr); |
| 103 | + auto ltm = gmtime(&now); |
| 104 | + mLHCPeriod = months[ltm->tm_mon]; |
| 105 | + LOG(WARNING) << "LHCPeriod is not available, using current month " << mLHCPeriod; |
| 106 | + } |
| 107 | + mLHCPeriod += fmt::format("_{}", DetID::getName(DetID::TPC)); |
| 108 | + } |
| 109 | + |
| 110 | + if (mWrite && (mFirstTForbit != oldOrbit)) { |
| 111 | + prepareTreeAndFile(dh); |
| 112 | + |
| 113 | + for (auto br : mInfoBranches) { |
| 114 | + br->Fill(); |
| 115 | + LOGP(info, "Filling branch {}", br->GetName()); |
| 116 | + } |
| 117 | + mTFOrbits.push_back(mFirstTForbit); |
| 118 | + ++mNTFs; |
| 119 | + } |
| 120 | + |
| 121 | + bool hasData = false; |
| 122 | + for (auto const& inputRef : InputRecordWalker(pc.inputs())) { |
| 123 | + auto const* sectorHeader = DataRefUtils::getHeader<TPCSectorHeader*>(inputRef); |
| 124 | + if (sectorHeader == nullptr) { |
| 125 | + LOGP(error, "sector header missing on header stack for input on ", inputRef.spec->binding); |
| 126 | + continue; |
| 127 | + } |
| 128 | + |
| 129 | + const int sector = sectorHeader->sector(); |
| 130 | + auto inData = pc.inputs().get<std::vector<o2::tpc::KrCluster>>(inputRef); |
| 131 | + auto dataPtr = &inData; |
| 132 | + |
| 133 | + if (!mDataBranches[sector]) { |
| 134 | + mDataBranches[sector] = mTreeOut->Branch(fmt::format("TPCBoxCluster_{}", sector).data(), &inData); |
| 135 | + } else { |
| 136 | + mDataBranches[sector]->SetAddress(&dataPtr); |
| 137 | + } |
| 138 | + mDataBranches[sector]->Fill(); |
| 139 | + mDataBranches[sector]->ResetAddress(); |
| 140 | + |
| 141 | + LOGP(info, "getting data for sector {}", sector); |
| 142 | + hasData = true; |
| 143 | + } |
| 144 | + |
| 145 | + //if (hasData && mTreeOut) { |
| 146 | + //LOGP(info, "fill tree"); |
| 147 | + //mTreeOut->Fill(); |
| 148 | + //} |
| 149 | + //for (auto br : mDataBranches) { |
| 150 | + //if (br) { |
| 151 | + //br->ResetAddress(); |
| 152 | + //} |
| 153 | + //} |
| 154 | + } |
| 155 | + |
| 156 | + void endOfStream(EndOfStreamContext& ec) final |
| 157 | + { |
| 158 | + closeTreeAndFile(); |
| 159 | + } |
| 160 | + |
| 161 | + void stop() final |
| 162 | + { |
| 163 | + closeTreeAndFile(); |
| 164 | + } |
| 165 | + |
| 166 | + private: |
| 167 | + std::array<TBranch*, Sector::MAXSECTOR> mDataBranches{}; ///< data branch pointers |
| 168 | + std::vector<TBranch*> mInfoBranches; ///< common information |
| 169 | + std::vector<uint32_t> mTFOrbits{}; ///< 1st orbits of TF accumulated in current file |
| 170 | + std::unique_ptr<TFile> mFileOut; ///< output file containin the tree |
| 171 | + std::unique_ptr<TTree> mTreeOut; ///< output tree |
| 172 | + std::unique_ptr<FileMetaData> mFileMetaData; ///< meta data file for eos and alien file creation |
| 173 | + std::string mOutDir{}; ///< file output direcotry |
| 174 | + std::string mLHCPeriod{}; ///< LHC period under which to register the data on eos and alien |
| 175 | + std::string mMetaFileDir{"/dev/null"}; ///< output directory for meta data file |
| 176 | + std::string mCurrentFileName{}; ///< current file name |
| 177 | + std::string mCurrentFileNameFull{}; ///< current file name with full directory |
| 178 | + std::string mEnvironmentID{}; ///< partition env. id |
| 179 | + uint64_t mRun = 0; ///< present run number |
| 180 | + uint32_t mPresentTF = 0; ///< present TF number |
| 181 | + uint32_t mFirstTForbit = 0; ///< first orbit of present tf |
| 182 | + size_t mNTFs = 0; ///< total number of TFs accumulated in the current file |
| 183 | + size_t mNFiles = 0; ///< total number of calibration files written |
| 184 | + int mMaxTFPerFile = 0; ///< maximum number of TFs per file |
| 185 | + bool mStoreMetaFile = false; ///< store the meata data file? |
| 186 | + bool mWrite = true; ///< write data |
| 187 | + bool mCreateRunEnvDir = true; ///< create the output directory structure? |
| 188 | + |
| 189 | + static constexpr std::string_view TMPFileEnding{".part"}; |
| 190 | + |
| 191 | + void prepareTreeAndFile(const o2::header::DataHeader* dh); |
| 192 | + void closeTreeAndFile(); |
| 193 | +}; |
| 194 | +//___________________________________________________________________ |
| 195 | +void FileWriterDevice::prepareTreeAndFile(const o2::header::DataHeader* dh) |
| 196 | +{ |
| 197 | + if (!mWrite) { |
| 198 | + return; |
| 199 | + } |
| 200 | + bool needToOpen = false; |
| 201 | + if (!mTreeOut) { |
| 202 | + needToOpen = true; |
| 203 | + } else { |
| 204 | + if ((mMaxTFPerFile > 0) && (mNTFs >= mMaxTFPerFile)) { |
| 205 | + needToOpen = true; |
| 206 | + } |
| 207 | + } |
| 208 | + if (needToOpen) { |
| 209 | + LOGP(info, "opening new file"); |
| 210 | + closeTreeAndFile(); |
| 211 | + //auto fname = o2::base::NameConf::getCTFFileName(mRun, dh->firstTForbit, dh->tfCounter, "tpc_krypton"); |
| 212 | + auto ctfDir = mOutDir.empty() ? o2::utils::Str::rectifyDirectory("./") : mOutDir; |
| 213 | + //if (mChkSize > 0 && (mDirFallBack != "/dev/null")) { |
| 214 | + //createLockFile(dh, 0); |
| 215 | + //auto sz = getAvailableDiskSpace(ctfDir, 0); // check main storage |
| 216 | + //if (sz < mChkSize) { |
| 217 | + //removeLockFile(); |
| 218 | + //LOG(WARNING) << "Primary output device has available size " << sz << " while " << mChkSize << " is requested: will write on secondary one"; |
| 219 | + //ctfDir = mDirFallBack; |
| 220 | + //} |
| 221 | + //} |
| 222 | + if (mCreateRunEnvDir && !mEnvironmentID.empty()) { |
| 223 | + ctfDir += fmt::format("{}_{}/", mEnvironmentID, mRun); |
| 224 | + if (!std::filesystem::exists(ctfDir)) { |
| 225 | + if (!std::filesystem::create_directories(ctfDir)) { |
| 226 | + throw std::runtime_error(fmt::format("Failed to create {} directory", ctfDir)); |
| 227 | + } else { |
| 228 | + LOG(INFO) << "Created {} directory for s output" << ctfDir; |
| 229 | + } |
| 230 | + } |
| 231 | + } |
| 232 | + mCurrentFileName = o2::base::NameConf::getCTFFileName(mRun, dh->firstTForbit, dh->tfCounter, "tpc_krypton"); |
| 233 | + mCurrentFileNameFull = fmt::format("{}{}", ctfDir, mCurrentFileName); |
| 234 | + mFileOut.reset(TFile::Open(fmt::format("{}{}", mCurrentFileNameFull, TMPFileEnding).c_str(), "recreate")); // to prevent premature external usage, use temporary name |
| 235 | + mTreeOut = std::make_unique<TTree>("Clusters", "O2 tree"); |
| 236 | + mInfoBranches.emplace_back(mTreeOut->Branch("run", &mRun)); |
| 237 | + mInfoBranches.emplace_back(mTreeOut->Branch("tfCounter", &mPresentTF)); |
| 238 | + mInfoBranches.emplace_back(mTreeOut->Branch("firstOrbit", &mFirstTForbit)); |
| 239 | + LOGP(info, "created {} info branches", mInfoBranches.size()); |
| 240 | + //for (int iSec = 0; iSec < Sector::MAXSECTOR; ++iSec) { |
| 241 | + //std::vector<KrCluster> clusters; |
| 242 | + //LOGP(info, "creating branch for sector {} on tree {} with name {}", iSec, (void*)mTreeOut.get(), fmt::format("TPCBoxCluster_{}", iSec).data()); |
| 243 | + //mDataBranches[iSec] = mTreeOut->Branch(fmt::format("TPCBoxCluster_{}", iSec).data(), &clusters); |
| 244 | + //mDataBranches[iSec]->ResetAddress(); |
| 245 | + //} |
| 246 | + if (mStoreMetaFile) { |
| 247 | + mFileMetaData = std::make_unique<o2::dataformats::FileMetaData>(); |
| 248 | + } |
| 249 | + |
| 250 | + mNFiles++; |
| 251 | + } |
| 252 | +} |
| 253 | +//___________________________________________________________________ |
| 254 | +void FileWriterDevice::closeTreeAndFile() |
| 255 | +{ |
| 256 | + if (!mTreeOut) { |
| 257 | + return; |
| 258 | + } |
| 259 | + |
| 260 | + LOGP(info, "closing file {}", mCurrentFileName); |
| 261 | + try { |
| 262 | + mFileOut->cd(); |
| 263 | + mTreeOut->SetEntries(); |
| 264 | + mTreeOut->Write(); |
| 265 | + mTreeOut.reset(); |
| 266 | + mFileOut->Close(); |
| 267 | + mFileOut.reset(); |
| 268 | + if (!TMPFileEnding.empty()) { |
| 269 | + std::filesystem::rename(o2::utils::Str::concat_string(mCurrentFileNameFull, TMPFileEnding), mCurrentFileNameFull); |
| 270 | + } |
| 271 | + // write file metaFile data |
| 272 | + if (mStoreMetaFile) { |
| 273 | + mFileMetaData->fillFileData(mCurrentFileNameFull); |
| 274 | + mFileMetaData->run = mRun; |
| 275 | + mFileMetaData->LHCPeriod = mLHCPeriod; |
| 276 | + mFileMetaData->type = "raw"; |
| 277 | + auto metaFileNameTmp = fmt::format("{}{}.tmp", mMetaFileDir, mCurrentFileName); |
| 278 | + auto metaFileName = fmt::format("{}{}.done", mMetaFileDir, mCurrentFileName); |
| 279 | + try { |
| 280 | + std::ofstream metaFileOut(metaFileNameTmp); |
| 281 | + metaFileOut << *mFileMetaData.get(); |
| 282 | + metaFileOut << "TFOrbits: "; |
| 283 | + for (size_t i = 0; i < mTFOrbits.size(); i++) { |
| 284 | + metaFileOut << fmt::format("{}{}", i ? ", " : "", mTFOrbits[i]); |
| 285 | + } |
| 286 | + metaFileOut << '\n'; |
| 287 | + metaFileOut.close(); |
| 288 | + std::filesystem::rename(metaFileNameTmp, metaFileName); |
| 289 | + } catch (std::exception const& e) { |
| 290 | + LOG(ERROR) << "Failed to store meta data file " << metaFileName << ", reason: " << e.what(); |
| 291 | + } |
| 292 | + mFileMetaData.reset(); |
| 293 | + } |
| 294 | + } catch (std::exception const& e) { |
| 295 | + LOG(ERROR) << "Failed to finalize file " << mCurrentFileNameFull << ", reason: " << e.what(); |
| 296 | + } |
| 297 | + mTFOrbits.clear(); |
| 298 | + mInfoBranches.clear(); |
| 299 | + std::fill(mDataBranches.begin(), mDataBranches.end(), nullptr); |
| 300 | + mNTFs = 0; |
| 301 | + //mAccSize = 0; |
| 302 | + //removeLockFile(); |
| 303 | +} |
| 304 | + |
| 305 | +DataProcessorSpec getFileWriterSpec(const std::string inputSpec) |
| 306 | +{ |
| 307 | + return DataProcessorSpec{ |
| 308 | + "file-writer", |
| 309 | + select(inputSpec.data()), |
| 310 | + Outputs{}, |
| 311 | + AlgorithmSpec{adaptFromTask<FileWriterDevice>()}, |
| 312 | + Options{ |
| 313 | + {"output-dir", VariantType::String, "none", {" output directory, must exist"}}, |
| 314 | + {"meta-output-dir", VariantType::String, "/dev/null", {" metadata output directory, must exist (if not /dev/null)"}}, |
| 315 | + {"max-tf-per-file", VariantType::Int, 0, {"if > 0, avoid storing more than requested TFs per file"}}, |
| 316 | + {"ignore-partition-run-dir", VariantType::Bool, false, {"Do not creare partition-run directory in output-dir"}}, |
| 317 | + }}; |
| 318 | +}; // end DataProcessorSpec |
| 319 | + |
| 320 | +} // namespace o2::tpc |
0 commit comments