Skip to content

Commit e2d5ed2

Browse files
authored
[QC-630] Data Sampling: Match the host name to the machine list (AliceO2Group#6812)
1 parent ef63be2 commit e2d5ed2

3 files changed

Lines changed: 96 additions & 11 deletions

File tree

Utilities/DataSampling/include/DataSampling/DataSampling.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,16 @@ class DataSampling
8181
/// QC tasks.
8282
/// \param policiesSource Path to configuration file.
8383
/// \param threads Number of dispatcher threads, that will handle the data
84-
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, const std::string& policiesSource, size_t threads = 1);
84+
/// \param host Host name. If the host or a policy machine list are empty, the policy will always be created.
85+
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, const std::string& policiesSource, size_t threads = 1, const std::string& host = "");
8586

8687
/// \brief Generates data sampling infrastructure.
8788
/// \param workflow DPL workflow with already declared data processors which provide data desired by
8889
/// QC tasks.
8990
/// \param policiesSource boost::property_tree::ptree with the configuration
9091
/// \param threads Number of dispatcher threads, that will handle the data
91-
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1);
92+
/// \param host Host name. If the host or a policy machine list are empty, the policy will always be created.
93+
static void GenerateInfrastructure(framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1, const std::string& host = "");
9294
/// \brief Configures dispatcher to consume any data immediately.
9395
static void CustomizeInfrastructure(std::vector<framework::CompletionPolicy>&);
9496
/// \brief Applies blocking/nonblocking data sampling configuration to the workflow.
@@ -122,7 +124,7 @@ class DataSampling
122124
static std::vector<std::string> MachinesForPolicy(const boost::property_tree::ptree& policiesTree, const std::string& policyName);
123125

124126
private:
125-
static void DoGenerateInfrastructure(Dispatcher&, framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1);
127+
static void DoGenerateInfrastructure(Dispatcher&, framework::WorkflowSpec& workflow, boost::property_tree::ptree const& policies, size_t threads = 1, const std::string& host = "");
126128
// Internal functions, used by GenerateInfrastructure()
127129
static std::string createDispatcherName();
128130
};

Utilities/DataSampling/src/DataSampling.cxx

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ std::string DataSampling::createDispatcherName()
3636
return std::string("Dispatcher"); //_") + getenv("HOSTNAME");
3737
}
3838

39-
void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads)
39+
void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::string& policiesSource, size_t threads, const std::string& host)
4040
{
4141
std::unique_ptr<ConfigurationInterface> cfg = ConfigurationFactory::getConfiguration(policiesSource);
4242
if (cfg->getRecursive("").count("dataSamplingPolicies") == 0) {
@@ -45,26 +45,33 @@ void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const std::str
4545
}
4646
auto policiesTree = cfg->getRecursive("dataSamplingPolicies");
4747
Dispatcher dispatcher(createDispatcherName(), policiesSource);
48-
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads);
48+
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
4949
}
5050

51-
void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads)
51+
void DataSampling::GenerateInfrastructure(WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads, const std::string& host)
5252
{
5353
Dispatcher dispatcher(createDispatcherName(), "");
54-
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads);
54+
DataSampling::DoGenerateInfrastructure(dispatcher, workflow, policiesTree, threads, host);
5555
}
5656

