Skip to content

Commit 11ad855

Browse files
Julian Myrchashahor02
authored andcommitted
o2-eve-export-workflow: reduce amount of created data to given threshold (default: 3MB for each period). After exceeding this value, new files are not created in the period.
1 parent bdb541f commit 11ad855

4 files changed

Lines changed: 90 additions & 8 deletions

File tree

EventVisualisation/Base/include/EventVisualisationBase/DirectoryLoader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ class DirectoryLoader
3434
public:
3535
static std::deque<std::string> load(const std::string& path, const std::string& marker, const std::vector<std::string>& ext);
3636
static std::deque<std::string> load(const std::vector<std::string>& paths, const std::string& marker, const std::vector<std::string>& ext);
37+
static std::vector<std::string> allFolders(const std::string& location);
38+
static bool canCreateNextFile(const std::vector<std::string>& paths, const std::string& marker, const std::vector<std::string>& ext, long long millisec, long capacityAllowed);
3739
static void reduceNumberOfFiles(const std::string& path, const std::deque<std::string>& files, std::size_t filesInFolder);
3840
static void removeOldestFiles(std::string& path, std::vector<std::string>& ext, int remaining);
3941
};

EventVisualisation/Base/src/DirectoryLoader.cxx

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
#include <filesystem>
1818
#include <algorithm>
1919
#include <climits>
20+
#include <map>
21+
#include <iostream>
22+
#include <forward_list>
23+
#include <regex>
2024
#include <fairlogger/Logger.h>
2125

2226
using namespace std;
@@ -39,6 +43,47 @@ deque<string> DirectoryLoader::load(const std::string& path, const std::string&
3943
return result;
4044
}
4145

