Skip to content

Commit ee6068a

Browse files
authored
[QC-793] Avoid colliding TaskRunner outputs (#1208)
* [QC-793] Avoid colliding TaskRunner outputs * clang-format
1 parent 65e2b5c commit ee6068a

8 files changed

Lines changed: 39 additions & 21 deletions

File tree

Framework/include/QualityControl/TaskRunner.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ class TaskRunner : public framework::Task
9494
/// \brief ID string for all TaskRunner devices
9595
static std::string createTaskRunnerIdString();
9696
/// \brief Unified DataOrigin for Quality Control tasks
97-
static header::DataOrigin createTaskDataOrigin();
97+
static header::DataOrigin createTaskDataOrigin(const std::string& detectorCode);
9898
/// \brief Unified DataDescription naming scheme for all tasks
9999
static header::DataDescription createTaskDataDescription(const std::string& taskName);
100100
/// \brief Unified DataDescription naming scheme for all timers

Framework/src/InfrastructureGenerator.cxx

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ framework::WorkflowSpec InfrastructureGenerator::generateLocalBatchInfrastructur
278278
auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, 1);
279279
workflow.emplace_back(TaskRunnerFactory::create(taskConfig));
280280

281-
fileSinkInputs.emplace_back(taskSpec.taskName, TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(taskSpec.taskName));
281+
fileSinkInputs.emplace_back(taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName));
282282
}
283283
}
284284

@@ -445,7 +445,7 @@ void InfrastructureGenerator::generateLocalTaskLocalProxy(framework::WorkflowSpe
445445
std::string remotePort = std::to_string(taskSpec.remotePort);
446446
std::string proxyName = taskSpec.detectorName + "-" + taskName + "-proxy";
447447
std::string channelName = taskSpec.detectorName + "-" + taskName + "-proxy";
448-
InputSpec proxyInput{ channelName, TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(taskName), static_cast<SubSpec>(id), Lifetime::Sporadic };
448+
InputSpec proxyInput{ channelName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskName), static_cast<SubSpec>(id), Lifetime::Sporadic };
449449
std::string channelConfig = "name=" + channelName + ",type=pub,method=connect,address=tcp://" +
450450
taskSpec.remoteMachine + ":" + remotePort + ",rateLogging=60,transport=zeromq";
451451

@@ -467,7 +467,7 @@ void InfrastructureGenerator::generateLocalTaskRemoteProxy(framework::WorkflowSp
467467
Outputs proxyOutputs;
468468
for (size_t id = 1; id <= numberOfLocalMachines; id++) {
469469
proxyOutputs.emplace_back(
470-
OutputSpec{ { channelName }, TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(taskName), static_cast<SubSpec>(id), Lifetime::Sporadic });
470+
OutputSpec{ { channelName }, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskName), static_cast<SubSpec>(id), Lifetime::Sporadic });
471471
}
472472

