From a364dbefc159fe2b18db98048d1bc5f9f621445b Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Tue, 22 Jun 2021 11:55:40 +0200 Subject: [PATCH 1/5] [QC-443] Refactor configuration of TaskRunner and lay foundation for others Now TaskRunner only gets TaskRunnerConfig with all the information it needs in order to run. It will be able to update the configuration by retrieving it from DPL and letting TaskRunnerFactory provide updated TaskConfig. --- Framework/CMakeLists.txt | 5 +- Framework/include/QualityControl/CommonSpec.h | 43 ++++ .../include/QualityControl/DataSourceSpec.h | 56 +++++ .../QualityControl/InfrastructureGenerator.h | 4 +- .../QualityControl/InfrastructureSpec.h | 35 ++++ .../QualityControl/InfrastructureSpecReader.h | 49 +++++ .../include/QualityControl/ObjectsManager.h | 1 - .../include/QualityControl/TaskFactory.h | 4 +- Framework/include/QualityControl/TaskRunner.h | 26 +-- .../{TaskConfig.h => TaskRunnerConfig.h} | 24 ++- .../QualityControl/TaskRunnerFactory.h | 17 +- Framework/include/QualityControl/TaskSpec.h | 74 +++++++ .../include/QualityControl/runnerUtils.h | 17 ++ Framework/src/DataSourceSpec.cxx | 30 +++ Framework/src/InfrastructureGenerator.cxx | 194 ++++++++---------- Framework/src/InfrastructureSpecReader.cxx | 194 ++++++++++++++++++ Framework/src/TaskFactory.cxx | 2 +- Framework/src/TaskRunner.cxx | 126 ++---------- Framework/src/TaskRunnerFactory.cxx | 73 ++++++- Framework/test/testObjectsManager.cxx | 20 +- Framework/test/testPublisher.cxx | 8 +- Framework/test/testSharedConfig.json | 11 +- Framework/test/testTaskInterface.cxx | 7 +- Framework/test/testTaskRunner.cxx | 29 ++- Framework/test/testWorkflow.json | 1 + Modules/Daq/test/testQcDaq.cxx | 2 +- Modules/Example/test/testFactory.cxx | 4 +- Modules/Example/test/testQcExample.cxx | 2 +- Modules/Skeleton/test/testQcSkeleton.cxx | 2 +- 29 files changed, 769 insertions(+), 291 deletions(-) create mode 100644 Framework/include/QualityControl/CommonSpec.h create mode 100644 Framework/include/QualityControl/DataSourceSpec.h create mode 100644 Framework/include/QualityControl/InfrastructureSpec.h create mode 100644 Framework/include/QualityControl/InfrastructureSpecReader.h rename Framework/include/QualityControl/{TaskConfig.h => TaskRunnerConfig.h} (68%) create mode 100644 Framework/include/QualityControl/TaskSpec.h create mode 100644 Framework/src/DataSourceSpec.cxx create mode 100644 Framework/src/InfrastructureSpecReader.cxx diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index da1b1f6cc4..7075d8c55e 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -53,6 +53,7 @@ add_library(O2QualityControl src/TaskInterface.cxx src/RepositoryBenchmark.cxx src/InfrastructureGenerator.cxx + src/InfrastructureSpecReader.cxx src/Check.cxx src/Aggregator.cxx src/ServiceDiscovery.cxx @@ -73,7 +74,9 @@ add_library(O2QualityControl src/UpdatePolicyManager.cxx src/AdvancedWorkflow.cxx src/QualitiesToTRFCollectionConverter.cxx - src/Calculators.cxx) + src/Calculators.cxx + src/DataSourceSpec.cxx + ) target_include_directories( O2QualityControl diff --git a/Framework/include/QualityControl/CommonSpec.h b/Framework/include/QualityControl/CommonSpec.h new file mode 100644 index 0000000000..8586ec18c0 --- /dev/null +++ b/Framework/include/QualityControl/CommonSpec.h @@ -0,0 +1,43 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_COMMONSPEC_H +#define QUALITYCONTROL_COMMONSPEC_H + +/// +/// \file CommonSpec.h +/// \author Piotr Konopka +/// + +#include +#include +#include + +namespace o2::quality_control::core +{ + +struct CommonSpec { + CommonSpec() = default; + + std::unordered_map database; + int activityNumber; + int activityType; + std::string monitoringUrl = "infologger:///debug?qc"; + std::string consulUrl; + std::string conditionDBUrl = "http://ccdb-test.cern.ch:8080"; + bool infologgerFilterDiscardDebug = false; + int infologgerDiscardLevel = 21; + + std::string configurationSource; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_COMMONSPEC_H diff --git a/Framework/include/QualityControl/DataSourceSpec.h b/Framework/include/QualityControl/DataSourceSpec.h new file mode 100644 index 0000000000..3e21fa21db --- /dev/null +++ b/Framework/include/QualityControl/DataSourceSpec.h @@ -0,0 +1,56 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_DATASOURCESPEC_H +#define QUALITYCONTROL_DATASOURCESPEC_H + +/// +/// \file DataSourceSpec.h +/// \author Piotr Konopka +/// + +#include +#include +#include +#include + +namespace o2::quality_control::core +{ + +enum class DataSourceType { + DataSamplingPolicy, + Direct, + Task, + Check, + Aggregator, + PostProcessingTask, + ExternalTask, + Invalid +}; + +// this should allow us to represent all data sources which come from DPL (and maybe CCDB). +struct DataSourceSpec { + explicit DataSourceSpec(DataSourceType type = DataSourceType::Invalid, std::unordered_map params = {}); + + // todo: use c++20 concepts when available + template )>> + bool isOneOf(Args... dataSourceType) const + { + return (... || (dataSourceType == type)); + } + + DataSourceType type; + std::unordered_map typeSpecificParams; + std::vector inputs; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_DATASOURCESPEC_H diff --git a/Framework/include/QualityControl/InfrastructureGenerator.h b/Framework/include/QualityControl/InfrastructureGenerator.h index eacf11fc02..9250cd1e65 100644 --- a/Framework/include/QualityControl/InfrastructureGenerator.h +++ b/Framework/include/QualityControl/InfrastructureGenerator.h @@ -71,9 +71,9 @@ class InfrastructureGenerator /// configuration to be 'local'. /// /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") - /// \param host - name of the machine + /// \param targetHost - name of the machine /// \return generated local QC workflow - static framework::WorkflowSpec generateLocalInfrastructure(std::string configurationSource, std::string host); + static framework::WorkflowSpec generateLocalInfrastructure(std::string configurationSource, std::string targetHost); /// \brief Generates the local part of the QC infrastructure for a specified host. /// diff --git a/Framework/include/QualityControl/InfrastructureSpec.h b/Framework/include/QualityControl/InfrastructureSpec.h new file mode 100644 index 0000000000..facea241f6 --- /dev/null +++ b/Framework/include/QualityControl/InfrastructureSpec.h @@ -0,0 +1,35 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_INFRASTRUCTURESPEC_H +#define QUALITYCONTROL_INFRASTRUCTURESPEC_H + +/// +/// \file InfrastructureSpec.h +/// \author Piotr Konopka +/// + +#include "QualityControl/CommonSpec.h" +#include "QualityControl/TaskSpec.h" + +#include + +namespace o2::quality_control::core +{ + +struct InfrastructureSpec { + CommonSpec common; + std::vector tasks; + // todo: add other actors +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_INFRASTRUCTURESPEC_H diff --git a/Framework/include/QualityControl/InfrastructureSpecReader.h b/Framework/include/QualityControl/InfrastructureSpecReader.h new file mode 100644 index 0000000000..50d0d575ed --- /dev/null +++ b/Framework/include/QualityControl/InfrastructureSpecReader.h @@ -0,0 +1,49 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_INFRASTRUCTURESPECREADER_H +#define QUALITYCONTROL_INFRASTRUCTURESPECREADER_H + +/// +/// \file InfrastructureSpecReader.h +/// \author Piotr Konopka +/// + +#include "QualityControl/InfrastructureSpec.h" +#include "QualityControl/TaskSpec.h" +#include "QualityControl/CommonSpec.h" +#include "QualityControl/DataSourceSpec.h" +#include + +namespace o2::quality_control::core +{ + +// If have to increase the performance of reading, +// we can probably improve it by writing a proper parser like for WorkflowSerializationHelpers in O2 +// Also, move operators could be implemented. + +class InfrastructureSpecReader +{ + public: + /// \brief Reads the full QC configuration file. + // todo remove configurationSource when it is possible + static InfrastructureSpec readInfrastructureSpec(const boost::property_tree::ptree&, const std::string& configurationSource); + + // readers for separate parts + static CommonSpec readCommonSpec(const boost::property_tree::ptree& config, const std::string& configurationSource); + static TaskSpec readTaskSpec(std::string taskName, const boost::property_tree::ptree& taskSpec, const std::string& configurationSource); + static DataSourceSpec readDataSourceSpec(const boost::property_tree::ptree& dataSourceSpec, const std::string& configurationSource); + + static std::string validateDetectorName(std::string name); +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_INFRASTRUCTURESPECREADER_H diff --git a/Framework/include/QualityControl/ObjectsManager.h b/Framework/include/QualityControl/ObjectsManager.h index d7b1fcfec4..4ba9e54768 100644 --- a/Framework/include/QualityControl/ObjectsManager.h +++ b/Framework/include/QualityControl/ObjectsManager.h @@ -20,7 +20,6 @@ // QC #include "QualityControl/MonitorObject.h" #include "QualityControl/MonitorObjectCollection.h" -#include "QualityControl/TaskConfig.h" // stl #include #include diff --git a/Framework/include/QualityControl/TaskFactory.h b/Framework/include/QualityControl/TaskFactory.h index 76b82cbe27..386c6424a0 100644 --- a/Framework/include/QualityControl/TaskFactory.h +++ b/Framework/include/QualityControl/TaskFactory.h @@ -20,7 +20,7 @@ // STL #include // QC -#include "QualityControl/TaskConfig.h" +#include "QualityControl/TaskRunnerConfig.h" #include "QualityControl/TaskInterface.h" namespace o2::quality_control::core @@ -43,7 +43,7 @@ class TaskFactory /// The TaskInterface actual class is decided based on the parameters passed. /// \todo make it static ? /// \author Barthelemy von Haller - TaskInterface* create(TaskConfig& taskConfig, std::shared_ptr objectsManager); + TaskInterface* create(TaskRunnerConfig& taskConfig, std::shared_ptr objectsManager); }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index c9dc4a17da..9434fc3ccb 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -30,7 +30,7 @@ #include #include // QC -#include "QualityControl/TaskConfig.h" +#include "QualityControl/TaskRunnerConfig.h" #include "QualityControl/TaskInterface.h" namespace o2::configuration @@ -75,7 +75,7 @@ class TaskRunner : public framework::Task /// \param taskName - name of the task, which exists in tasks list in the configuration file /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies - TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id = 0); + TaskRunner(const TaskRunnerConfig& config); ~TaskRunner() override = default; /// \brief TaskRunner's init callback @@ -86,13 +86,10 @@ class TaskRunner : public framework::Task /// \brief TaskRunner's completion policy callback static framework::CompletionPolicy::CompletionOp completionPolicyCallback(o2::framework::InputSpan const& inputs); - std::string getDeviceName() { return mDeviceName; }; - const framework::Inputs& getInputsSpecs() { return mInputSpecs; }; - const framework::OutputSpec getOutputSpec() { return mMonitorObjectsSpec; }; - const framework::Options getOptions() { return mOptions; }; - - /// \brief Makes TaskRunner invoke TaskInterface::reset() each n cycles. n = 0 means never. - void setResetAfterCycles(size_t n = 0); + std::string getDeviceName() { return mTaskConfig.deviceName; }; + const framework::Inputs& getInputsSpecs() const { return mTaskConfig.inputSpecs; }; + const framework::OutputSpec getOutputSpec() { return mTaskConfig.moSpec; }; + const framework::Options getOptions() { return mTaskConfig.options; }; /// \brief ID string for all TaskRunner devices static std::string createTaskRunnerIdString(); @@ -124,25 +121,16 @@ class TaskRunner : public framework::Task void saveToFile(); private: - std::string mDeviceName; - TaskConfig mTaskConfig; - std::shared_ptr mConfigFile; // used in init only + TaskRunnerConfig mTaskConfig; std::shared_ptr mCollector; std::shared_ptr mTask; - size_t mResetAfterCycles = 0; std::shared_ptr mObjectsManager; int mRunNumber; - std::string validateDetectorName(std::string name) const; boost::property_tree::ptree getTaskConfigTree() const; void updateMonitoringStats(framework::ProcessingContext& pCtx); void computeRunNumber(const framework::ServiceRegistry& services); - // consider moving these to TaskConfig - framework::Inputs mInputSpecs; - framework::OutputSpec mMonitorObjectsSpec; - framework::Options mOptions; - bool mCycleOn = false; bool mNoMoreCycles = false; int mCycleNumber = 0; diff --git a/Framework/include/QualityControl/TaskConfig.h b/Framework/include/QualityControl/TaskRunnerConfig.h similarity index 68% rename from Framework/include/QualityControl/TaskConfig.h rename to Framework/include/QualityControl/TaskRunnerConfig.h index bb8763dfd6..672be59934 100644 --- a/Framework/include/QualityControl/TaskConfig.h +++ b/Framework/include/QualityControl/TaskRunnerConfig.h @@ -10,7 +10,7 @@ // or submit itself to any jurisdiction. /// -/// \file TaskConfig.h +/// \file TaskRunnerConfig.h /// \author Barthelemy von Haller /// @@ -19,23 +19,37 @@ #include #include +#include + +#include namespace o2::quality_control::core { /// \brief Container for the configuration of a Task -struct TaskConfig { +struct TaskRunnerConfig { + std::string deviceName; std::string taskName; std::string moduleName; std::string className; int cycleDurationSeconds; int maxNumberCycles; - std::string consulUrl; - std::string conditionUrl = ""; + std::string consulUrl{}; + std::string conditionUrl{}; + std::string monitoringUrl{}; + framework::Inputs inputSpecs{}; + framework::OutputSpec moSpec{ "XXX", "INVALID" }; + framework::Options options{}; std::unordered_map customParameters = {}; std::string detectorName = "MISC"; // intended to be the 3 letters code int parallelTaskID = 0; // ID to differentiate parallel local Tasks from one another. 0 means this is the only one. - std::string saveToFile = ""; + std::string saveToFile{}; + int resetAfterCycles = 0; + bool infologgerFilterDiscardDebug = false; + int infologgerDiscardLevel = 21; + int activityType = 0; + int defaultRunNumber = 0; + std::string configurationSource{}; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/TaskRunnerFactory.h b/Framework/include/QualityControl/TaskRunnerFactory.h index 3de1a56bb5..326d50a671 100644 --- a/Framework/include/QualityControl/TaskRunnerFactory.h +++ b/Framework/include/QualityControl/TaskRunnerFactory.h @@ -21,6 +21,8 @@ #include #include +#include "QualityControl/CommonSpec.h" +#include "QualityControl/TaskSpec.h" namespace o2::framework { @@ -30,6 +32,8 @@ class CompletionPolicy; namespace o2::quality_control::core { +struct TaskRunnerConfig; + /// \brief Factory in charge of creating DataProcessorSpec of QC task class TaskRunnerFactory { @@ -37,14 +41,13 @@ class TaskRunnerFactory TaskRunnerFactory() = default; virtual ~TaskRunnerFactory() = default; - /// \brief Creator of tasks + /// \brief Creates TaskRunner /// - /// \param taskName - name of the task, which exists in tasks list in the configuration file - /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") - /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies - /// \param resetAfterPublish - should taskRunner reset the user's task after each MO publication - static o2::framework::DataProcessorSpec - create(std::string taskName, std::string configurationSource, size_t id = 0, size_t resetAfterCycles = 0); + /// \param taskConfig + static o2::framework::DataProcessorSpec create(const TaskRunnerConfig&); + + /// \brief Knows how to create TaskConfig from Specs + static TaskRunnerConfig extractConfig(const CommonSpec&, const TaskSpec&, std::optional id = std::nullopt, std::optional resetAfterCycles = std::nullopt); /// \brief Provides necessary customization of the TaskRunners. /// diff --git a/Framework/include/QualityControl/TaskSpec.h b/Framework/include/QualityControl/TaskSpec.h new file mode 100644 index 0000000000..2205ac10a6 --- /dev/null +++ b/Framework/include/QualityControl/TaskSpec.h @@ -0,0 +1,74 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_TASKSPEC_H +#define QUALITYCONTROL_TASKSPEC_H + +/// +/// \file TaskSpec.h +/// \author Piotr Konopka +/// + +#include +#include + +#include "QualityControl/DataSourceSpec.h" + +namespace o2::quality_control::core +{ + +enum class TaskLocationSpec { + Local, + Remote +}; + +/// \brief Specification of a Task, which should map the JSON configuration structure. +struct TaskSpec { + // default, invalid spec + TaskSpec() = default; + + // minimal valid spec + TaskSpec(std::string taskName, std::string className, std::string moduleName, std::string detectorName, + int cycleDurationSeconds, DataSourceSpec dataSource) + : taskName(std::move(taskName)), + className(std::move(className)), + moduleName(std::move(moduleName)), + detectorName(std::move(detectorName)), + cycleDurationSeconds(cycleDurationSeconds), + dataSource(std::move(dataSource)) + { + } + + // basic + std::string taskName = "Invalid"; + std::string className = "Invalid"; + std::string moduleName = "Invalid"; + std::string detectorName = "Invalid"; + int cycleDurationSeconds = -1; + DataSourceSpec dataSource; + // advanced + bool active = true; + int maxNumberCycles = -1; + size_t resetAfterCycles = 0; + std::string saveObjectsToFile; + std::unordered_map customParameters = {}; + // multinode setups + TaskLocationSpec location = TaskLocationSpec::Remote; + std::vector localMachines = {}; + std::string remoteMachine = "any"; + uint16_t remotePort = 36543; + std::string localControl = "aliecs"; + std::string mergingMode = "delta"; // todo as enum? + int mergerCycleMultiplier = 1; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_TASKSPEC_H diff --git a/Framework/include/QualityControl/runnerUtils.h b/Framework/include/QualityControl/runnerUtils.h index f841cc6614..43181c4868 100644 --- a/Framework/include/QualityControl/runnerUtils.h +++ b/Framework/include/QualityControl/runnerUtils.h @@ -81,6 +81,23 @@ inline int computeRunNumber(const framework::ServiceRegistry& services, const bo return run; } +inline int computeRunNumber(const framework::ServiceRegistry& services, int defaultRunNumber = 0) +{ // Determine run number + int run = 0; + try { + auto temp = services.get().device()->fConfig->GetProperty("runNumber", "unspecified"); + ILOG(Info, Devel) << "Got this property runNumber from RawDeviceService: '" << temp << "'" << ENDM; + run = stoi(temp); + ILOG(Info, Support) << " Run number found in options: " << run << ENDM; + } catch (std::invalid_argument& ia) { + ILOG(Info, Support) << " Run number not found in options or is not a number, \n" + " using the one from the config file or 0 as a last resort." + << ENDM; + } + run = run > 0 /* found it in service */ ? run : defaultRunNumber; + return run; +} + } // namespace o2::quality_control::core #endif //QUALITYCONTROL_RUNNERUTILS_H diff --git a/Framework/src/DataSourceSpec.cxx b/Framework/src/DataSourceSpec.cxx new file mode 100644 index 0000000000..6c9d9f847a --- /dev/null +++ b/Framework/src/DataSourceSpec.cxx @@ -0,0 +1,30 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file DataSourceSpec.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/DataSourceSpec.h" +#include + +namespace o2::quality_control::core +{ + +// todo make DataSourceUtils + +DataSourceSpec::DataSourceSpec(DataSourceType type, std::unordered_map params) + : type(type), typeSpecificParams(std::move(params)) +{ + // todo: validation? +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index 8047a4368a..46cbb4473f 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -25,6 +25,9 @@ #include "QualityControl/PostProcessingDevice.h" #include "QualityControl/Version.h" #include "QualityControl/QcInfoLogger.h" +#include "QualityControl/TaskSpec.h" +#include "QualityControl/InfrastructureSpecReader.h" +#include "QualityControl/InfrastructureSpec.h" #include #include @@ -50,7 +53,6 @@ using SubSpec = o2::header::DataHeader::SubSpecificationType; namespace o2::quality_control::core { -const char* defaultRemotePort = "36543"; uint16_t defaultPolicyPort = 42349; struct DataSamplingPolicySpec { @@ -65,19 +67,21 @@ struct DataSamplingPolicySpec { framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructure(std::string configurationSource) { - WorkflowSpec workflow; - auto config = ConfigurationFactory::getConfiguration(configurationSource); printVersion(); - if (config->getRecursive("qc").count("tasks")) { - for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { - if (taskConfig.get("active", true)) { - // The "resetAfterCycles" parameters should be handled differently for standalone/remote and local tasks, - // thus we should not let TaskRunnerFactory read it and decide by itself, since it might not be aware of - // the context we run QC. - size_t resetAfterCycles = taskConfig.get("resetAfterCycles", 0); - workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, 0, resetAfterCycles)); - } + auto config = ConfigurationFactory::getConfiguration(configurationSource); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + // todo: report the number of tasks/checks/etc once all are read there. + + WorkflowSpec workflow; + + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (taskSpec.active) { + // The "resetAfterCycles" parameters should be handled differently for standalone/remote and local tasks, + // thus we should not let TaskRunnerFactory read it and decide by itself, since it might not be aware of + // the context we run QC. + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, taskSpec.resetAfterCycles); + workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); } } auto checkRunnerOutputs = generateCheckRunners(workflow, configurationSource); @@ -93,73 +97,59 @@ void InfrastructureGenerator::generateStandaloneInfrastructure(framework::Workfl workflow.insert(std::end(workflow), std::begin(qcInfrastructure), std::end(qcInfrastructure)); } -WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string configurationSource, std::string host) +WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string configurationSource, std::string targetHost) { - WorkflowSpec workflow; - TaskRunnerFactory taskRunnerFactory; - std::set samplingPoliciesUsed; + printVersion(); auto config = ConfigurationFactory::getConfiguration(configurationSource); - printVersion(); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + + WorkflowSpec workflow; + std::set samplingPoliciesUsed; - if (config->getRecursive("qc").count("tasks") == 0) { + if (infrastructureSpec.tasks.empty()) { return workflow; } - for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { - if (!taskConfig.get("active")) { - ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM; + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (!taskSpec.active) { + ILOG(Info, Devel) << "Task " << taskSpec.taskName << " is disabled, ignoring." << ENDM; continue; } - if (taskConfig.get("location") == "local") { - if (taskConfig.get_child("localMachines").empty()) { - throw std::runtime_error("No local machines specified for task " + taskName + " in its configuration"); + if (taskSpec.location == TaskLocationSpec::Local) { + if (taskSpec.localMachines.empty()) { + throw std::runtime_error("No local machines specified for task " + taskSpec.taskName + " in its configuration"); } size_t id = 1; - for (const auto& machine : taskConfig.get_child("localMachines")) { + for (const auto& machine : taskSpec.localMachines) { // We spawn a task and proxy only if we are on the right machine. - if (machine.second.get("") == host) { + if (machine == targetHost) { // If we use delta mergers, then the moving window is implemented by the last Merger layer. // The QC Tasks should always send a delta covering one cycle. - int resetAfterCycles = taskConfig.get("mergingMode", "delta") == "delta" ? 1 : taskConfig.get("resetAfterCycles", 0); + int resetAfterCycles = taskSpec.mergingMode == "delta" ? 1 : (int)taskSpec.resetAfterCycles; + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, id, resetAfterCycles); // Generate QC Task Runner - workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, resetAfterCycles)); + workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); // Generate an output proxy // These should be removed when we are able to declare dangling output in normal DPL devices - auto remoteMachine = taskConfig.get_optional("remoteMachine"); - if (!remoteMachine.has_value()) { - ILOG(Warning, Devel) - << "No remote machine was specified for a multinode QC setup." - " This is fine if running with AliECS, but it will fail in standalone mode." - << ENDM; - } - auto remotePort = taskConfig.get_optional("remotePort"); - if (!remotePort.has_value()) { - ILOG(Warning, Devel) - << "No remote port was specified for a multinode QC setup." - " This is fine if running with AliECS, but it might fail in standalone mode." - << ENDM; - } - generateLocalTaskLocalProxy(workflow, id, taskName, remoteMachine.value_or("any"), - remotePort.value_or(defaultRemotePort), - taskConfig.get("localControl", "aliecs")); + generateLocalTaskLocalProxy(workflow, id, taskSpec.taskName, taskSpec.remoteMachine, std::to_string(taskSpec.remotePort), taskSpec.localControl); break; } id++; } - } else // if (taskConfig.get("location") == "remote") + } else // TaskLocationSpec::Remote { // Collecting Data Sampling Policies - auto dataSourceTree = taskConfig.get_child("dataSource"); - std::string type = dataSourceTree.get("type"); - if (type == "dataSamplingPolicy") { - samplingPoliciesUsed.insert({ dataSourceTree.get("name"), taskConfig.get("localControl", "aliecs") }); - } else if (type == "direct") { - throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources"); - } else { - throw std::runtime_error("Configuration error: dataSource type unknown : " + type); + switch (taskSpec.dataSource.type) { + case DataSourceType::DataSamplingPolicy: + samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl }); + break; + case DataSourceType::Direct: + throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources"); + default: + throw std::runtime_error("Configuration error: unsupported dataSource for remote QC Tasks"); } } } @@ -172,7 +162,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co std::vector machines = DataSampling::MachinesForPolicy(config.get(), policyName); for (const auto& machine : machines) { - if (machine == host) { + if (machine == targetHost) { generateDataSamplingPolicyLocalProxy(workflow, policyName, inputSpecs, machine, port, control); } } @@ -189,65 +179,53 @@ void InfrastructureGenerator::generateLocalInfrastructure(framework::WorkflowSpe o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructure(std::string configurationSource) { + printVersion(); + + auto config = ConfigurationFactory::getConfiguration(configurationSource); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + WorkflowSpec workflow; std::set samplingPoliciesUsed; - auto config = ConfigurationFactory::getConfiguration(configurationSource); - printVersion(); - if (config->getRecursive("qc").count("tasks")) { - for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { - if (!taskConfig.get("active", true)) { - ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM; - continue; - } + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (!taskSpec.active) { + ILOG(Info, Devel) << "Task " << taskSpec.taskName << " is disabled, ignoring." << ENDM; + continue; + } - if (taskConfig.get("location") == "local") { - // if tasks are LOCAL, generate input proxies + mergers + checkers - - size_t numberOfLocalMachines = taskConfig.get_child("localMachines").size() > 1 ? taskConfig.get_child("localMachines").size() : 1; - // Generate an input proxy - // These should be removed when we are able to declare dangling inputs in normal DPL devices - auto remotePort = taskConfig.get_optional("remotePort"); - if (!remotePort.has_value()) { - ILOG(Warning, Devel) << "No remote port was specified for a multinode QC setup." - " This is fine if running with AliECS, but it might fail in standalone mode." - << ENDM; - } - generateLocalTaskRemoteProxy(workflow, taskName, numberOfLocalMachines, remotePort.value_or(defaultRemotePort), - taskConfig.get("localControl", "aliecs")); - - auto mergingMode = taskConfig.get("mergingMode", "delta"); - // In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks. - size_t resetAfterCycles = mergingMode == "delta" ? taskConfig.get("resetAfterCycles", 0) : 0; - auto cycleDurationSeconds = taskConfig.get("cycleDurationSeconds") * taskConfig.get("mergerCycleMultiplier", 1); - auto monitoringUrl = config->get("qc.config.monitoring.url"); - - generateMergers(workflow, taskName, numberOfLocalMachines, cycleDurationSeconds, mergingMode, resetAfterCycles, monitoringUrl); - - } else if (taskConfig.get("location") == "remote") { - - // -- if tasks are REMOTE, generate dispatcher proxies + tasks + checkers - // (for the time being we don't foresee parallel tasks on QC servers, so no mergers here) - - // fixme: ideally we should check if we are on the right remote machine, but now we support only n -> 1 setups, - // so there is no point. Also, I expect that we should be able to generate one big topology or its parts - // and we would place it among QC servers using AliECS, not by configuration files. - - // Collecting Data Sampling Policies - auto dataSourceTree = taskConfig.get_child("dataSource"); - std::string type = dataSourceTree.get("type"); - if (type == "dataSamplingPolicy") { - samplingPoliciesUsed.insert({ dataSourceTree.get("name"), taskConfig.get("localControl", "aliecs") }); - } else if (type == "direct") { - throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources"); - } else { - throw std::runtime_error("Configuration error: dataSource type unknown : " + type); - } + if (taskSpec.location == TaskLocationSpec::Local) { + // if tasks are LOCAL, generate input proxies + mergers + checkers + + size_t numberOfLocalMachines = taskSpec.localMachines.size() > 1 ? taskSpec.localMachines.size() : 1; + // Generate an input proxy + // These should be removed when we are able to declare dangling inputs in normal DPL devices + generateLocalTaskRemoteProxy(workflow, taskSpec.taskName, numberOfLocalMachines, std::to_string(taskSpec.remotePort), taskSpec.localControl); + + // In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks. + size_t resetAfterCycles = taskSpec.mergingMode == "delta" ? taskSpec.resetAfterCycles : 0; + auto cycleDurationSeconds = taskSpec.cycleDurationSeconds * taskSpec.mergerCycleMultiplier; + + generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationSeconds, taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl); + + } else if (taskSpec.location == TaskLocationSpec::Remote) { - auto resetAfterCycles = taskConfig.get("resetAfterCycles", 0); - // Creating the remote task - workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, 0, resetAfterCycles)); + // -- if tasks are REMOTE, generate dispatcher proxies + tasks + checkers + // (for the time being we don't foresee parallel tasks on QC servers, so no mergers here) + + // Collecting Data Sampling Policies + switch (taskSpec.dataSource.type) { + case DataSourceType::DataSamplingPolicy: + samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl }); + break; + case DataSourceType::Direct: + throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources"); + default: + throw std::runtime_error("Configuration error: unsupported dataSource for remote QC Tasks"); } + + // Creating the remote task + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, taskSpec.resetAfterCycles); + workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); } } diff --git a/Framework/src/InfrastructureSpecReader.cxx b/Framework/src/InfrastructureSpecReader.cxx new file mode 100644 index 0000000000..3db71022e0 --- /dev/null +++ b/Framework/src/InfrastructureSpecReader.cxx @@ -0,0 +1,194 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file InfrastructureSpecReader.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/InfrastructureSpecReader.h" +#include "QualityControl/QcInfoLogger.h" + +#include +#include +#include + +using namespace o2::utilities; +using namespace o2::framework; + +namespace o2::quality_control::core +{ + +InfrastructureSpec InfrastructureSpecReader::readInfrastructureSpec(const boost::property_tree::ptree& config, const std::string& configurationSource) +{ + InfrastructureSpec spec; + const auto& qcTree = config.get_child("qc"); + if (qcTree.find("config") != qcTree.not_found()) { + spec.common = readCommonSpec(qcTree.get_child("config"), configurationSource); + } else { + ILOG(Error) << "The \"config\" section in the provided QC config file is missing." << ENDM; + } + if (qcTree.find("tasks") != qcTree.not_found()) { + const auto& tasksTree = qcTree.get_child("tasks"); + spec.tasks.reserve(tasksTree.size()); + for (const auto& [taskName, taskConfig] : tasksTree) { + spec.tasks.push_back(readTaskSpec(taskName, taskConfig, configurationSource)); + } + } + return spec; +} + +CommonSpec InfrastructureSpecReader::readCommonSpec(const boost::property_tree::ptree& config, const std::string& configurationSource) +{ + CommonSpec gc; + for (const auto& [key, value] : config.get_child("database")) { + gc.database.emplace(key, value.get_value()); + } + gc.activityNumber = config.get("Activity.number", gc.activityNumber); + gc.activityType = config.get("Activity.type", gc.activityType); + gc.monitoringUrl = config.get("monitoring.url", gc.monitoringUrl); + gc.consulUrl = config.get("consul.url", gc.consulUrl); + gc.conditionDBUrl = config.get("conditionDB.url", gc.conditionDBUrl); + gc.infologgerFilterDiscardDebug = config.get("infologger.filterDiscardDebug", gc.infologgerFilterDiscardDebug); + gc.infologgerDiscardLevel = config.get("infologger.filterDiscardLevel", gc.infologgerDiscardLevel); + + gc.configurationSource = configurationSource; + + return gc; +} + +TaskSpec InfrastructureSpecReader::readTaskSpec(std::string taskName, const boost::property_tree::ptree& config, const std::string& configurationSource) +{ + static std::unordered_map const taskLocationFromString = { + { "local", TaskLocationSpec::Local }, + { "remote", TaskLocationSpec::Remote } + }; + + TaskSpec ts; + + ts.taskName = taskName; + ts.className = config.get("className"); + ts.moduleName = config.get("moduleName"); + ts.detectorName = config.get("detectorName"); + ts.cycleDurationSeconds = config.get("cycleDurationSeconds"); + ts.dataSource = readDataSourceSpec(config.get_child("dataSource"), configurationSource); + + ts.active = config.get("active", ts.active); + ts.maxNumberCycles = config.get("maxNumberCycles", ts.maxNumberCycles); + ts.resetAfterCycles = config.get("resetAfterCycles", ts.resetAfterCycles); + ts.saveObjectsToFile = config.get("saveObjectsToFile", ts.saveObjectsToFile); + if (config.count("taskParameters") > 0) { + for (const auto& [key, value] : config.get_child("taskParameters")) { + ts.customParameters.emplace(key, value.get_value()); + } + } + + bool multinodeSetup = config.find("location") != config.not_found(); + ts.location = taskLocationFromString.at(config.get("location", "remote")); + if (config.count("localMachines") > 0) { + for (const auto& [key, value] : config.get_child("localMachines")) { + ts.localMachines.emplace_back(value.get_value()); + } + } + if (multinodeSetup && config.count("remoteMachine") > 0) { + ILOG(Warning, Trace) + << "No remote machine was specified for a multinode QC setup." + " This is fine if running with AliECS, but it will fail in standalone mode." + << ENDM; + } + ts.remoteMachine = config.get("remoteMachine", ts.remoteMachine); + if (multinodeSetup && config.count("remotePort") > 0) { + ILOG(Warning, Trace) + << "No remote port was specified for a multinode QC setup." + " This is fine if running with AliECS, but it might fail in standalone mode." + << ENDM; + } + ts.remotePort = config.get("remotePort", ts.remotePort); + ts.localControl = config.get("localControl", ts.localControl); + ts.mergingMode = config.get("mergingMode", ts.mergingMode); + ts.mergerCycleMultiplier = config.get("mergerCycleMultiplier", ts.mergerCycleMultiplier); + + return ts; +} + +DataSourceSpec InfrastructureSpecReader::readDataSourceSpec(const boost::property_tree::ptree& dataSourceSpec, + const std::string& configurationSource) +{ + static std::unordered_map const dataSourceTypeFromString = { + // fixme: the convention is inconsistent and it should be fixed in coordination with configuration files + { "dataSamplingPolicy", DataSourceType::DataSamplingPolicy }, + { "direct", DataSourceType::Direct }, + { "Task", DataSourceType::Task }, + { "Check", DataSourceType::Check }, + { "Aggregator", DataSourceType::Aggregator }, + { "PostProcessing", DataSourceType::PostProcessingTask }, + { "ExternalTask", DataSourceType::ExternalTask } + }; + + DataSourceSpec dss; + dss.type = dataSourceTypeFromString.at(dataSourceSpec.get("type")); + + switch (dss.type) { + case DataSourceType::DataSamplingPolicy: { + auto name = dataSourceSpec.get("name"); + dss.typeSpecificParams.insert({ "name", name }); + dss.inputs = DataSampling::InputSpecsForPolicy(configurationSource, name); //fixme: add a method which takes a ptree, then i can remove configurationSource + break; + } + case DataSourceType::Direct: { + dss.typeSpecificParams.insert({ "query", dataSourceSpec.get("query") }); + auto inputsQuery = dataSourceSpec.get("query"); + dss.inputs = DataDescriptorQueryBuilder::parse(inputsQuery.c_str()); + break; + } + case DataSourceType::Task: // todo all below + case DataSourceType::PostProcessingTask: + case DataSourceType::Check: + case DataSourceType::Aggregator: + dss.typeSpecificParams.insert({ "name", dataSourceSpec.get("name") }); + break; + case DataSourceType::ExternalTask: + dss.typeSpecificParams.insert({ "name", dataSourceSpec.get("name") }); + dss.typeSpecificParams.insert({ "query", dataSourceSpec.get("query") }); + break; + case DataSourceType::Invalid: + // todo: throw? + break; + } + + return dss; +} + +std::string InfrastructureSpecReader::validateDetectorName(std::string name) +{ + // name must be a detector code from DetID or one of the few allowed general names + int nDetectors = 16; + const char* detNames[16] = // once we can use DetID, remove this hard-coded list + { "ITS", "TPC", "TRD", "TOF", "PHS", "CPV", "EMC", "HMP", "MFT", "MCH", "MID", "ZDC", "FT0", "FV0", "FDD", "ACO" }; + std::vector permitted = { "MISC", "DAQ", "GENERAL", "TST", "BMK", "CTP", "TRG", "DCS", "REC" }; + for (auto i = 0; i < nDetectors; i++) { + permitted.emplace_back(detNames[i]); + // permitted.push_back(o2::detectors::DetID::getName(i)); + } + auto it = std::find(permitted.begin(), permitted.end(), name); + + if (it == permitted.end()) { + std::string permittedString; + for (const auto& i : permitted) + permittedString += i + ' '; + ILOG(Error, Support) << "Invalid detector name : " << name << "\n" + << " Placeholder 'MISC' will be used instead\n" + << " Note: list of permitted detector names :" << permittedString << ENDM; + return "MISC"; + } + return name; +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/TaskFactory.cxx b/Framework/src/TaskFactory.cxx index fea65dabb7..4ca07b9aa0 100644 --- a/Framework/src/TaskFactory.cxx +++ b/Framework/src/TaskFactory.cxx @@ -22,7 +22,7 @@ namespace o2::quality_control::core { -TaskInterface* TaskFactory::create(TaskConfig& taskConfig, std::shared_ptr objectsManager) +TaskInterface* TaskFactory::create(TaskRunnerConfig& taskConfig, std::shared_ptr objectsManager) { TaskInterface* result = root_class_factory::create(taskConfig.moduleName, taskConfig.className); result->setName(taskConfig.taskName); diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 9f524ee9f4..f06b5bf10e 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -59,23 +59,10 @@ using namespace o2::utilities; using namespace std::chrono; using namespace AliceO2::Common; -TaskRunner::TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id) - : mDeviceName(createTaskRunnerIdString() + "-" + taskName), - mRunNumber(0), - mMonitorObjectsSpec({ "mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id) +TaskRunner::TaskRunner(const TaskRunnerConfig& config) + : mTaskConfig(config), + mRunNumber(0) { - // setup configuration - try { - mTaskConfig.taskName = taskName; - mTaskConfig.parallelTaskID = id; - mConfigFile = ConfigurationFactory::getConfiguration(configurationSource); - loadTopologyConfig(); - } catch (...) { - // catch the configuration exception and print it to avoid losing it - ILOG(Fatal, Ops) << "Unexpected exception during configuration:\n" - << current_diagnostic(true) << ENDM; - throw; - } } void TaskRunner::init(InitContext& iCtx) @@ -86,7 +73,10 @@ void TaskRunner::init(InitContext& iCtx) } catch (const RuntimeErrorRef& err) { ILOG(Error) << "Could not find the DPL InfoLogger Context." << ENDM; } - ILOG_INST.init("task/" + mTaskConfig.taskName, mConfigFile->getRecursive(), ilContext); + ILOG_INST.init("task/" + mTaskConfig.taskName, + mTaskConfig.infologgerFilterDiscardDebug, + mTaskConfig.infologgerDiscardLevel, + ilContext); ILOG(Info, Support) << "Initializing TaskRunner" << ENDM; try { @@ -104,8 +94,7 @@ void TaskRunner::init(InitContext& iCtx) iCtx.services().get().set(CallbackService::Id::Reset, [this]() { reset(); }); // setup monitoring - auto monitoringUrl = mConfigFile->get("qc.config.monitoring.url", "infologger:///debug?qc"); - mCollector = MonitoringFactory::Get(monitoringUrl); + mCollector = MonitoringFactory::Get(mTaskConfig.monitoringUrl); mCollector->addGlobalTag(tags::Key::Subsystem, tags::Value::QC); mCollector->addGlobalTag("TaskName", mTaskConfig.taskName); @@ -146,7 +135,7 @@ void TaskRunner::run(ProcessingContext& pCtx) if (timerReady) { finishCycle(pCtx.outputs()); - if (mResetAfterCycles > 0 && (mCycleNumber % mResetAfterCycles == 0)) { + if (mTaskConfig.resetAfterCycles > 0 && (mCycleNumber % mTaskConfig.resetAfterCycles == 0)) { mTask->reset(); } if (mTaskConfig.maxNumberCycles < 0 || mCycleNumber < mTaskConfig.maxNumberCycles) { @@ -195,11 +184,6 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framewor return action; } -void TaskRunner::setResetAfterCycles(size_t n) -{ - mResetAfterCycles = n; -} - std::string TaskRunner::createTaskRunnerIdString() { return std::string("QC-TASK-RUNNER"); @@ -229,7 +213,7 @@ void TaskRunner::endOfStream(framework::EndOfStreamContext& eosContext) void TaskRunner::start(const ServiceRegistry& services) { - o2::quality_control::core::computeRunNumber(services, mConfigFile->getRecursive()); + mRunNumber = o2::quality_control::core::computeRunNumber(services, mTaskConfig.defaultRunNumber); try { startOfActivity(); @@ -306,64 +290,11 @@ std::tuple TaskRunner::validateInputs return { dataReady, timerReady }; } -void TaskRunner::loadTopologyConfig() -{ - auto taskConfigTree = getTaskConfigTree(); - auto policiesFilePath = mConfigFile->get("dataSamplingPolicyFile", ""); - std::shared_ptr config = policiesFilePath.empty() ? mConfigFile : ConfigurationFactory::getConfiguration(policiesFilePath); - auto dataSourceTree = taskConfigTree.get_child("dataSource"); - auto type = dataSourceTree.get("type"); - - if (type == "dataSamplingPolicy") { - auto policyName = dataSourceTree.get("name"); - ILOG(Info, Support) << "policyName : " << policyName << ENDM; - mInputSpecs = DataSampling::InputSpecsForPolicy(config.get(), policyName); - } else if (type == "direct") { - auto inputsQuery = dataSourceTree.get("query"); - mInputSpecs = DataDescriptorQueryBuilder::parse(inputsQuery.c_str()); - } else { - std::string message = std::string("Configuration error : dataSource type unknown : ") + type; - BOOST_THROW_EXCEPTION(AliceO2::Common::FatalException() << AliceO2::Common::errinfo_details(message)); - } - - mInputSpecs.emplace_back(InputSpec{ "timer-cycle", createTaskDataOrigin(), createTaskDataDescription("TIMER-" + mTaskConfig.taskName), 0, Lifetime::Timer }); - - // needed to avoid having looping at the maximum speed - mTaskConfig.cycleDurationSeconds = taskConfigTree.get("cycleDurationSeconds", 10); - mOptions.push_back({ "period-timer-cycle", framework::VariantType::Int, static_cast(mTaskConfig.cycleDurationSeconds * 1000000), { "timer period" } }); - mOptions.push_back({ "runNumber", framework::VariantType::String, { "Run number" } }); -} - -boost::property_tree::ptree TaskRunner::getTaskConfigTree() const -{ - auto tasksConfigList = mConfigFile->getRecursive("qc.tasks"); - auto taskConfigTree = tasksConfigList.find(mTaskConfig.taskName); - if (taskConfigTree == tasksConfigList.not_found()) { - std::string message = "No configuration found for task \"" + mTaskConfig.taskName + "\""; - BOOST_THROW_EXCEPTION(AliceO2::Common::FatalException() << AliceO2::Common::errinfo_details(message)); - } - return taskConfigTree->second; -} - -void TaskRunner::loadTaskConfig() +void TaskRunner::loadTaskConfig() // todo consider renaming { ILOG(Info, Support) << "Loading configuration" << ENDM; - auto taskConfigTree = getTaskConfigTree(); - string test = taskConfigTree.get("detectorName", "MISC"); - mTaskConfig.detectorName = validateDetectorName(taskConfigTree.get("detectorName", "MISC")); ILOG_INST.setDetector(mTaskConfig.detectorName); - mTaskConfig.moduleName = taskConfigTree.get("moduleName"); - mTaskConfig.className = taskConfigTree.get("className"); - mTaskConfig.maxNumberCycles = taskConfigTree.get("maxNumberCycles", -1); - mTaskConfig.consulUrl = mConfigFile->get("qc.config.consul.url", ""); - mTaskConfig.conditionUrl = mConfigFile->get("qc.config.conditionDB.url", "http://ccdb-test.cern.ch:8080"); - mTaskConfig.saveToFile = taskConfigTree.get("saveObjectsToFile", ""); - try { - mTaskConfig.customParameters = mConfigFile->getRecursiveMap("qc.tasks." + mTaskConfig.taskName + ".taskParameters"); - } catch (...) { - ILOG(Debug, Support) << "No custom parameters for " << mTaskConfig.taskName << ENDM; - } ILOG(Info, Support) << "Configuration loaded : " << ENDM; ILOG(Info, Support) << ">> Task name : " << mTaskConfig.taskName << ENDM; @@ -374,31 +305,6 @@ void TaskRunner::loadTaskConfig() ILOG(Info, Support) << ">> Save to file : " << mTaskConfig.saveToFile << ENDM; } -std::string TaskRunner::validateDetectorName(std::string name) const -{ - // name must be a detector code from DetID or one of the few allowed general names - int nDetectors = 16; - const char* detNames[16] = // once we can use DetID, remove this hard-coded list - { "ITS", "TPC", "TRD", "TOF", "PHS", "CPV", "EMC", "HMP", "MFT", "MCH", "MID", "ZDC", "FT0", "FV0", "FDD", "ACO" }; - vector permitted = { "MISC", "DAQ", "GENERAL", "TST", "BMK", "CTP", "TRG", "DCS", "REC" }; - for (auto i = 0; i < nDetectors; i++) { - permitted.push_back(detNames[i]); - // permitted.push_back(o2::detectors::DetID::getName(i)); - } - auto it = std::find(permitted.begin(), permitted.end(), name); - - if (it == permitted.end()) { - std::string permittedString; - for (auto i : permitted) - permittedString += i + ' '; - ILOG(Error, Support) << "Invalid detector name : " << name << "\n" - << " Placeholder 'MISC' will be used instead\n" - << " Note: list of permitted detector names :" << permittedString << ENDM; - return "MISC"; - } - return name; -} - void TaskRunner::startOfActivity() { // stats @@ -406,8 +312,7 @@ void TaskRunner::startOfActivity() mTotalNumberObjectsPublished = 0; // Start activity in module's stask and update objectsManager - Activity activity(mRunNumber, - mConfigFile->get("qc.config.Activity.type")); + Activity activity(mRunNumber, mTaskConfig.activityType); ILOG(Info, Ops) << "Starting run " << mRunNumber << ENDM; mCollector->setRunNumber(mRunNumber); mTask->startOfActivity(activity); @@ -417,8 +322,7 @@ void TaskRunner::startOfActivity() void TaskRunner::endOfActivity() { - Activity activity(mRunNumber, - mConfigFile->get("qc.config.Activity.type")); + Activity activity(mRunNumber, mTaskConfig.activityType); ILOG(Info, Ops) << "Stopping run " << mRunNumber << ENDM; mTask->endOfActivity(activity); mObjectsManager->removeAllFromServiceDiscovery(); @@ -504,7 +408,7 @@ int TaskRunner::publish(DataAllocator& outputs) ILOG(Info, Support) << "Publishing " << mObjectsManager->getNumberPublishedObjects() << " MonitorObjects" << ENDM; AliceO2::Common::Timer publicationDurationTimer; - auto concreteOutput = framework::DataSpecUtils::asConcreteDataMatcher(mMonitorObjectsSpec); + auto concreteOutput = framework::DataSpecUtils::asConcreteDataMatcher(mTaskConfig.moSpec); // getNonOwningArray creates a TObjArray containing the monitoring objects, but not // owning them. The array is created by new and must be cleaned up by the caller std::unique_ptr array(mObjectsManager->getNonOwningArray()); @@ -514,7 +418,7 @@ int TaskRunner::publish(DataAllocator& outputs) Output{ concreteOutput.origin, concreteOutput.description, concreteOutput.subSpec, - mMonitorObjectsSpec.lifetime }, + mTaskConfig.moSpec.lifetime }, *array); mLastPublicationDuration = publicationDurationTimer.getTime(); diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index 45df00f857..b1c854d3e6 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -16,9 +16,12 @@ #include "QualityControl/TaskRunnerFactory.h" #include "QualityControl/TaskRunner.h" +#include "QualityControl/TaskRunnerConfig.h" +#include "QualityControl/InfrastructureSpecReader.h" #include #include +#include namespace o2::quality_control::core { @@ -26,24 +29,74 @@ namespace o2::quality_control::core using namespace o2::framework; o2::framework::DataProcessorSpec - TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, size_t resetAfterCycles) + TaskRunnerFactory::create(const TaskRunnerConfig& taskConfig) { - TaskRunner qcTask{ taskName, configurationSource, id }; - qcTask.setResetAfterCycles(resetAfterCycles); + TaskRunner qcTask{ taskConfig }; DataProcessorSpec newTask{ - qcTask.getDeviceName(), - qcTask.getInputsSpecs(), - Outputs{ qcTask.getOutputSpec() }, - AlgorithmSpec{}, - qcTask.getOptions() + taskConfig.deviceName, + taskConfig.inputSpecs, + { taskConfig.moSpec }, + adaptFromTask(std::move(qcTask)), + taskConfig.options }; - // this needs to be moved at the end - newTask.algorithm = adaptFromTask(std::move(qcTask)); return newTask; } +TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig, const TaskSpec& taskSpec, std::optional id, std::optional resetAfterCycles) +{ + std::string deviceName{ TaskRunner::createTaskRunnerIdString() + "-" + taskSpec.taskName }; + + int parallelTaskID = id.value_or(0); + + // todo validate data source + if (!taskSpec.dataSource.isOneOf(DataSourceType::DataSamplingPolicy, DataSourceType::Direct)) { + throw std::runtime_error("This data source of the task '" + taskSpec.taskName + "' is not supported."); + } + auto inputs = taskSpec.dataSource.inputs; + inputs.emplace_back("timer-cycle", + TaskRunner::createTaskDataOrigin(), + TaskRunner::createTaskDataDescription("TIMER-" + taskSpec.taskName), + 0, + Lifetime::Timer); + + OutputSpec monitorObjectsSpec{ { "mo" }, + TaskRunner::createTaskDataOrigin(), + TaskRunner::createTaskDataDescription(taskSpec.taskName), + static_cast(parallelTaskID) }; + + Options options{ + { "period-timer-cycle", framework::VariantType::Int, static_cast(taskSpec.cycleDurationSeconds * 1000000), { "timer period" } }, + { "runNumber", framework::VariantType::String, { "Run number" } } + }; + + return { + deviceName, + taskSpec.taskName, + taskSpec.moduleName, + taskSpec.className, + taskSpec.cycleDurationSeconds, + taskSpec.maxNumberCycles, + globalConfig.consulUrl, + globalConfig.conditionDBUrl, + globalConfig.monitoringUrl, + inputs, + monitorObjectsSpec, + options, + taskSpec.customParameters, + InfrastructureSpecReader::validateDetectorName(taskSpec.detectorName), + parallelTaskID, + taskSpec.saveObjectsToFile, + resetAfterCycles.value_or(taskSpec.resetAfterCycles), + globalConfig.infologgerFilterDiscardDebug, + globalConfig.infologgerDiscardLevel, + globalConfig.activityType, + globalConfig.activityNumber, + globalConfig.configurationSource + }; +} + void TaskRunnerFactory::customizeInfrastructure(std::vector& policies) { auto matcher = [](framework::DeviceSpec const& device) { diff --git a/Framework/test/testObjectsManager.cxx b/Framework/test/testObjectsManager.cxx index 985c636ad3..56b0a718e8 100644 --- a/Framework/test/testObjectsManager.cxx +++ b/Framework/test/testObjectsManager.cxx @@ -31,9 +31,15 @@ using namespace AliceO2::Common; namespace o2::quality_control::core { +struct Config { + std::string taskName = "test"; + std::string detectorName = "TST"; + std::string consulUrl = "invalid"; +}; + BOOST_AUTO_TEST_CASE(invalid_url_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "bad-url:1234"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -41,7 +47,7 @@ BOOST_AUTO_TEST_CASE(invalid_url_test) BOOST_AUTO_TEST_CASE(duplicate_object_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -52,7 +58,7 @@ BOOST_AUTO_TEST_CASE(duplicate_object_test) BOOST_AUTO_TEST_CASE(is_being_published_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -65,7 +71,7 @@ BOOST_AUTO_TEST_CASE(is_being_published_test) BOOST_AUTO_TEST_CASE(unpublish_test) { - TaskConfig config; + Config config; config.taskName = "test"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); TObjString s("content"); @@ -83,7 +89,7 @@ BOOST_AUTO_TEST_CASE(unpublish_test) BOOST_AUTO_TEST_CASE(getters_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -113,7 +119,7 @@ BOOST_AUTO_TEST_CASE(getters_test) BOOST_AUTO_TEST_CASE(metadata_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -129,7 +135,7 @@ BOOST_AUTO_TEST_CASE(metadata_test) BOOST_AUTO_TEST_CASE(drawOptions_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); diff --git a/Framework/test/testPublisher.cxx b/Framework/test/testPublisher.cxx index e34b391615..87fa2cf5d6 100644 --- a/Framework/test/testPublisher.cxx +++ b/Framework/test/testPublisher.cxx @@ -30,11 +30,13 @@ using namespace AliceO2::Common; namespace o2::quality_control::core { +// fixme: unify with testObjectManager? BOOST_AUTO_TEST_CASE(publisher_test) { - TaskConfig config; - config.taskName = "test"; - ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); + std::string taskName = "test"; + std::string detectorName = "TST"; + std::string consulUrl = "invalid"; + ObjectsManager objectsManager(taskName, detectorName, consulUrl, 0, true); TObjString s("content"); objectsManager.startPublishing(&s); diff --git a/Framework/test/testSharedConfig.json b/Framework/test/testSharedConfig.json index 5d61a819b8..ab22f9bc52 100644 --- a/Framework/test/testSharedConfig.json +++ b/Framework/test/testSharedConfig.json @@ -23,6 +23,7 @@ "moduleName": "QcSkeleton", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", + "detectorName": "TST", "dataSource": { "type": "dataSamplingPolicy", "name": "tpcclust" @@ -71,7 +72,15 @@ "location": "remote" }, "defTask": { - "active": "false" + "active": "false", + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "detectorName": "ITS", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "tpcclust" + }, + "cycleDurationSeconds": "10" } }, "checks": { diff --git a/Framework/test/testTaskInterface.cxx b/Framework/test/testTaskInterface.cxx index c0d88849fa..aa803630a7 100644 --- a/Framework/test/testTaskInterface.cxx +++ b/Framework/test/testTaskInterface.cxx @@ -134,7 +134,7 @@ class TestTask : public TaskInterface BOOST_AUTO_TEST_CASE(test_invoke_all_methods) { // This is maximum that we can do until we are able to test the DPL algorithms in isolation. - TaskConfig taskConfig; + TaskRunnerConfig taskConfig; ObjectsManager* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.detectorName, taskConfig.consulUrl, 0, true); test::TestTask testTask(objectsManager); @@ -169,7 +169,8 @@ BOOST_AUTO_TEST_CASE(test_invoke_all_methods) BOOST_AUTO_TEST_CASE(test_task_factory) { - TaskConfig config{ + TaskRunnerConfig config{ + "SkeletonTaskRunner", "skeletonTask", "QcSkeleton", "o2::quality_control_modules::skeleton::SkeletonTask", @@ -200,7 +201,7 @@ BOOST_AUTO_TEST_CASE(retrieveCondition) api.storeAsTFileAny(&bad, "qc/TST/conditions", meta); // retrieve it - TaskConfig taskConfig; + TaskRunnerConfig taskConfig; ObjectsManager* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.detectorName, taskConfig.consulUrl, 0, true); test::TestTask testTask(objectsManager); testTask.loadCcdb("ccdb-test.cern.ch:8080"); diff --git a/Framework/test/testTaskRunner.cxx b/Framework/test/testTaskRunner.cxx index 0da9f27e4f..20d815f10a 100644 --- a/Framework/test/testTaskRunner.cxx +++ b/Framework/test/testTaskRunner.cxx @@ -19,6 +19,9 @@ #include "QualityControl/TaskRunner.h" #include #include +#include "QualityControl/InfrastructureSpecReader.h" +#include "Configuration/ConfigurationFactory.h" +#include "Configuration/ConfigurationInterface.h" #define BOOST_TEST_MODULE TaskRunner test #define BOOST_TEST_MAIN @@ -31,13 +34,28 @@ using namespace std; using namespace o2::framework; using namespace o2::header; using namespace o2::utilities; +using namespace o2::configuration; + +TaskRunnerConfig getTaskConfig(const std::string& configFilePath, const std::string& taskName) +{ + auto config = ConfigurationFactory::getConfiguration(configFilePath); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configFilePath); + + auto taskSpec = std::find_if(infrastructureSpec.tasks.begin(), infrastructureSpec.tasks.end(), [&taskName](const auto& taskSpec) { + return taskSpec.taskName == taskName; + }); + if (taskSpec != infrastructureSpec.tasks.end()) { + return TaskRunnerFactory::extractConfig(infrastructureSpec.common, *taskSpec); + } else { + throw std::runtime_error("task " + taskName + " not found in the config file"); + } +} BOOST_AUTO_TEST_CASE(test_factory) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunnerFactory taskRunnerFactory; - DataProcessorSpec taskRunner = taskRunnerFactory.create("abcTask", configFilePath, 123); + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask")); BOOST_CHECK_EQUAL(taskRunner.name, "QC-TASK-RUNNER-abcTask"); @@ -67,7 +85,7 @@ BOOST_AUTO_TEST_CASE(test_task_runner) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunner qcTask{ "abcTask", configFilePath, 0 }; + TaskRunner qcTask{ getTaskConfig(configFilePath, "abcTask") }; BOOST_CHECK_EQUAL(qcTask.getDeviceName(), "QC-TASK-RUNNER-abcTask"); @@ -88,7 +106,7 @@ BOOST_AUTO_TEST_CASE(test_task_wrong_detector_name) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunner qcTask{ "abcTask", configFilePath, 0 }; + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask")); // cout << "It should print an error message" << endl; } @@ -96,6 +114,7 @@ BOOST_AUTO_TEST_CASE(test_task_good_detector_name) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunner qcTask{ "xyzTask", configFilePath, 0 }; + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "xyzTask")); + // cout << "no error message" << endl; } \ No newline at end of file diff --git a/Framework/test/testWorkflow.json b/Framework/test/testWorkflow.json index 806c89e91e..1106f50498 100644 --- a/Framework/test/testWorkflow.json +++ b/Framework/test/testWorkflow.json @@ -19,6 +19,7 @@ "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", "cycleDurationSeconds": "5", + "detectorName": "TST", "maxNumberCycles": "-1", "dataSource": { "type": "dataSamplingPolicy", diff --git a/Modules/Daq/test/testQcDaq.cxx b/Modules/Daq/test/testQcDaq.cxx index 8c9ae51e60..904d2b633d 100644 --- a/Modules/Daq/test/testQcDaq.cxx +++ b/Modules/Daq/test/testQcDaq.cxx @@ -22,7 +22,7 @@ namespace o2::quality_control_modules::daq BOOST_AUTO_TEST_CASE(instantiate_task) { DaqTask task; - TaskConfig config; + TaskRunnerConfig config; config.consulUrl = "http://consul-test.cern.ch:8500"; config.taskName = "qcDaqTest"; config.detectorName = "DAQ"; diff --git a/Modules/Example/test/testFactory.cxx b/Modules/Example/test/testFactory.cxx index 783316826c..b9e4464685 100644 --- a/Modules/Example/test/testFactory.cxx +++ b/Modules/Example/test/testFactory.cxx @@ -25,7 +25,7 @@ namespace o2::quality_control_modules::example BOOST_AUTO_TEST_CASE(Task_Factory) { TaskFactory factory; - TaskConfig config; + TaskRunnerConfig config; config.taskName = "task"; config.moduleName = "QcCommon"; config.className = "o2::quality_control_modules::example::ExampleTask"; @@ -44,7 +44,7 @@ bool is_critical(AliceO2::Common::FatalException const&) { return true; } BOOST_AUTO_TEST_CASE(Task_Factory_failures, *utf::depends_on("Task_Factory") /* make sure we don't run both tests at the same time */) { TaskFactory factory; - TaskConfig config; + TaskRunnerConfig config; auto manager = make_shared(config.taskName, config.detectorName, config.consulUrl, 0, true); config.taskName = "task"; diff --git a/Modules/Example/test/testQcExample.cxx b/Modules/Example/test/testQcExample.cxx index 17a40e91ec..01045ab917 100644 --- a/Modules/Example/test/testQcExample.cxx +++ b/Modules/Example/test/testQcExample.cxx @@ -23,7 +23,7 @@ namespace o2::quality_control_modules::example BOOST_AUTO_TEST_CASE(insantiate_task) { ExampleTask task; - TaskConfig config; + TaskRunnerConfig config; config.consulUrl = "http://consul-test.cern.ch:8500"; config.taskName = "qcExampleTest"; config.detectorName = "TST"; diff --git a/Modules/Skeleton/test/testQcSkeleton.cxx b/Modules/Skeleton/test/testQcSkeleton.cxx index 5a1698af47..65ad315a99 100644 --- a/Modules/Skeleton/test/testQcSkeleton.cxx +++ b/Modules/Skeleton/test/testQcSkeleton.cxx @@ -34,7 +34,7 @@ namespace o2::quality_control_modules::skeleton BOOST_AUTO_TEST_CASE(instantiate_task) { SkeletonTask task; - TaskConfig config; + TaskRunnerConfig config; config.consulUrl = "http://consul-test.cern.ch:8500"; config.taskName = "qcSkeletonTest"; config.detectorName = "TST"; From b1ba884cbdba7666a4fc3e39b0859dc7aae01ef3 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Tue, 20 Jul 2021 09:42:50 +0200 Subject: [PATCH 2/5] test fixes --- Framework/include/QualityControl/TaskRunner.h | 9 +++------ Modules/Daq/test/testQcDaq.cxx | 8 +++++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index 9434fc3ccb..4ace9fa9ba 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -86,10 +86,10 @@ class TaskRunner : public framework::Task /// \brief TaskRunner's completion policy callback static framework::CompletionPolicy::CompletionOp completionPolicyCallback(o2::framework::InputSpan const& inputs); - std::string getDeviceName() { return mTaskConfig.deviceName; }; + std::string getDeviceName() const { return mTaskConfig.deviceName; }; const framework::Inputs& getInputsSpecs() const { return mTaskConfig.inputSpecs; }; - const framework::OutputSpec getOutputSpec() { return mTaskConfig.moSpec; }; - const framework::Options getOptions() { return mTaskConfig.options; }; + const framework::OutputSpec& getOutputSpec() const { return mTaskConfig.moSpec; }; + const framework::Options& getOptions() const { return mTaskConfig.options; }; /// \brief ID string for all TaskRunner devices static std::string createTaskRunnerIdString(); @@ -111,7 +111,6 @@ class TaskRunner : public framework::Task std::tuple validateInputs(const framework::InputRecord&); void loadTaskConfig(); - void loadTopologyConfig(); void startOfActivity(); void endOfActivity(); void startCycle(); @@ -127,9 +126,7 @@ class TaskRunner : public framework::Task std::shared_ptr mObjectsManager; int mRunNumber; - boost::property_tree::ptree getTaskConfigTree() const; void updateMonitoringStats(framework::ProcessingContext& pCtx); - void computeRunNumber(const framework::ServiceRegistry& services); bool mCycleOn = false; bool mNoMoreCycles = false; diff --git a/Modules/Daq/test/testQcDaq.cxx b/Modules/Daq/test/testQcDaq.cxx index 904d2b633d..1630cb6985 100644 --- a/Modules/Daq/test/testQcDaq.cxx +++ b/Modules/Daq/test/testQcDaq.cxx @@ -19,10 +19,16 @@ using namespace std; namespace o2::quality_control_modules::daq { +struct Config { + std::string taskName = "test"; + std::string detectorName = "TST"; + std::string consulUrl = "invalid"; +}; + BOOST_AUTO_TEST_CASE(instantiate_task) { DaqTask task; - TaskRunnerConfig config; + Config config; config.consulUrl = "http://consul-test.cern.ch:8500"; config.taskName = "qcDaqTest"; config.detectorName = "DAQ"; From b32762e4135eec06bd28a62b647bb12ab7c44702 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Tue, 20 Jul 2021 11:17:19 +0200 Subject: [PATCH 3/5] Fix testTaskRunner --- Framework/include/QualityControl/TaskRunner.h | 2 -- Framework/test/testTaskRunner.cxx | 12 ++++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index 4ace9fa9ba..b2380d1aa7 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -18,8 +18,6 @@ #ifndef QC_CORE_TASKRUNNER_H #define QC_CORE_TASKRUNNER_H -#include - // O2 #include #include diff --git a/Framework/test/testTaskRunner.cxx b/Framework/test/testTaskRunner.cxx index 20d815f10a..a4cc34c14a 100644 --- a/Framework/test/testTaskRunner.cxx +++ b/Framework/test/testTaskRunner.cxx @@ -36,7 +36,7 @@ using namespace o2::header; using namespace o2::utilities; using namespace o2::configuration; -TaskRunnerConfig getTaskConfig(const std::string& configFilePath, const std::string& taskName) +TaskRunnerConfig getTaskConfig(const std::string& configFilePath, const std::string& taskName, size_t id) { auto config = ConfigurationFactory::getConfiguration(configFilePath); auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configFilePath); @@ -45,7 +45,7 @@ TaskRunnerConfig getTaskConfig(const std::string& configFilePath, const std::str return taskSpec.taskName == taskName; }); if (taskSpec != infrastructureSpec.tasks.end()) { - return TaskRunnerFactory::extractConfig(infrastructureSpec.common, *taskSpec); + return TaskRunnerFactory::extractConfig(infrastructureSpec.common, *taskSpec, id); } else { throw std::runtime_error("task " + taskName + " not found in the config file"); } @@ -55,7 +55,7 @@ BOOST_AUTO_TEST_CASE(test_factory) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask")); + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask", 123)); BOOST_CHECK_EQUAL(taskRunner.name, "QC-TASK-RUNNER-abcTask"); @@ -85,7 +85,7 @@ BOOST_AUTO_TEST_CASE(test_task_runner) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunner qcTask{ getTaskConfig(configFilePath, "abcTask") }; + TaskRunner qcTask{ getTaskConfig(configFilePath, "abcTask", 0) }; BOOST_CHECK_EQUAL(qcTask.getDeviceName(), "QC-TASK-RUNNER-abcTask"); @@ -106,7 +106,7 @@ BOOST_AUTO_TEST_CASE(test_task_wrong_detector_name) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask")); + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask", 0)); // cout << "It should print an error message" << endl; } @@ -114,7 +114,7 @@ BOOST_AUTO_TEST_CASE(test_task_good_detector_name) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "xyzTask")); + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "xyzTask", 0)); // cout << "no error message" << endl; } \ No newline at end of file From bccc3d0455786736719c141732c9d69e62b18991 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Tue, 20 Jul 2021 11:38:27 +0200 Subject: [PATCH 4/5] Review fixes --- Framework/CMakeLists.txt | 3 +-- Framework/src/InfrastructureSpecReader.cxx | 22 +++++++++++----------- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 7075d8c55e..659c3f7c5c 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -75,8 +75,7 @@ add_library(O2QualityControl src/AdvancedWorkflow.cxx src/QualitiesToTRFCollectionConverter.cxx src/Calculators.cxx - src/DataSourceSpec.cxx - ) + src/DataSourceSpec.cxx) target_include_directories( O2QualityControl diff --git a/Framework/src/InfrastructureSpecReader.cxx b/Framework/src/InfrastructureSpecReader.cxx index 3db71022e0..3d7a838bdb 100644 --- a/Framework/src/InfrastructureSpecReader.cxx +++ b/Framework/src/InfrastructureSpecReader.cxx @@ -47,21 +47,21 @@ InfrastructureSpec InfrastructureSpecReader::readInfrastructureSpec(const boost: CommonSpec InfrastructureSpecReader::readCommonSpec(const boost::property_tree::ptree& config, const std::string& configurationSource) { - CommonSpec gc; + CommonSpec spec; for (const auto& [key, value] : config.get_child("database")) { - gc.database.emplace(key, value.get_value()); + spec.database.emplace(key, value.get_value()); } - gc.activityNumber = config.get("Activity.number", gc.activityNumber); - gc.activityType = config.get("Activity.type", gc.activityType); - gc.monitoringUrl = config.get("monitoring.url", gc.monitoringUrl); - gc.consulUrl = config.get("consul.url", gc.consulUrl); - gc.conditionDBUrl = config.get("conditionDB.url", gc.conditionDBUrl); - gc.infologgerFilterDiscardDebug = config.get("infologger.filterDiscardDebug", gc.infologgerFilterDiscardDebug); - gc.infologgerDiscardLevel = config.get("infologger.filterDiscardLevel", gc.infologgerDiscardLevel); + spec.activityNumber = config.get("Activity.number", spec.activityNumber); + spec.activityType = config.get("Activity.type", spec.activityType); + spec.monitoringUrl = config.get("monitoring.url", spec.monitoringUrl); + spec.consulUrl = config.get("consul.url", spec.consulUrl); + spec.conditionDBUrl = config.get("conditionDB.url", spec.conditionDBUrl); + spec.infologgerFilterDiscardDebug = config.get("infologger.filterDiscardDebug", spec.infologgerFilterDiscardDebug); + spec.infologgerDiscardLevel = config.get("infologger.filterDiscardLevel", spec.infologgerDiscardLevel); - gc.configurationSource = configurationSource; + spec.configurationSource = configurationSource; - return gc; + return spec; } TaskSpec InfrastructureSpecReader::readTaskSpec(std::string taskName, const boost::property_tree::ptree& config, const std::string& configurationSource) From b9e9f3449a9cd09b2a76d44ad2fd99ffdc4cf7d9 Mon Sep 17 00:00:00 2001 From: Piotr Konopka Date: Tue, 20 Jul 2021 14:52:18 +0200 Subject: [PATCH 5/5] Add missing detector name in CheckWorkflow test --- Framework/test/testCheckWorkflow.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Framework/test/testCheckWorkflow.json b/Framework/test/testCheckWorkflow.json index 6f05ab8670..d187bafa92 100644 --- a/Framework/test/testCheckWorkflow.json +++ b/Framework/test/testCheckWorkflow.json @@ -18,6 +18,7 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", + "detectorName" : "TST", "cycleDurationSeconds": "5", "maxNumberCycles": "-1", "dataSource": { @@ -32,6 +33,7 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", + "detectorName" : "TST", "cycleDurationSeconds": "5", "maxNumberCycles": "-1", "dataSource": { @@ -46,6 +48,7 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", + "detectorName" : "TST", "cycleDurationSeconds": "5", "maxNumberCycles": "-1", "dataSource": {