@@ -477,7 +477,8 @@ void DeviceSpecHelpers::validate(std::vector<DataProcessorSpec> const& workflow)
477477 }
478478}
479479
480- void DeviceSpecHelpers::processOutEdgeActions (std::vector<DeviceSpec>& devices,
480+ void DeviceSpecHelpers::processOutEdgeActions (ConfigContext const & configContext,
481+ std::vector<DeviceSpec>& devices,
481482 std::vector<DeviceId>& deviceIndex,
482483 std::vector<DeviceConnectionId>& connections,
483484 ResourceManager& resourceManager,
@@ -486,6 +487,7 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
486487 const std::vector<EdgeAction>& actions, const WorkflowSpec& workflow,
487488 const std::vector<OutputSpec>& outputsMatchers,
488489 const std::vector<ChannelConfigurationPolicy>& channelPolicies,
490+ const std::vector<SendingPolicy>& sendingPolicies,
489491 std::string const & channelPrefix,
490492 ComputingOffer const & defaultOffer,
491493 OverrideServiceSpecs const & overrideServices)
@@ -647,7 +649,7 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
647649 // whether this is a real OutputRoute or if it's a forward from
648650 // a previous consumer device.
649651 // FIXME: where do I find the InputSpec for the forward?
650- auto appendOutputRouteToSourceDeviceChannel = [&outputsMatchers, &workflow, &devices, &logicalEdges](
652+ auto appendOutputRouteToSourceDeviceChannel = [&outputsMatchers, &workflow, &devices, &logicalEdges, &sendingPolicies, &configContext ](
651653 size_t ei, size_t di, size_t ci) {
652654 assert (ei < logicalEdges.size ());
653655 assert (di < devices.size ());
@@ -656,15 +658,27 @@ void DeviceSpecHelpers::processOutEdgeActions(std::vector<DeviceSpec>& devices,
656658 auto & device = devices[di];
657659 assert (edge.consumer < workflow.size ());
658660 auto & consumer = workflow[edge.consumer ];
661+ auto & producer = workflow[edge.producer ];
659662 auto & channel = devices[di].outputChannels [ci];
660663 assert (edge.outputGlobalIndex < outputsMatchers.size ());
664+ // Iterate over all the policies and apply the first one that matches.
665+ SendingPolicy const * policyPtr = nullptr ;
666+ for (auto & policy : sendingPolicies) {
667+ if (policy.matcher (producer, consumer, configContext)) {
668+ policyPtr = &policy;
669+ break ;
670+ }
671+ }
672+ assert (policyPtr != nullptr );
661673
662674 if (edge.isForward == false ) {
663675 OutputRoute route{
664676 edge.timeIndex ,
665677 consumer.maxInputTimeslices ,
666678 outputsMatchers[edge.outputGlobalIndex ],
667- channel.name };
679+ channel.name ,
680+ policyPtr,
681+ };
668682 device.outputs .emplace_back (route);
669683 } else {
670684 ForwardRoute route{
@@ -1088,8 +1102,8 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
10881102 defaultOffer.cpu /= deviceCount + 1 ;
10891103 defaultOffer.memory /= deviceCount + 1 ;
10901104
1091- processOutEdgeActions (devices, deviceIndex, connections, resourceManager, outEdgeIndex, logicalEdges,
1092- outActions, workflow, outputs, channelPolicies, channelPrefix, defaultOffer, overrideServices);
1105+ processOutEdgeActions (configContext, devices, deviceIndex, connections, resourceManager, outEdgeIndex, logicalEdges,
1106+ outActions, workflow, outputs, channelPolicies, sendingPolicies, channelPrefix, defaultOffer, overrideServices);
10931107
10941108 // FIXME: is this not the case???
10951109 std::sort (connections.begin (), connections.end ());
@@ -1117,12 +1131,6 @@ void DeviceSpecHelpers::dataProcessorSpecs2DeviceSpecs(const WorkflowSpec& workf
11171131 break ;
11181132 }
11191133 }
1120- for (auto & policy : sendingPolicies) {
1121- if (policy.matcher (device, configContext) == true ) {
1122- device.sendingPolicy = policy;
1123- break ;
1124- }
1125- }
11261134 bool hasPolicy = false ;
11271135 for (auto & policy : resourcePolicies) {
11281136 if (policy.matcher (device) == true ) {
0 commit comments