473473
std::string channelConfig = "name=" + channelName + ",type=sub,method=bind,address=tcp://*:" + remotePort +
@@ -490,7 +490,7 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,
490490
for (size_t id = 1; id <= numberOfLocalMachines; id++) {
491491
mergerInputs.emplace_back(
492492
InputSpec{ { taskName + std::to_string(id) },
493-
TaskRunner::createTaskDataOrigin(),
493+
TaskRunner::createTaskDataOrigin(detectorName),
494494
TaskRunner::createTaskDataDescription(taskName),
495495
static_cast<SubSpec>(id),
496496
Lifetime::Sporadic });
@@ -500,7 +500,7 @@ void InfrastructureGenerator::generateMergers(framework::WorkflowSpec& workflow,
500500
mergersBuilder.setInfrastructureName(taskName);
501501
mergersBuilder.setInputSpecs(mergerInputs);
502502
mergersBuilder.setOutputSpec(
503-
{ { "main" }, TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(taskName), 0 });
503+
{ { "main" }, TaskRunner::createTaskDataOrigin(detectorName), TaskRunner::createTaskDataDescription(taskName), 0 });
504504
MergerConfig mergerConfig;
505505
// if we are to change the mode to Full, disable reseting tasks after each cycle.
506506
mergerConfig.inputObjectTimespan = { (mergingMode.empty() || mergingMode == "delta") ? InputObjectsTimespan::LastDifference : InputObjectsTimespan::FullHistory };
@@ -528,7 +528,7 @@ void InfrastructureGenerator::generateCheckRunners(framework::WorkflowSpec& work
528528
// todo: avoid code repetition
529529
for (const auto& taskSpec : infrastructureSpec.tasks) {
530530
if (taskSpec.active) {
531-
InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic };
531+
InputSpec taskOutput{ taskSpec.taskName, TaskRunner::createTaskDataOrigin(taskSpec.detectorName), TaskRunner::createTaskDataDescription(taskSpec.taskName), Lifetime::Sporadic };
532532
tasksOutputMap.insert({ DataSpecUtils::label(taskOutput), taskOutput });
533533
}
534534
}

Framework/src/InfrastructureSpecReader.cxx

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ TaskSpec InfrastructureSpecReader::readSpecEntry<TaskSpec>(std::string taskID, c
9090
ts.moduleName = taskTree.get<std::string>("moduleName");
9191
ts.detectorName = taskTree.get<std::string>("detectorName");
9292
ts.cycleDurationSeconds = taskTree.get<int>("cycleDurationSeconds");
93-
ts.dataSource = readSpecEntry<DataSourceSpec>("", taskTree.get_child("dataSource"), wholeTree);
93+
ts.dataSource = readSpecEntry<DataSourceSpec>(taskID, taskTree.get_child("dataSource"), wholeTree);
9494
ts.active = taskTree.get<bool>("active", ts.active);
9595
ts.maxNumberCycles = taskTree.get<int>("maxNumberCycles", ts.maxNumberCycles);
9696
ts.resetAfterCycles = taskTree.get<size_t>("resetAfterCycles", ts.resetAfterCycles);
@@ -132,7 +132,7 @@ TaskSpec InfrastructureSpecReader::readSpecEntry<TaskSpec>(std::string taskID, c
132132
}
133133

134134
template <>
135-
DataSourceSpec InfrastructureSpecReader::readSpecEntry<DataSourceSpec>(std::string,
135+
DataSourceSpec InfrastructureSpecReader::readSpecEntry<DataSourceSpec>(std::string dataRequestorId,
136136
const boost::property_tree::ptree& dataSourceTree,
137137
const boost::property_tree::ptree& wholeTree)
138138
{
@@ -164,7 +164,11 @@ DataSourceSpec InfrastructureSpecReader::readSpecEntry<DataSourceSpec>(std::stri
164164
}
165165
case DataSourceType::Task: {
166166
dss.name = dataSourceTree.get<std::string>("name");
167-
dss.inputs = { { dss.name, TaskRunner::createTaskDataOrigin(), TaskRunner::createTaskDataDescription(dss.name), 0, Lifetime::Sporadic } };
167+
// dss.name is actually taskID (backwards compatibility). We deduce the taskName as followS:
168+
auto taskName = wholeTree.get<std::string>("qc.tasks." + dss.name + ".taskName", dss.name);
169+
auto detectorName = wholeTree.get<std::string>("qc.tasks." + dss.name + ".detectorName");
170+
171+
dss.inputs = { { dss.name, TaskRunner::createTaskDataOrigin(detectorName), TaskRunner::createTaskDataDescription(taskName), 0, Lifetime::Sporadic } };
168172
if (dataSourceTree.count("MOs") > 0) {
169173
for (const auto& moName : dataSourceTree.get_child("MOs")) {
170174
dss.subInputs.push_back(moName.second.get_value<std::string>());
@@ -230,7 +234,7 @@ CheckSpec InfrastructureSpecReader::readSpecEntry<CheckSpec>(std::string checkID
230234
const auto& dataSourcesTree = checkTree.count("dataSource") > 0 ? checkTree.get_child("dataSource") : checkTree.get_child("dataSources");
231235
for (const auto& [_key, dataSourceTree] : dataSourcesTree) {
232236
(void)_key;
233-
cs.dataSources.push_back(readSpecEntry<DataSourceSpec>("", dataSourceTree, wholeTree));
237+
cs.dataSources.push_back(readSpecEntry<DataSourceSpec>(checkID, dataSourceTree, wholeTree));
234238
}
235239

236240
if (auto policy = checkTree.get_optional<std::string>("policy"); policy.has_value()) {
@@ -261,7 +265,7 @@ AggregatorSpec InfrastructureSpecReader::readSpecEntry<AggregatorSpec>(std::stri
261265
const auto& dataSourcesTree = aggregatorTree.count("dataSource") > 0 ? aggregatorTree.get_child("dataSource") : aggregatorTree.get_child("dataSources");
262266
for (const auto& [_key, dataSourceTree] : dataSourcesTree) {
263267
(void)_key;
264-
as.dataSources.push_back(readSpecEntry<DataSourceSpec>("", dataSourceTree, wholeTree));
268+
as.dataSources.push_back(readSpecEntry<DataSourceSpec>(aggregatorID, dataSourceTree, wholeTree));
265269
}
266270

267271
if (auto policy = aggregatorTree.get_optional<std::string>("policy"); policy.has_value()) {

Framework/src/TaskRunner.cxx

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,23 @@ std::string TaskRunner::createTaskRunnerIdString()
239239
return std::string("qc-task");
240240
}
241241

242-
header::DataOrigin TaskRunner::createTaskDataOrigin()
242+
header::DataOrigin TaskRunner::createTaskDataOrigin(const std::string& detectorCode)
243243
{
244-
return header::DataOrigin{ "QC" };
244+
// We need a unique Data Origin, so we can have QC Tasks with the same names for different detectors.
245+
// However, to avoid colliding with data marked as e.g. TPC/CLUSTERS, we add 'Q' to the data origin, so it is Q<det>.
246+
std::string originStr = "Q";
247+
if (detectorCode.empty()) {
248+
ILOG(Warning, Ops) << "empty detector code for a task data origin, trying to survive with: DET" << ENDM;
249+
originStr += "DET";
250+
} else if (detectorCode.size() > 3) {
251+
ILOG(Warning, Ops) << "too long detector code for a task data origin: " + detectorCode + ", trying to survive with: " + detectorCode.substr(0, 3) << ENDM;
252+
originStr += detectorCode.substr(0, 3);
253+
} else {
254+
originStr += detectorCode;
255+
}
256+
o2::header::DataOrigin origin;
257+
origin.runtimeInit(originStr.c_str());
258+
return origin;
245259
}
246260

247261
header::DataDescription TaskRunner::createTaskDataDescription(const std::string& taskName)

Framework/src/TaskRunnerFactory.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,13 @@ TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig
6868
}
6969
auto inputs = taskSpec.dataSource.inputs;
7070
inputs.emplace_back("timer-cycle",
71-
TaskRunner::createTaskDataOrigin(),
71+
TaskRunner::createTaskDataOrigin(taskSpec.detectorName),
7272
TaskRunner::createTimerDataDescription(taskSpec.taskName),
7373
0,
7474
Lifetime::Timer);
7575

7676
OutputSpec monitorObjectsSpec{ { "mo" },
77-
TaskRunner::createTaskDataOrigin(),
77+
TaskRunner::createTaskDataOrigin(taskSpec.detectorName),
7878
TaskRunner::createTaskDataDescription(taskSpec.taskName),
7979
static_cast<header::DataHeader::SubSpecificationType>(parallelTaskID),
8080
Lifetime::Sporadic };

Framework/test/testCheck.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ BOOST_AUTO_TEST_CASE(test_check_specs)
6161
Check check(getCheckConfig(configFilePath, "singleCheck"));
6262

6363
BOOST_REQUIRE_EQUAL(check.getInputs().size(), 1);
64-
BOOST_CHECK_EQUAL(check.getInputs()[0], (InputSpec{ { "mo" }, "QC", "skeletonTask", 0, Lifetime::Sporadic }));
64+
BOOST_CHECK_EQUAL(check.getInputs()[0], (InputSpec{ { "mo" }, "QTST", "skeletonTask", 0, Lifetime::Sporadic }));
6565

6666
BOOST_CHECK_EQUAL(check.getOutputSpec(), (OutputSpec{ "QC", "singleCheck-chk", 0, Lifetime::Sporadic }));
6767
}

Framework/test/testSharedConfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@
145145
"moduleName": "QcSkeleton",
146146
"dataSource": [{
147147
"type": "Task",
148-
"name": "xyzTask"
148+
"name": "xyzTaskID"
149149
}]
150150
},
151151
"checkOnEachSeparately": {

Framework/test/testTaskRunner.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ BOOST_AUTO_TEST_CASE(test_factory)
6868
BOOST_CHECK(taskRunner.inputs[1].lifetime == Lifetime::Timer);
6969

7070
BOOST_REQUIRE_EQUAL(taskRunner.outputs.size(), 1);
71-
BOOST_CHECK_EQUAL(taskRunner.outputs[0], (OutputSpec{ { "mo" }, "QC", "abcTask", 123, Lifetime::Sporadic }));
71+
BOOST_CHECK_EQUAL(taskRunner.outputs[0], (OutputSpec{ { "mo" }, "QXXX", "abcTask", 123, Lifetime::Sporadic }));
7272

7373
BOOST_CHECK(taskRunner.algorithm.onInit != nullptr);
7474

@@ -78,7 +78,7 @@ BOOST_AUTO_TEST_CASE(test_factory)
7878

7979
BOOST_AUTO_TEST_CASE(test_task_runner_static)
8080
{
81-
BOOST_CHECK_EQUAL(TaskRunner::createTaskDataOrigin(), DataOrigin("QC"));
81+
BOOST_CHECK_EQUAL(TaskRunner::createTaskDataOrigin("DET"), DataOrigin("QDET"));
8282
BOOST_CHECK(TaskRunner::createTaskDataDescription("qwertyuiop") == DataDescription("qwertyuiop"));
8383
BOOST_CHECK(TaskRunner::createTaskDataDescription("012345678901234567890") == DataDescription("0123456789012345"));
8484
BOOST_CHECK_THROW(TaskRunner::createTaskDataDescription(""), AliceO2::Common::FatalException);
@@ -97,7 +97,7 @@ BOOST_AUTO_TEST_CASE(test_task_runner)
9797
BOOST_CHECK_EQUAL(qcTask.getInputsSpecs()[0], DataSampling::InputSpecsForPolicy(dataSamplingTree, "tpcclust").at(0));
9898
BOOST_CHECK(qcTask.getInputsSpecs()[1].lifetime == Lifetime::Timer);
9999

100-
BOOST_CHECK_EQUAL(qcTask.getOutputSpec(), (OutputSpec{ { "mo" }, "QC", "abcTask", 0, Lifetime::Sporadic }));
100+
BOOST_CHECK_EQUAL(qcTask.getOutputSpec(), (OutputSpec{ { "mo" }, "QXXX", "abcTask", 0, Lifetime::Sporadic }));
101101

102102
BOOST_REQUIRE_EQUAL(qcTask.getOptions().size(), 3);
103103
BOOST_CHECK_EQUAL(qcTask.getOptions()[0].name, "period-timer-cycle");

0 commit comments

Comments
 (0)