Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions Utilities/DataSampling/include/DataSampling/DataSampling.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,18 +100,26 @@ class DataSampling
static std::vector<framework::InputSpec> InputSpecsForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName);
/// \brief Provides InputSpecs to receive data for given DataSamplingPolicy
static std::vector<framework::InputSpec> InputSpecsForPolicy(std::shared_ptr<configuration::ConfigurationInterface> config, const std::string& policyName);
/// \brief Provides OutputSpecs of given DataSamplingPolicy
/// \brief Provides InputSpecs to receive data for given DataSamplingPolicy. Expects the "dataSamplingPolicies" tree.
static std::vector<framework::InputSpec> InputSpecsForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName);
/// \brief Provides OutputSpecs of given DataSamplingPolicy.
static std::vector<framework::OutputSpec> OutputSpecsForPolicy(const std::string& policiesSource, const std::string& policyName);
/// \brief Provides OutputSpecs of given DataSamplingPolicy
/// \brief Provides OutputSpecs of given DataSamplingPolicy.
static std::vector<framework::OutputSpec> OutputSpecsForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName);
/// \brief Provides the port to be used for a proxy of given DataSamplingPolicy
/// \brief Provides OutputSpecs of given DataSamplingPolicy. Expects the "dataSamplingPolicies" tree.
static std::vector<framework::OutputSpec> OutputSpecsForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName);
/// \brief Provides the port to be used for a proxy of given DataSamplingPolicy.
static std::optional<uint16_t> PortForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName);
/// \brief Provides the port to be used for a proxy of given DataSamplingPolicy
/// \brief Provides the port to be used for a proxy of given DataSamplingPolicy.
static std::optional<uint16_t> PortForPolicy(const std::string& policiesSource, const std::string& policyName);
/// \brief Provides the machines where given DataSamplingPolicy is enabled
/// \brief Provides the port to be used for a proxy of given DataSamplingPolicy. Expects the "dataSamplingPolicies" tree.
static std::optional<uint16_t> PortForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName);
/// \brief Provides the machines where given DataSamplingPolicy is enabled.
static std::vector<std::string> MachinesForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName);
/// \brief Provides the port to be used for a proxy of given DataSamplingPolicy
/// \brief Provides the machines where given DataSamplingPolicy is enabled.
static std::vector<std::string> MachinesForPolicy(const std::string& policiesSource, const std::string& policyName);
/// \brief Provides the machines where given DataSamplingPolicy is enabled. Expects the "dataSamplingPolicies" tree.
static std::vector<std::string> MachinesForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName);

private:
static void DoGenerateInfrastructure(Dispatcher&, framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1);
Expand Down
56 changes: 31 additions & 25 deletions Utilities/DataSampling/src/DataSampling.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -112,27 +112,19 @@ std::vector<InputSpec> DataSampling::InputSpecsForPolicy(const std::string& poli

std::vector<InputSpec> DataSampling::InputSpecsForPolicy(ConfigurationInterface* const config, const std::string& policyName)
{
std::vector<InputSpec> inputs;
auto policiesTree = config->getRecursive("dataSamplingPolicies");

for (auto&& policyConfig : policiesTree) {
if (policyConfig.second.get<std::string>("id") == policyName) {
auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
for (const auto& path : policy.getPathMap()) {
InputSpec input = DataSpecUtils::matchingInput(path.second);
inputs.push_back(input);
}
break;
}
}
return inputs;
return InputSpecsForPolicy(policiesTree, policyName);
}

std::vector<InputSpec> DataSampling::InputSpecsForPolicy(std::shared_ptr<configuration::ConfigurationInterface> config, const std::string& policyName)
{
std::vector<InputSpec> inputs;
auto policiesTree = config->getRecursive("dataSamplingPolicies");
return InputSpecsForPolicy(policiesTree, policyName);
}

std::vector<framework::InputSpec> DataSampling::InputSpecsForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
{
std::vector<InputSpec> inputs;
for (auto&& policyConfig : policiesTree) {
if (policyConfig.second.get<std::string>("id") == policyName) {
auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
Expand All @@ -154,9 +146,13 @@ std::vector<OutputSpec> DataSampling::OutputSpecsForPolicy(const std::string& po

std::vector<OutputSpec> DataSampling::OutputSpecsForPolicy(ConfigurationInterface* const config, const std::string& policyName)
{
std::vector<OutputSpec> outputs;
auto policiesTree = config->getRecursive("dataSamplingPolicies");
return OutputSpecsForPolicy(policiesTree, policyName);
}

std::vector<framework::OutputSpec> DataSampling::OutputSpecsForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
{
std::vector<OutputSpec> outputs;
for (auto&& policyConfig : policiesTree) {
if (policyConfig.second.get<std::string>("id") == policyName) {
auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
Expand All @@ -172,6 +168,17 @@ std::vector<OutputSpec> DataSampling::OutputSpecsForPolicy(ConfigurationInterfac
std::optional<uint16_t> DataSampling::PortForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName)
{
auto policiesTree = config->getRecursive("dataSamplingPolicies");
return PortForPolicy(policiesTree, policyName);
}

std::optional<uint16_t> DataSampling::PortForPolicy(const std::string& policiesSource, const std::string& policyName)
{
std::unique_ptr<ConfigurationInterface> config = ConfigurationFactory::getConfiguration(policiesSource);
return PortForPolicy(config.get(), policyName);
}

std::optional<uint16_t> DataSampling::PortForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
{
for (auto&& policyConfig : policiesTree) {
if (policyConfig.second.get<std::string>("id") == policyName) {
auto boostOptionalPort = policyConfig.second.get_optional<uint16_t>("port");
Expand All @@ -181,16 +188,21 @@ std::optional<uint16_t> DataSampling::PortForPolicy(configuration::Configuration
throw std::runtime_error("Could not find the policy '" + policyName + "'");
}

std::optional<uint16_t> DataSampling::PortForPolicy(const std::string& policiesSource, const std::string& policyName)
std::vector<std::string> DataSampling::MachinesForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName)
{
auto policiesTree = config->getRecursive("dataSamplingPolicies");
return MachinesForPolicy(policiesTree, policyName);
}

std::vector<std::string> DataSampling::MachinesForPolicy(const std::string& policiesSource, const std::string& policyName)
{
std::unique_ptr<ConfigurationInterface> config = ConfigurationFactory::getConfiguration(policiesSource);
return PortForPolicy(config.get(), policyName);
return MachinesForPolicy(config.get(), policyName);
}

std::vector<std::string> DataSampling::MachinesForPolicy(configuration::ConfigurationInterface* const config, const std::string& policyName)
std::vector<std::string> DataSampling::MachinesForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName)
{
std::vector<std::string> machines;
auto policiesTree = config->getRecursive("dataSamplingPolicies");
for (auto&& policyConfig : policiesTree) {
if (policyConfig.second.get<std::string>("id") == policyName) {
if (policyConfig.second.count("machines") > 0) {
Expand All @@ -204,10 +216,4 @@ std::vector<std::string> DataSampling::MachinesForPolicy(configuration::Configur
throw std::runtime_error("Could not find the policy '" + policyName + "'");
}

std::vector<std::string> DataSampling::MachinesForPolicy(const std::string& policiesSource, const std::string& policyName)
{
std::unique_ptr<ConfigurationInterface> config = ConfigurationFactory::getConfiguration(policiesSource);
return MachinesForPolicy(config.get(), policyName);
}

} // namespace o2::utilities