Skip to content

Commit adfb261

Browse files
Introducing configurable dispatch policy for devices
`DispatchPolicy` describes on per-device basis when completed objects are dispatched for sending, either at the end of computation - the default and current behavior - or immediately when ready. Depending on the `DispatchPolicy` operation, the `DataProcessingDevice` configures the `MessageContext` with a dispatch callback. The dispatch policy can be customized in the usual DPL manner.
1 parent 9e12ed8 commit adfb261

9 files changed

Lines changed: 153 additions & 12 deletions

File tree

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ o2_add_library(Framework
2828
src/CommonDataProcessors.cxx
2929
src/CompletionPolicy.cxx
3030
src/CompletionPolicyHelpers.cxx
31+
src/DispatchPolicy.cxx
3132
src/ConfigParamsHelper.cxx
3233
src/DDSConfigHelpers.cxx
3334
src/DataAllocator.cxx

Framework/Core/include/Framework/DeviceSpec.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "Framework/InputRoute.h"
2121
#include "Framework/OutputRoute.h"
2222
#include "Framework/CompletionPolicy.h"
23+
#include "Framework/DispatchPolicy.h"
2324

2425
#include <vector>
2526
#include <string>
@@ -51,6 +52,7 @@ struct DeviceSpec {
5152
size_t inputTimesliceId;
5253
/// The completion policy to use for this device.
5354
CompletionPolicy completionPolicy;
55+
DispatchPolicy dispatchPolicy;
5456
};
5557

5658
} // namespace framework
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
#ifndef FRAMEWORK_DISPATCHPOLICY_H
11+
#define FRAMEWORK_DISPATCHPOLICY_H
12+
13+
#include "Framework/PartRef.h"
14+
15+
#include <functional>
16+
#include <string>
17+
#include <vector>
18+
19+
namespace o2
20+
{
21+
namespace framework
22+
{
23+
24+
struct DeviceSpec;
25+
struct Output;
26+
27+
/// Policy to describe when to dispatch objects
28+
/// As for now we describe this policy per device, however it can be extended
29+
/// to match on specific outputs of the device.
30+
struct DispatchPolicy {
31+
/// Action to take whenever an object in the output gets ready:
32+
///
33+
enum struct DispatchOp {
34+
/// Dispatch objects when the calculation ends, this means the devices will
35+
/// send messages from all contextes in one bulk after computation
36+
AfterComputation,
37+
/// Dispatch the object when it becomes ready, i.e. when it goes out of the
38+
/// scope of the user code and no changes to the object are possible
39+
WhenReady,
40+
};
41+
42+
using DeviceMatcher = std::function<bool(DeviceSpec const& device)>;
43+
// OutputMatcher can be a later extension, but not expected to be of high priority
44+
using OutputMatcher = std::function<bool(Output const&)>;
45+
46+
/// Name of the policy itself.
47+
std::string name;
48+
/// Callback to be used to understand if the policy should apply
49+
/// to the given device.
50+
DeviceMatcher deviceMatcher;
51+
/// the action to be used for matched devices
52+
DispatchOp action = DispatchOp::AfterComputation;
53+
54+
/// Helper to create the default configuration.
55+
static std::vector<DispatchPolicy> createDefaultPolicies();
56+
};
57+
58+
std::ostream& operator<<(std::ostream& oss, DispatchPolicy::DispatchOp const& val);
59+
60+
} // namespace framework
61+
} // namespace o2
62+
63+
#endif // FRAMEWORK_DISPATCHPOLICY_H

