forked from wiechula/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSubframeBuilderDevice.cxx
More file actions
150 lines (129 loc) · 6.14 KB
/
SubframeBuilderDevice.cxx
File metadata and controls
150 lines (129 loc) · 6.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
/// @file SubframeBuilderDevice.cxx
/// @author Giulio Eulisse, Matthias Richter, Sandro Wenzel
/// @since 2017-02-07
/// @brief Demonstrator device for a subframe builder
#include <thread> // this_thread::sleep_for
#include <chrono>
#include <functional>
#include "DataFlow/SubframeBuilderDevice.h"
#include "Headers/SubframeMetadata.h"
#include "Headers/HeartbeatFrame.h"
#include "Headers/DataHeader.h"
#include <options/FairMQProgOptions.h>
// From C++11 on, constexpr static data members are implicitly inlined. Redeclaration
// is still permitted, but deprecated. Some compilers do not implement this standard
// correctly. It also has to be noticed that this error does not occur for all the
// other public constexpr members
constexpr uint32_t AliceO2::DataFlow::SubframeBuilderDevice::mOrbitsPerTimeframe;
constexpr uint32_t AliceO2::DataFlow::SubframeBuilderDevice::mOrbitDuration;
constexpr uint32_t AliceO2::DataFlow::SubframeBuilderDevice::mDuration;
using HeartbeatHeader = AliceO2::Header::HeartbeatHeader;
using HeartbeatTrailer = AliceO2::Header::HeartbeatTrailer;
using DataHeader = AliceO2::Header::DataHeader;
AliceO2::DataFlow::SubframeBuilderDevice::SubframeBuilderDevice()
: O2Device()
{
LOG(INFO) << "AliceO2::DataFlow::SubframeBuilderDevice::SubframeBuilderDevice " << mDuration << "\n";
}
AliceO2::DataFlow::SubframeBuilderDevice::~SubframeBuilderDevice()
= default;
void AliceO2::DataFlow::SubframeBuilderDevice::InitTask()
{
// mDuration = GetConfig()->GetValue<uint32_t>(OptionKeyDuration);
mIsSelfTriggered = GetConfig()->GetValue<bool>(OptionKeySelfTriggered);
mInputChannelName = GetConfig()->GetValue<std::string>(OptionKeyInputChannelName);
mOutputChannelName = GetConfig()->GetValue<std::string>(OptionKeyOutputChannelName);
if (!mIsSelfTriggered) {
// depending on whether the device is self-triggered or expects input,
// the handler function needs to be registered or not.
// ConditionalRun is not called anymore from the base class if the
// callback is registered
LOG(INFO) << "Obtaining data from DataPublisher\n";
OnData(mInputChannelName.c_str(), &AliceO2::DataFlow::SubframeBuilderDevice::HandleData);
} else {
LOG(INFO) << "Self triggered mode. Doing nothing for now.\n";
}
LOG(INFO) << "AliceO2::DataFlow::SubframeBuilderDevice::InitTask " << mDuration << "\n";
}
// FIXME: how do we actually find out the payload size???
int64_t extractDetectorPayload(char **payload, char *buffer, size_t bufferSize) {
*payload = buffer + sizeof(HeartbeatHeader);
return bufferSize - sizeof(HeartbeatHeader) - sizeof(HeartbeatTrailer);
}
bool AliceO2::DataFlow::SubframeBuilderDevice::BuildAndSendFrame(FairMQParts &inParts)
{
LOG(INFO) << "AliceO2::DataFlow::SubframeBuilderDevice::BuildAndSendFrame" << mDuration << "\n";
char *incomingBuffer = (char *)inParts.At(1)->GetData();
HeartbeatHeader *hbh = reinterpret_cast<HeartbeatHeader*>(incomingBuffer);
// top level subframe header, the DataHeader is going to be used with
// description "SUBTIMEFRAMEMD"
// this should be defined in a common place, and also the origin
// the origin can probably name a detector identifier, but not sure if
// all CRUs of a FLP in all cases serve a single detector
AliceO2::Header::DataHeader dh;
dh.dataDescription = AliceO2::Header::DataDescription("SUBTIMEFRAMEMD");
dh.dataOrigin = AliceO2::Header::DataOrigin("TEST");
dh.subSpecification = 0;
dh.payloadSize = sizeof(SubframeMetadata);
// subframe meta information as payload
SubframeMetadata md;
// md.startTime = (hbh->orbit / mOrbitsPerTimeframe) * mDuration;
md.startTime = static_cast<uint64_t>(hbh->orbit) * static_cast<uint64_t>(mOrbitDuration);
md.duration = mDuration;
LOG(INFO) << "Start time for subframe (" << hbh->orbit << ", "
<< mDuration
<< ") " << timeframeIdFromTimestamp(md.startTime, mDuration) << " " << md.startTime<< "\n";
// send an empty subframe (no detector payload), only the data header
// and the subframe meta data are added to the sub timeframe
// TODO: this is going to be changed as soon as the device implements
// handling of the input data
O2Message outgoing;
// build multipart message from header and payload
AddMessage(outgoing, dh, NewSimpleMessage(md));
char *sourcePayload = nullptr;
auto payloadSize = extractDetectorPayload(&sourcePayload,
incomingBuffer,
inParts.At(1)->GetSize());
LOG(INFO) << "Got " << inParts.Size() << " parts\n";
for (auto pi = 0; pi < inParts.Size(); ++pi)
{
LOG(INFO) << " Part " << pi << ": " << inParts.At(pi)->GetSize() << " bytes \n";
}
if (payloadSize <= 0)
{
LOG(ERROR) << "Payload is too small: " << payloadSize << "\n";
return true;
}
else
{
LOG(INFO) << "Payload of size " << payloadSize << "received\n";
}
auto *payload = new char[payloadSize]();
memcpy(payload, sourcePayload, payloadSize);
DataHeader payloadheader(*AliceO2::Header::get<DataHeader>((byte*)inParts.At(0)->GetData()));
payloadheader.subSpecification = 0;
payloadheader.payloadSize = payloadSize;
// FIXME: take care of multiple HBF per SubtimeFrame
AddMessage(outgoing, payloadheader,
NewMessage(payload, payloadSize,
[](void* data, void* hint) { delete[] reinterpret_cast<char *>(hint); }, payload));
// send message
Send(outgoing, mOutputChannelName.c_str());
outgoing.fParts.clear();
return true;
}
bool AliceO2::DataFlow::SubframeBuilderDevice::HandleData(FairMQParts& msgParts, int /*index*/)
{
// loop over header payload pairs in the incoming multimessage
// for each pair
// - check timestamp
// - create new subtimeframe if none existing where the timestamp of the data fits
// - add pair to the corresponding subtimeframe
// check for completed subtimeframes and send all completed frames
// the builder does not implement the routing to the EPN, this is done in the
// specific FLP-EPN setup
// to fit into the simple emulation of event/frame ids in the flpSender the order of
// subtimeframes needs to be preserved
BuildAndSendFrame(msgParts);
return true;
}