Skip to content

Commit b50062c

Browse files
Prototyping a demonstrator workflow for extended DataDescriptor matching
There is a request to support input specs matching all blocks of a particular origin/description with all but a specific sub spec. E.g. a processor with input spec {TST/SAMPLE/!0} matching all blocks of {TST/SAMPLE} with a subspec other than '0', and the output {TST/SAMPLE/0}. A custom matcher can be created for this purpose using operation 'not' to invert the result of the embedded SubSpecificationTypeValueMatcher.
1 parent ae7aed7 commit b50062c

2 files changed

Lines changed: 149 additions & 0 deletions

File tree

Framework/Core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ foreach(w
320320
Task
321321
ExternalFairMQDeviceWorkflow
322322
VariablePayloadSequenceWorkflow
323+
DataDescriptorMatcherWorkflow
323324
)
324325
o2_add_test(${w} NAME test_Framework_test_${w}
325326
SOURCES test/test_${w}.cxx
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
#include "Framework/DataDescriptorMatcher.h"
13+
#include "Framework/DataDescriptorQueryBuilder.h"
14+
#include "Framework/InputSpec.h"
15+
#include "Framework/DataDescriptorMatcher.h"
16+
#include "Framework/InputRecordWalker.h"
17+
#include "Framework/DataProcessorSpec.h"
18+
#include "Framework/DataRefUtils.h"
19+
#include "Framework/ControlService.h"
20+
#include "Framework/InputRecord.h"
21+
#include "Framework/DataAllocator.h"
22+
23+
// we need to add workflow options before including Framework/runDataProcessing
24+
//void customize(std::vector<ConfigParamSpec>& workflowOptions)
25+
//{
26+
//}
27+
28+
#include "Framework/runDataProcessing.h"
29+
30+
// A test workflow for DataDescriptorNegator
31+
// Create a processor which subscribes to input spec TST/SAMPLE/!0
32+
// meaning TST/SAMPLE with all but subspec 0
33+
// Subscribing processor to TST/SAMPLE/0
34+
35+
// Two tasks:
36+
// - InputSpec matching to OutputSpec: OutputSpec has to options,
37+
// ConcreteDataMatcher and ConcreteDataTypeMatcher, DataSpecUtils has to
38+
// methods matching InputSpec matcher to either if this
39+
// -> this is sufficient for the use case if the negator is implemented
40+
// in DataDescriptorMatcher
41+
// - matching of data packets to InputRoutes of the DataRelayer, also this
42+
// is based on DataDescriptorMatcher
43+
//
44+
// DataDescriptorMatcher extension
45+
// - define Negator
46+
//
47+
// InputSpec definition:
48+
// - refactor to have one constructor with templated parameters
49+
50+
#define ASSERT_ERROR(condition) \
51+
if ((condition) == false) { \
52+
LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
53+
}
54+
55+
using namespace o2::framework;
56+
using DataHeader = o2::header::DataHeader;
57+
58+
std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
59+
{
60+
std::vector<DataProcessorSpec> workflow;
61+
62+
auto producerCallback = [counter = std::make_shared<size_t>()](DataAllocator& outputs, ControlService& control) {
63+
if (*counter > 0) {
64+
// don't know if we enter the processing callback after the EOS was sent
65+
return;
66+
}
67+
outputs.make<unsigned int>(Output{"TST", "SAMPLE", 1}) = 1;
68+
outputs.make<unsigned int>(Output{"TST", "SAMPLE", 2}) = 2;
69+
++(*counter);
70+
control.endOfStream();
71+
};
72+
73+
workflow.emplace_back(DataProcessorSpec{"producer",
74+
{InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
75+
{OutputSpec{"TST", "SAMPLE", 1, Lifetime::Timeframe},
76+
OutputSpec{"TST", "SAMPLE", 2, Lifetime::Timeframe}},
77+
AlgorithmSpec{adaptStateless(producerCallback)},
78+
{ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"timer"}}}});
79+
80+
auto processorCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs, DataAllocator& outputs) {
81+
// should not be called more than one time
82+
ASSERT_ERROR(*counter == 0);
83+
++(*counter);
84+
int nBlocks = 0;
85+
for (auto ref : InputRecordWalker(inputs)) {
86+
auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
87+
ASSERT_ERROR(dh != nullptr);
88+
auto const& data = inputs.get<unsigned int>(ref);
89+
ASSERT_ERROR(data == dh->subSpecification);
90+
outputs.make<unsigned int>(OutputRef{"out", 0}) = data;
91+
LOG(info) << fmt::format("forwarded {}/{}/{} with data {}",
92+
dh->dataOrigin.as<std::string>(),
93+
dh->dataDescription.as<std::string>(),
94+
dh->subSpecification,
95+
data);
96+
++nBlocks;
97+
}
98+
ASSERT_ERROR(nBlocks == 2);
99+
};
100+
101+
using DataDescriptorMatcher = o2::framework::data_matcher::DataDescriptorMatcher;
102+
DataDescriptorMatcher processorInputMatcher = {
103+
DataDescriptorMatcher::Op::And,
104+
data_matcher::OriginValueMatcher{"TST"},
105+
std::make_unique<DataDescriptorMatcher>(
106+
DataDescriptorMatcher::Op::And,
107+
data_matcher::DescriptionValueMatcher{"SAMPLE"},
108+
std::make_unique<DataDescriptorMatcher>(
109+
DataDescriptorMatcher::Op::And,
110+
std::make_unique<DataDescriptorMatcher>(
111+
DataDescriptorMatcher::Op::Not,
112+
data_matcher::SubSpecificationTypeValueMatcher{0}),
113+
std::make_unique<DataDescriptorMatcher>(
114+
DataDescriptorMatcher::Op::Just,
115+
data_matcher::StartTimeValueMatcher(data_matcher::ContextRef{0}))))};
116+
117+
workflow.emplace_back(DataProcessorSpec{"processor",
118+
{InputSpec{"in", std::move(processorInputMatcher), Lifetime::Timeframe}},
119+
{OutputSpec{{"out"}, "TST", "SAMPLE", 0, Lifetime::Timeframe}},
120+
AlgorithmSpec{adaptStateless(processorCallback)}});
121+
122+
auto sinkCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs) {
123+
// should not be called more than one time
124+
ASSERT_ERROR(*counter == 0);
125+
++(*counter);
126+
int nBlocks = 0;
127+
for (auto ref : InputRecordWalker(inputs)) {
128+
auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
129+
ASSERT_ERROR(dh != nullptr);
130+
auto const& data = inputs.get<unsigned int>(ref);
131+
ASSERT_ERROR(data > 0 && data < 3);
132+
LOG(info) << fmt::format("received {}/{}/{} with data {}",
133+
dh->dataOrigin.as<std::string>(),
134+
dh->dataDescription.as<std::string>(),
135+
dh->subSpecification,
136+
data);
137+
++nBlocks;
138+
}
139+
ASSERT_ERROR(nBlocks == 2);
140+
};
141+
142+
workflow.emplace_back(DataProcessorSpec{"sink",
143+
{InputSpec{"in", "TST", "SAMPLE", 0, Lifetime::Timeframe}},
144+
{},
145+
AlgorithmSpec{adaptStateless(sinkCallback)}});
146+
147+
return workflow;
148+
}

0 commit comments

Comments
 (0)