Framework/Core/include/Framework/runDataProcessing.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "Framework/ChannelConfigurationPolicy.h"
1414
#include "Framework/CompletionPolicy.h"
15+
#include "Framework/DispatchPolicy.h"
1516
#include "Framework/ConfigParamsHelper.h"
1617
#include "Framework/DataProcessorSpec.h"
1718
#include "Framework/WorkflowSpec.h"
@@ -68,6 +69,7 @@ o2::framework::WorkflowSpec defineDataProcessing(o2::framework::ConfigContext co
6869
void defaultConfiguration(std::vector<o2::framework::ChannelConfigurationPolicy>& channelPolicies) {}
6970
void defaultConfiguration(std::vector<o2::framework::ConfigParamSpec> &globalWorkflowOptions) {}
7071
void defaultConfiguration(std::vector<o2::framework::CompletionPolicy> &completionPolicies) {}
72+
void defaultConfiguration(std::vector<o2::framework::DispatchPolicy>& dispatchPolicies) {}
7173
void defaultConfiguration(o2::framework::OnWorkflowTerminationHook& hook)
7274
{
7375
hook = [](const char*) {};
@@ -91,9 +93,10 @@ struct UserCustomizationsHelper {
9193
// This comes from the framework itself. This way we avoid code duplication.
9294
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& specs,
9395
std::vector<o2::framework::ChannelConfigurationPolicy> const& channelPolicies,
94-
std::vector<o2::framework::CompletionPolicy> const &completionPolicies,
95-
std::vector<o2::framework::ConfigParamSpec> const &workflowOptions,
96-
o2::framework::ConfigContext &configContext);
96+
std::vector<o2::framework::CompletionPolicy> const& completionPolicies,
97+
std::vector<o2::framework::DispatchPolicy> const& dispatchPolicies,
98+
std::vector<o2::framework::ConfigParamSpec> const& workflowOptions,
99+
o2::framework::ConfigContext& configContext);
97100

98101
int main(int argc, char** argv)
99102
{
@@ -118,11 +121,16 @@ int main(int argc, char** argv)
118121
auto defaultCompletionPolicies = CompletionPolicy::createDefaultPolicies();
119122
completionPolicies.insert(std::end(completionPolicies), std::begin(defaultCompletionPolicies), std::end(defaultCompletionPolicies));
120123

124+
std::vector<DispatchPolicy> dispatchPolicies;
125+
UserCustomizationsHelper::userDefinedCustomization(dispatchPolicies, 0);
126+
auto defaultDispatchPolicies = DispatchPolicy::createDefaultPolicies();
127+
dispatchPolicies.insert(std::end(dispatchPolicies), std::begin(defaultDispatchPolicies), std::end(defaultDispatchPolicies));
128+
121129
std::unique_ptr<ParamRetriever> retriever{ new BoostOptionsRetriever(workflowOptions, true, argc, argv) };
122130
ConfigParamRegistry workflowOptionsRegistry(std::move(retriever));
123131
ConfigContext configContext{ workflowOptionsRegistry };
124132
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
125-
result = doMain(argc, argv, specs, channelPolicies, completionPolicies, workflowOptions, configContext);
133+
result = doMain(argc, argv, specs, channelPolicies, completionPolicies, dispatchPolicies, workflowOptions, configContext);
126134
} catch (std::exception const& error) {
127135
LOG(ERROR) << "error while setting up workflow: " << error.what();
128136
} catch (...) {

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,8 +565,9 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
565565
void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(WorkflowSpec const& workflow,
566566
std::vector<ChannelConfigurationPolicy> const& channelPolicies,
567567
std::vector<CompletionPolicy> const& completionPolicies,
568+
std::vector<DispatchPolicy> const& dispatchPolicies,
568569
std::vector<DeviceSpec>& devices,
569-
std::vector<ComputingResource> &resources)
570+
std::vector<ComputingResource>& resources)
570571
{
571572

572573
std::vector<LogicalForwardInfo> availableForwardsInfo;
@@ -619,6 +620,12 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(WorkflowSpec const& workf
619620
break;
620621
}
621622
}
623+
for (auto& policy : dispatchPolicies) {
624+
if (policy.deviceMatcher(device) == true) {
625+
device.dispatchPolicy = policy;
626+
break;
627+
}
628+
}
622629
}
623630
}
624631

