Skip to content

Commit 9e12ed8

Browse files
Introducing scoped context object
Adding MessageContext::make_scoped to return a unique object to handle lifetime. The scope handler ownes the context object and when it goes out of scope, the objet will be scheduled in the context. If the context is configured with the `send` hook, the object is directly sent throw this hook. Otherwise it is added to the message list and sent when the computation ends. Using the scoped context object for the DataAllocator::snapshot method This allows promt sending if the context is configured with corresponding callback.
1 parent 6065822 commit 9e12ed8

2 files changed

Lines changed: 102 additions & 5 deletions

File tree

Framework/Core/include/Framework/MessageContext.h

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include <string>
2525
#include <type_traits>
2626
#include <stdexcept>
27+
#include <functional>
2728

2829
class FairMQDevice;
2930

@@ -34,11 +35,25 @@ namespace framework
3435

3536
class MessageContext {
3637
public:
38+
using DispatchCallback = std::function<void(FairMQParts&& message, std::string const&, int)>;
39+
// so far we are only using one instance per named channel
40+
static constexpr int DefaultChannelIndex = 0;
41+
3742
MessageContext(FairMQDeviceProxy proxy)
3843
: mProxy{ proxy }
3944
{
4045
}
4146

47+
MessageContext(FairMQDeviceProxy proxy, DispatchCallback&& dispatcher)
48+
: mProxy{ proxy }, mDispatchCallback{ dispatcher }
49+
{
50+
}
51+
52+
void init(DispatchCallback&& dispatcher)
53+
{
54+
mDispatchCallback = dispatcher;
55+
}
56+
4257
// this is the virtual interface for context objects
4358
class ContextObject
4459
{
@@ -292,17 +307,95 @@ class MessageContext {
292307

293308
using Messages = std::vector<std::unique_ptr<ContextObject>>;
294309

295-
/// Create the specified context object from the variadic arguments and add to message list
310+
/// @class ScopeHook A special deleter to handle object going out of scope
311+
/// This object is used together with the wrapper @a ContextObjectScope which
312+
/// is a unique object returned to the called. When this scope handler goes out of
313+
/// scope and is going to be deleted, the handled object is scheduled in the context,
314+
/// or simply deleted if no context is available.
315+
template <typename T, typename BASE = std::default_delete<T>>
316+
class ScopeHook : public BASE
317+
{
318+
public:
319+
using base = std::default_delete<T>;
320+
using self_type = ScopeHook<T>;
321+
ScopeHook() = default;
322+
ScopeHook(MessageContext* context)
323+
: mContext(context)
324+
{
325+
}
326+
~ScopeHook() = default;
327+
328+
// forbid assignment operator to prohibid changing the Deleter
329+
// resource control property once used in the unique_ptr
330+
self_type& operator=(const self_type&) = delete;
331+
332+
void operator()(T* ptr) const
333+
{
334+
if (!mContext) {
335+
// TODO: decide whether this is an error or not
336+
// can also check if the standard constructor can be dropped to make sure that
337+
// the ScopeHook is always set up with a context
338+
throw std::runtime_error("No context available to schedule the context object");
339+
return base::operator()(ptr);
340+
}
341+
// keep the object alive and add to message list of the context
342+
mContext->schedule(Messages::value_type(ptr));
343+
}
344+
345+
private:
346+
MessageContext* mContext = nullptr;
347+
};
348+
349+
template <typename T>
350+
using ContextObjectScope = std::unique_ptr<T, ScopeHook<T>>;
351+
352+
/// Create the specified context object from the variadic arguments and add to message list.
353+
/// The context object is owned be the context and returned by reference.
296354
/// The context object type is specified as template argument, each context object implementation
297-
/// must derive from the ContextObject interface
355+
/// must derive from the ContextObject interface.
356+
/// TODO: rename to make_ref
298357
template <typename T, typename... Args>
299358
auto& add(Args&&... args)
300359
{
301-
static_assert(std::is_base_of<ContextObject, T>::value == true, "type must inherit ContextObject interface");
302-
mMessages.push_back(std::move(std::make_unique<T>(this, std::forward<Args>(args)...)));
360+
mMessages.push_back(std::move(make<T>(std::forward<Args>(args)...)));
361+
// return a reference to the element in the vector of unique pointers
303362
return *dynamic_cast<T*>(mMessages.back().get());
304363
}
305364

365+
/// Create the specified context object from the variadic arguments as a unique pointer of the context
366+
/// object base class.
367+
/// The context object type is specified as template argument, each context object implementation
368+
/// must derive from the ContextObject interface.
369+
template <typename T, typename... Args>
370+
Messages::value_type make(Args&&... args)
371+
{
372+
static_assert(std::is_base_of<ContextObject, T>::value == true, "type must inherit ContextObject interface");
373+
return std::make_unique<T>(this, std::forward<Args>(args)...);
374+
}
375+
376+
/// Create scope handler managing the specified context object.
377+
/// The context object is created from the variadic arguments and is owned by the scope handler.
378+
/// If the handler goes out of scope, the object is scheduled in the context, either added to the
379+
/// list of messages or directly sent via the optional callback.
380+
template <typename T, typename... Args>
381+
ContextObjectScope<T> make_scoped(Args&&... args)
382+
{
383+
ContextObjectScope<T> scope(dynamic_cast<T*>(make<T>(std::forward<Args>(args)...).release()), ScopeHook<T>(this));
384+
return scope;
385+
}
386+
387+
/// Schedule a context object for sending.
388+
/// The object is considered complete at this point and is sent directly through the dispatcher callback
389+
/// of the context if initialized. It is added to the list of messages otherwise.
390+
void schedule(Messages::value_type&& message)
391+
{
392+
if (mDispatchCallback) {
393+
mDispatchCallback(std::move(message->finalize()), message->channel(), DefaultChannelIndex);
394+
return;
395+
}
396+
mMessages.push_back(std::move(message));
397+
}
398+
306399
Messages::iterator begin()
307400
{
308401
return mMessages.begin();
@@ -345,6 +438,7 @@ class MessageContext {
345438
private:
346439
FairMQDeviceProxy mProxy;
347440
Messages mMessages;
441+
DispatchCallback mDispatchCallback;
348442
};
349443
} // namespace framework
350444
} // namespace o2

Framework/Core/src/DataAllocator.cxx

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ void DataAllocator::addPartToContext(FairMQMessagePtr&& payloadMessage, const Ou
123123
DataHeader* dh = const_cast<DataHeader*>(cdh);
124124
dh->payloadSize = payloadMessage->GetSize();
125125
auto context = mContextRegistry->get<MessageContext>();
126-
context->add<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), channel);
126+
// make_scoped creates the context object inside of a scope handler, since it goes out of
127+
// scope immediately, the created object is scheduled and can be directly sent if the context
128+
// is configured with the dispatcher callback
129+
context->make_scoped<MessageContext::TrivialObject>(std::move(headerMessage), std::move(payloadMessage), channel);
127130
}
128131

129132
void DataAllocator::adopt(const Output& spec, TObject* ptr)

0 commit comments

Comments
 (0)