Skip to content

Commit 76164f6

Browse files
committed
Workflow to update RCT with ts of 1st/last TFs seen
1 parent 5adebea commit 76164f6

6 files changed

Lines changed: 222 additions & 5 deletions

File tree

Detectors/CTF/workflow/src/CTFWriterSpec.cxx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,8 @@ void CTFWriterSpec::run(ProcessingContext& pc)
530530
if (mCreateDict && mSaveDictAfter > 0 && (mNCTF % mSaveDictAfter) == 0) {
531531
storeDictionaries();
532532
}
533+
int dummy = 0;
534+
pc.outputs().snapshot({"ctfdone", 0}, dummy);
533535
}
534536

535537
//___________________________________________________________________
@@ -795,7 +797,7 @@ DataProcessorSpec getCTFWriterSpec(DetID::mask_t dets, const std::string& outTyp
795797
return DataProcessorSpec{
796798
"ctf-writer",
797799
inputs,
798-
Outputs{},
800+
Outputs{{OutputLabel{"ctfdone"}, "CTF", "DONE", 0, Lifetime::Timeframe}},
799801
AlgorithmSpec{adaptFromTask<CTFWriterSpec>(dets, outType, verbosity, reportInterval)}, // RS FIXME once global/local options clash is solved, --output-type will become device option
800802
Options{ //{"output-type", VariantType::String, "ctf", {"output types: ctf (per TF) or dict (create dictionaries) or both or none"}},
801803
{"save-ctf-after", VariantType::Int64, 0ll, {"autosave CTF tree with multiple CTFs after every N CTFs if >0 or every -N MBytes if < 0"}},

Detectors/GRP/workflows/CMakeLists.txt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,14 @@ o2_add_executable(grp-lhc-if-file-workflow
3131
O2::GRPCalibration
3232
O2::DetectorsCalibration)
3333

34+
o2_add_executable(workflow
35+
COMPONENT_NAME rct-updater
36+
SOURCES src/rct-updater-workflow.cxx
37+
PUBLIC_LINK_LIBRARIES O2::Framework
38+
O2::CCDB
39+
O2::DetectorsBase
40+
O2::DataFormatsParameters)
41+
3442
o2_add_executable(grp-create
3543
COMPONENT_NAME ecs
3644
SOURCES src/create-grp-ecs.cxx

Detectors/GRP/workflows/src/create-grp-ecs.cxx

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,22 @@ int createGRPECSObject(const std::string& dataPeriod,
134134
} else {
135135
LOGP(alarm, "Upload to {}/{} with validity {}:{} for SOR:{}/EOR:{} FAILED, returned with code {}", ccdbServer, objPath, tstart, tendVal, tstart, tend, retValGLO);
136136
}
137+
if (runType == GRPECSObject::RunType::PHYSICS || runType == GRPECSObject::RunType::COSMICS) { // also create the RCT/Info/RunInformation entry in case the run type is PHYSICS, to be finalized at EOR
138+
char tempChar{};
139+
std::map<std::string, std::string> mdRCT;
140+
mdRCT["SOR"] = std::to_string(tstart);
141+
mdRCT["EOR"] = std::to_string(tend);
142+
mdRCT["SOX"] = std::to_string(tstartCTP);
143+
mdRCT["EOX"] = std::to_string(tendCTP);
144+
long startValRCT = (long)run;
145+
long endValRCT = (long)(run + 1);
146+
retValRCT = api.storeAsBinaryFile(&tempChar, sizeof(tempChar), "tmp.dat", "char", "RCT/Info/RunInformation", mdRCT, startValRCT, endValRCT);
147+
if (retValRCT == 0) {
148+
LOGP(info, "Uploaded initial RCT object to {}/{} with validity {}:{}", ccdbServer, "RCT/Info/RunInformation", startValRCT, endValRCT);
149+
} else {
150+
LOGP(alarm, "Upload of initial RCT object to {}/{} with validity {}:{} FAILED, returned with code {}", ccdbServer, "RCT/Info/RunInformation", startValRCT, endValRCT, retValRCT);
151+
}
152+
}
137153
if (tend > tstart) {
138154
// override SOR version to the same limits
139155
metadata.erase("EOR");
@@ -157,15 +173,14 @@ int createGRPECSObject(const std::string& dataPeriod,
157173
mdRCT["EOX"] = std::to_string(tendCTP);
158174
long startValRCT = (long)run;
159175
long endValRCT = (long)(run + 1);
160-
retValRCT = api.storeAsBinaryFile(&tempChar, sizeof(tempChar), "tmp.dat", "char", "RCT/Info/RunInformation", mdRCT, startValRCT, endValRCT);
176+
retValRCT = api.updateMetadata("RCT/Info/RunInformation", mdRCT, startValRCT);
161177
if (retValRCT == 0) {
162-
LOGP(info, "Uploaded RCT object to {}/{} with validity {}:{}", ccdbServer, "RCT/Info/RunInformation", startValRCT, endValRCT);
178+
LOGP(info, "Updated RCT object to SOR:{}/EOR:{} SOX:{}/EOX:{}", tstart, tend, tstartCTP, tendCTP);
163179
} else {
164-
LOGP(alarm, "Uploaded RCT object to {}/{} with validity {}:{} FAILED, returned with code {}", ccdbServer, "RCT/Info/RunInformation", startValRCT, endValRCT, retValRCT);
180+
LOGP(alarm, "Update of RCT object to SOR:{}/EOR:{} SOX:{}/EOX:{} FAILED, returned with code {}", tstart, tend, tstartCTP, tendCTP, retValRCT);
165181
}
166182
}
167183
}
168-
169184
} else { // write a local file
170185
auto fname = o2::base::NameConf::getGRPECSFileName();
171186
TFile grpF(fname.c_str(), "recreate");
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/Logger.h"
13+
#include "Framework/ControlService.h"
14+
#include "Framework/ConfigParamRegistry.h"
15+
#include "Framework/InputSpec.h"
16+
#include "Framework/Task.h"
17+
#include "CommonUtils/ConfigurableParam.h"
18+
19+
using namespace o2::framework;
20+
21+
void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
22+
{
23+
std::vector<o2::framework::ConfigParamSpec> options{{"configKeyValues", o2::framework::VariantType::String, "", {"Semicolon separated key=value strings ..."}}};
24+
std::swap(workflowOptions, options);
25+
}
26+
27+
#include "DetectorsBase/GRPGeomHelper.h"
28+
#include "CCDB/CcdbApi.h"
29+
#include "DataFormatsParameters/GRPECSObject.h"
30+
31+
namespace o2::rct
32+
{
33+
class RCTUpdaterSpec : public o2::framework::Task
34+
{
35+
public:
36+
RCTUpdaterSpec(std::shared_ptr<o2::base::GRPGeomRequest> gr) : mGGCCDBRequest(gr) {}
37+
~RCTUpdaterSpec() final = default;
38+
39+
void init(InitContext& ic) final
40+
{
41+
o2::base::GRPGeomHelper::instance().setRequest(mGGCCDBRequest);
42+
mUpdateInterval = std::max(0.1f, ic.options().get<float>("update-interval"));
43+
auto ccdb = ic.options().get<std::string>("ccdb-server");
44+
if (!ccdb.empty() && ccdb != "none") {
45+
mCCDBApi = std::make_unique<o2::ccdb::CcdbApi>();
46+
mCCDBApi->init(ic.options().get<std::string>("ccdb-server"));
47+
} else {
48+
LOGP(warn, "No ccdb server provided, no RCT update will be done");
49+
}
50+
}
51+
52+
void run(ProcessingContext& pc) final
53+
{
54+
o2::base::GRPGeomHelper::instance().checkUpdates(pc);
55+
auto tinfo = pc.services().get<o2::framework::TimingInfo>();
56+
if (tinfo.globalRunNumberChanged) { // do we have RCT object?
57+
const auto* grp = o2::base::GRPGeomHelper::instance().getGRPECS();
58+
mNHBFPerTF = grp->getNHBFPerTF();
59+
if (mNHBFPerTF < 1) {
60+
mNHBFPerTF = 32;
61+
}
62+
mRunNumber = tinfo.runNumber;
63+
mUpdateIntervalTF = uint32_t(mUpdateInterval / (mNHBFPerTF * o2::constants::lhc::LHCOrbitMUS * 1e-6)); // convert update interval in seconds to interval in TFs
64+
LOGP(info, "Will update RCT after {} TFs of {} HBFs ({}s was requested)", mUpdateIntervalTF, mNHBFPerTF, mUpdateInterval);
65+
mOrbitReset = o2::base::GRPGeomHelper::instance().getOrbitResetTimeMS();
66+
mMinOrbit = 0xffffffff;
67+
mMaxOrbit = 0;
68+
if (grp->getRunType() == o2::parameters::GRPECS::PHYSICS || grp->getRunType() == o2::parameters::GRPECS::COSMICS) {
69+
mEnabled = true;
70+
} else {
71+
LOGP(warning, "Run {} type is {}, disabling RCT update", mRunNumber, o2::parameters::GRPECS::RunTypeNames[mRunNumber]);
72+
mEnabled = false;
73+
}
74+
if (mEnabled) {
75+
if (mCCDBApi) {
76+
auto md = mCCDBApi->retrieveHeaders("RCT/Info/RunInformation", {}, grp->getRun());
77+
if (md.empty()) {
78+
mEnabled = false;
79+
LOGP(alarm, "RCT object is missing for {} run {}, disabling RCT updater", o2::parameters::GRPECS::RunTypeNames[grp->getRunType()], grp->getRun());
80+
}
81+
}
82+
}
83+
}
84+
if (mEnabled) {
85+
if (tinfo.firstTForbit < mMinOrbit) {
86+
mMinOrbit = tinfo.firstTForbit;
87+
}
88+
if (tinfo.firstTForbit > mMaxOrbit) {
89+
mMaxOrbit = tinfo.firstTForbit;
90+
}
91+
if (tinfo.tfCounter > mLastTFUpdate + mUpdateIntervalTF) { // need to update
92+
mLastTFUpdate = tinfo.tfCounter;
93+
updateRCT();
94+
}
95+
}
96+
}
97+
98+
void endOfStream(framework::EndOfStreamContext& ec) final
99+
{
100+
if (mEnabled) {
101+
updateRCT();
102+
mEnabled = false;
103+
}
104+
}
105+
106+
void stop() final
107+
{
108+
if (mEnabled) {
109+
updateRCT();
110+
mEnabled = false;
111+
}
112+
}
113+
114+
void finaliseCCDB(framework::ConcreteDataMatcher& matcher, void* obj) final
115+
{
116+
if (o2::base::GRPGeomHelper::instance().finaliseCCDB(matcher, obj)) {
117+
return;
118+
}
119+
}
120+
121+
void updateRCT()
122+
{
123+
std::map<std::string, std::string> mdRCT;
124+
if (mMinOrbit > mMaxOrbit) {
125+
return;
126+
}
127+
mdRCT["STF"] = std::to_string(long(mMinOrbit * o2::constants::lhc::LHCOrbitMUS * 1e-3) + mOrbitReset);
128+
mdRCT["ETF"] = std::to_string(long((mMaxOrbit + mNHBFPerTF - 1) * o2::constants::lhc::LHCOrbitMUS * 1e-3) + mOrbitReset);
129+
long startValRCT = (long)mRunNumber;
130+
long endValRCT = (long)(mRunNumber + 1);
131+
if (mCCDBApi) {
132+
int retValRCT = mCCDBApi->updateMetadata("RCT/Info/RunInformation", mdRCT, startValRCT);
133+
if (retValRCT == 0) {
134+
LOGP(info, "Updated RCT object for run {} with TF start:{} end:{}", mRunNumber, mdRCT["STF"], mdRCT["ETF"]);
135+
} else {
136+
LOGP(alarm, "Update of RCT object for run {} with TF start:{} end:{} FAILED, returned with code {}", mRunNumber, mdRCT["STF"], mdRCT["ETF"], retValRCT);
137+
}
138+
} else {
139+
LOGP(info, "CCDB update disabled, TF timestamp range is {}:{}", mdRCT["STF"], mdRCT["ETF"]);
140+
}
141+
}
142+
143+
private:
144+
bool mEnabled = true;
145+
float mUpdateInterval = 1.;
146+
int mUpdateIntervalTF = 1;
147+
uint32_t mMinOrbit = 0xffffffff;
148+
uint32_t mMaxOrbit = 0;
149+
uint32_t mLastTFUpdate = 0;
150+
long mOrbitReset = 0;
151+
int mRunNumber = 0;
152+
int mNHBFPerTF = 32;
153+
std::shared_ptr<o2::base::GRPGeomRequest> mGGCCDBRequest;
154+
std::unique_ptr<o2::ccdb::CcdbApi> mCCDBApi;
155+
};
156+
} // namespace o2::rct
157+
158+
// ------------------------------------------------------------------
159+
#include "Framework/runDataProcessing.h"
160+
#include "Framework/DataProcessorSpec.h"
161+
162+
WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
163+
{
164+
WorkflowSpec specs;
165+
o2::conf::ConfigurableParam::updateFromString(configcontext.options().get<std::string>("configKeyValues"));
166+
std::vector<InputSpec> inputs{{"ctfdone", "CTF", "DONE", 0, Lifetime::Timeframe}};
167+
auto ggRequest = std::make_shared<o2::base::GRPGeomRequest>(true, // orbitResetTime
168+
true, // GRPECS=true
169+
false, // GRPLHCIF
170+
false, // GRPMagField
171+
false, // askMatLUT
172+
o2::base::GRPGeomRequest::None, // geometry
173+
inputs,
174+
true); // query only once all objects except mag.field
175+
specs.push_back(DataProcessorSpec{
176+
"rct-updater",
177+
inputs,
178+
{},
179+
AlgorithmSpec{adaptFromTask<o2::rct::RCTUpdaterSpec>(ggRequest)},
180+
Options{
181+
{"update-interval", VariantType::Float, 1.f, {"update every ... seconds"}},
182+
{"ccdb-server", VariantType::String, "http://ccdb-test.cern.ch:8080", {"CCDB to update"}}}});
183+
return specs;
184+
}

prodtests/full-system-test/aggregator-workflow.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ if [[ "${GEN_TOPO_VERBOSE:-}" == "1" ]]; then
7676
echo "CALIB_FT0_TIMEOFFSET = $CALIB_FT0_TIMEOFFSET" 1>&2
7777
echo "CALIB_ITS_DEADMAP_TIME = $CALIB_ITS_DEADMAP_TIME" 1>&2
7878
echo "CALIB_MFT_DEADMAP_TIME = $CALIB_MFT_DEADMAP_TIME" 1>&2
79+
echo "CALIB_RCT_UPDATER = ${CALIB_RCT_UPDATER:-}" 1>&2
7980
fi
8081

8182
# beamtype dependent settings
@@ -191,6 +192,10 @@ fi
191192

192193
# calibrations for AGGREGATOR_TASKS == BARREL_TF
193194
if [[ $AGGREGATOR_TASKS == BARREL_TF ]] || [[ $AGGREGATOR_TASKS == ALL ]]; then
195+
# RCT updater
196+
if [[ ${CALIB_RCT_UPDATER:-} == 1 ]]; then
197+
add_W o2-rct-updater-workflow "--ccdb-server $CCDB_POPULATOR_UPLOAD_PATH"
198+
fi
194199
# PrimaryVertex
195200
if [[ $CALIB_PRIMVTX_MEANVTX == 1 ]]; then
196201
: ${TFPERSLOTS_MEANVTX:=55000}

prodtests/full-system-test/dpl-workflow.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ fi
3030
: ${CTF_FREE_DISK_WAIT:="10"} # if disk on EPNs is close to full, wait X seconds before retrying to write
3131
: ${CTF_MAX_FREE_DISK_WAIT:="600"} # if not enough disk space after this time throw error
3232

33+
export CALIB_RCT_UPDATER=0
34+
3335
# entropy encoding/decoding mode, '' is equivalent to '--ans-version compat' (compatible with < 09/2023 data),
3436
# use '--ans-version 1.0 --ctf-dict none' for the new per-TF dictionary mode
3537
: ${RANS_OPT:="--ans-version 1.0 --ctf-dict none"}
@@ -601,6 +603,7 @@ if has_processing_step ENTROPY_ENCODER && [[ ! -z "$WORKFLOW_DETECTORS_CTF" ]] &
601603
if [[ $CREATECTFDICT == 1 ]] && [[ $EXTINPUT == 1 ]]; then CONFIG_CTF+=" --save-dict-after $SAVE_CTFDICT_NTIMEFRAMES"; fi
602604
[[ $EPNSYNCMODE == 1 ]] && CONFIG_CTF+=" --require-free-disk 53687091200 --wait-for-free-disk $CTF_FREE_DISK_WAIT --max-wait-for-free-disk $CTF_MAX_FREE_DISK_WAIT"
603605
add_W o2-ctf-writer-workflow "$CONFIG_CTF"
606+
[[ $SYNCMODE == 1 ]] && export CALIB_RCT_UPDATER=1
604607
fi
605608

606609
# ---------------------------------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)