Skip to content

Commit 809b0c7

Browse files
committed
DPL: event driven FairMQ data loop with libuv
1 parent e4815db commit 809b0c7

10 files changed

Lines changed: 206 additions & 41 deletions

Framework/Core/include/Framework/AlgorithmSpec.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ struct AlgorithmSpec {
4545
using ProcessCallback = std::function<void(ProcessingContext&)>;
4646
using InitCallback = std::function<ProcessCallback(InitContext&)>;
4747
using ErrorCallback = std::function<void(ErrorContext&)>;
48+
49+
static AlgorithmSpec dummyAlgorithm()
50+
{
51+
return AlgorithmSpec{ProcessCallback{nullptr}};
52+
}
53+
4854
static ErrorCallback& emptyErrorCallback()
4955
{
5056
static ErrorCallback callback = nullptr;

Framework/Core/include/Framework/ChannelConfigurationPolicyHelpers.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ struct ChannelConfigurationPolicyHelpers {
5252
static InputChannelModifier reqInput;
5353
/// Makes the passed output channel bind and reply
5454
static OutputChannelModifier replyOutput;
55+
/// Makes the passed input channel connect and pair
56+
static InputChannelModifier pairInput;
57+
/// Makes the passed output channel bind and pair
58+
static OutputChannelModifier pairOutput;
5559
};
5660

5761
} // namespace framework

Framework/Core/include/Framework/ChannelSpec.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ enum struct ChannelType {
3030
Sub,
3131
Push,
3232
Pull,
33+
Pair
3334
};
3435

3536
/// The kind of backend to use for the channels

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class DataProcessingDevice : public FairMQDevice
5555
void SetErrorPolicy(enum TerminationPolicy policy) { mErrorPolicy = policy; }
5656

5757
protected:
58+
bool doRun();
5859
bool handleData(FairMQParts&, InputChannelInfo&);
5960
bool tryDispatchComputation(std::vector<DataRelayer::RecordAction>& completed);
6061
void error(const char* msg);
@@ -91,6 +92,7 @@ class DataProcessingDevice : public FairMQDevice
9192
int mCurrentBackoff = 0; /// The current exponential backoff value.
9293
std::vector<FairMQRegionInfo> mPendingRegionInfos; /// A list of the region infos not yet notified.
9394
enum TerminationPolicy mErrorPolicy = TerminationPolicy::WAIT; /// What to do when an error arises
95+
bool mWasActive = false; /// Whether or not the device was active at last iteration.
9496
};
9597

9698
} // namespace o2::framework

Framework/Core/include/Framework/DeviceState.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
#include <map>
1717
#include <utility>
1818

19+
typedef struct uv_loop_s uv_loop_t;
20+
1921
namespace o2::framework
2022
{
2123

@@ -35,6 +37,7 @@ struct DeviceState {
3537
std::vector<InputChannelInfo> inputChannelInfos;
3638
StreamingState streaming = StreamingState::Streaming;
3739
bool quitRequested = false;
40+
uv_loop_t* loop;
3841
};
3942

4043
} // namespace o2::framework

Framework/Core/src/ChannelConfigurationPolicyHelpers.cxx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,17 @@ ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPol
6161
channel.type = ChannelType::Push;
6262
};
6363

64+
ChannelConfigurationPolicyHelpers::InputChannelModifier ChannelConfigurationPolicyHelpers::pairInput =
65+
[](InputChannelSpec& channel) {
66+
channel.method = ChannelMethod::Connect;
67+
channel.type = ChannelType::Pair;
68+
};
69+
70+
ChannelConfigurationPolicyHelpers::OutputChannelModifier ChannelConfigurationPolicyHelpers::pairOutput =
71+
[](OutputChannelSpec& channel) {
72+
channel.method = ChannelMethod::Bind;
73+
channel.type = ChannelType::Pair;
74+
};
75+
6476
} // namespace framework
6577
} // namespace o2

Framework/Core/src/ChannelSpecHelpers.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ char const* ChannelSpecHelpers::typeAsString(enum ChannelType type)
2727
return "push";
2828
case ChannelType::Pull:
2929
return "pull";
30+
case ChannelType::Pair:
31+
return "pair";
3032
}
3133
throw std::runtime_error("Unknown ChannelType");
3234
}

0 commit comments

Comments
 (0)