Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Framework/Core/src/CommonDataProcessors.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ DataProcessorSpec CommonDataProcessors::getGlobalFairMQSink(std::vector<InputSpe
// vectored options
// use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
OutputChannelSpec externalChannelSpec;
externalChannelSpec.name = "output_0_0";
externalChannelSpec.name = "downstream";
externalChannelSpec.type = ChannelType::Push;
externalChannelSpec.method = ChannelMethod::Bind;
externalChannelSpec.hostname = "localhost";
Expand Down
26 changes: 19 additions & 7 deletions Framework/Core/src/ExternalFairMQDeviceProxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,15 @@ DataProcessorSpec specifyExternalFairMQDeviceProxy(char const* name,
return spec;
}

namespace
{
// Decide where to sent the output. Everything to "downstream" for now
std::string decideChannel(InputSpec const&)
{
return "downstream";
}
} // namespace

DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
Inputs const& inputSpecs,
const char* defaultChannelConfig)
Expand All @@ -411,8 +420,9 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
auto channelConfigurationChecker = [inputSpecs = std::move(inputSpecs), device]() {
LOG(INFO) << "checking channel configuration";
for (auto const& spec : inputSpecs) {
if (device->fChannels.count(spec.binding) == 0) {
throw std::runtime_error("no corresponding output channel found for input '" + spec.binding + "'");
auto channel = decideChannel(spec);
if (device->fChannels.count(channel) == 0) {
throw std::runtime_error("no corresponding output channel found for input '" + channel + "'");
}
}
};
Expand Down Expand Up @@ -443,12 +453,13 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
}
size_t payloadMsgSize = dh->payloadSize;

auto headerMessage = device.NewMessageFor(first.spec->binding, index, headerMsgSize);
auto channel = decideChannel(*first.spec);
auto headerMessage = device.NewMessageFor(channel, index, headerMsgSize);
memcpy(headerMessage->GetData(), part.header, headerMsgSize);
auto payloadMessage = device.NewMessageFor(first.spec->binding, index, payloadMsgSize);
auto payloadMessage = device.NewMessageFor(channel, index, payloadMsgSize);
memcpy(payloadMessage->GetData(), part.payload, payloadMsgSize);
outputs[first.spec->binding].AddPart(std::move(headerMessage));
outputs[first.spec->binding].AddPart(std::move(payloadMessage));
outputs[channel].AddPart(std::move(headerMessage));
outputs[channel].AddPart(std::move(payloadMessage));
}
}
for (auto& [channelName, channelParts] : outputs) {
Expand All @@ -461,7 +472,8 @@ DataProcessorSpec specifyFairMQDeviceOutputProxy(char const* name,
});
const char* d = strdup(((std::string(defaultChannelConfig).find("name=") == std::string::npos ? (std::string("name=") + name + ",") : "") + std::string(defaultChannelConfig)).c_str());
spec.options = {
ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}}};
ConfigParamSpec{"channel-config", VariantType::String, d, {"Out-of-band channel config"}},
};

return spec;
}
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/test/test_ExternalFairMQDeviceWorkflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)

// use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
OutputChannelSpec externalChannelSpec;
// Note: the name has to match the binding of the input spec
externalChannelSpec.name = "external";
// Note: the name is hardcoded for now
externalChannelSpec.name = "downstream";
externalChannelSpec.type = ChannelType::Push;
externalChannelSpec.method = ChannelMethod::Bind;
externalChannelSpec.hostname = "localhost";
Expand Down
2 changes: 1 addition & 1 deletion Framework/Utils/src/dpl-output-proxy.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& config)
// vectored options
// use the OutputChannelSpec as a tool to create the default configuration for the out-of-band channel
OutputChannelSpec externalChannelSpec;
externalChannelSpec.name = inputs[0].binding;
externalChannelSpec.name = "downstream";
externalChannelSpec.type = ChannelType::Push;
externalChannelSpec.method = ChannelMethod::Bind;
externalChannelSpec.hostname = "localhost";
Expand Down