Skip to content

Commit d6853b4

Browse files
committed
DPL: make sending policy per channel
This avoids having expendable / non-critical devices blocking the data flow since any device can now drop (non forwarded) data when sending to one. Notice that this means that non-critical devices must keep into account they might not see as much data as expected.
1 parent 4bd86e4 commit d6853b4

14 files changed

Lines changed: 119 additions & 47 deletions

Framework/Core/include/Framework/ChannelInfo.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,14 @@ struct InputChannelInfo {
6666
int pollerIndex = -1;
6767
};
6868

69+
struct SendingPolicy;
70+
6971
/// Output channel information
7072
struct OutputChannelInfo {
7173
std::string name = "invalid";
7274
ChannelAccountingType channelType = ChannelAccountingType::DPL;
7375
fair::mq::Channel& channel;
76+
SendingPolicy const* policy;
7477
};
7578

7679
struct OutputChannelState {

Framework/Core/include/Framework/DataProcessorMatchers.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,15 @@ struct DataProcessorSpec;
1919
struct DeviceSpec;
2020
struct ConfigContext;
2121

22+
// A matcher for a given DataProcessorSpec @p spec.
2223
using DataProcessorMatcher = std::function<bool(DataProcessorSpec const& spec)>;
24+
// A matcher which is specific to a given DeviceSpec. @a context is the ConfigContext associated with the topology.
2325
using DeviceMatcher = std::function<bool(DeviceSpec const& spec, ConfigContext const& context)>;
26+
// A matcher which is specific to a given edge between two DataProcessors, described
27+
// by @p source and @p dest. @p context is the ConfigContext associated with the topology.
28+
// NOTE: we use DataProcessorSpecs rather than devices, because when we assign the policy
29+
// we do not have all the devices yet.
30+
using EdgeMatcher = std::function<bool(DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const& context)>;
2431

2532
/// A set of helper to build policies that need to
2633
/// be applied based on some DataProcessorSpec property
@@ -34,5 +41,11 @@ struct DeviceMatchers {
3441
static DeviceMatcher matchByName(const char* name);
3542
};
3643

44+
struct EdgeMatchers {
45+
static EdgeMatcher matchSourceByName(const char* name);
46+
static EdgeMatcher matchDestByName(const char* name);
47+
static EdgeMatcher matchEndsByName(const char* sourceName, const char* destName);
48+
};
49+
3750
} // namespace o2::framework
3851
#endif // O2_FRAMEWORK_DATAPROCESSORMATCHER_H_

Framework/Core/include/Framework/DataSender.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
#define O2_FRAMEWORK_DATASENDER_H_
1313