57-
void DataSampling::DoGenerateInfrastructure(Dispatcher& dispatcher, WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads)
57+
void DataSampling::DoGenerateInfrastructure(Dispatcher& dispatcher, WorkflowSpec& workflow, const boost::property_tree::ptree& policiesTree, size_t threads, const std::string& host)
5858
{
5959
LOG(DEBUG) << "Generating Data Sampling infrastructure...";
6060

6161
for (auto&& policyConfig : policiesTree) {
6262

63-
std::unique_ptr<DataSamplingPolicy> policy;
64-
6563
// We don't want the Dispatcher to exit due to one faulty Policy
6664
try {
67-
dispatcher.registerPolicy(std::make_unique<DataSamplingPolicy>(DataSamplingPolicy::fromConfiguration(policyConfig.second)));
65+
auto policy = DataSamplingPolicy::fromConfiguration(policyConfig.second);
66+
std::vector<std::string> machines;
67+
if (policyConfig.second.count("machines") > 0) {
68+
for (const auto& machine : policyConfig.second.get_child("machines")) {
69+
machines.emplace_back(machine.second.get<std::string>(""));
70+
}
71+
}
72+
if (host.empty() || machines.empty() || std::find(machines.begin(), machines.end(), host) != machines.end()) {
73+
dispatcher.registerPolicy(std::make_unique<DataSamplingPolicy>(std::move(policy)));
74+
}
6875
} catch (const std::exception& ex) {
6976
LOG(WARN) << "Could not load the Data Sampling Policy '"
7077
<< policyConfig.second.get_optional<std::string>("id").value_or("") << "', because: " << ex.what();

Utilities/DataSampling/test/test_DataSampling.cxx

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,82 @@ BOOST_AUTO_TEST_CASE(MultinodeUtilities)
207207
auto machines = DataSampling::MachinesForPolicy(configFilePath, "tpcraw");
208208
BOOST_CHECK_EQUAL(machines.size(), 2);
209209
}
210+
{
211+
// empty host -> match any policy
212+
WorkflowSpec workflow;
213+
DataSampling::GenerateInfrastructure(workflow, configFilePath, 1, "");
214+
215+
auto disp = std::find_if(workflow.begin(), workflow.end(),
216+
[](const DataProcessorSpec& d) {
217+
return d.name.find("Dispatcher") != std::string::npos;
218+
});
219+
BOOST_REQUIRE(disp != workflow.end());
220+
221+
auto input1 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
222+
[](const InputSpec& in) {
223+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS"), 0) && in.lifetime == Lifetime::Timeframe;
224+
});
225+
BOOST_CHECK(input1 != disp->inputs.end());
226+
auto input2 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
227+
[](const InputSpec& in) {
228+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS_P"), 0) && in.lifetime == Lifetime::Timeframe;
229+
});
230+
BOOST_CHECK(input2 != disp->inputs.end());
231+
auto input3 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
232+
[](const InputSpec& in) {
233+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("RAWDATA"), 0) && in.lifetime == Lifetime::Timeframe;
234+
});
235+
BOOST_CHECK(input3 != disp->inputs.end());
236+
}
237+
{
238+
// mismatching host -> create only policies with empty machines list
239+
WorkflowSpec workflow;
240+
DataSampling::GenerateInfrastructure(workflow, configFilePath, 1, "mismatching host");
241+
242+
auto disp = std::find_if(workflow.begin(), workflow.end(),
243+
[](const DataProcessorSpec& d) {
244+
return d.name.find("Dispatcher") != std::string::npos;
245+
});
246+
BOOST_REQUIRE(disp != workflow.end());
247+
248+
auto input1 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
249+
[](const InputSpec& in) {
250+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS"), 0) && in.lifetime == Lifetime::Timeframe;
251+
});
252+
BOOST_CHECK(input1 != disp->inputs.end());
253+
auto input2 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
254+
[](const InputSpec& in) {
255+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS_P"), 0) && in.lifetime == Lifetime::Timeframe;
256+
});
257+
BOOST_CHECK(input2 != disp->inputs.end());
258+
}
259+
{
260+
// matching host -> create policies with empty machines list and the ones which match
261+
WorkflowSpec workflow;
262+
DataSampling::GenerateInfrastructure(workflow, configFilePath, 1, "machineA");
263+
264+
auto disp = std::find_if(workflow.begin(), workflow.end(),
265+
[](const DataProcessorSpec& d) {
266+
return d.name.find("Dispatcher") != std::string::npos;
267+
});
268+
BOOST_REQUIRE(disp != workflow.end());
269+
270+
auto input1 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
271+
[](const InputSpec& in) {
272+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS"), 0) && in.lifetime == Lifetime::Timeframe;
273+
});
274+
BOOST_CHECK(input1 != disp->inputs.end());
275+
auto input2 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
276+
[](const InputSpec& in) {
277+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("CLUSTERS_P"), 0) && in.lifetime == Lifetime::Timeframe;
278+
});
279+
BOOST_CHECK(input2 != disp->inputs.end());
280+
auto input3 = std::find_if(disp->inputs.begin(), disp->inputs.end(),
281+
[](const InputSpec& in) {
282+
return DataSpecUtils::match(in, DataOrigin("TPC"), DataDescription("RAWDATA"), 0) && in.lifetime == Lifetime::Timeframe;
283+
});
284+
BOOST_CHECK(input3 != disp->inputs.end());
285+
}
210286
}
211287

212288
BOOST_AUTO_TEST_CASE(DataSamplingEmptyConfig)

0 commit comments

Comments
 (0)