Skip to content

Commit 171c26f

Browse files
committed
DPL: fix matthias bug
1 parent c741a4a commit 171c26f

2 files changed

Lines changed: 144 additions & 4 deletions

File tree

Framework/Core/src/DataRelayer.cxx

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,14 +241,34 @@ DataRelayer::relay(std::unique_ptr<FairMQMessage> &&header,
241241
auto timeslice = TimesliceId{ TimesliceId::INVALID };
242242
auto slot = TimesliceSlot{ TimesliceSlot::INVALID };
243243

244+
// First look for matching slots which already have some
245+
// partial match.
244246
for (size_t ci = 0; ci < index.size(); ++ci) {
245247
slot = TimesliceSlot{ ci };
248+
if (index.isValid(slot) == false) {
249+
continue;
250+
}
246251
std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
247252
if (input != INVALID_INPUT) {
248253
break;
249254
}
250255
}
251256

257+
// If we did not find anything, look for slots which
258+
// are invalid.
259+
if (input == INVALID_INPUT) {
260+
for (size_t ci = 0; ci < index.size(); ++ci) {
261+
slot = TimesliceSlot{ ci };
262+
if (index.isValid(slot) == true) {
263+
continue;
264+
}
265+
std::tie(input, timeslice) = getInputTimeslice(index.getVariablesForSlot(slot));
266+
if (input != INVALID_INPUT) {
267+
break;
268+
}
269+
}
270+
}
271+
252272
/// If we get a valid result, we can store the message in cache.
253273
if (input != INVALID_INPUT && TimesliceId::isValid(timeslice) && TimesliceSlot::isValid(slot)) {
254274
LOG(DEBUG) << "Received timeslice " << timeslice.value;

Framework/Core/test/test_DataRelayer.cxx

Lines changed: 124 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "Framework/CompletionPolicyHelpers.h"
1919
#include "Framework/DataRelayer.h"
2020
#include "Framework/DataProcessingHeader.h"
21+
#include "Framework/WorkflowSpec.h"
2122
#include <Monitoring/Monitoring.h>
2223
#include <fairmq/FairMQTransportFactory.h>
2324
#include <cstring>
@@ -68,6 +69,45 @@ BOOST_AUTO_TEST_CASE(TestNoWait) {
6869
BOOST_REQUIRE_EQUAL(result.size(),2);
6970
}
7071

72+
//
73+
BOOST_AUTO_TEST_CASE(TestNoWaitMatcher) {
74+
Monitoring metrics;
75+
auto specs = o2::framework::select("clusters:TPC/CLUSTERS");
76+
77+
std::vector<InputRoute> inputs = {
78+
InputRoute{ specs[0], "Fake", 0 }
79+
};
80+
81+
std::vector<ForwardRoute> forwards;
82+
TimesliceIndex index;
83+
84+
auto policy = CompletionPolicyHelpers::consumeWhenAny();
85+
DataRelayer relayer(policy, inputs, forwards, metrics, index);
86+
relayer.setPipelineLength(4);
87+
88+
// Let's create a dummy O2 Message with two headers in the stack:
89+
// - DataHeader matching the one provided in the input
90+
DataHeader dh;
91+
dh.dataDescription = "CLUSTERS";
92+
dh.dataOrigin = "TPC";
93+
dh.subSpecification = 0;
94+
95+
DataProcessingHeader dph{0,1};
96+
Stack stack{dh, dph};
97+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
98+
FairMQMessagePtr header = transport->CreateMessage(stack.size());
99+
FairMQMessagePtr payload = transport->CreateMessage(1000);
100+
memcpy(header->GetData(), stack.data(), stack.size());
101+
relayer.relay(std::move(header),std::move(payload));
102+
auto ready = relayer.getReadyToProcess();
103+
BOOST_REQUIRE_EQUAL(ready.size(), 1);
104+
BOOST_CHECK_EQUAL(ready[0].slot.index, 0);
105+
BOOST_CHECK_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
106+
auto result = relayer.getInputsForTimeslice(ready[0].slot);
107+
// One for the header, one for the payload
108+
BOOST_REQUIRE_EQUAL(result.size(),2);
109+
}
110+
71111
// This test a more complicated set of inputs, and verifies that data is
72112
// correctly relayed before being processed.
73113
BOOST_AUTO_TEST_CASE(TestRelay) {
@@ -98,8 +138,8 @@ BOOST_AUTO_TEST_CASE(TestRelay) {
98138

99139
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
100140

101-
auto createMessage = [&transport,&relayer] (DataHeader &dh) {
102-
DataProcessingHeader dph{0,1};
141+
auto createMessage = [&transport,&relayer] (DataHeader &dh, size_t time) {
142+
DataProcessingHeader dph{time,1};
103143
Stack stack{dh, dph};
104144
FairMQMessagePtr header = transport->CreateMessage(stack.size());
105145
FairMQMessagePtr payload = transport->CreateMessage(1000);
@@ -120,11 +160,11 @@ BOOST_AUTO_TEST_CASE(TestRelay) {
120160
dh2.dataOrigin = "ITS";
121161
dh2.subSpecification = 0;
122162

123-
createMessage(dh1);
163+
createMessage(dh1, 0);
124164
auto ready = relayer.getReadyToProcess();
125165
BOOST_REQUIRE_EQUAL(ready.size(), 0);
126166

127-
createMessage(dh2);
167+
createMessage(dh2, 0);
128168
ready = relayer.getReadyToProcess();
129169
BOOST_REQUIRE_EQUAL(ready.size(), 1);
130170
BOOST_CHECK_EQUAL(ready[0].slot.index, 0);
@@ -135,6 +175,86 @@ BOOST_AUTO_TEST_CASE(TestRelay) {
135175
BOOST_REQUIRE_EQUAL(result.size(),4);
136176
}
137177

178+
// This test a more complicated set of inputs, and verifies that data is
179+
// correctly relayed before being processed.
180+
BOOST_AUTO_TEST_CASE(TestRelayBug) {
181+
Monitoring metrics;
182+
InputSpec spec1{
183+
"clusters",
184+
"TPC",
185+
"CLUSTERS",
186+
};
187+
InputSpec spec2{
188+
"clusters_its",
189+
"ITS",
190+
"CLUSTERS",
191+
};
192+
193+
std::vector<InputRoute> inputs = {
194+
InputRoute{ spec1, "Fake1", 0 },
195+
InputRoute{ spec2, "Fake2", 0 }
196+
};
197+
198+
std::vector<ForwardRoute> forwards;
199+
200+
TimesliceIndex index;
201+
202+
auto policy = CompletionPolicyHelpers::consumeWhenAll();
203+
DataRelayer relayer(policy, inputs, forwards, metrics, index);
204+
relayer.setPipelineLength(3);
205+
206+
auto transport = FairMQTransportFactory::CreateTransportFactory("zeromq");
207+
208+
auto createMessage = [&transport,&relayer] (DataHeader &dh, size_t time) {
209+
DataProcessingHeader dph{time,1};
210+
Stack stack{dh, dph};
211+
FairMQMessagePtr header = transport->CreateMessage(stack.size());
212+
FairMQMessagePtr payload = transport->CreateMessage(1000);
213+
memcpy(header->GetData(), stack.data(), stack.size());
214+
relayer.relay(std::move(header),std::move(payload));
215+
};
216+
217+
// Let's create a dummy O2 Message with two headers in the stack:
218+
// - DataHeader matching the one provided in the input
219+
DataHeader dh1;
220+
dh1.dataDescription = "CLUSTERS";
221+
dh1.dataOrigin = "TPC";
222+
dh1.subSpecification = 0;
223+
224+
// Let's create the second O2 Message:
225+
DataHeader dh2;
226+
dh2.dataDescription = "CLUSTERS";
227+
dh2.dataOrigin = "ITS";
228+
dh2.subSpecification = 0;
229+
230+
// Let's create the second O2 Message:
231+
DataHeader dh3;
232+
dh3.dataDescription = "CLUSTERS";
233+
dh3.dataOrigin = "FOO";
234+
dh3.subSpecification = 0;
235+
236+
237+
/// Reproduce the bug reported by Matthias in https://github.com/AliceO2Group/AliceO2/pull/1483
238+
createMessage(dh1, 0);
239+
auto ready = relayer.getReadyToProcess();
240+
BOOST_REQUIRE_EQUAL(ready.size(), 0);
241+
createMessage(dh1, 1);
242+
ready = relayer.getReadyToProcess();
243+
BOOST_REQUIRE_EQUAL(ready.size(), 0);
244+
createMessage(dh2, 0);
245+
ready = relayer.getReadyToProcess();
246+
BOOST_REQUIRE_EQUAL(ready.size(), 1);
247+
BOOST_CHECK_EQUAL(ready[0].slot.index, 0);
248+
BOOST_CHECK_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
249+
auto result = relayer.getInputsForTimeslice(ready[0].slot);
250+
createMessage(dh2, 1);
251+
ready = relayer.getReadyToProcess();
252+
BOOST_REQUIRE_EQUAL(ready.size(), 1);
253+
BOOST_CHECK_EQUAL(ready[0].slot.index, 1);
254+
BOOST_CHECK_EQUAL(ready[0].op, CompletionPolicy::CompletionOp::Consume);
255+
result = relayer.getInputsForTimeslice(ready[0].slot);
256+
}
257+
138258
// This tests a simple cache pruning, where a single input is shifted out of
139259
// the cache.
140260
BOOST_AUTO_TEST_CASE(TestCache) {

0 commit comments

Comments
 (0)