@@ -397,10 +397,10 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
397397
398398namespace
399399{
400- // Decide where to sent the output. Everything to "downstream" for now
401- std::string decideChannel (InputSpec const &)
400+ // Decide where to sent the output. Everything to "downstream" if there is such a channel.
401+ std::string decideChannel (InputSpec const & input, const std::unordered_map<std::string, std::vector<FairMQChannel>>& channels )
402402{
403- return " downstream" ;
403+ return channels. count ( " downstream" ) ? " downstream " : input. binding ;
404404}
405405} // namespace
406406
@@ -420,7 +420,7 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
420420 auto channelConfigurationChecker = [inputSpecs = std::move (inputSpecs), device]() {
421421 LOG (INFO) << " checking channel configuration" ;
422422 for (auto const & spec : inputSpecs) {
423- auto channel = decideChannel (spec);
423+ auto channel = decideChannel (spec, device-> fChannels );
424424 if (device->fChannels .count (channel) == 0 ) {
425425 throw std::runtime_error (" no corresponding output channel found for input '" + channel + " '" );
426426 }
@@ -453,7 +453,7 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
453453 }
454454 size_t payloadMsgSize = dh->payloadSize ;
455455
456- auto channel = decideChannel (*first.spec );
456+ auto channel = decideChannel (*first.spec , device. fChannels );
457457 auto headerMessage = device.NewMessageFor (channel, index, headerMsgSize);
458458 memcpy (headerMessage->GetData (), part.header , headerMsgSize);
459459 auto payloadMessage = device.NewMessageFor (channel, index, payloadMsgSize);
0 commit comments