1818#include " Framework/CompletionPolicyHelpers.h"
1919#include " Framework/DataRelayer.h"
2020#include " Framework/DataProcessingHeader.h"
21+ #include " Framework/WorkflowSpec.h"
2122#include < Monitoring/Monitoring.h>
2223#include < fairmq/FairMQTransportFactory.h>
2324#include < cstring>
@@ -68,6 +69,45 @@ BOOST_AUTO_TEST_CASE(TestNoWait) {
6869 BOOST_REQUIRE_EQUAL (result.size (),2 );
6970}
7071
72+ //
73+ BOOST_AUTO_TEST_CASE (TestNoWaitMatcher) {
74+ Monitoring metrics;
75+ auto specs = o2::framework::select (" clusters:TPC/CLUSTERS" );
76+
77+ std::vector<InputRoute> inputs = {
78+ InputRoute{ specs[0 ], " Fake" , 0 }
79+ };
80+
81+ std::vector<ForwardRoute> forwards;
82+ TimesliceIndex index;
83+
84+ auto policy = CompletionPolicyHelpers::consumeWhenAny ();
85+ DataRelayer relayer (policy, inputs, forwards, metrics, index);
86+ relayer.setPipelineLength (4 );
87+
88+ // Let's create a dummy O2 Message with two headers in the stack:
89+ // - DataHeader matching the one provided in the input
90+ DataHeader dh;
91+ dh.dataDescription = " CLUSTERS" ;
92+ dh.dataOrigin = " TPC" ;
93+ dh.subSpecification = 0 ;
94+
95+ DataProcessingHeader dph{0 ,1 };
96+ Stack stack{dh, dph};
97+ auto transport = FairMQTransportFactory::CreateTransportFactory (" zeromq" );
98+ FairMQMessagePtr header = transport->CreateMessage (stack.size ());
99+ FairMQMessagePtr payload = transport->CreateMessage (1000 );
100+ memcpy (header->GetData (), stack.data (), stack.size ());
101+ relayer.relay (std::move (header),std::move (payload));
102+ auto ready = relayer.getReadyToProcess ();
103+ BOOST_REQUIRE_EQUAL (ready.size (), 1 );
104+ BOOST_CHECK_EQUAL (ready[0 ].slot .index , 0 );
105+ BOOST_CHECK_EQUAL (ready[0 ].op , CompletionPolicy::CompletionOp::Consume);
106+ auto result = relayer.getInputsForTimeslice (ready[0 ].slot );
107+ // One for the header, one for the payload
108+ BOOST_REQUIRE_EQUAL (result.size (),2 );
109+ }
110+
71111// This test a more complicated set of inputs, and verifies that data is
72112// correctly relayed before being processed.
73113BOOST_AUTO_TEST_CASE (TestRelay) {
@@ -98,8 +138,8 @@ BOOST_AUTO_TEST_CASE(TestRelay) {
98138
99139 auto transport = FairMQTransportFactory::CreateTransportFactory (" zeromq" );
100140
101- auto createMessage = [&transport,&relayer] (DataHeader &dh) {
102- DataProcessingHeader dph{0 ,1 };
141+ auto createMessage = [&transport,&relayer] (DataHeader &dh, size_t time ) {
142+ DataProcessingHeader dph{time ,1 };
103143 Stack stack{dh, dph};
104144 FairMQMessagePtr header = transport->CreateMessage (stack.size ());
105145 FairMQMessagePtr payload = transport->CreateMessage (1000 );
@@ -120,11 +160,11 @@ BOOST_AUTO_TEST_CASE(TestRelay) {
120160 dh2.dataOrigin = " ITS" ;
121161 dh2.subSpecification = 0 ;
122162
123- createMessage (dh1);
163+ createMessage (dh1, 0 );
124164 auto ready = relayer.getReadyToProcess ();
125165 BOOST_REQUIRE_EQUAL (ready.size (), 0 );
126166
127- createMessage (dh2);
167+ createMessage (dh2, 0 );
128168 ready = relayer.getReadyToProcess ();
129169 BOOST_REQUIRE_EQUAL (ready.size (), 1 );
130170 BOOST_CHECK_EQUAL (ready[0 ].slot .index , 0 );
@@ -135,6 +175,86 @@ BOOST_AUTO_TEST_CASE(TestRelay) {
135175 BOOST_REQUIRE_EQUAL (result.size (),4 );
136176}
137177
178+ // This test a more complicated set of inputs, and verifies that data is
179+ // correctly relayed before being processed.
180+ BOOST_AUTO_TEST_CASE (TestRelayBug) {
181+ Monitoring metrics;
182+ InputSpec spec1{
183+ " clusters" ,
184+ " TPC" ,
185+ " CLUSTERS" ,
186+ };
187+ InputSpec spec2{
188+ " clusters_its" ,
189+ " ITS" ,
190+ " CLUSTERS" ,
191+ };
192+
193+ std::vector<InputRoute> inputs = {
194+ InputRoute{ spec1, " Fake1" , 0 },
195+ InputRoute{ spec2, " Fake2" , 0 }
196+ };
197+
198+ std::vector<ForwardRoute> forwards;
199+
200+ TimesliceIndex index;
201+
202+ auto policy = CompletionPolicyHelpers::consumeWhenAll ();
203+ DataRelayer relayer (policy, inputs, forwards, metrics, index);
204+ relayer.setPipelineLength (3 );
205+
206+ auto transport = FairMQTransportFactory::CreateTransportFactory (" zeromq" );
207+
208+ auto createMessage = [&transport,&relayer] (DataHeader &dh, size_t time) {
209+ DataProcessingHeader dph{time,1 };
210+ Stack stack{dh, dph};
211+ FairMQMessagePtr header = transport->CreateMessage (stack.size ());
212+ FairMQMessagePtr payload = transport->CreateMessage (1000 );
213+ memcpy (header->GetData (), stack.data (), stack.size ());
214+ relayer.relay (std::move (header),std::move (payload));
215+ };
216+
217+ // Let's create a dummy O2 Message with two headers in the stack:
218+ // - DataHeader matching the one provided in the input
219+ DataHeader dh1;
220+ dh1.dataDescription = " CLUSTERS" ;
221+ dh1.dataOrigin = " TPC" ;
222+ dh1.subSpecification = 0 ;
223+
224+ // Let's create the second O2 Message:
225+ DataHeader dh2;
226+ dh2.dataDescription = " CLUSTERS" ;
227+ dh2.dataOrigin = " ITS" ;
228+ dh2.subSpecification = 0 ;
229+
230+ // Let's create the second O2 Message:
231+ DataHeader dh3;
232+ dh3.dataDescription = " CLUSTERS" ;
233+ dh3.dataOrigin = " FOO" ;
234+ dh3.subSpecification = 0 ;
235+
236+
237+ // / Reproduce the bug reported by Matthias in https://github.com/AliceO2Group/AliceO2/pull/1483
238+ createMessage (dh1, 0 );
239+ auto ready = relayer.getReadyToProcess ();
240+ BOOST_REQUIRE_EQUAL (ready.size (), 0 );
241+ createMessage (dh1, 1 );
242+ ready = relayer.getReadyToProcess ();
243+ BOOST_REQUIRE_EQUAL (ready.size (), 0 );
244+ createMessage (dh2, 0 );
245+ ready = relayer.getReadyToProcess ();
246+ BOOST_REQUIRE_EQUAL (ready.size (), 1 );
247+ BOOST_CHECK_EQUAL (ready[0 ].slot .index , 0 );
248+ BOOST_CHECK_EQUAL (ready[0 ].op , CompletionPolicy::CompletionOp::Consume);
249+ auto result = relayer.getInputsForTimeslice (ready[0 ].slot );
250+ createMessage (dh2, 1 );
251+ ready = relayer.getReadyToProcess ();
252+ BOOST_REQUIRE_EQUAL (ready.size (), 1 );
253+ BOOST_CHECK_EQUAL (ready[0 ].slot .index , 1 );
254+ BOOST_CHECK_EQUAL (ready[0 ].op , CompletionPolicy::CompletionOp::Consume);
255+ result = relayer.getInputsForTimeslice (ready[0 ].slot );
256+ }
257+
138258// This tests a simple cache pruning, where a single input is shifted out of
139259// the cache.
140260BOOST_AUTO_TEST_CASE (TestCache) {
0 commit comments