Skip to content

Commit d1b2b90

Browse files
committed
DPL: introduce Lifetime::Optional
If data is present, use simply give the data. If data is not present on input, create a dummy entry.
1 parent 4f85078 commit d1b2b90

5 files changed

Lines changed: 83 additions & 15 deletions

File tree

Framework/Core/include/Framework/Lifetime.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace o2::framework
1616
/// Possible Lifetime of objects being exchanged by the DPL.
1717
enum struct Lifetime {
1818
/// A message which is associated to a timeframe. DPL will wait indefinitely for it by default.
19-
Timeframe,
19+
Timeframe,
2020
/// Eventually a message whose content is retrieved from CCDB
2121
Condition,
2222
/// Do not use for now
@@ -28,7 +28,14 @@ enum struct Lifetime {
2828
/// A message which is created immediately, with payload / containing a
2929
/// single value which gets incremented for every / invokation.
3030
Enumeration,
31-
Signal
31+
/// A message which is created every time a SIGUSR1 is received.
32+
Signal,
33+
/// An optional message. When data arrives, if not already part of the data,
34+
/// a dummy entry will be generated.
35+
/// This comes handy e.g. to handle Raw Data, since DataDistribution will provide
36+
/// everything in one go so whatever is expected but not there, for whatever reason
37+
/// will be substituted with a dummy entry.
38+
Optional
3239
};
3340

3441
} // namespace o2::framework

Framework/Core/include/Framework/LifetimeHelpers.h

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
// In applying this license CERN does not waive the privileges and immunities
88
// granted to it by virtue of its status as an Intergovernmental Organization
99
// or submit itself to any jurisdiction.
10-
#ifndef FRAMEWORK_LIFETIMEHELPERS_H
11-
#define FRAMEWORK_LIFETIMEHELPERS_H
10+
#ifndef O2_FRAMEWORK_LIFETIMEHELPERS_H_
11+
#define O2_FRAMEWORK_LIFETIMEHELPERS_H_
1212

1313
#include "Framework/ExpirationHandler.h"
1414
#include "Framework/PartRef.h"
@@ -17,9 +17,7 @@
1717
#include <functional>
1818
#include <string>
1919

20-
namespace o2
21-
{
22-
namespace framework
20+
namespace o2::framework
2321
{
2422

2523
struct ConcreteDataMatcher;
@@ -33,6 +31,7 @@ struct LifetimeHelpers {
3331
/// Callback which creates a new timeslice as soon as one is available and
3432
/// uses an incremental number as timestamp.
3533
static ExpirationHandler::Creator enumDrivenCreation(size_t first, size_t last, size_t step, size_t inputTimeslice, size_t maxTimeSliceId);
34+
3635
/// Callback which creates a new timeslice when timer
3736
/// expires and there is not a compatible datadriven callback
3837
/// available.
@@ -78,9 +77,12 @@ struct LifetimeHelpers {
7877
/// The payload of each message will contain an incremental number for each
7978
/// message being created.
8079
static ExpirationHandler::Handler enumerate(ConcreteDataMatcher const& spec, std::string const& sourceChannel);
80+
81+
/// Create a dummy (empty) message every time a record expires, suing @a spec
82+
/// as content of the payload.
83+
static ExpirationHandler::Handler dummy(ConcreteDataMatcher const& spec, std::string const& sourceChannel);
8184
};
8285

83-
} // namespace framework
84-
} // namespace o2
86+
} // namespace o2::framework
8587

86-
#endif // FRAMEWORK_LIFETIMEHELPERS_H
88+
#endif // O2_FRAMEWORK_LIFETIMEHELPERS_H_