1414
#include "Framework/RoutingIndices.h"
15-
#include "Framework/SendingPolicy.h"
15+
#include "Framework/FairMQDeviceProxy.h"
1616
#include "Framework/ServiceRegistryRef.h"
1717
#include "Framework/Tracing.h"
1818
#include "Framework/OutputSpec.h"
@@ -33,8 +33,7 @@ struct DeviceSpec;
3333
class DataSender
3434
{
3535
public:
36-
DataSender(ServiceRegistryRef registry,
37-
SendingPolicy const& policy);
36+
DataSender(ServiceRegistryRef registry);
3837
void send(fair::mq::Parts&, ChannelIndex index);
3938
std::unique_ptr<fair::mq::Message> create(RouteIndex index);
4039
/// Reset the datasender to a clean state
@@ -55,7 +54,6 @@ class DataSender
5554
ServiceRegistryRef mRegistry;
5655
DeviceSpec const& mSpec;
5756
std::vector<OutputSpec> mOutputs;
58-
SendingPolicy mPolicy;
5957
std::vector<size_t> mDistinctRoutesIndex;
6058

6159
std::vector<std::string> mMetricsNames;

Framework/Core/include/Framework/OutputRoute.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,17 @@
1818
namespace o2::framework
1919
{
2020

21+
struct SendingPolicy;
22+
2123
// This uniquely identifies a route out of the device if
2224
// the OutputSpec @a matcher and @a timeslice match.
2325
struct OutputRoute {
2426
size_t timeslice;
2527
size_t maxTimeslices;
2628
OutputSpec matcher;
2729
std::string channel;
30+
// The policy to use to send to on this route.
31+
SendingPolicy const* policy;
2832
};
2933

3034
} // namespace o2::framework

Framework/Core/include/Framework/SendingPolicy.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222
namespace o2::framework
2323
{
2424

25-
class ServiceRegistry;
26-
2725
class FairMQDeviceProxy;
2826

2927
struct SendingPolicy {
30-
using SendingCallback = std::function<void(FairMQDeviceProxy&, fair::mq::Parts&, ChannelIndex channelIndex, ServiceRegistryRef registry)>;
28+
using SendingCallback = std::function<void(fair::mq::Parts&, ChannelIndex channelIndex, ServiceRegistryRef registry)>;
3129
std::string name = "invalid";
32-
DeviceMatcher matcher = nullptr;
30+
EdgeMatcher matcher = nullptr;
3331
SendingCallback send = nullptr;
3432
static std::vector<SendingPolicy> createDefaultPolicies();
3533
};

Framework/Core/src/CommonServices.cxx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,9 +341,8 @@ o2::framework::ServiceSpec CommonServices::dataSender()
341341
return ServiceSpec{
342342
.name = "datasender",
343343
.init = [](ServiceRegistryRef services, DeviceState&, fair::mq::ProgOptions& options) -> ServiceHandle {
344-
auto& spec = services.get<DeviceSpec const>();
345344
return ServiceHandle{TypeIdHelpers::uniqueId<DataSender>(),
346-
new DataSender(services, spec.sendingPolicy)};
345+
new DataSender(services)};
347346
},
348347
.configure = noConfiguration(),
349348
.preProcessing = [](ProcessingContext&, void* service) {

Framework/Core/src/DataProcessorMatchers.cxx

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,25 @@ DeviceMatcher DeviceMatchers::matchByName(char const* name_)
2828
return spec.name == name;
2929
};
3030
}
31+
32+
EdgeMatcher EdgeMatchers::matchSourceByName(char const* name_)
33+
{
34+
return [name = std::string(name_)](DataProcessorSpec const& source, DataProcessorSpec const&, ConfigContext const&) {
35+
return source.name == name;
36+
};
37+
}
38+
39+
EdgeMatcher EdgeMatchers::matchDestByName(char const* name_)
40+
{
41+
return [name = std::string(name_)](DataProcessorSpec const&, DataProcessorSpec const& dest, ConfigContext const&) {
42+
return dest.name == name;
43+
};
44+
}
45+
46+
EdgeMatcher EdgeMatchers::matchEndsByName(char const* source_, char const* dest_)
47+
{
48+
return [sourceName = std::string(source_), destName = std::string(dest_)](DataProcessorSpec const& source, DataProcessorSpec const& dest, ConfigContext const&) {
49+
return source.name == sourceName && dest.name == destName;
50+
};
51+
}
3152
} // namespace o2::framework

Framework/Core/src/DataSender.cxx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,10 @@ std::vector<size_t>
4545
}
4646
} // namespace
4747

48-
DataSender::DataSender(ServiceRegistryRef registry,
49-
SendingPolicy const& policy)
48+
DataSender::DataSender(ServiceRegistryRef registry)
5049
: mProxy{registry.get<FairMQDeviceProxy>()},
5150
mRegistry{registry},
5251
mSpec{registry.get<DeviceSpec const>()},
53-
mPolicy{policy},
5452
mDistinctRoutesIndex{createDistinctOutputRouteIndex(mSpec.outputs)}
5553
{
5654
std::scoped_lock<LockableBase(std::recursive_mutex)> lock(mMutex);
@@ -96,7 +94,8 @@ DataSender::DataSender(ServiceRegistryRef registry,
9694

9795
std::unique_ptr<fair::mq::Message> DataSender::create(RouteIndex routeIndex)
9896
{
99-
return mProxy.getOutputTransport(routeIndex)->CreateMessage();
97+
auto& proxy = mRegistry.get<FairMQDeviceProxy>();
98+
return proxy.getOutputTransport(routeIndex)->CreateMessage();
10099
}
101100

102101
void DataSender::send(fair::mq::Parts& parts, ChannelIndex channelIndex)
@@ -107,7 +106,8 @@ void DataSender::send(fair::mq::Parts& parts, ChannelIndex channelIndex)
107106
}
108107
auto& dataProcessorContext = mRegistry.get<DataProcessorContext>();
109108
dataProcessorContext.preSendingMessagesCallbacks(mRegistry, parts, channelIndex);
110-
mPolicy.send(mProxy, parts, channelIndex, mRegistry);
109+
auto& info = mProxy.getOutputChannelInfo(channelIndex);
110+
info.policy->send(parts, channelIndex, mRegistry);
111111
}
112112

113113
void DataSender::reset()

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,8 @@ void DeviceSpecHelpers::validate(std::vector<DataProcessorSpec> const& workflow)
477477
}
478478
}
479479

