forked from mcoquet642/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathArrowContext.h
More file actions
102 lines (87 loc) · 2.67 KB
/
ArrowContext.h
File metadata and controls
102 lines (87 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_ARROWCONTEXT_H
#define FRAMEWORK_ARROWCONTEXT_H
#include "Framework/FairMQDeviceProxy.h"
#include <cassert>
#include <functional>
#include <memory>
#include <string>
#include <vector>
class FairMQMessage;
namespace o2
{
namespace framework
{
class FairMQResizableBuffer;
/// A context which holds `std::string`s being passed around
/// useful for debug purposes and as an illustration of
/// how to add a context for a new kind of object.
class ArrowContext
{
public:
ArrowContext(FairMQDeviceProxy proxy)
: mProxy{proxy}
{
}
struct MessageRef {
/// The header to be associated with the message
std::unique_ptr<FairMQMessage> header;
/// The actual buffer holding the ArrowData
std::shared_ptr<FairMQResizableBuffer> buffer;
/// The function to call to finalise the builder into the message
std::function<void(std::shared_ptr<FairMQResizableBuffer>)> finalize;
std::string channel;
};
using Messages = std::vector<MessageRef>;
void addBuffer(std::unique_ptr<FairMQMessage> header,
std::shared_ptr<FairMQResizableBuffer> buffer,
std::function<void(std::shared_ptr<FairMQResizableBuffer>)> finalize,
const std::string& channel)
{
mMessages.push_back(std::move(MessageRef{std::move(header),
std::move(buffer),
std::move(finalize),
channel}));
}
Messages::iterator begin()
{
return mMessages.begin();
}
Messages::iterator end()
{
return mMessages.end();
}
size_t size()
{
return mMessages.size();
}
void clear()
{
// On send we move the header, but the payload remains
// there because what's really sent is the copy of the string
// payload will be cleared by the mMessages.clear()
for (auto& m : mMessages) {
// assert(m.header.get() == nullptr);
// assert(m.payload.get() != nullptr);
}
mMessages.clear();
}
FairMQDeviceProxy& proxy()
{
return mProxy;
}
private:
FairMQDeviceProxy mProxy;
Messages mMessages;
};
} // namespace framework
} // namespace o2
#endif // FRAMEWORK_ARROWCONTEXT_H