Framework/Core/src/DeviceSpecHelpers.h

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "Framework/DataProcessorSpec.h"
1616
#include "Framework/ChannelSpec.h"
1717
#include "Framework/CompletionPolicy.h"
18+
#include "Framework/DispatchPolicy.h"
1819
#include "Framework/DeviceControl.h"
1920
#include "Framework/DeviceExecution.h"
2021
#include "Framework/DeviceSpec.h"
@@ -39,12 +40,23 @@ struct DeviceSpecHelpers {
3940
/// Helper to convert from an abstract dataflow specification, @a workflow,
4041
/// to an actual set of devices which will have to run.
4142
static void dataProcessorSpecs2DeviceSpecs(
42-
const WorkflowSpec &workflow,
43-
std::vector<ChannelConfigurationPolicy> const &channelPolicies,
44-
std::vector<CompletionPolicy> const &completionPolicies,
45-
std::vector<DeviceSpec> &devices,
46-
std::vector<ComputingResource> &resources
47-
);
43+
const WorkflowSpec& workflow,
44+
std::vector<ChannelConfigurationPolicy> const& channelPolicies,
45+
std::vector<CompletionPolicy> const& completionPolicies,
46+
std::vector<DispatchPolicy> const& dispatchPolicies,
47+
std::vector<DeviceSpec>& devices,
48+
std::vector<ComputingResource>& resources);
49+
50+
static void dataProcessorSpecs2DeviceSpecs(
51+
const WorkflowSpec& workflow,
52+
std::vector<ChannelConfigurationPolicy> const& channelPolicies,
53+
std::vector<CompletionPolicy> const& completionPolicies,
54+
std::vector<DeviceSpec>& devices,
55+
std::vector<ComputingResource>& resources)
56+
{
57+
std::vector<DispatchPolicy> dispatchPolicies = DispatchPolicy::createDefaultPolicies();
58+
dataProcessorSpecs2DeviceSpecs(workflow, channelPolicies, completionPolicies, dispatchPolicies, devices, resources);
59+
}
4860

4961
/// Helper to prepare the arguments which will be used to
5062
/// start the various devices.
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
#include "Framework/DispatchPolicy.h"
12+
#include "Framework/DeviceSpec.h"
13+
#include <functional>
14+
#include <iostream>
15+
16+
namespace o2
17+
{
18+
namespace framework
19+
{
20+
21+
/// By default the DispatchPolicy matches any Device and messages are sent
22+
/// after computation
23+
std::vector<DispatchPolicy> DispatchPolicy::createDefaultPolicies()
24+
{
25+
return { DispatchPolicy{ "dispatch-all-after-computation", [](DeviceSpec const&) { return true; }, DispatchPolicy::DispatchOp::AfterComputation } };
26+
}
27+
28+
std::ostream& operator<<(std::ostream& oss, DispatchPolicy::DispatchOp const& val)
29+
{
30+
switch (val) {
31+
case DispatchPolicy::DispatchOp::AfterComputation:
32+
oss << "after computation";
33+
break;
34+
case DispatchPolicy::DispatchOp::WhenReady:
35+
oss << "when ready";
36+
break;
37+
};
38+
return oss;
39+
}
40+
41+
} // namespace framework
42+
} // namespace o2

Framework/Core/src/DriverInfo.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,12 @@ struct DriverInfo {
9898
/// Since they are decided by the toplevel configuration, they belong
9999
/// to the driver process.
100100
std::vector<ChannelConfigurationPolicy> channelPolicies;
101-
/// These are the policies which can be applied to decide wether or not
101+
/// These are the policies which can be applied to decide whether or not
102102
/// a given record is complete.
103103
std::vector<CompletionPolicy> completionPolicies;
104+
/// These are the policies which can be applied to decide when complete
105+
/// objects/messages are sent out
106+
std::vector<DispatchPolicy> dispatchPolicies;
104107
/// The argc with which the driver was started.
105108
int argc;
106109
/// The argv with which the driver was started.

Framework/Core/src/runDataProcessing.cxx

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
823823
DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(workflow,
824824
driverInfo.channelPolicies,
825825
driverInfo.completionPolicies,
826+
driverInfo.dispatchPolicies,
826827
deviceSpecs,
827828
resources);
828829
// This should expand nodes so that we can build a consistent DAG.
@@ -1158,6 +1159,7 @@ void initialiseDriverControl(bpo::variables_map const& varmap,
11581159
int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
11591160
std::vector<ChannelConfigurationPolicy> const& channelPolicies,
11601161
std::vector<CompletionPolicy> const& completionPolicies,
1162+
std::vector<DispatchPolicy> const& dispatchPolicies,
11611163
std::vector<ConfigParamSpec> const& currentWorkflowOptions,
11621164
o2::framework::ConfigContext& configContext)
11631165
{
@@ -1285,6 +1287,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
12851287
driverInfo.sigchldRequested = false;
12861288
driverInfo.channelPolicies = channelPolicies;
12871289
driverInfo.completionPolicies = completionPolicies;
1290+
driverInfo.dispatchPolicies = dispatchPolicies;
12881291
driverInfo.argc = argc;
12891292
driverInfo.argv = argv;
12901293
driverInfo.batch = varmap["batch"].as<bool>();

0 commit comments

Comments
 (0)