480-
void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
480+
void DeviceSpecHelpers::processOutEdgeActions(ConfigContext const& configContext,
481+
std::vector<DeviceSpec>& devices,
481482
std::vector<DeviceId>& deviceIndex,
482483
std::vector<DeviceConnectionId>& connections,
483484
ResourceManager& resourceManager,
@@ -486,6 +487,7 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
486487
const std::vector<EdgeAction>& actions, const WorkflowSpec& workflow,
487488
const std::vector<OutputSpec>& outputsMatchers,
488489
const std::vector<ChannelConfigurationPolicy>& channelPolicies,
490+
const std::vector<SendingPolicy>& sendingPolicies,
489491
std::string const& channelPrefix,
490492
ComputingOffer const& defaultOffer,
491493
OverrideServiceSpecs const& overrideServices)
@@ -647,7 +649,7 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
647649
// whether this is a real OutputRoute or if it's a forward from
648650
// a previous consumer device.
649651
// FIXME: where do I find the InputSpec for the forward?
650-
auto appendOutputRouteToSourceDeviceChannel = [&outputsMatchers, &workflow, &devices, &logicalEdges](
652+
auto appendOutputRouteToSourceDeviceChannel = [&outputsMatchers, &workflow, &devices, &logicalEdges, &sendingPolicies, &configContext](
651653
size_t ei, size_t di, size_t ci) {
652654
assert(ei < logicalEdges.size());
653655
assert(di < devices.size());
@@ -656,15 +658,27 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
656658
auto& device = devices[di];
657659
assert(edge.consumer < workflow.size());
658660
auto& consumer = workflow[edge.consumer];
661+
auto& producer = workflow[edge.producer];
659662
auto& channel = devices[di].outputChannels[ci];
660663
assert(edge.outputGlobalIndex < outputsMatchers.size());
664+
// Iterate over all the policies and apply the first one that matches.
665+
SendingPolicy const* policyPtr = nullptr;
666+
for (auto& policy : sendingPolicies) {
667+
if (policy.matcher(producer, consumer, configContext)) {
668+
policyPtr = &policy;
669+
break;
670+
}
671+
}
672+
assert(policyPtr != nullptr);
661673

662674
if (edge.isForward == false) {
663675
OutputRoute route{
664676
edge.timeIndex,
665677
consumer.maxInputTimeslices,
666678
outputsMatchers[edge.outputGlobalIndex],
667-
channel.name};
679+
channel.name,
680+
policyPtr,
681+
};
668682
device.outputs.emplace_back(route);
669683
} else {
670684
ForwardRoute route{
@@ -1088,8 +1102,8 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
10881102
defaultOffer.cpu /= deviceCount + 1;
10891103
defaultOffer.memory /= deviceCount + 1;
10901104

1091-
processOutEdgeActions(devices, deviceIndex, connections, resourceManager, outEdgeIndex, logicalEdges,
1092-
outActions, workflow, outputs, channelPolicies, channelPrefix, defaultOffer, overrideServices);
1105+
processOutEdgeActions(configContext, devices, deviceIndex, connections, resourceManager, outEdgeIndex, logicalEdges,
1106+
outActions, workflow, outputs, channelPolicies, sendingPolicies, channelPrefix, defaultOffer, overrideServices);
10931107

10941108
// FIXME: is this not the case???
10951109
std::sort(connections.begin(), connections.end());
@@ -1117,12 +1131,6 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
11171131
break;
11181132
}
11191133
}
1120-
for (auto& policy : sendingPolicies) {
1121-
if (policy.matcher(device, configContext) == true) {
1122-
device.sendingPolicy = policy;
1123-
break;
1124-
}
1125-
}
11261134
bool hasPolicy = false;
11271135
for (auto& policy : resourcePolicies) {
11281136
if (policy.matcher(device) == true) {

Framework/Core/src/DeviceSpecHelpers.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ struct DeviceSpecHelpers {
138138
/// to the outgoing edges i.e. those which refer
139139
/// to the act of producing data.
140140
static void processOutEdgeActions(
141+
ConfigContext const& configContext,
141142
std::vector<DeviceSpec>& devices,
142143
std::vector<DeviceId>& deviceIndex,
143144
std::vector<DeviceConnectionId>& connections,
@@ -148,6 +149,7 @@ struct DeviceSpecHelpers {
148149
const WorkflowSpec& workflow,
149150
const std::vector<OutputSpec>& outputs,
150151
std::vector<ChannelConfigurationPolicy> const& channelPolicies,
152+
std::vector<SendingPolicy> const& sendingPolicies,
151153
std::string const& channelPrefix,
152154
ComputingOffer const& defaultOffer,
153155
OverrideServiceSpecs const& overrideServices = {});

0 commit comments

Comments
 (0)