Framework/Core/src/DeviceSpecHelpers.cxx

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,31 @@ struct ExpirationHandlerHelpers {
209209
{
210210
return [](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::fetchFromObjectRegistry(); };
211211
}
212+
213+
/// This behaves as data. I.e. we never create it unless data arrives.
214+
static RouteConfigurator::CreationConfigurator createOptionalConfigurator()
215+
{
216+
return [](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::dataDrivenCreation(); };
217+
}
218+
219+
/// This will always exipire an optional record when no data is received.
220+
static RouteConfigurator::DanglingConfigurator danglingOptionalConfigurator()
221+
{
222+
return [](DeviceState&, ConfigParamRegistry const&) { return LifetimeHelpers::expireAlways(); };
223+
}
224+
225+
/// When the record expires, simply create a dummy entry.
226+
static RouteConfigurator::ExpirationConfigurator expiringOptionalConfigurator(InputSpec const& spec, std::string const& sourceChannel)
227+
{
228+
auto m = std::get_if<ConcreteDataMatcher>(&spec.matcher);
229+
if (m == nullptr) {
230+
throw runtime_error("InputSpec for Enumeration must be fully qualified");
231+
}
232+
// We copy the matcher to avoid lifetime issues.
233+
return [matcher = *m, sourceChannel](DeviceState&, ConfigParamRegistry const&) {
234+
return LifetimeHelpers::dummy(matcher, sourceChannel);
235+
};
236+
}
212237
};
213238

214239
/// This creates a string to configure channels of a FairMQDevice
@@ -638,6 +663,12 @@ void DeviceSpecHelpers::processInEdgeActions(std::vector<DeviceSpec>& devices,
638663
ExpirationHandlerHelpers::danglingTransientConfigurator(),
639664
ExpirationHandlerHelpers::expiringTransientConfigurator(inputSpec)};
640665
break;
666+
case Lifetime::Optional:
667+
route.configurator = {
668+
ExpirationHandlerHelpers::createOptionalConfigurator(),
669+
ExpirationHandlerHelpers::danglingOptionalConfigurator(),
670+
ExpirationHandlerHelpers::expiringOptionalConfigurator(inputSpec, sourceChannel)};
671+
break;
641672
default:
642673
break;
643674
}

Framework/Core/src/LifetimeHelpers.cxx

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
using namespace o2::header;
2929
using namespace fair;
3030

31-
namespace o2
32-
{
33-
namespace framework
31+
namespace o2::framework
3432
{
3533

3634
namespace
@@ -303,5 +301,34 @@ ExpirationHandler::Handler LifetimeHelpers::enumerate(ConcreteDataMatcher const&
303301
return f;
304302
}
305303

306-
} // namespace framework
307-
} // namespace o2
304+
/// Create a dummy message with the provided ConcreteDataMatcher
305+
ExpirationHandler::Handler LifetimeHelpers::dummy(ConcreteDataMatcher const& matcher, std::string const& sourceChannel)
306+
{
307+
using counter_t = int64_t;
308+
auto counter = std::make_shared<counter_t>(0);
309+
auto f = [matcher, counter, sourceChannel](ServiceRegistry& services, PartRef& ref, uint64_t timestamp) -> void {
310+
// We should invoke the handler only once.
311+
assert(!ref.header);
312+
assert(!ref.payload);
313+
auto& rawDeviceService = services.get<RawDeviceService>();
314+
315+
DataHeader dh;
316+
dh.dataOrigin = matcher.origin;
317+
dh.dataDescription = matcher.description;
318+
dh.subSpecification = matcher.subSpec;
319+
dh.payloadSize = 0;
320+
dh.payloadSerializationMethod = gSerializationMethodNone;
321+
322+
DataProcessingHeader dph{timestamp, 1};
323+
324+
auto&& transport = rawDeviceService.device()->GetChannel(sourceChannel, 0).Transport();
325+
auto channelAlloc = o2::pmr::getTransportAllocator(transport);
326+
auto header = o2::pmr::getMessage(o2::header::Stack{channelAlloc, dh, dph});
327+
ref.header = std::move(header);
328+
auto payload = rawDeviceService.device()->NewMessage(0);
329+
ref.payload = std::move(payload);
330+
};
331+
return f;
332+
}
333+
334+
} // namespace o2::framework

Framework/Core/src/WorkflowHelpers.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
323323
case Lifetime::QA:
324324
case Lifetime::Transient:
325325
case Lifetime::Timeframe:
326+
case Lifetime::Optional:
326327
break;
327328
}
328329
if (DataSpecUtils::partialMatch(input, header::DataOrigin{"AOD"})) {

0 commit comments

Comments
 (0)