Skip to content

Commit 9b7f1f7

Browse files
authored
DPL: add Lifetime::Signal to trigger processing on a signal (#4633)
1 parent ade60fa commit 9b7f1f7

6 files changed

Lines changed: 52 additions & 12 deletions

File tree

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ class DataProcessingDevice : public FairMQDevice
133133
std::mutex mRegionInfoMutex;
134134
enum TerminationPolicy mErrorPolicy = TerminationPolicy::WAIT; /// What to do when an error arises
135135
bool mWasActive = false; /// Whether or not the device was active at last iteration.
136+
bool mFirst = true; /// Whether or not this was the first iteration
136137
};
137138

138139
} // namespace o2::framework

Framework/Core/include/Framework/DeviceState.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
typedef struct uv_loop_s uv_loop_t;
2020
typedef struct uv_timer_s uv_timer_t;
2121
typedef struct uv_poll_s uv_poll_t;
22+
typedef struct uv_signal_s uv_signal_t;
2223

2324
namespace o2::framework
2425
{
@@ -47,6 +48,8 @@ struct DeviceState {
4748
std::vector<uv_poll_t*> activeInputPollers;
4849
// The list of pollers for active output channels
4950
std::vector<uv_poll_t*> activeOutputPollers;
51+
/// The list of active signal handlers
52+
std::vector<uv_signal_t*> activeSignals;
5053
};
5154

5255
} // namespace o2::framework

Framework/Core/include/Framework/Lifetime.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#ifndef FRAMEWORK_LIFETIME_H
11-
#define FRAMEWORK_LIFETIME_H
10+
#ifndef O2_FRAMEWORK_LIFETIME_H_
11+
#define O2_FRAMEWORK_LIFETIME_H_
1212

13-
namespace o2
14-
{
15-
namespace framework
13+
namespace o2::framework
1614
{
1715

1816
/// Possible Lifetime of objects being exchanged by the DPL.
@@ -23,9 +21,9 @@ enum struct Lifetime {
2321
QA,
2422
Transient,
2523
Timer,
26-
Enumeration
24+
Enumeration,
25+
Signal
2726
};
2827

29-
} // namespace framework
30-
} // namespace o2
31-
#endif
28+
} // namespace o2::framework
29+
#endif // O2_FRAMEWORK_LIFETIME_H_

Framework/Core/src/DataProcessingDevice.cxx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,9 +338,9 @@ void DataProcessingDevice::InitTask()
338338
mState.activeInputPollers.push_back(poller);
339339
}
340340
// In case we do not have any input channel and we do not have
341-
// any timers, we still wake up whenever we can send data to downstream
341+
// any timers or signal watchers we still wake up whenever we can send data to downstream
342342
// devices to allow for enumerations.
343-
if (mState.activeInputPollers.empty() && mState.activeTimers.empty()) {
343+
if (mState.activeInputPollers.empty() && mState.activeTimers.empty() && mState.activeSignals.empty()) {
344344
for (auto& x : fChannels) {
345345
if (x.first.rfind("from_internal-dpl", 0) == 0) {
346346
LOG(debug) << x.first << " is an internal channel. Not polling." << std::endl;
@@ -432,7 +432,8 @@ bool DataProcessingDevice::ConditionalRun()
432432
if (mState.loop) {
433433
ZoneScopedN("uv idle");
434434
TracyPlot("past activity", (int64_t)mWasActive);
435-
uv_run(mState.loop, mWasActive && (mDataProcessorContexes.at(0).state->streaming != StreamingState::Idle) ? UV_RUN_NOWAIT : UV_RUN_ONCE);
435+
uv_run(mState.loop, (mFirst == true) || (mWasActive && (mDataProcessorContexes.at(0).state->streaming != StreamingState::Idle) && (mState.activeSignals.empty())) ? UV_RUN_NOWAIT : UV_RUN_ONCE);
436+
mFirst = false;
436437
}
437438

438439
// Notify on the main thread the new region callbacks, making sure

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939

4040
#include <sys/time.h>
4141
#include <sys/resource.h>
42+
#include <csignal>
4243

4344
namespace bpo = boost::program_options;
4445

@@ -53,6 +54,11 @@ void timer_callback(uv_timer_t*)
5354
{
5455
// We simply wake up the event loop. Nothing to be done here.
5556
}
57+
58+
void signal_callback(uv_signal_t*, int)
59+
{
60+
// We simply wake up the event loop. Nothing to be done here.
61+
}
5662
} // namespace detail
5763

5864
struct ExpirationHandlerHelpers {
@@ -78,6 +84,27 @@ struct ExpirationHandlerHelpers {
7884
};
7985
}
8086

87+
static RouteConfigurator::CreationConfigurator signalDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
88+
{
89+
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState& state, ConfigParamRegistry const& options) {
90+
std::string startName = std::string{"start-value-"} + matcher.binding;
91+
std::string endName = std::string{"end-value-"} + matcher.binding;
92+
std::string stepName = std::string{"step-value-"} + matcher.binding;
93+
auto start = options.get<int64_t>(startName.c_str());
94+
auto stop = options.get<int64_t>(endName.c_str());
95+
auto step = options.get<int64_t>(stepName.c_str());
96+
// We create a timer to wake us up. Notice the actual
97+
// timeslot creation and record expiration still happens
98+
// in a synchronous way.
99+
uv_signal_t* sh = (uv_signal_t*)(malloc(sizeof(uv_signal_t)));
100+
uv_signal_init(state.loop, sh);
101+
uv_signal_start(sh, detail::signal_callback, SIGUSR1);
102+
state.activeSignals.push_back(sh);
103+
104+
return LifetimeHelpers::enumDrivenCreation(start, stop, step, inputTimeslice, maxInputTimeslices);
105+
};
106+
}
107+
81108
static RouteConfigurator::CreationConfigurator enumDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
82109
{
83110
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState&, ConfigParamRegistry const& options) {
@@ -591,6 +618,12 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
591618
ExpirationHandlerHelpers::danglingEnumerationConfigurator(inputSpec),
592619
ExpirationHandlerHelpers::expiringEnumerationConfigurator(inputSpec, sourceChannel)};
593620
break;
621+
case Lifetime::Signal:
622+
route.configurator = {
623+
ExpirationHandlerHelpers::signalDrivenConfigurator(inputSpec, consumerDevice.inputTimesliceId, consumerDevice.maxInputTimeslices),
624+
ExpirationHandlerHelpers::danglingEnumerationConfigurator(inputSpec),
625+
ExpirationHandlerHelpers::expiringEnumerationConfigurator(inputSpec, sourceChannel)};
626+
break;
594627
case Lifetime::Transient:
595628
route.configurator = {
596629
ExpirationHandlerHelpers::dataDrivenConfigurator(),

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
253253
}
254254
timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
255255
} break;
256+
case Lifetime::Signal: {
257+
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
258+
timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
259+
} break;
256260
case Lifetime::Enumeration: {
257261
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
258262
timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});

0 commit comments

Comments
 (0)