Skip to content

Commit f216f34

Browse files
committed
DPL: Proper ServiceRegistryRef implementation
For now it simply forwards to the underlying service registry.
1 parent e81d503 commit f216f34

18 files changed

Lines changed: 86 additions & 26 deletions

Framework/Core/include/Framework/CommonServices.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#define O2_FRAMEWORK_COMMONSERVICES_H_
1313

1414
#include "Framework/ServiceSpec.h"
15+
#include "Framework/ServiceRegistryRef.h"
1516
#include "Framework/TypeIdHelpers.h"
1617

1718
class TDatabasePDG;

Framework/Core/include/Framework/DataProcessingDevice.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ struct DeviceConfigurationHelpers {
122122
class DataProcessingDevice : public fair::mq::Device
123123
{
124124
public:
125-
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistryRef, ProcessingPolicies& policies);
125+
DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry&, ProcessingPolicies& policies);
126126
void Init() final;
127127
void InitTask() final;
128128
void PreRun() final;
@@ -159,7 +159,7 @@ class DataProcessingDevice : public fair::mq::Device
159159
AlgorithmSpec::ErrorCallback mError;
160160
std::function<void(RuntimeErrorRef e, InputRecord& record)> mErrorHandling;
161161
std::unique_ptr<ConfigParamRegistry> mConfigRegistry;
162-
ServiceRegistryRef mServiceRegistry;
162+
ServiceRegistry& mServiceRegistry;
163163
DataAllocator mAllocator;
164164
DataRelayer* mRelayer = nullptr;
165165
/// Expiration handler

Framework/Core/include/Framework/DataSender.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class DataSender
4040

4141
private:
4242
FairMQDeviceProxy& mProxy;
43+
// We need the ServiceRegistry and not a ref, to be able
44+
// to call the callbacks after sending.
4345
ServiceRegistryRef mRegistry;
4446
DeviceSpec const& mSpec;
4547
std::vector<OutputSpec> mOutputs;