46+
bool DirectoryLoader::canCreateNextFile(const std::vector<std::string>& paths, const std::string& marker, const std::vector<std::string>& ext, long long millisec, long capacityAllowed)
47+
{
48+
deque<string> result;
49+
std::map<std::string, std::string> fullPath;
50+
for (const auto& path : paths) {
51+
try {
52+
for (const auto& entry : std::filesystem::directory_iterator(path)) {
53+
if (std::find(ext.begin(), ext.end(), entry.path().extension()) != ext.end()) {
54+
result.push_back(entry.path().filename());
55+
fullPath[entry.path().filename()] = entry.path();
56+
}
57+
}
58+
} catch (std::filesystem::filesystem_error const& ex) {
59+
LOGF(info, "filesystem problem: %s", ex.what());
60+
}
61+
}
62+
63+
// comparison with safety if marker not in the filename (-1+1 gives 0)
64+
std::ranges::sort(result.begin(), result.end(),
65+
[marker](const std::string& a, const std::string& b) {
66+
return a.substr(a.find_first_of(marker) + 1) > b.substr(b.find_first_of(marker) + 1);
67+
});
68+
unsigned long accumulatedSize = 0L;
69+
const std::regex delimiter{"_"};
70+
for (auto const& file : result) {
71+
std::vector<std::string> c(std::sregex_token_iterator(file.begin(), file.end(), delimiter, -1), {});
72+
if (std::stoll(c[1]) < millisec) {
73+
break;
74+
}
75+
try {
76+
accumulatedSize += filesystem::file_size(fullPath[file]);
77+
} catch (std::filesystem::filesystem_error const& ex) {
78+
LOGF(info, "problem scanning folder: %s", ex.what());
79+
}
80+
if (accumulatedSize > capacityAllowed) {
81+
return false;
82+
}
83+
}
84+
return true;
85+
}
86+
4287
deque<string> DirectoryLoader::load(const std::vector<std::string>& paths, const std::string& marker, const std::vector<std::string>& ext)
4388
{
4489
deque<string> result;
@@ -51,13 +96,23 @@ deque<string> DirectoryLoader::load(const std::vector<std::string>& paths, const
5196
}
5297
// comparison with safety if marker not in the filename (-1+1 gives 0)
5398
std::sort(result.begin(), result.end(),
54-
[marker](std::string a, std::string b) {
99+
[marker](const std::string& a, const std::string& b) {
55100
return a.substr(a.find_first_of(marker) + 1) < b.substr(b.find_first_of(marker) + 1);
56101
});
57102

58103
return result;
59104
}
60105

106+
std::vector<std::string> DirectoryLoader::allFolders(const std::string& location)
107+
{
108+
auto const pos = location.find_last_of('_');
109+
std::vector<std::string> folders;
110+
folders.push_back(location.substr(0, pos) + "_PHYSICS");
111+
folders.push_back(location.substr(0, pos) + "_COSMICS");
112+
folders.push_back(location.substr(0, pos) + "_SYNTHETIC");
113+
return folders;
114+
}
115+
61116
void DirectoryLoader::reduceNumberOfFiles(const std::string& path, const std::deque<std::string>& files, std::size_t filesInFolder)
62117
{
63118
if (filesInFolder == -1) {
@@ -107,7 +162,7 @@ std::string DirectoryLoader::getLatestFile(std::string& path, std::vector<std::s
107162
void DirectoryLoader::removeOldestFiles(std::string& path, std::vector<std::string>& ext, const int remaining)
108163
{
109164
while (getNumberOfFiles(path, ext) > remaining) {
110-
LOGF(info, "removing oldest file in folder: ", path, " : ", getLatestFile(path, ext));
165+
LOGF(info, "removing oldest file in folder: %s : %s", path, getLatestFile(path, ext));
111166
filesystem::remove(path + "/" + getLatestFile(path, ext));
112167
}
113168
}

EventVisualisation/Workflow/include/EveWorkflow/O2DPLDisplay.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,14 @@ class O2DPLDisplaySpec : public o2::framework::Task
5858
std::shared_ptr<o2::base::GRPGeomRequest> gr,
5959
std::shared_ptr<o2::emcal::CalibLoader> emcCalibLoader,
6060
const std::string& jsonPath, const std::string& ext,
61-
std::chrono::milliseconds timeInterval, int numberOfFiles, int numberOfTracks,
61+
std::chrono::milliseconds timeInterval, int numberOfFiles, int numberOfTracks, int numberOfBytes,
6262
bool eveHostNameMatch, int minITSTracks, int minTracks, bool filterITSROF, bool filterTime,
6363
const EveWorkflowHelper::Bracket& timeBracket, bool removeTPCEta,
64-
const EveWorkflowHelper::Bracket& etaBracket, bool trackSorting, int onlyNthEvent, bool primaryVertex, int maxPrimaryVertices, bool primaryVertexTriggers, float primaryVertexMinZ, float primaryVertexMaxZ, float primaryVertexMinX, float primaryVertexMaxX, float primaryVertexMinY, float primaryVertexMaxY, float maxEMCALCellTime, float minEMCALCellEnergy)
65-
: mDisableWrite(disableWrite), mUseMC(useMC), mTrkMask(trkMask), mClMask(clMask), mDataRequest(dataRequest), mGGCCDBRequest(gr), mEMCALCalibLoader(emcCalibLoader), mJsonPath(jsonPath), mExt(ext), mTimeInterval(timeInterval), mNumberOfFiles(numberOfFiles), mNumberOfTracks(numberOfTracks), mEveHostNameMatch(eveHostNameMatch), mMinITSTracks(minITSTracks), mMinTracks(minTracks), mFilterITSROF(filterITSROF), mFilterTime(filterTime), mTimeBracket(timeBracket), mRemoveTPCEta(removeTPCEta), mEtaBracket(etaBracket), mTrackSorting(trackSorting), mOnlyNthEvent(onlyNthEvent), mPrimaryVertexMode(primaryVertex), mMaxPrimaryVertices(maxPrimaryVertices), mPrimaryVertexTriggers(primaryVertexTriggers), mPrimaryVertexMinZ(primaryVertexMinZ), mPrimaryVertexMaxZ(primaryVertexMaxZ), mPrimaryVertexMinX(primaryVertexMinX), mPrimaryVertexMaxX(primaryVertexMaxX), mPrimaryVertexMinY(primaryVertexMinY), mPrimaryVertexMaxY(primaryVertexMaxY), mEMCALMaxCellTime(maxEMCALCellTime), mEMCALMinCellEnergy(minEMCALCellEnergy), mRunType(o2::parameters::GRPECS::NONE)
64+
const EveWorkflowHelper::Bracket& etaBracket, bool trackSorting, int onlyNthEvent,
65+
bool primaryVertex, int maxPrimaryVertices, bool primaryVertexTriggers,
66+
float primaryVertexMinZ, float primaryVertexMaxZ, float primaryVertexMinX, float primaryVertexMaxX, float primaryVertexMinY, float primaryVertexMaxY,
67+
float maxEMCALCellTime, float minEMCALCellEnergy)
68+
: mDisableWrite(disableWrite), mUseMC(useMC), mTrkMask(trkMask), mClMask(clMask), mDataRequest(dataRequest), mGGCCDBRequest(gr), mEMCALCalibLoader(emcCalibLoader), mJsonPath(jsonPath), mExt(ext), mTimeInterval(timeInterval), mNumberOfFiles(numberOfFiles), mNumberOfTracks(numberOfTracks), mNumberOfBytes(numberOfBytes), mEveHostNameMatch(eveHostNameMatch), mMinITSTracks(minITSTracks), mMinTracks(minTracks), mFilterITSROF(filterITSROF), mFilterTime(filterTime), mTimeBracket(timeBracket), mRemoveTPCEta(removeTPCEta), mEtaBracket(etaBracket), mTrackSorting(trackSorting), mOnlyNthEvent(onlyNthEvent), mPrimaryVertexMode(primaryVertex), mMaxPrimaryVertices(maxPrimaryVertices), mPrimaryVertexTriggers(primaryVertexTriggers), mPrimaryVertexMinZ(primaryVertexMinZ), mPrimaryVertexMaxZ(primaryVertexMaxZ), mPrimaryVertexMinX(primaryVertexMinX), mPrimaryVertexMaxX(primaryVertexMaxX), mPrimaryVertexMinY(primaryVertexMinY), mPrimaryVertexMaxY(primaryVertexMaxY), mEMCALMaxCellTime(maxEMCALCellTime), mEMCALMinCellEnergy(minEMCALCellEnergy), mRunType(o2::parameters::GRPECS::NONE)
6669

6770
{
6871
this->mTimeStamp = std::chrono::high_resolution_clock::now() - timeInterval; // first run meets condition
@@ -92,6 +95,7 @@ class O2DPLDisplaySpec : public o2::framework::Task
9295
std::chrono::milliseconds mTimeInterval; // minimal interval between files in milliseconds
9396
int mNumberOfFiles; // maximum number of files in folder - newer replaces older
9497
int mNumberOfTracks; // maximum number of track in single file (0 means no limit)
98+
int mNumberOfBytes; // number of bytes stored in period which causes stopping saving a new file
9599
bool mTrackSorting; // perform sorting tracks by track time before applying filters
96100
int mOnlyNthEvent; // process only every nth event.
97101
int mMaxPrimaryVertices; // max number of primary vertices to draw per time frame

EventVisualisation/Workflow/src/O2DPLDisplay.cxx

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,14 @@
3434
#include "Framework/ConfigParamSpec.h"
3535
#include "DataFormatsMCH/TrackMCH.h"
3636
#include "DataFormatsMCH/ROFRecord.h"
37+
#include <EventVisualisationBase/DirectoryLoader.h>
3738
#include "DataFormatsMCH/Cluster.h"
3839
#include <unistd.h>
3940

41+
using std::chrono::duration_cast;
42+
using std::chrono::milliseconds;
43+
using std::chrono::system_clock;
44+
4045
using namespace o2::event_visualisation;
4146
using namespace o2::framework;
4247
using namespace o2::dataformats;
@@ -59,6 +64,7 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
5964
{"eve-dds-collection-index", VariantType::Int, -1, {"number of dpl collection allowed to produce files (-1 means no limit)"}},
6065
{"number-of_files", VariantType::Int, 150, {"maximum number of json files in folder"}},
6166
{"number-of_tracks", VariantType::Int, -1, {"maximum number of track stored in json file (-1 means no limit)"}},
67+
{"number-of_bytes", VariantType::Int, 3000000, {"number of bytes stored in time interval which stops producing new data file (-1 means no limit)"}},
6268
{"time-interval", VariantType::Int, 5000, {"time interval in milliseconds between stored files"}},
6369
{"disable-mc", VariantType::Bool, false, {"disable visualization of MC data"}},
6470
{"disable-write", VariantType::Bool, false, {"disable writing output files"}},
@@ -118,7 +124,7 @@ void O2DPLDisplaySpec::run(ProcessingContext& pc)
118124
if (elapsed < this->mTimeInterval) {
119125
return; // skip this run - it is too often
120126
}
121-
this->mTimeStamp = currentTime;
127+
this->mTimeStamp = currentTime; // next run AFTER period counted from last run, even if there will be not any save
122128
o2::globaltracking::RecoContainer recoCont;
123129
recoCont.collectData(pc, *mDataRequest);
124130
updateTimeDependentParams(pc); // Make sure that this is called after the RecoContainer collect data, since some condition objects are fetched there
@@ -158,12 +164,24 @@ void O2DPLDisplaySpec::run(ProcessingContext& pc)
158164
const auto& tinfo = pc.services().get<o2::framework::TimingInfo>();
159165

160166
std::size_t filesSaved = 0;
167+
const std::vector<std::string> dirs = o2::event_visualisation::DirectoryLoader::allFolders(this->mJsonPath);
168+
const std::string marker = "_";
169+
const std::vector<std::string> exts = {
170+
".json", ".root", ".eve"};
161171
auto processData = [&](const auto& dataMap) {
162172
for (const auto& keyVal : dataMap) {
163173
if (filesSaved >= mMaxPrimaryVertices) {
164174
break;
165175
}
166-
176+
if (this->mNumberOfBytes != -1) {
177+
auto periodStart =
178+
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() - this->mTimeInterval.count();
179+
if (!DirectoryLoader::canCreateNextFile(
180+
dirs, marker, exts, periodStart, this->mNumberOfBytes)) {
181+
LOGF(info, "Already too much data (> %d) to transfer in this period - event will not be not saved ...", this->mNumberOfBytes);
182+
break;
183+
}
184+
}
167185
const auto pv = keyVal.first;
168186
bool save = false;
169187
if (mPrimaryVertexMode) {
@@ -202,6 +220,8 @@ void O2DPLDisplaySpec::run(ProcessingContext& pc)
202220
helper.mEvent.setPrimaryVertex(pv);
203221
helper.save(this->mJsonPath, this->mExt, this->mNumberOfFiles, this->mTrkMask, this->mClMask, tinfo.runNumber, tinfo.creation);
204222
filesSaved++;
223+
currentTime = std::chrono::high_resolution_clock::now(); // time AFTER save
224+
this->mTimeStamp = currentTime; // next run AFTER period counted from last save
205225
}
206226

207227
helper.clear();
@@ -318,6 +338,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
318338
std::chrono::milliseconds timeInterval(cfgc.options().get<int>("time-interval"));
319339
int numberOfFiles = cfgc.options().get<int>("number-of_files");
320340
int numberOfTracks = cfgc.options().get<int>("number-of_tracks");
341+
int numberOfBytes = cfgc.options().get<int>("number-of_bytes");
321342

322343
GlobalTrackID::mask_t srcTrk = GlobalTrackID::getSourcesMask(cfgc.options().get<std::string>("display-tracks"));
323344
GlobalTrackID::mask_t srcCl = GlobalTrackID::getSourcesMask(cfgc.options().get<std::string>("display-clusters"));
@@ -430,7 +451,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc)
430451
"o2-eve-export",
431452
dataRequest->inputs,
432453
{},
433-
AlgorithmSpec{adaptFromTask<O2DPLDisplaySpec>(disableWrite, useMC, srcTrk, srcCl, dataRequest, ggRequest, emcalCalibLoader, jsonFolder, ext, timeInterval, numberOfFiles, numberOfTracks, eveHostNameMatch, minITSTracks, minTracks, filterITSROF, filterTime, timeBracket, removeTPCEta, etaBracket, tracksSorting, onlyNthEvent, primaryVertexMode, maxPrimaryVertices, primaryVertexTriggers, primaryVertexMinZ, primaryVertexMaxZ, primaryVertexMinX, primaryVertexMaxX, primaryVertexMinY, primaryVertexMaxY, maxEMCALCellTime, minEMCALCellEnergy)}});
454+
AlgorithmSpec{adaptFromTask<O2DPLDisplaySpec>(disableWrite, useMC, srcTrk, srcCl, dataRequest, ggRequest, emcalCalibLoader, jsonFolder, ext, timeInterval, numberOfFiles, numberOfTracks, numberOfBytes, eveHostNameMatch, minITSTracks, minTracks, filterITSROF, filterTime, timeBracket, removeTPCEta, etaBracket, tracksSorting, onlyNthEvent, primaryVertexMode, maxPrimaryVertices, primaryVertexTriggers, primaryVertexMinZ, primaryVertexMaxZ, primaryVertexMinX, primaryVertexMaxX, primaryVertexMinY, primaryVertexMaxY, maxEMCALCellTime, minEMCALCellEnergy)}});
434455

435456
// configure dpl timer to inject correct firstTForbit: start from the 1st orbit of TF containing 1st sampled orbit
436457
o2::raw::HBFUtilsInitializer hbfIni(cfgc, specs);

0 commit comments

Comments
 (0)