diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index da1b1f6cc4..659c3f7c5c 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -53,6 +53,7 @@ add_library(O2QualityControl src/TaskInterface.cxx src/RepositoryBenchmark.cxx src/InfrastructureGenerator.cxx + src/InfrastructureSpecReader.cxx src/Check.cxx src/Aggregator.cxx src/ServiceDiscovery.cxx @@ -73,7 +74,8 @@ add_library(O2QualityControl src/UpdatePolicyManager.cxx src/AdvancedWorkflow.cxx src/QualitiesToTRFCollectionConverter.cxx - src/Calculators.cxx) + src/Calculators.cxx + src/DataSourceSpec.cxx) target_include_directories( O2QualityControl diff --git a/Framework/include/QualityControl/CommonSpec.h b/Framework/include/QualityControl/CommonSpec.h new file mode 100644 index 0000000000..8586ec18c0 --- /dev/null +++ b/Framework/include/QualityControl/CommonSpec.h @@ -0,0 +1,43 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_COMMONSPEC_H +#define QUALITYCONTROL_COMMONSPEC_H + +/// +/// \file CommonSpec.h +/// \author Piotr Konopka +/// + +#include +#include +#include + +namespace o2::quality_control::core +{ + +struct CommonSpec { + CommonSpec() = default; + + std::unordered_map database; + int activityNumber; + int activityType; + std::string monitoringUrl = "infologger:///debug?qc"; + std::string consulUrl; + std::string conditionDBUrl = "http://ccdb-test.cern.ch:8080"; + bool infologgerFilterDiscardDebug = false; + int infologgerDiscardLevel = 21; + + std::string configurationSource; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_COMMONSPEC_H diff --git a/Framework/include/QualityControl/DataSourceSpec.h b/Framework/include/QualityControl/DataSourceSpec.h new file mode 100644 index 0000000000..3e21fa21db --- /dev/null +++ b/Framework/include/QualityControl/DataSourceSpec.h @@ -0,0 +1,56 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_DATASOURCESPEC_H +#define QUALITYCONTROL_DATASOURCESPEC_H + +/// +/// \file DataSourceSpec.h +/// \author Piotr Konopka +/// + +#include +#include +#include +#include + +namespace o2::quality_control::core +{ + +enum class DataSourceType { + DataSamplingPolicy, + Direct, + Task, + Check, + Aggregator, + PostProcessingTask, + ExternalTask, + Invalid +}; + +// this should allow us to represent all data sources which come from DPL (and maybe CCDB). +struct DataSourceSpec { + explicit DataSourceSpec(DataSourceType type = DataSourceType::Invalid, std::unordered_map params = {}); + + // todo: use c++20 concepts when available + template )>> + bool isOneOf(Args... dataSourceType) const + { + return (... || (dataSourceType == type)); + } + + DataSourceType type; + std::unordered_map typeSpecificParams; + std::vector inputs; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_DATASOURCESPEC_H diff --git a/Framework/include/QualityControl/InfrastructureGenerator.h b/Framework/include/QualityControl/InfrastructureGenerator.h index eacf11fc02..9250cd1e65 100644 --- a/Framework/include/QualityControl/InfrastructureGenerator.h +++ b/Framework/include/QualityControl/InfrastructureGenerator.h @@ -71,9 +71,9 @@ class InfrastructureGenerator /// configuration to be 'local'. /// /// \param configurationSource - full path to configuration file, preceded with the backend (f.e. "json://") - /// \param host - name of the machine + /// \param targetHost - name of the machine /// \return generated local QC workflow - static framework::WorkflowSpec generateLocalInfrastructure(std::string configurationSource, std::string host); + static framework::WorkflowSpec generateLocalInfrastructure(std::string configurationSource, std::string targetHost); /// \brief Generates the local part of the QC infrastructure for a specified host. /// diff --git a/Framework/include/QualityControl/InfrastructureSpec.h b/Framework/include/QualityControl/InfrastructureSpec.h new file mode 100644 index 0000000000..facea241f6 --- /dev/null +++ b/Framework/include/QualityControl/InfrastructureSpec.h @@ -0,0 +1,35 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_INFRASTRUCTURESPEC_H +#define QUALITYCONTROL_INFRASTRUCTURESPEC_H + +/// +/// \file InfrastructureSpec.h +/// \author Piotr Konopka +/// + +#include "QualityControl/CommonSpec.h" +#include "QualityControl/TaskSpec.h" + +#include + +namespace o2::quality_control::core +{ + +struct InfrastructureSpec { + CommonSpec common; + std::vector tasks; + // todo: add other actors +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_INFRASTRUCTURESPEC_H diff --git a/Framework/include/QualityControl/InfrastructureSpecReader.h b/Framework/include/QualityControl/InfrastructureSpecReader.h new file mode 100644 index 0000000000..50d0d575ed --- /dev/null +++ b/Framework/include/QualityControl/InfrastructureSpecReader.h @@ -0,0 +1,49 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_INFRASTRUCTURESPECREADER_H +#define QUALITYCONTROL_INFRASTRUCTURESPECREADER_H + +/// +/// \file InfrastructureSpecReader.h +/// \author Piotr Konopka +/// + +#include "QualityControl/InfrastructureSpec.h" +#include "QualityControl/TaskSpec.h" +#include "QualityControl/CommonSpec.h" +#include "QualityControl/DataSourceSpec.h" +#include + +namespace o2::quality_control::core +{ + +// If have to increase the performance of reading, +// we can probably improve it by writing a proper parser like for WorkflowSerializationHelpers in O2 +// Also, move operators could be implemented. + +class InfrastructureSpecReader +{ + public: + /// \brief Reads the full QC configuration file. + // todo remove configurationSource when it is possible + static InfrastructureSpec readInfrastructureSpec(const boost::property_tree::ptree&, const std::string& configurationSource); + + // readers for separate parts + static CommonSpec readCommonSpec(const boost::property_tree::ptree& config, const std::string& configurationSource); + static TaskSpec readTaskSpec(std::string taskName, const boost::property_tree::ptree& taskSpec, const std::string& configurationSource); + static DataSourceSpec readDataSourceSpec(const boost::property_tree::ptree& dataSourceSpec, const std::string& configurationSource); + + static std::string validateDetectorName(std::string name); +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_INFRASTRUCTURESPECREADER_H diff --git a/Framework/include/QualityControl/ObjectsManager.h b/Framework/include/QualityControl/ObjectsManager.h index d7b1fcfec4..4ba9e54768 100644 --- a/Framework/include/QualityControl/ObjectsManager.h +++ b/Framework/include/QualityControl/ObjectsManager.h @@ -20,7 +20,6 @@ // QC #include "QualityControl/MonitorObject.h" #include "QualityControl/MonitorObjectCollection.h" -#include "QualityControl/TaskConfig.h" // stl #include #include diff --git a/Framework/include/QualityControl/TaskFactory.h b/Framework/include/QualityControl/TaskFactory.h index 76b82cbe27..386c6424a0 100644 --- a/Framework/include/QualityControl/TaskFactory.h +++ b/Framework/include/QualityControl/TaskFactory.h @@ -20,7 +20,7 @@ // STL #include // QC -#include "QualityControl/TaskConfig.h" +#include "QualityControl/TaskRunnerConfig.h" #include "QualityControl/TaskInterface.h" namespace o2::quality_control::core @@ -43,7 +43,7 @@ class TaskFactory /// The TaskInterface actual class is decided based on the parameters passed. /// \todo make it static ? /// \author Barthelemy von Haller - TaskInterface* create(TaskConfig& taskConfig, std::shared_ptr objectsManager); + TaskInterface* create(TaskRunnerConfig& taskConfig, std::shared_ptr objectsManager); }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/TaskRunner.h b/Framework/include/QualityControl/TaskRunner.h index c9dc4a17da..b2380d1aa7 100644 --- a/Framework/include/QualityControl/TaskRunner.h +++ b/Framework/include/QualityControl/TaskRunner.h @@ -18,8 +18,6 @@ #ifndef QC_CORE_TASKRUNNER_H #define QC_CORE_TASKRUNNER_H -#include - // O2 #include #include @@ -30,7 +28,7 @@ #include #include // QC -#include "QualityControl/TaskConfig.h" +#include "QualityControl/TaskRunnerConfig.h" #include "QualityControl/TaskInterface.h" namespace o2::configuration @@ -75,7 +73,7 @@ class TaskRunner : public framework::Task /// \param taskName - name of the task, which exists in tasks list in the configuration file /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies - TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id = 0); + TaskRunner(const TaskRunnerConfig& config); ~TaskRunner() override = default; /// \brief TaskRunner's init callback @@ -86,13 +84,10 @@ class TaskRunner : public framework::Task /// \brief TaskRunner's completion policy callback static framework::CompletionPolicy::CompletionOp completionPolicyCallback(o2::framework::InputSpan const& inputs); - std::string getDeviceName() { return mDeviceName; }; - const framework::Inputs& getInputsSpecs() { return mInputSpecs; }; - const framework::OutputSpec getOutputSpec() { return mMonitorObjectsSpec; }; - const framework::Options getOptions() { return mOptions; }; - - /// \brief Makes TaskRunner invoke TaskInterface::reset() each n cycles. n = 0 means never. - void setResetAfterCycles(size_t n = 0); + std::string getDeviceName() const { return mTaskConfig.deviceName; }; + const framework::Inputs& getInputsSpecs() const { return mTaskConfig.inputSpecs; }; + const framework::OutputSpec& getOutputSpec() const { return mTaskConfig.moSpec; }; + const framework::Options& getOptions() const { return mTaskConfig.options; }; /// \brief ID string for all TaskRunner devices static std::string createTaskRunnerIdString(); @@ -114,7 +109,6 @@ class TaskRunner : public framework::Task std::tuple validateInputs(const framework::InputRecord&); void loadTaskConfig(); - void loadTopologyConfig(); void startOfActivity(); void endOfActivity(); void startCycle(); @@ -124,24 +118,13 @@ class TaskRunner : public framework::Task void saveToFile(); private: - std::string mDeviceName; - TaskConfig mTaskConfig; - std::shared_ptr mConfigFile; // used in init only + TaskRunnerConfig mTaskConfig; std::shared_ptr mCollector; std::shared_ptr mTask; - size_t mResetAfterCycles = 0; std::shared_ptr mObjectsManager; int mRunNumber; - std::string validateDetectorName(std::string name) const; - boost::property_tree::ptree getTaskConfigTree() const; void updateMonitoringStats(framework::ProcessingContext& pCtx); - void computeRunNumber(const framework::ServiceRegistry& services); - - // consider moving these to TaskConfig - framework::Inputs mInputSpecs; - framework::OutputSpec mMonitorObjectsSpec; - framework::Options mOptions; bool mCycleOn = false; bool mNoMoreCycles = false; diff --git a/Framework/include/QualityControl/TaskConfig.h b/Framework/include/QualityControl/TaskRunnerConfig.h similarity index 68% rename from Framework/include/QualityControl/TaskConfig.h rename to Framework/include/QualityControl/TaskRunnerConfig.h index bb8763dfd6..672be59934 100644 --- a/Framework/include/QualityControl/TaskConfig.h +++ b/Framework/include/QualityControl/TaskRunnerConfig.h @@ -10,7 +10,7 @@ // or submit itself to any jurisdiction. /// -/// \file TaskConfig.h +/// \file TaskRunnerConfig.h /// \author Barthelemy von Haller /// @@ -19,23 +19,37 @@ #include #include +#include + +#include namespace o2::quality_control::core { /// \brief Container for the configuration of a Task -struct TaskConfig { +struct TaskRunnerConfig { + std::string deviceName; std::string taskName; std::string moduleName; std::string className; int cycleDurationSeconds; int maxNumberCycles; - std::string consulUrl; - std::string conditionUrl = ""; + std::string consulUrl{}; + std::string conditionUrl{}; + std::string monitoringUrl{}; + framework::Inputs inputSpecs{}; + framework::OutputSpec moSpec{ "XXX", "INVALID" }; + framework::Options options{}; std::unordered_map customParameters = {}; std::string detectorName = "MISC"; // intended to be the 3 letters code int parallelTaskID = 0; // ID to differentiate parallel local Tasks from one another. 0 means this is the only one. - std::string saveToFile = ""; + std::string saveToFile{}; + int resetAfterCycles = 0; + bool infologgerFilterDiscardDebug = false; + int infologgerDiscardLevel = 21; + int activityType = 0; + int defaultRunNumber = 0; + std::string configurationSource{}; }; } // namespace o2::quality_control::core diff --git a/Framework/include/QualityControl/TaskRunnerFactory.h b/Framework/include/QualityControl/TaskRunnerFactory.h index 3de1a56bb5..326d50a671 100644 --- a/Framework/include/QualityControl/TaskRunnerFactory.h +++ b/Framework/include/QualityControl/TaskRunnerFactory.h @@ -21,6 +21,8 @@ #include #include +#include "QualityControl/CommonSpec.h" +#include "QualityControl/TaskSpec.h" namespace o2::framework { @@ -30,6 +32,8 @@ class CompletionPolicy; namespace o2::quality_control::core { +struct TaskRunnerConfig; + /// \brief Factory in charge of creating DataProcessorSpec of QC task class TaskRunnerFactory { @@ -37,14 +41,13 @@ class TaskRunnerFactory TaskRunnerFactory() = default; virtual ~TaskRunnerFactory() = default; - /// \brief Creator of tasks + /// \brief Creates TaskRunner /// - /// \param taskName - name of the task, which exists in tasks list in the configuration file - /// \param configurationSource - absolute path to configuration file, preceded with backend (f.e. "json://") - /// \param id - subSpecification for taskRunner's OutputSpec, useful to avoid outputs collisions one more complex topologies - /// \param resetAfterPublish - should taskRunner reset the user's task after each MO publication - static o2::framework::DataProcessorSpec - create(std::string taskName, std::string configurationSource, size_t id = 0, size_t resetAfterCycles = 0); + /// \param taskConfig + static o2::framework::DataProcessorSpec create(const TaskRunnerConfig&); + + /// \brief Knows how to create TaskConfig from Specs + static TaskRunnerConfig extractConfig(const CommonSpec&, const TaskSpec&, std::optional id = std::nullopt, std::optional resetAfterCycles = std::nullopt); /// \brief Provides necessary customization of the TaskRunners. /// diff --git a/Framework/include/QualityControl/TaskSpec.h b/Framework/include/QualityControl/TaskSpec.h new file mode 100644 index 0000000000..2205ac10a6 --- /dev/null +++ b/Framework/include/QualityControl/TaskSpec.h @@ -0,0 +1,74 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#ifndef QUALITYCONTROL_TASKSPEC_H +#define QUALITYCONTROL_TASKSPEC_H + +/// +/// \file TaskSpec.h +/// \author Piotr Konopka +/// + +#include +#include + +#include "QualityControl/DataSourceSpec.h" + +namespace o2::quality_control::core +{ + +enum class TaskLocationSpec { + Local, + Remote +}; + +/// \brief Specification of a Task, which should map the JSON configuration structure. +struct TaskSpec { + // default, invalid spec + TaskSpec() = default; + + // minimal valid spec + TaskSpec(std::string taskName, std::string className, std::string moduleName, std::string detectorName, + int cycleDurationSeconds, DataSourceSpec dataSource) + : taskName(std::move(taskName)), + className(std::move(className)), + moduleName(std::move(moduleName)), + detectorName(std::move(detectorName)), + cycleDurationSeconds(cycleDurationSeconds), + dataSource(std::move(dataSource)) + { + } + + // basic + std::string taskName = "Invalid"; + std::string className = "Invalid"; + std::string moduleName = "Invalid"; + std::string detectorName = "Invalid"; + int cycleDurationSeconds = -1; + DataSourceSpec dataSource; + // advanced + bool active = true; + int maxNumberCycles = -1; + size_t resetAfterCycles = 0; + std::string saveObjectsToFile; + std::unordered_map customParameters = {}; + // multinode setups + TaskLocationSpec location = TaskLocationSpec::Remote; + std::vector localMachines = {}; + std::string remoteMachine = "any"; + uint16_t remotePort = 36543; + std::string localControl = "aliecs"; + std::string mergingMode = "delta"; // todo as enum? + int mergerCycleMultiplier = 1; +}; + +} // namespace o2::quality_control::core + +#endif //QUALITYCONTROL_TASKSPEC_H diff --git a/Framework/include/QualityControl/runnerUtils.h b/Framework/include/QualityControl/runnerUtils.h index f841cc6614..43181c4868 100644 --- a/Framework/include/QualityControl/runnerUtils.h +++ b/Framework/include/QualityControl/runnerUtils.h @@ -81,6 +81,23 @@ inline int computeRunNumber(const framework::ServiceRegistry& services, const bo return run; } +inline int computeRunNumber(const framework::ServiceRegistry& services, int defaultRunNumber = 0) +{ // Determine run number + int run = 0; + try { + auto temp = services.get().device()->fConfig->GetProperty("runNumber", "unspecified"); + ILOG(Info, Devel) << "Got this property runNumber from RawDeviceService: '" << temp << "'" << ENDM; + run = stoi(temp); + ILOG(Info, Support) << " Run number found in options: " << run << ENDM; + } catch (std::invalid_argument& ia) { + ILOG(Info, Support) << " Run number not found in options or is not a number, \n" + " using the one from the config file or 0 as a last resort." + << ENDM; + } + run = run > 0 /* found it in service */ ? run : defaultRunNumber; + return run; +} + } // namespace o2::quality_control::core #endif //QUALITYCONTROL_RUNNERUTILS_H diff --git a/Framework/src/DataSourceSpec.cxx b/Framework/src/DataSourceSpec.cxx new file mode 100644 index 0000000000..6c9d9f847a --- /dev/null +++ b/Framework/src/DataSourceSpec.cxx @@ -0,0 +1,30 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file DataSourceSpec.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/DataSourceSpec.h" +#include + +namespace o2::quality_control::core +{ + +// todo make DataSourceUtils + +DataSourceSpec::DataSourceSpec(DataSourceType type, std::unordered_map params) + : type(type), typeSpecificParams(std::move(params)) +{ + // todo: validation? +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/InfrastructureGenerator.cxx b/Framework/src/InfrastructureGenerator.cxx index 8047a4368a..46cbb4473f 100644 --- a/Framework/src/InfrastructureGenerator.cxx +++ b/Framework/src/InfrastructureGenerator.cxx @@ -25,6 +25,9 @@ #include "QualityControl/PostProcessingDevice.h" #include "QualityControl/Version.h" #include "QualityControl/QcInfoLogger.h" +#include "QualityControl/TaskSpec.h" +#include "QualityControl/InfrastructureSpecReader.h" +#include "QualityControl/InfrastructureSpec.h" #include #include @@ -50,7 +53,6 @@ using SubSpec = o2::header::DataHeader::SubSpecificationType; namespace o2::quality_control::core { -const char* defaultRemotePort = "36543"; uint16_t defaultPolicyPort = 42349; struct DataSamplingPolicySpec { @@ -65,19 +67,21 @@ struct DataSamplingPolicySpec { framework::WorkflowSpec InfrastructureGenerator::generateStandaloneInfrastructure(std::string configurationSource) { - WorkflowSpec workflow; - auto config = ConfigurationFactory::getConfiguration(configurationSource); printVersion(); - if (config->getRecursive("qc").count("tasks")) { - for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { - if (taskConfig.get("active", true)) { - // The "resetAfterCycles" parameters should be handled differently for standalone/remote and local tasks, - // thus we should not let TaskRunnerFactory read it and decide by itself, since it might not be aware of - // the context we run QC. - size_t resetAfterCycles = taskConfig.get("resetAfterCycles", 0); - workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, 0, resetAfterCycles)); - } + auto config = ConfigurationFactory::getConfiguration(configurationSource); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + // todo: report the number of tasks/checks/etc once all are read there. + + WorkflowSpec workflow; + + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (taskSpec.active) { + // The "resetAfterCycles" parameters should be handled differently for standalone/remote and local tasks, + // thus we should not let TaskRunnerFactory read it and decide by itself, since it might not be aware of + // the context we run QC. + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, taskSpec.resetAfterCycles); + workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); } } auto checkRunnerOutputs = generateCheckRunners(workflow, configurationSource); @@ -93,73 +97,59 @@ void InfrastructureGenerator::generateStandaloneInfrastructure(framework::Workfl workflow.insert(std::end(workflow), std::begin(qcInfrastructure), std::end(qcInfrastructure)); } -WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string configurationSource, std::string host) +WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string configurationSource, std::string targetHost) { - WorkflowSpec workflow; - TaskRunnerFactory taskRunnerFactory; - std::set samplingPoliciesUsed; + printVersion(); auto config = ConfigurationFactory::getConfiguration(configurationSource); - printVersion(); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + + WorkflowSpec workflow; + std::set samplingPoliciesUsed; - if (config->getRecursive("qc").count("tasks") == 0) { + if (infrastructureSpec.tasks.empty()) { return workflow; } - for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { - if (!taskConfig.get("active")) { - ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM; + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (!taskSpec.active) { + ILOG(Info, Devel) << "Task " << taskSpec.taskName << " is disabled, ignoring." << ENDM; continue; } - if (taskConfig.get("location") == "local") { - if (taskConfig.get_child("localMachines").empty()) { - throw std::runtime_error("No local machines specified for task " + taskName + " in its configuration"); + if (taskSpec.location == TaskLocationSpec::Local) { + if (taskSpec.localMachines.empty()) { + throw std::runtime_error("No local machines specified for task " + taskSpec.taskName + " in its configuration"); } size_t id = 1; - for (const auto& machine : taskConfig.get_child("localMachines")) { + for (const auto& machine : taskSpec.localMachines) { // We spawn a task and proxy only if we are on the right machine. - if (machine.second.get("") == host) { + if (machine == targetHost) { // If we use delta mergers, then the moving window is implemented by the last Merger layer. // The QC Tasks should always send a delta covering one cycle. - int resetAfterCycles = taskConfig.get("mergingMode", "delta") == "delta" ? 1 : taskConfig.get("resetAfterCycles", 0); + int resetAfterCycles = taskSpec.mergingMode == "delta" ? 1 : (int)taskSpec.resetAfterCycles; + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, id, resetAfterCycles); // Generate QC Task Runner - workflow.emplace_back(taskRunnerFactory.create(taskName, configurationSource, id, resetAfterCycles)); + workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); // Generate an output proxy // These should be removed when we are able to declare dangling output in normal DPL devices - auto remoteMachine = taskConfig.get_optional("remoteMachine"); - if (!remoteMachine.has_value()) { - ILOG(Warning, Devel) - << "No remote machine was specified for a multinode QC setup." - " This is fine if running with AliECS, but it will fail in standalone mode." - << ENDM; - } - auto remotePort = taskConfig.get_optional("remotePort"); - if (!remotePort.has_value()) { - ILOG(Warning, Devel) - << "No remote port was specified for a multinode QC setup." - " This is fine if running with AliECS, but it might fail in standalone mode." - << ENDM; - } - generateLocalTaskLocalProxy(workflow, id, taskName, remoteMachine.value_or("any"), - remotePort.value_or(defaultRemotePort), - taskConfig.get("localControl", "aliecs")); + generateLocalTaskLocalProxy(workflow, id, taskSpec.taskName, taskSpec.remoteMachine, std::to_string(taskSpec.remotePort), taskSpec.localControl); break; } id++; } - } else // if (taskConfig.get("location") == "remote") + } else // TaskLocationSpec::Remote { // Collecting Data Sampling Policies - auto dataSourceTree = taskConfig.get_child("dataSource"); - std::string type = dataSourceTree.get("type"); - if (type == "dataSamplingPolicy") { - samplingPoliciesUsed.insert({ dataSourceTree.get("name"), taskConfig.get("localControl", "aliecs") }); - } else if (type == "direct") { - throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources"); - } else { - throw std::runtime_error("Configuration error: dataSource type unknown : " + type); + switch (taskSpec.dataSource.type) { + case DataSourceType::DataSamplingPolicy: + samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl }); + break; + case DataSourceType::Direct: + throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources"); + default: + throw std::runtime_error("Configuration error: unsupported dataSource for remote QC Tasks"); } } } @@ -172,7 +162,7 @@ WorkflowSpec InfrastructureGenerator::generateLocalInfrastructure(std::string co std::vector machines = DataSampling::MachinesForPolicy(config.get(), policyName); for (const auto& machine : machines) { - if (machine == host) { + if (machine == targetHost) { generateDataSamplingPolicyLocalProxy(workflow, policyName, inputSpecs, machine, port, control); } } @@ -189,65 +179,53 @@ void InfrastructureGenerator::generateLocalInfrastructure(framework::WorkflowSpe o2::framework::WorkflowSpec InfrastructureGenerator::generateRemoteInfrastructure(std::string configurationSource) { + printVersion(); + + auto config = ConfigurationFactory::getConfiguration(configurationSource); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configurationSource); + WorkflowSpec workflow; std::set samplingPoliciesUsed; - auto config = ConfigurationFactory::getConfiguration(configurationSource); - printVersion(); - if (config->getRecursive("qc").count("tasks")) { - for (const auto& [taskName, taskConfig] : config->getRecursive("qc.tasks")) { - if (!taskConfig.get("active", true)) { - ILOG(Info, Devel) << "Task " << taskName << " is disabled, ignoring." << ENDM; - continue; - } + for (const auto& taskSpec : infrastructureSpec.tasks) { + if (!taskSpec.active) { + ILOG(Info, Devel) << "Task " << taskSpec.taskName << " is disabled, ignoring." << ENDM; + continue; + } - if (taskConfig.get("location") == "local") { - // if tasks are LOCAL, generate input proxies + mergers + checkers - - size_t numberOfLocalMachines = taskConfig.get_child("localMachines").size() > 1 ? taskConfig.get_child("localMachines").size() : 1; - // Generate an input proxy - // These should be removed when we are able to declare dangling inputs in normal DPL devices - auto remotePort = taskConfig.get_optional("remotePort"); - if (!remotePort.has_value()) { - ILOG(Warning, Devel) << "No remote port was specified for a multinode QC setup." - " This is fine if running with AliECS, but it might fail in standalone mode." - << ENDM; - } - generateLocalTaskRemoteProxy(workflow, taskName, numberOfLocalMachines, remotePort.value_or(defaultRemotePort), - taskConfig.get("localControl", "aliecs")); - - auto mergingMode = taskConfig.get("mergingMode", "delta"); - // In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks. - size_t resetAfterCycles = mergingMode == "delta" ? taskConfig.get("resetAfterCycles", 0) : 0; - auto cycleDurationSeconds = taskConfig.get("cycleDurationSeconds") * taskConfig.get("mergerCycleMultiplier", 1); - auto monitoringUrl = config->get("qc.config.monitoring.url"); - - generateMergers(workflow, taskName, numberOfLocalMachines, cycleDurationSeconds, mergingMode, resetAfterCycles, monitoringUrl); - - } else if (taskConfig.get("location") == "remote") { - - // -- if tasks are REMOTE, generate dispatcher proxies + tasks + checkers - // (for the time being we don't foresee parallel tasks on QC servers, so no mergers here) - - // fixme: ideally we should check if we are on the right remote machine, but now we support only n -> 1 setups, - // so there is no point. Also, I expect that we should be able to generate one big topology or its parts - // and we would place it among QC servers using AliECS, not by configuration files. - - // Collecting Data Sampling Policies - auto dataSourceTree = taskConfig.get_child("dataSource"); - std::string type = dataSourceTree.get("type"); - if (type == "dataSamplingPolicy") { - samplingPoliciesUsed.insert({ dataSourceTree.get("name"), taskConfig.get("localControl", "aliecs") }); - } else if (type == "direct") { - throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskName + " cannot use direct data sources"); - } else { - throw std::runtime_error("Configuration error: dataSource type unknown : " + type); - } + if (taskSpec.location == TaskLocationSpec::Local) { + // if tasks are LOCAL, generate input proxies + mergers + checkers + + size_t numberOfLocalMachines = taskSpec.localMachines.size() > 1 ? taskSpec.localMachines.size() : 1; + // Generate an input proxy + // These should be removed when we are able to declare dangling inputs in normal DPL devices + generateLocalTaskRemoteProxy(workflow, taskSpec.taskName, numberOfLocalMachines, std::to_string(taskSpec.remotePort), taskSpec.localControl); + + // In "delta" mode Mergers should implement moving window, in "entire" - QC Tasks. + size_t resetAfterCycles = taskSpec.mergingMode == "delta" ? taskSpec.resetAfterCycles : 0; + auto cycleDurationSeconds = taskSpec.cycleDurationSeconds * taskSpec.mergerCycleMultiplier; + + generateMergers(workflow, taskSpec.taskName, numberOfLocalMachines, cycleDurationSeconds, taskSpec.mergingMode, resetAfterCycles, infrastructureSpec.common.monitoringUrl); + + } else if (taskSpec.location == TaskLocationSpec::Remote) { - auto resetAfterCycles = taskConfig.get("resetAfterCycles", 0); - // Creating the remote task - workflow.emplace_back(TaskRunnerFactory::create(taskName, configurationSource, 0, resetAfterCycles)); + // -- if tasks are REMOTE, generate dispatcher proxies + tasks + checkers + // (for the time being we don't foresee parallel tasks on QC servers, so no mergers here) + + // Collecting Data Sampling Policies + switch (taskSpec.dataSource.type) { + case DataSourceType::DataSamplingPolicy: + samplingPoliciesUsed.insert({ taskSpec.dataSource.typeSpecificParams.at("name"), taskSpec.localControl }); + break; + case DataSourceType::Direct: + throw std::runtime_error("Configuration error: Remote QC tasks such as " + taskSpec.taskName + " cannot use direct data sources"); + default: + throw std::runtime_error("Configuration error: unsupported dataSource for remote QC Tasks"); } + + // Creating the remote task + auto taskConfig = TaskRunnerFactory::extractConfig(infrastructureSpec.common, taskSpec, 0, taskSpec.resetAfterCycles); + workflow.emplace_back(TaskRunnerFactory::create(taskConfig)); } } diff --git a/Framework/src/InfrastructureSpecReader.cxx b/Framework/src/InfrastructureSpecReader.cxx new file mode 100644 index 0000000000..3d7a838bdb --- /dev/null +++ b/Framework/src/InfrastructureSpecReader.cxx @@ -0,0 +1,194 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// +/// \file InfrastructureSpecReader.cxx +/// \author Piotr Konopka +/// + +#include "QualityControl/InfrastructureSpecReader.h" +#include "QualityControl/QcInfoLogger.h" + +#include +#include +#include + +using namespace o2::utilities; +using namespace o2::framework; + +namespace o2::quality_control::core +{ + +InfrastructureSpec InfrastructureSpecReader::readInfrastructureSpec(const boost::property_tree::ptree& config, const std::string& configurationSource) +{ + InfrastructureSpec spec; + const auto& qcTree = config.get_child("qc"); + if (qcTree.find("config") != qcTree.not_found()) { + spec.common = readCommonSpec(qcTree.get_child("config"), configurationSource); + } else { + ILOG(Error) << "The \"config\" section in the provided QC config file is missing." << ENDM; + } + if (qcTree.find("tasks") != qcTree.not_found()) { + const auto& tasksTree = qcTree.get_child("tasks"); + spec.tasks.reserve(tasksTree.size()); + for (const auto& [taskName, taskConfig] : tasksTree) { + spec.tasks.push_back(readTaskSpec(taskName, taskConfig, configurationSource)); + } + } + return spec; +} + +CommonSpec InfrastructureSpecReader::readCommonSpec(const boost::property_tree::ptree& config, const std::string& configurationSource) +{ + CommonSpec spec; + for (const auto& [key, value] : config.get_child("database")) { + spec.database.emplace(key, value.get_value()); + } + spec.activityNumber = config.get("Activity.number", spec.activityNumber); + spec.activityType = config.get("Activity.type", spec.activityType); + spec.monitoringUrl = config.get("monitoring.url", spec.monitoringUrl); + spec.consulUrl = config.get("consul.url", spec.consulUrl); + spec.conditionDBUrl = config.get("conditionDB.url", spec.conditionDBUrl); + spec.infologgerFilterDiscardDebug = config.get("infologger.filterDiscardDebug", spec.infologgerFilterDiscardDebug); + spec.infologgerDiscardLevel = config.get("infologger.filterDiscardLevel", spec.infologgerDiscardLevel); + + spec.configurationSource = configurationSource; + + return spec; +} + +TaskSpec InfrastructureSpecReader::readTaskSpec(std::string taskName, const boost::property_tree::ptree& config, const std::string& configurationSource) +{ + static std::unordered_map const taskLocationFromString = { + { "local", TaskLocationSpec::Local }, + { "remote", TaskLocationSpec::Remote } + }; + + TaskSpec ts; + + ts.taskName = taskName; + ts.className = config.get("className"); + ts.moduleName = config.get("moduleName"); + ts.detectorName = config.get("detectorName"); + ts.cycleDurationSeconds = config.get("cycleDurationSeconds"); + ts.dataSource = readDataSourceSpec(config.get_child("dataSource"), configurationSource); + + ts.active = config.get("active", ts.active); + ts.maxNumberCycles = config.get("maxNumberCycles", ts.maxNumberCycles); + ts.resetAfterCycles = config.get("resetAfterCycles", ts.resetAfterCycles); + ts.saveObjectsToFile = config.get("saveObjectsToFile", ts.saveObjectsToFile); + if (config.count("taskParameters") > 0) { + for (const auto& [key, value] : config.get_child("taskParameters")) { + ts.customParameters.emplace(key, value.get_value()); + } + } + + bool multinodeSetup = config.find("location") != config.not_found(); + ts.location = taskLocationFromString.at(config.get("location", "remote")); + if (config.count("localMachines") > 0) { + for (const auto& [key, value] : config.get_child("localMachines")) { + ts.localMachines.emplace_back(value.get_value()); + } + } + if (multinodeSetup && config.count("remoteMachine") > 0) { + ILOG(Warning, Trace) + << "No remote machine was specified for a multinode QC setup." + " This is fine if running with AliECS, but it will fail in standalone mode." + << ENDM; + } + ts.remoteMachine = config.get("remoteMachine", ts.remoteMachine); + if (multinodeSetup && config.count("remotePort") > 0) { + ILOG(Warning, Trace) + << "No remote port was specified for a multinode QC setup." + " This is fine if running with AliECS, but it might fail in standalone mode." + << ENDM; + } + ts.remotePort = config.get("remotePort", ts.remotePort); + ts.localControl = config.get("localControl", ts.localControl); + ts.mergingMode = config.get("mergingMode", ts.mergingMode); + ts.mergerCycleMultiplier = config.get("mergerCycleMultiplier", ts.mergerCycleMultiplier); + + return ts; +} + +DataSourceSpec InfrastructureSpecReader::readDataSourceSpec(const boost::property_tree::ptree& dataSourceSpec, + const std::string& configurationSource) +{ + static std::unordered_map const dataSourceTypeFromString = { + // fixme: the convention is inconsistent and it should be fixed in coordination with configuration files + { "dataSamplingPolicy", DataSourceType::DataSamplingPolicy }, + { "direct", DataSourceType::Direct }, + { "Task", DataSourceType::Task }, + { "Check", DataSourceType::Check }, + { "Aggregator", DataSourceType::Aggregator }, + { "PostProcessing", DataSourceType::PostProcessingTask }, + { "ExternalTask", DataSourceType::ExternalTask } + }; + + DataSourceSpec dss; + dss.type = dataSourceTypeFromString.at(dataSourceSpec.get("type")); + + switch (dss.type) { + case DataSourceType::DataSamplingPolicy: { + auto name = dataSourceSpec.get("name"); + dss.typeSpecificParams.insert({ "name", name }); + dss.inputs = DataSampling::InputSpecsForPolicy(configurationSource, name); //fixme: add a method which takes a ptree, then i can remove configurationSource + break; + } + case DataSourceType::Direct: { + dss.typeSpecificParams.insert({ "query", dataSourceSpec.get("query") }); + auto inputsQuery = dataSourceSpec.get("query"); + dss.inputs = DataDescriptorQueryBuilder::parse(inputsQuery.c_str()); + break; + } + case DataSourceType::Task: // todo all below + case DataSourceType::PostProcessingTask: + case DataSourceType::Check: + case DataSourceType::Aggregator: + dss.typeSpecificParams.insert({ "name", dataSourceSpec.get("name") }); + break; + case DataSourceType::ExternalTask: + dss.typeSpecificParams.insert({ "name", dataSourceSpec.get("name") }); + dss.typeSpecificParams.insert({ "query", dataSourceSpec.get("query") }); + break; + case DataSourceType::Invalid: + // todo: throw? + break; + } + + return dss; +} + +std::string InfrastructureSpecReader::validateDetectorName(std::string name) +{ + // name must be a detector code from DetID or one of the few allowed general names + int nDetectors = 16; + const char* detNames[16] = // once we can use DetID, remove this hard-coded list + { "ITS", "TPC", "TRD", "TOF", "PHS", "CPV", "EMC", "HMP", "MFT", "MCH", "MID", "ZDC", "FT0", "FV0", "FDD", "ACO" }; + std::vector permitted = { "MISC", "DAQ", "GENERAL", "TST", "BMK", "CTP", "TRG", "DCS", "REC" }; + for (auto i = 0; i < nDetectors; i++) { + permitted.emplace_back(detNames[i]); + // permitted.push_back(o2::detectors::DetID::getName(i)); + } + auto it = std::find(permitted.begin(), permitted.end(), name); + + if (it == permitted.end()) { + std::string permittedString; + for (const auto& i : permitted) + permittedString += i + ' '; + ILOG(Error, Support) << "Invalid detector name : " << name << "\n" + << " Placeholder 'MISC' will be used instead\n" + << " Note: list of permitted detector names :" << permittedString << ENDM; + return "MISC"; + } + return name; +} + +} // namespace o2::quality_control::core \ No newline at end of file diff --git a/Framework/src/TaskFactory.cxx b/Framework/src/TaskFactory.cxx index fea65dabb7..4ca07b9aa0 100644 --- a/Framework/src/TaskFactory.cxx +++ b/Framework/src/TaskFactory.cxx @@ -22,7 +22,7 @@ namespace o2::quality_control::core { -TaskInterface* TaskFactory::create(TaskConfig& taskConfig, std::shared_ptr objectsManager) +TaskInterface* TaskFactory::create(TaskRunnerConfig& taskConfig, std::shared_ptr objectsManager) { TaskInterface* result = root_class_factory::create(taskConfig.moduleName, taskConfig.className); result->setName(taskConfig.taskName); diff --git a/Framework/src/TaskRunner.cxx b/Framework/src/TaskRunner.cxx index 9f524ee9f4..f06b5bf10e 100644 --- a/Framework/src/TaskRunner.cxx +++ b/Framework/src/TaskRunner.cxx @@ -59,23 +59,10 @@ using namespace o2::utilities; using namespace std::chrono; using namespace AliceO2::Common; -TaskRunner::TaskRunner(const std::string& taskName, const std::string& configurationSource, size_t id) - : mDeviceName(createTaskRunnerIdString() + "-" + taskName), - mRunNumber(0), - mMonitorObjectsSpec({ "mo" }, createTaskDataOrigin(), createTaskDataDescription(taskName), id) +TaskRunner::TaskRunner(const TaskRunnerConfig& config) + : mTaskConfig(config), + mRunNumber(0) { - // setup configuration - try { - mTaskConfig.taskName = taskName; - mTaskConfig.parallelTaskID = id; - mConfigFile = ConfigurationFactory::getConfiguration(configurationSource); - loadTopologyConfig(); - } catch (...) { - // catch the configuration exception and print it to avoid losing it - ILOG(Fatal, Ops) << "Unexpected exception during configuration:\n" - << current_diagnostic(true) << ENDM; - throw; - } } void TaskRunner::init(InitContext& iCtx) @@ -86,7 +73,10 @@ void TaskRunner::init(InitContext& iCtx) } catch (const RuntimeErrorRef& err) { ILOG(Error) << "Could not find the DPL InfoLogger Context." << ENDM; } - ILOG_INST.init("task/" + mTaskConfig.taskName, mConfigFile->getRecursive(), ilContext); + ILOG_INST.init("task/" + mTaskConfig.taskName, + mTaskConfig.infologgerFilterDiscardDebug, + mTaskConfig.infologgerDiscardLevel, + ilContext); ILOG(Info, Support) << "Initializing TaskRunner" << ENDM; try { @@ -104,8 +94,7 @@ void TaskRunner::init(InitContext& iCtx) iCtx.services().get().set(CallbackService::Id::Reset, [this]() { reset(); }); // setup monitoring - auto monitoringUrl = mConfigFile->get("qc.config.monitoring.url", "infologger:///debug?qc"); - mCollector = MonitoringFactory::Get(monitoringUrl); + mCollector = MonitoringFactory::Get(mTaskConfig.monitoringUrl); mCollector->addGlobalTag(tags::Key::Subsystem, tags::Value::QC); mCollector->addGlobalTag("TaskName", mTaskConfig.taskName); @@ -146,7 +135,7 @@ void TaskRunner::run(ProcessingContext& pCtx) if (timerReady) { finishCycle(pCtx.outputs()); - if (mResetAfterCycles > 0 && (mCycleNumber % mResetAfterCycles == 0)) { + if (mTaskConfig.resetAfterCycles > 0 && (mCycleNumber % mTaskConfig.resetAfterCycles == 0)) { mTask->reset(); } if (mTaskConfig.maxNumberCycles < 0 || mCycleNumber < mTaskConfig.maxNumberCycles) { @@ -195,11 +184,6 @@ CompletionPolicy::CompletionOp TaskRunner::completionPolicyCallback(o2::framewor return action; } -void TaskRunner::setResetAfterCycles(size_t n) -{ - mResetAfterCycles = n; -} - std::string TaskRunner::createTaskRunnerIdString() { return std::string("QC-TASK-RUNNER"); @@ -229,7 +213,7 @@ void TaskRunner::endOfStream(framework::EndOfStreamContext& eosContext) void TaskRunner::start(const ServiceRegistry& services) { - o2::quality_control::core::computeRunNumber(services, mConfigFile->getRecursive()); + mRunNumber = o2::quality_control::core::computeRunNumber(services, mTaskConfig.defaultRunNumber); try { startOfActivity(); @@ -306,64 +290,11 @@ std::tuple TaskRunner::validateInputs return { dataReady, timerReady }; } -void TaskRunner::loadTopologyConfig() -{ - auto taskConfigTree = getTaskConfigTree(); - auto policiesFilePath = mConfigFile->get("dataSamplingPolicyFile", ""); - std::shared_ptr config = policiesFilePath.empty() ? mConfigFile : ConfigurationFactory::getConfiguration(policiesFilePath); - auto dataSourceTree = taskConfigTree.get_child("dataSource"); - auto type = dataSourceTree.get("type"); - - if (type == "dataSamplingPolicy") { - auto policyName = dataSourceTree.get("name"); - ILOG(Info, Support) << "policyName : " << policyName << ENDM; - mInputSpecs = DataSampling::InputSpecsForPolicy(config.get(), policyName); - } else if (type == "direct") { - auto inputsQuery = dataSourceTree.get("query"); - mInputSpecs = DataDescriptorQueryBuilder::parse(inputsQuery.c_str()); - } else { - std::string message = std::string("Configuration error : dataSource type unknown : ") + type; - BOOST_THROW_EXCEPTION(AliceO2::Common::FatalException() << AliceO2::Common::errinfo_details(message)); - } - - mInputSpecs.emplace_back(InputSpec{ "timer-cycle", createTaskDataOrigin(), createTaskDataDescription("TIMER-" + mTaskConfig.taskName), 0, Lifetime::Timer }); - - // needed to avoid having looping at the maximum speed - mTaskConfig.cycleDurationSeconds = taskConfigTree.get("cycleDurationSeconds", 10); - mOptions.push_back({ "period-timer-cycle", framework::VariantType::Int, static_cast(mTaskConfig.cycleDurationSeconds * 1000000), { "timer period" } }); - mOptions.push_back({ "runNumber", framework::VariantType::String, { "Run number" } }); -} - -boost::property_tree::ptree TaskRunner::getTaskConfigTree() const -{ - auto tasksConfigList = mConfigFile->getRecursive("qc.tasks"); - auto taskConfigTree = tasksConfigList.find(mTaskConfig.taskName); - if (taskConfigTree == tasksConfigList.not_found()) { - std::string message = "No configuration found for task \"" + mTaskConfig.taskName + "\""; - BOOST_THROW_EXCEPTION(AliceO2::Common::FatalException() << AliceO2::Common::errinfo_details(message)); - } - return taskConfigTree->second; -} - -void TaskRunner::loadTaskConfig() +void TaskRunner::loadTaskConfig() // todo consider renaming { ILOG(Info, Support) << "Loading configuration" << ENDM; - auto taskConfigTree = getTaskConfigTree(); - string test = taskConfigTree.get("detectorName", "MISC"); - mTaskConfig.detectorName = validateDetectorName(taskConfigTree.get("detectorName", "MISC")); ILOG_INST.setDetector(mTaskConfig.detectorName); - mTaskConfig.moduleName = taskConfigTree.get("moduleName"); - mTaskConfig.className = taskConfigTree.get("className"); - mTaskConfig.maxNumberCycles = taskConfigTree.get("maxNumberCycles", -1); - mTaskConfig.consulUrl = mConfigFile->get("qc.config.consul.url", ""); - mTaskConfig.conditionUrl = mConfigFile->get("qc.config.conditionDB.url", "http://ccdb-test.cern.ch:8080"); - mTaskConfig.saveToFile = taskConfigTree.get("saveObjectsToFile", ""); - try { - mTaskConfig.customParameters = mConfigFile->getRecursiveMap("qc.tasks." + mTaskConfig.taskName + ".taskParameters"); - } catch (...) { - ILOG(Debug, Support) << "No custom parameters for " << mTaskConfig.taskName << ENDM; - } ILOG(Info, Support) << "Configuration loaded : " << ENDM; ILOG(Info, Support) << ">> Task name : " << mTaskConfig.taskName << ENDM; @@ -374,31 +305,6 @@ void TaskRunner::loadTaskConfig() ILOG(Info, Support) << ">> Save to file : " << mTaskConfig.saveToFile << ENDM; } -std::string TaskRunner::validateDetectorName(std::string name) const -{ - // name must be a detector code from DetID or one of the few allowed general names - int nDetectors = 16; - const char* detNames[16] = // once we can use DetID, remove this hard-coded list - { "ITS", "TPC", "TRD", "TOF", "PHS", "CPV", "EMC", "HMP", "MFT", "MCH", "MID", "ZDC", "FT0", "FV0", "FDD", "ACO" }; - vector permitted = { "MISC", "DAQ", "GENERAL", "TST", "BMK", "CTP", "TRG", "DCS", "REC" }; - for (auto i = 0; i < nDetectors; i++) { - permitted.push_back(detNames[i]); - // permitted.push_back(o2::detectors::DetID::getName(i)); - } - auto it = std::find(permitted.begin(), permitted.end(), name); - - if (it == permitted.end()) { - std::string permittedString; - for (auto i : permitted) - permittedString += i + ' '; - ILOG(Error, Support) << "Invalid detector name : " << name << "\n" - << " Placeholder 'MISC' will be used instead\n" - << " Note: list of permitted detector names :" << permittedString << ENDM; - return "MISC"; - } - return name; -} - void TaskRunner::startOfActivity() { // stats @@ -406,8 +312,7 @@ void TaskRunner::startOfActivity() mTotalNumberObjectsPublished = 0; // Start activity in module's stask and update objectsManager - Activity activity(mRunNumber, - mConfigFile->get("qc.config.Activity.type")); + Activity activity(mRunNumber, mTaskConfig.activityType); ILOG(Info, Ops) << "Starting run " << mRunNumber << ENDM; mCollector->setRunNumber(mRunNumber); mTask->startOfActivity(activity); @@ -417,8 +322,7 @@ void TaskRunner::startOfActivity() void TaskRunner::endOfActivity() { - Activity activity(mRunNumber, - mConfigFile->get("qc.config.Activity.type")); + Activity activity(mRunNumber, mTaskConfig.activityType); ILOG(Info, Ops) << "Stopping run " << mRunNumber << ENDM; mTask->endOfActivity(activity); mObjectsManager->removeAllFromServiceDiscovery(); @@ -504,7 +408,7 @@ int TaskRunner::publish(DataAllocator& outputs) ILOG(Info, Support) << "Publishing " << mObjectsManager->getNumberPublishedObjects() << " MonitorObjects" << ENDM; AliceO2::Common::Timer publicationDurationTimer; - auto concreteOutput = framework::DataSpecUtils::asConcreteDataMatcher(mMonitorObjectsSpec); + auto concreteOutput = framework::DataSpecUtils::asConcreteDataMatcher(mTaskConfig.moSpec); // getNonOwningArray creates a TObjArray containing the monitoring objects, but not // owning them. The array is created by new and must be cleaned up by the caller std::unique_ptr array(mObjectsManager->getNonOwningArray()); @@ -514,7 +418,7 @@ int TaskRunner::publish(DataAllocator& outputs) Output{ concreteOutput.origin, concreteOutput.description, concreteOutput.subSpec, - mMonitorObjectsSpec.lifetime }, + mTaskConfig.moSpec.lifetime }, *array); mLastPublicationDuration = publicationDurationTimer.getTime(); diff --git a/Framework/src/TaskRunnerFactory.cxx b/Framework/src/TaskRunnerFactory.cxx index 45df00f857..b1c854d3e6 100644 --- a/Framework/src/TaskRunnerFactory.cxx +++ b/Framework/src/TaskRunnerFactory.cxx @@ -16,9 +16,12 @@ #include "QualityControl/TaskRunnerFactory.h" #include "QualityControl/TaskRunner.h" +#include "QualityControl/TaskRunnerConfig.h" +#include "QualityControl/InfrastructureSpecReader.h" #include #include +#include namespace o2::quality_control::core { @@ -26,24 +29,74 @@ namespace o2::quality_control::core using namespace o2::framework; o2::framework::DataProcessorSpec - TaskRunnerFactory::create(std::string taskName, std::string configurationSource, size_t id, size_t resetAfterCycles) + TaskRunnerFactory::create(const TaskRunnerConfig& taskConfig) { - TaskRunner qcTask{ taskName, configurationSource, id }; - qcTask.setResetAfterCycles(resetAfterCycles); + TaskRunner qcTask{ taskConfig }; DataProcessorSpec newTask{ - qcTask.getDeviceName(), - qcTask.getInputsSpecs(), - Outputs{ qcTask.getOutputSpec() }, - AlgorithmSpec{}, - qcTask.getOptions() + taskConfig.deviceName, + taskConfig.inputSpecs, + { taskConfig.moSpec }, + adaptFromTask(std::move(qcTask)), + taskConfig.options }; - // this needs to be moved at the end - newTask.algorithm = adaptFromTask(std::move(qcTask)); return newTask; } +TaskRunnerConfig TaskRunnerFactory::extractConfig(const CommonSpec& globalConfig, const TaskSpec& taskSpec, std::optional id, std::optional resetAfterCycles) +{ + std::string deviceName{ TaskRunner::createTaskRunnerIdString() + "-" + taskSpec.taskName }; + + int parallelTaskID = id.value_or(0); + + // todo validate data source + if (!taskSpec.dataSource.isOneOf(DataSourceType::DataSamplingPolicy, DataSourceType::Direct)) { + throw std::runtime_error("This data source of the task '" + taskSpec.taskName + "' is not supported."); + } + auto inputs = taskSpec.dataSource.inputs; + inputs.emplace_back("timer-cycle", + TaskRunner::createTaskDataOrigin(), + TaskRunner::createTaskDataDescription("TIMER-" + taskSpec.taskName), + 0, + Lifetime::Timer); + + OutputSpec monitorObjectsSpec{ { "mo" }, + TaskRunner::createTaskDataOrigin(), + TaskRunner::createTaskDataDescription(taskSpec.taskName), + static_cast(parallelTaskID) }; + + Options options{ + { "period-timer-cycle", framework::VariantType::Int, static_cast(taskSpec.cycleDurationSeconds * 1000000), { "timer period" } }, + { "runNumber", framework::VariantType::String, { "Run number" } } + }; + + return { + deviceName, + taskSpec.taskName, + taskSpec.moduleName, + taskSpec.className, + taskSpec.cycleDurationSeconds, + taskSpec.maxNumberCycles, + globalConfig.consulUrl, + globalConfig.conditionDBUrl, + globalConfig.monitoringUrl, + inputs, + monitorObjectsSpec, + options, + taskSpec.customParameters, + InfrastructureSpecReader::validateDetectorName(taskSpec.detectorName), + parallelTaskID, + taskSpec.saveObjectsToFile, + resetAfterCycles.value_or(taskSpec.resetAfterCycles), + globalConfig.infologgerFilterDiscardDebug, + globalConfig.infologgerDiscardLevel, + globalConfig.activityType, + globalConfig.activityNumber, + globalConfig.configurationSource + }; +} + void TaskRunnerFactory::customizeInfrastructure(std::vector& policies) { auto matcher = [](framework::DeviceSpec const& device) { diff --git a/Framework/test/testCheckWorkflow.json b/Framework/test/testCheckWorkflow.json index 6f05ab8670..d187bafa92 100644 --- a/Framework/test/testCheckWorkflow.json +++ b/Framework/test/testCheckWorkflow.json @@ -18,6 +18,7 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", + "detectorName" : "TST", "cycleDurationSeconds": "5", "maxNumberCycles": "-1", "dataSource": { @@ -32,6 +33,7 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", + "detectorName" : "TST", "cycleDurationSeconds": "5", "maxNumberCycles": "-1", "dataSource": { @@ -46,6 +48,7 @@ "active": "true", "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", + "detectorName" : "TST", "cycleDurationSeconds": "5", "maxNumberCycles": "-1", "dataSource": { diff --git a/Framework/test/testObjectsManager.cxx b/Framework/test/testObjectsManager.cxx index 985c636ad3..56b0a718e8 100644 --- a/Framework/test/testObjectsManager.cxx +++ b/Framework/test/testObjectsManager.cxx @@ -31,9 +31,15 @@ using namespace AliceO2::Common; namespace o2::quality_control::core { +struct Config { + std::string taskName = "test"; + std::string detectorName = "TST"; + std::string consulUrl = "invalid"; +}; + BOOST_AUTO_TEST_CASE(invalid_url_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "bad-url:1234"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -41,7 +47,7 @@ BOOST_AUTO_TEST_CASE(invalid_url_test) BOOST_AUTO_TEST_CASE(duplicate_object_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -52,7 +58,7 @@ BOOST_AUTO_TEST_CASE(duplicate_object_test) BOOST_AUTO_TEST_CASE(is_being_published_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -65,7 +71,7 @@ BOOST_AUTO_TEST_CASE(is_being_published_test) BOOST_AUTO_TEST_CASE(unpublish_test) { - TaskConfig config; + Config config; config.taskName = "test"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); TObjString s("content"); @@ -83,7 +89,7 @@ BOOST_AUTO_TEST_CASE(unpublish_test) BOOST_AUTO_TEST_CASE(getters_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -113,7 +119,7 @@ BOOST_AUTO_TEST_CASE(getters_test) BOOST_AUTO_TEST_CASE(metadata_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); @@ -129,7 +135,7 @@ BOOST_AUTO_TEST_CASE(metadata_test) BOOST_AUTO_TEST_CASE(drawOptions_test) { - TaskConfig config; + Config config; config.taskName = "test"; config.consulUrl = "http://consul-test.cern.ch:8500"; ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); diff --git a/Framework/test/testPublisher.cxx b/Framework/test/testPublisher.cxx index e34b391615..87fa2cf5d6 100644 --- a/Framework/test/testPublisher.cxx +++ b/Framework/test/testPublisher.cxx @@ -30,11 +30,13 @@ using namespace AliceO2::Common; namespace o2::quality_control::core { +// fixme: unify with testObjectManager? BOOST_AUTO_TEST_CASE(publisher_test) { - TaskConfig config; - config.taskName = "test"; - ObjectsManager objectsManager(config.taskName, config.detectorName, config.consulUrl, 0, true); + std::string taskName = "test"; + std::string detectorName = "TST"; + std::string consulUrl = "invalid"; + ObjectsManager objectsManager(taskName, detectorName, consulUrl, 0, true); TObjString s("content"); objectsManager.startPublishing(&s); diff --git a/Framework/test/testSharedConfig.json b/Framework/test/testSharedConfig.json index 5d61a819b8..ab22f9bc52 100644 --- a/Framework/test/testSharedConfig.json +++ b/Framework/test/testSharedConfig.json @@ -23,6 +23,7 @@ "moduleName": "QcSkeleton", "cycleDurationSeconds": "10", "maxNumberCycles": "-1", + "detectorName": "TST", "dataSource": { "type": "dataSamplingPolicy", "name": "tpcclust" @@ -71,7 +72,15 @@ "location": "remote" }, "defTask": { - "active": "false" + "active": "false", + "className": "o2::quality_control_modules::skeleton::SkeletonTask", + "moduleName": "QcSkeleton", + "detectorName": "ITS", + "dataSource": { + "type": "dataSamplingPolicy", + "name": "tpcclust" + }, + "cycleDurationSeconds": "10" } }, "checks": { diff --git a/Framework/test/testTaskInterface.cxx b/Framework/test/testTaskInterface.cxx index c0d88849fa..aa803630a7 100644 --- a/Framework/test/testTaskInterface.cxx +++ b/Framework/test/testTaskInterface.cxx @@ -134,7 +134,7 @@ class TestTask : public TaskInterface BOOST_AUTO_TEST_CASE(test_invoke_all_methods) { // This is maximum that we can do until we are able to test the DPL algorithms in isolation. - TaskConfig taskConfig; + TaskRunnerConfig taskConfig; ObjectsManager* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.detectorName, taskConfig.consulUrl, 0, true); test::TestTask testTask(objectsManager); @@ -169,7 +169,8 @@ BOOST_AUTO_TEST_CASE(test_invoke_all_methods) BOOST_AUTO_TEST_CASE(test_task_factory) { - TaskConfig config{ + TaskRunnerConfig config{ + "SkeletonTaskRunner", "skeletonTask", "QcSkeleton", "o2::quality_control_modules::skeleton::SkeletonTask", @@ -200,7 +201,7 @@ BOOST_AUTO_TEST_CASE(retrieveCondition) api.storeAsTFileAny(&bad, "qc/TST/conditions", meta); // retrieve it - TaskConfig taskConfig; + TaskRunnerConfig taskConfig; ObjectsManager* objectsManager = new ObjectsManager(taskConfig.taskName, taskConfig.detectorName, taskConfig.consulUrl, 0, true); test::TestTask testTask(objectsManager); testTask.loadCcdb("ccdb-test.cern.ch:8080"); diff --git a/Framework/test/testTaskRunner.cxx b/Framework/test/testTaskRunner.cxx index 0da9f27e4f..a4cc34c14a 100644 --- a/Framework/test/testTaskRunner.cxx +++ b/Framework/test/testTaskRunner.cxx @@ -19,6 +19,9 @@ #include "QualityControl/TaskRunner.h" #include #include +#include "QualityControl/InfrastructureSpecReader.h" +#include "Configuration/ConfigurationFactory.h" +#include "Configuration/ConfigurationInterface.h" #define BOOST_TEST_MODULE TaskRunner test #define BOOST_TEST_MAIN @@ -31,13 +34,28 @@ using namespace std; using namespace o2::framework; using namespace o2::header; using namespace o2::utilities; +using namespace o2::configuration; + +TaskRunnerConfig getTaskConfig(const std::string& configFilePath, const std::string& taskName, size_t id) +{ + auto config = ConfigurationFactory::getConfiguration(configFilePath); + auto infrastructureSpec = InfrastructureSpecReader::readInfrastructureSpec(config->getRecursive(), configFilePath); + + auto taskSpec = std::find_if(infrastructureSpec.tasks.begin(), infrastructureSpec.tasks.end(), [&taskName](const auto& taskSpec) { + return taskSpec.taskName == taskName; + }); + if (taskSpec != infrastructureSpec.tasks.end()) { + return TaskRunnerFactory::extractConfig(infrastructureSpec.common, *taskSpec, id); + } else { + throw std::runtime_error("task " + taskName + " not found in the config file"); + } +} BOOST_AUTO_TEST_CASE(test_factory) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunnerFactory taskRunnerFactory; - DataProcessorSpec taskRunner = taskRunnerFactory.create("abcTask", configFilePath, 123); + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask", 123)); BOOST_CHECK_EQUAL(taskRunner.name, "QC-TASK-RUNNER-abcTask"); @@ -67,7 +85,7 @@ BOOST_AUTO_TEST_CASE(test_task_runner) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunner qcTask{ "abcTask", configFilePath, 0 }; + TaskRunner qcTask{ getTaskConfig(configFilePath, "abcTask", 0) }; BOOST_CHECK_EQUAL(qcTask.getDeviceName(), "QC-TASK-RUNNER-abcTask"); @@ -88,7 +106,7 @@ BOOST_AUTO_TEST_CASE(test_task_wrong_detector_name) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunner qcTask{ "abcTask", configFilePath, 0 }; + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "abcTask", 0)); // cout << "It should print an error message" << endl; } @@ -96,6 +114,7 @@ BOOST_AUTO_TEST_CASE(test_task_good_detector_name) { std::string configFilePath = std::string("json://") + getTestDataDirectory() + "testSharedConfig.json"; - TaskRunner qcTask{ "xyzTask", configFilePath, 0 }; + DataProcessorSpec taskRunner = TaskRunnerFactory::create(getTaskConfig(configFilePath, "xyzTask", 0)); + // cout << "no error message" << endl; } \ No newline at end of file diff --git a/Framework/test/testWorkflow.json b/Framework/test/testWorkflow.json index 806c89e91e..1106f50498 100644 --- a/Framework/test/testWorkflow.json +++ b/Framework/test/testWorkflow.json @@ -19,6 +19,7 @@ "className": "o2::quality_control_modules::skeleton::SkeletonTask", "moduleName": "QcSkeleton", "cycleDurationSeconds": "5", + "detectorName": "TST", "maxNumberCycles": "-1", "dataSource": { "type": "dataSamplingPolicy", diff --git a/Modules/Daq/test/testQcDaq.cxx b/Modules/Daq/test/testQcDaq.cxx index 8c9ae51e60..1630cb6985 100644 --- a/Modules/Daq/test/testQcDaq.cxx +++ b/Modules/Daq/test/testQcDaq.cxx @@ -19,10 +19,16 @@ using namespace std; namespace o2::quality_control_modules::daq { +struct Config { + std::string taskName = "test"; + std::string detectorName = "TST"; + std::string consulUrl = "invalid"; +}; + BOOST_AUTO_TEST_CASE(instantiate_task) { DaqTask task; - TaskConfig config; + Config config; config.consulUrl = "http://consul-test.cern.ch:8500"; config.taskName = "qcDaqTest"; config.detectorName = "DAQ"; diff --git a/Modules/Example/test/testFactory.cxx b/Modules/Example/test/testFactory.cxx index 783316826c..b9e4464685 100644 --- a/Modules/Example/test/testFactory.cxx +++ b/Modules/Example/test/testFactory.cxx @@ -25,7 +25,7 @@ namespace o2::quality_control_modules::example BOOST_AUTO_TEST_CASE(Task_Factory) { TaskFactory factory; - TaskConfig config; + TaskRunnerConfig config; config.taskName = "task"; config.moduleName = "QcCommon"; config.className = "o2::quality_control_modules::example::ExampleTask"; @@ -44,7 +44,7 @@ bool is_critical(AliceO2::Common::FatalException const&) { return true; } BOOST_AUTO_TEST_CASE(Task_Factory_failures, *utf::depends_on("Task_Factory") /* make sure we don't run both tests at the same time */) { TaskFactory factory; - TaskConfig config; + TaskRunnerConfig config; auto manager = make_shared(config.taskName, config.detectorName, config.consulUrl, 0, true); config.taskName = "task"; diff --git a/Modules/Example/test/testQcExample.cxx b/Modules/Example/test/testQcExample.cxx index 17a40e91ec..01045ab917 100644 --- a/Modules/Example/test/testQcExample.cxx +++ b/Modules/Example/test/testQcExample.cxx @@ -23,7 +23,7 @@ namespace o2::quality_control_modules::example BOOST_AUTO_TEST_CASE(insantiate_task) { ExampleTask task; - TaskConfig config; + TaskRunnerConfig config; config.consulUrl = "http://consul-test.cern.ch:8500"; config.taskName = "qcExampleTest"; config.detectorName = "TST"; diff --git a/Modules/Skeleton/test/testQcSkeleton.cxx b/Modules/Skeleton/test/testQcSkeleton.cxx index 5a1698af47..65ad315a99 100644 --- a/Modules/Skeleton/test/testQcSkeleton.cxx +++ b/Modules/Skeleton/test/testQcSkeleton.cxx @@ -34,7 +34,7 @@ namespace o2::quality_control_modules::skeleton BOOST_AUTO_TEST_CASE(instantiate_task) { SkeletonTask task; - TaskConfig config; + TaskRunnerConfig config; config.consulUrl = "http://consul-test.cern.ch:8500"; config.taskName = "qcSkeletonTest"; config.detectorName = "TST";