Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/DataProcessingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ class DataProcessingDevice : public FairMQDevice
std::mutex mRegionInfoMutex;
enum TerminationPolicy mErrorPolicy = TerminationPolicy::WAIT; /// What to do when an error arises
bool mWasActive = false; /// Whether or not the device was active at last iteration.
bool mFirst = true; /// Whether or not this was the first iteration
};

} // namespace o2::framework
Expand Down
3 changes: 3 additions & 0 deletions Framework/Core/include/Framework/DeviceState.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
typedef struct uv_loop_s uv_loop_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_poll_s uv_poll_t;
typedef struct uv_signal_s uv_signal_t;

namespace o2::framework
{
Expand Down Expand Up @@ -47,6 +48,8 @@ struct DeviceState {
std::vector<uv_poll_t*> activeInputPollers;
// The list of pollers for active output channels
std::vector<uv_poll_t*> activeOutputPollers;
/// The list of active signal handlers
std::vector<uv_signal_t*> activeSignals;
};

} // namespace o2::framework
Expand Down
16 changes: 7 additions & 9 deletions Framework/Core/include/Framework/Lifetime.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_LIFETIME_H
#define FRAMEWORK_LIFETIME_H
#ifndef O2_FRAMEWORK_LIFETIME_H_
#define O2_FRAMEWORK_LIFETIME_H_

namespace o2
{
namespace framework
namespace o2::framework
{

/// Possible Lifetime of objects being exchanged by the DPL.
Expand All @@ -23,9 +21,9 @@ enum struct Lifetime {
QA,
Transient,
Timer,
Enumeration
Enumeration,
Signal
};

} // namespace framework
} // namespace o2
#endif
} // namespace o2::framework
#endif // O2_FRAMEWORK_LIFETIME_H_
7 changes: 4 additions & 3 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,9 @@ void DataProcessingDevice::InitTask()
mState.activeInputPollers.push_back(poller);
}
// In case we do not have any input channel and we do not have
// any timers, we still wake up whenever we can send data to downstream
// any timers or signal watchers we still wake up whenever we can send data to downstream
// devices to allow for enumerations.
if (mState.activeInputPollers.empty() && mState.activeTimers.empty()) {
if (mState.activeInputPollers.empty() && mState.activeTimers.empty() && mState.activeSignals.empty()) {
for (auto& x : fChannels) {
if (x.first.rfind("from_internal-dpl", 0) == 0) {
LOG(debug) << x.first << " is an internal channel. Not polling." << std::endl;
Expand Down Expand Up @@ -432,7 +432,8 @@ bool DataProcessingDevice::ConditionalRun()
if (mState.loop) {
ZoneScopedN("uv idle");
TracyPlot("past activity", (int64_t)mWasActive);
uv_run(mState.loop, mWasActive && (mDataProcessorContexes.at(0).state->streaming != StreamingState::Idle) ? UV_RUN_NOWAIT : UV_RUN_ONCE);
uv_run(mState.loop, (mFirst == true) || (mWasActive && (mDataProcessorContexes.at(0).state->streaming != StreamingState::Idle) && (mState.activeSignals.empty())) ? UV_RUN_NOWAIT : UV_RUN_ONCE);
mFirst = false;
}

// Notify on the main thread the new region callbacks, making sure
Expand Down
33 changes: 33 additions & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include <sys/time.h>
#include <sys/resource.h>
#include <csignal>

namespace bpo = boost::program_options;

Expand All @@ -53,6 +54,11 @@ void timer_callback(uv_timer_t*)
{
// We simply wake up the event loop. Nothing to be done here.
}

void signal_callback(uv_signal_t*, int)
{
// We simply wake up the event loop. Nothing to be done here.
}
} // namespace detail

struct ExpirationHandlerHelpers {
Expand All @@ -78,6 +84,27 @@ struct ExpirationHandlerHelpers {
};
}

static RouteConfigurator::CreationConfigurator signalDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
{
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState& state, ConfigParamRegistry const& options) {
std::string startName = std::string{"start-value-"} + matcher.binding;
std::string endName = std::string{"end-value-"} + matcher.binding;
std::string stepName = std::string{"step-value-"} + matcher.binding;
auto start = options.get<int64_t>(startName.c_str());
auto stop = options.get<int64_t>(endName.c_str());
auto step = options.get<int64_t>(stepName.c_str());
// We create a timer to wake us up. Notice the actual
// timeslot creation and record expiration still happens
// in a synchronous way.
uv_signal_t* sh = (uv_signal_t*)(malloc(sizeof(uv_signal_t)));
uv_signal_init(state.loop, sh);
uv_signal_start(sh, detail::signal_callback, SIGUSR1);
state.activeSignals.push_back(sh);

return LifetimeHelpers::enumDrivenCreation(start, stop, step, inputTimeslice, maxInputTimeslices);
};
}

static RouteConfigurator::CreationConfigurator enumDrivenConfigurator(InputSpec const& matcher, size_t inputTimeslice, size_t maxInputTimeslices)
{
return [matcher, inputTimeslice, maxInputTimeslices](DeviceState&, ConfigParamRegistry const& options) {
Expand Down Expand Up @@ -591,6 +618,12 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
ExpirationHandlerHelpers::danglingEnumerationConfigurator(inputSpec),
ExpirationHandlerHelpers::expiringEnumerationConfigurator(inputSpec, sourceChannel)};
break;
case Lifetime::Signal:
route.configurator = {
ExpirationHandlerHelpers::signalDrivenConfigurator(inputSpec, consumerDevice.inputTimesliceId, consumerDevice.maxInputTimeslices),
ExpirationHandlerHelpers::danglingEnumerationConfigurator(inputSpec),
ExpirationHandlerHelpers::expiringEnumerationConfigurator(inputSpec, sourceChannel)};
break;
case Lifetime::Transient:
route.configurator = {
ExpirationHandlerHelpers::dataDrivenConfigurator(),
Expand Down
4 changes: 4 additions & 0 deletions Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
}
timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Timer});
} break;
case Lifetime::Signal: {
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Signal});
} break;
case Lifetime::Enumeration: {
auto concrete = DataSpecUtils::asConcreteDataMatcher(input);
timer.outputs.emplace_back(OutputSpec{concrete.origin, concrete.description, concrete.subSpec, Lifetime::Enumeration});
Expand Down