Framework/Core/include/Framework/DebugGUI.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
namespace o2::framework
2626
{
27+
struct ServiceRegistry;
2728
/// Plugin interface for DPL GUIs.
2829
struct DebugGUI {
2930
virtual std::function<void(void)> getGUIDebugger(std::vector<o2::framework::DeviceInfo> const& infos,
@@ -41,7 +42,7 @@ struct DebugGUI {
4142
virtual void keyUp(char key) = 0;
4243
virtual void charIn(char key) = 0;
4344

44-
virtual void* initGUI(char const* windowTitle, ServiceRegistryRef registry) = 0;
45+
virtual void* initGUI(char const* windowTitle, ServiceRegistry& registry) = 0;
4546
virtual void getFrameJSON(void* data, std::ostream& json_data) = 0;
4647
virtual void getFrameRaw(void* data, void** raw_data, int* size) = 0;
4748
virtual bool pollGUIPreRender(void* context, float delta) = 0;

Framework/Core/include/Framework/ErrorContext.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
#define O2_FRAMEWORK_ERROR_CONTEXT_H_
1313

1414
#include "Framework/InputRecord.h"
15-
#include "Framework/ServiceRegistry.h"
15+
#include "Framework/ServiceRegistryRef.h"
1616
#include "Framework/RuntimeError.h"
1717

1818
namespace o2::framework
@@ -31,7 +31,7 @@ class ErrorContext
3131
}
3232

3333
InputRecord const& inputs() { return mInputs; }
34-
ServiceRegistry const& services() { return mServices; }
34+
ServiceRegistryRef services() { return mServices; }
3535
RuntimeErrorRef exception() { return mExceptionRef; }
3636

3737
private:

Framework/Core/include/Framework/ServiceRegistry.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -370,10 +370,6 @@ struct ServiceRegistry {
370370
}
371371
};
372372

373-
// This is to migrate the code in QC.
374-
// FIXME: move this to the proper smart reference class once done
375-
using ServiceRegistryRef = o2::framework::ServiceRegistry &;
376-
377373
} // namespace o2::framework
378374

379375
#endif // O2_FRAMEWORK_SERVICEREGISTRY_H_

Framework/Core/include/Framework/ServiceRegistryRef.h

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,64 @@
1111
#ifndef O2_FRAMEWORK_SERVICEREGISTRYREF_H_
1212
#define O2_FRAMEWORK_SERVICEREGISTRYREF_H_
1313

14-
// Until we merge the proper ServiceRegistryRef...
1514
#include "Framework/ServiceRegistry.h"
16-
#endif // O2_FRAMEWORK_SERVICEREGISTRYREF_H_
15+
#include "Framework/Logger.h"
1716

17+
namespace o2::framework
18+
{
19+
20+
class ServiceRegistryRef
21+
{
22+
public:
23+
// The streamId is used to identify the stream in case we have multiple
24+
// threads. We cannot merely used the thread id because that does not
25+
// work in case the thread is created ad-hoc, like it appears to happen
26+
// for both libuv and FairMQ. This behaviour, BTW, makes usage
27+
// of thread local storage basically impossible (i.e. you lose state).
28+
// We use the following convention:
29+
// - streamId == 0 means the main thread
30+
// - streamId > 0 means one of the libuv worker threads
31+
// - streamId == -1 means the region callback thread
32+
// - streamId < -1 means some other worker thread of FairMQ which
33+
// we do not know about.
34+
//
35+
// The getter will also make sure that a service of kind Stream
36+
// cannot be accessed if the streamId is <= 0 and complain accordingly.
37+
// The dataProcessorId will be used to distinguish between different
38+
// data processors when
39+
ServiceRegistryRef(ServiceRegistry& registry, short streamId = 0, short dataProcessorId = 0)
40+
: mRegistry(registry),
41+
mContext{streamId, dataProcessorId}
42+
{
43+
}
44+
45+
/// Check if service of type T is currently active.
46+
template <typename T>
47+
std::enable_if_t<std::is_const_v<T> == false, bool> active() const
48+
{
49+
return mRegistry.active<T>();
50+
}
51+
52+
/// Get a service for the given interface T. The returned reference exposed to
53+
/// the user is actually of the last concrete type C registered, however this
54+
/// should not be a problem.
55+
template <typename T>
56+
T& get() const
57+
{
58+
return mRegistry.get<T>();
59+
}
60+
61+
/// Invoke before sending messages @a parts on a channel @a channelindex
62+
void preSendingMessagesCallbacks(fair::mq::Parts& parts, ChannelIndex channelindex)
63+
{
64+
mRegistry.preSendingMessagesCallbacks(mRegistry, parts, channelindex);
65+
}
66+
67+
private:
68+
ServiceRegistry& mRegistry;
69+
ServiceRegistry::Context mContext;
70+
};
71+
72+
} // namespace o2::framework
73+
74+
#endif // O2_FRAMEWORK_SERVICEREGISTRY_H_

Framework/Core/include/Framework/ServiceSpec.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace o2::framework
3030
struct InitContext;
3131
struct DeviceSpec;
3232
struct ServiceRegistry;
33-
using ServiceRegistryRef = ServiceRegistry&;
33+
struct ServiceRegistryRef;
3434
struct DeviceState;
3535
struct ProcessingContext;
3636
class EndOfStreamContext;
@@ -96,10 +96,10 @@ using ServicePostDispatching = void (*)(ProcessingContext&, void*);
9696
using ServicePostForwarding = void (*)(ProcessingContext&, void*);
9797

9898
/// Callback invoked when the driver enters the init phase.
99-
using ServiceDriverInit = void (*)(ServiceRegistryRef, boost::program_options::variables_map const&);
99+
using ServiceDriverInit = void (*)(ServiceRegistry&, boost::program_options::variables_map const&);
100100

101101
/// Callback invoked when the driver enters the init phase.
102-
using ServiceDriverStartup = void (*)(ServiceRegistryRef, boost::program_options::variables_map const&);
102+
using ServiceDriverStartup = void (*)(ServiceRegistry&, boost::program_options::variables_map const&);
103103

104104
/// Callback invoked when we inject internal devices in the topology
105105
using ServiceTopologyInject = void (*)(WorkflowSpecNode&, ConfigContext&);

Framework/Core/src/ArrowSupport.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
364364
monitoring.send(Metric{(uint64_t)arrow->bytesDestroyed(), "arrow-bytes-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
365365
monitoring.send(Metric{(uint64_t)arrow->messagesDestroyed(), "arrow-messages-destroyed"}.addTag(Key::Subsystem, monitoring::tags::Value::DPL));
366366
monitoring.flushBuffer(); },
367-
.driverInit = [](ServiceRegistryRef registry, boost::program_options::variables_map const& vm) {
367+
.driverInit = [](ServiceRegistry& registry, boost::program_options::variables_map const& vm) {
368368
auto config = new RateLimitConfig{};
369369
int readers = std::stoll(vm["readers"].as<std::string>());
370370
if (vm.count("aod-memory-rate-limit") && vm["aod-memory-rate-limit"].defaulted() == false) {

Framework/Core/src/CallbacksPolicy.cxx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "Framework/CallbacksPolicy.h"
1212
#include "Framework/CallbackService.h"
1313
#include "Framework/CompletionPolicy.h"
14+
#include "Framework/ServiceRegistryRef.h"
1415
#include "Framework/TimingInfo.h"
1516
#include "Framework/Logger.h"
1617
#include <cstdlib>

0 commit comments

Comments
 (0)