|
| 1 | +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. |
| 2 | +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. |
| 3 | +// All rights not expressly granted are reserved. |
| 4 | +// |
| 5 | +// This software is distributed under the terms of the GNU General Public |
| 6 | +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". |
| 7 | +// |
| 8 | +// In applying this license CERN does not waive the privileges and immunities |
| 9 | +// granted to it by virtue of its status as an Intergovernmental Organization |
| 10 | +// or submit itself to any jurisdiction. |
| 11 | + |
| 12 | +#include "Framework/DataDescriptorMatcher.h" |
| 13 | +#include "Framework/DataDescriptorQueryBuilder.h" |
| 14 | +#include "Framework/InputSpec.h" |
| 15 | +#include "Framework/DataDescriptorMatcher.h" |
| 16 | +#include "Framework/InputRecordWalker.h" |
| 17 | +#include "Framework/DataProcessorSpec.h" |
| 18 | +#include "Framework/DataRefUtils.h" |
| 19 | +#include "Framework/ControlService.h" |
| 20 | +#include "Framework/InputRecord.h" |
| 21 | +#include "Framework/DataAllocator.h" |
| 22 | + |
| 23 | +// we need to add workflow options before including Framework/runDataProcessing |
| 24 | +//void customize(std::vector<ConfigParamSpec>& workflowOptions) |
| 25 | +//{ |
| 26 | +//} |
| 27 | + |
| 28 | +#include "Framework/runDataProcessing.h" |
| 29 | + |
| 30 | +// A test workflow for DataDescriptorNegator |
| 31 | +// Create a processor which subscribes to input spec TST/SAMPLE/!0 |
| 32 | +// meaning TST/SAMPLE with all but subspec 0 |
| 33 | +// Subscribing processor to TST/SAMPLE/0 |
| 34 | + |
| 35 | +// Two tasks: |
| 36 | +// - InputSpec matching to OutputSpec: OutputSpec has to options, |
| 37 | +// ConcreteDataMatcher and ConcreteDataTypeMatcher, DataSpecUtils has to |
| 38 | +// methods matching InputSpec matcher to either if this |
| 39 | +// -> this is sufficient for the use case if the negator is implemented |
| 40 | +// in DataDescriptorMatcher |
| 41 | +// - matching of data packets to InputRoutes of the DataRelayer, also this |
| 42 | +// is based on DataDescriptorMatcher |
| 43 | +// |
| 44 | +// DataDescriptorMatcher extension |
| 45 | +// - define Negator |
| 46 | +// |
| 47 | +// InputSpec definition: |
| 48 | +// - refactor to have one constructor with templated parameters |
| 49 | + |
| 50 | +#define ASSERT_ERROR(condition) \ |
| 51 | + if ((condition) == false) { \ |
| 52 | + LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \ |
| 53 | + } |
| 54 | + |
| 55 | +using namespace o2::framework; |
| 56 | +using DataHeader = o2::header::DataHeader; |
| 57 | + |
| 58 | +std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config) |
| 59 | +{ |
| 60 | + std::vector<DataProcessorSpec> workflow; |
| 61 | + |
| 62 | + auto producerCallback = [counter = std::make_shared<size_t>()](DataAllocator& outputs, ControlService& control) { |
| 63 | + if (*counter > 0) { |
| 64 | + // don't know if we enter the processing callback after the EOS was sent |
| 65 | + return; |
| 66 | + } |
| 67 | + outputs.make<unsigned int>(Output{"TST", "SAMPLE", 1}) = 1; |
| 68 | + outputs.make<unsigned int>(Output{"TST", "SAMPLE", 2}) = 2; |
| 69 | + ++(*counter); |
| 70 | + control.endOfStream(); |
| 71 | + }; |
| 72 | + |
| 73 | + workflow.emplace_back(DataProcessorSpec{"producer", |
| 74 | + {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, |
| 75 | + {OutputSpec{"TST", "SAMPLE", 1, Lifetime::Timeframe}, |
| 76 | + OutputSpec{"TST", "SAMPLE", 2, Lifetime::Timeframe}}, |
| 77 | + AlgorithmSpec{adaptStateless(producerCallback)}, |
| 78 | + {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"timer"}}}}); |
| 79 | + |
| 80 | + auto processorCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs, DataAllocator& outputs) { |
| 81 | + // should not be called more than one time |
| 82 | + ASSERT_ERROR(*counter == 0); |
| 83 | + ++(*counter); |
| 84 | + int nBlocks = 0; |
| 85 | + for (auto ref : InputRecordWalker(inputs)) { |
| 86 | + auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref); |
| 87 | + ASSERT_ERROR(dh != nullptr); |
| 88 | + auto const& data = inputs.get<unsigned int>(ref); |
| 89 | + ASSERT_ERROR(data == dh->subSpecification); |
| 90 | + outputs.make<unsigned int>(OutputRef{"out", 0}) = data; |
| 91 | + LOG(info) << fmt::format("forwarded {}/{}/{} with data {}", |
| 92 | + dh->dataOrigin.as<std::string>(), |
| 93 | + dh->dataDescription.as<std::string>(), |
| 94 | + dh->subSpecification, |
| 95 | + data); |
| 96 | + ++nBlocks; |
| 97 | + } |
| 98 | + ASSERT_ERROR(nBlocks == 2); |
| 99 | + }; |
| 100 | + |
| 101 | + using DataDescriptorMatcher = o2::framework::data_matcher::DataDescriptorMatcher; |
| 102 | + DataDescriptorMatcher processorInputMatcher = { |
| 103 | + DataDescriptorMatcher::Op::And, |
| 104 | + data_matcher::OriginValueMatcher{"TST"}, |
| 105 | + std::make_unique<DataDescriptorMatcher>( |
| 106 | + DataDescriptorMatcher::Op::And, |
| 107 | + data_matcher::DescriptionValueMatcher{"SAMPLE"}, |
| 108 | + std::make_unique<DataDescriptorMatcher>( |
| 109 | + DataDescriptorMatcher::Op::And, |
| 110 | + std::make_unique<DataDescriptorMatcher>( |
| 111 | + DataDescriptorMatcher::Op::Not, |
| 112 | + data_matcher::SubSpecificationTypeValueMatcher{0}), |
| 113 | + std::make_unique<DataDescriptorMatcher>( |
| 114 | + DataDescriptorMatcher::Op::Just, |
| 115 | + data_matcher::StartTimeValueMatcher(data_matcher::ContextRef{0}))))}; |
| 116 | + |
| 117 | + workflow.emplace_back(DataProcessorSpec{"processor", |
| 118 | + {InputSpec{"in", std::move(processorInputMatcher), Lifetime::Timeframe}}, |
| 119 | + {OutputSpec{{"out"}, "TST", "SAMPLE", 0, Lifetime::Timeframe}}, |
| 120 | + AlgorithmSpec{adaptStateless(processorCallback)}}); |
| 121 | + |
| 122 | + auto sinkCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs) { |
| 123 | + // should not be called more than one time |
| 124 | + ASSERT_ERROR(*counter == 0); |
| 125 | + ++(*counter); |
| 126 | + int nBlocks = 0; |
| 127 | + for (auto ref : InputRecordWalker(inputs)) { |
| 128 | + auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref); |
| 129 | + ASSERT_ERROR(dh != nullptr); |
| 130 | + auto const& data = inputs.get<unsigned int>(ref); |
| 131 | + ASSERT_ERROR(data > 0 && data < 3); |
| 132 | + LOG(info) << fmt::format("received {}/{}/{} with data {}", |
| 133 | + dh->dataOrigin.as<std::string>(), |
| 134 | + dh->dataDescription.as<std::string>(), |
| 135 | + dh->subSpecification, |
| 136 | + data); |
| 137 | + ++nBlocks; |
| 138 | + } |
| 139 | + ASSERT_ERROR(nBlocks == 2); |
| 140 | + }; |
| 141 | + |
| 142 | + workflow.emplace_back(DataProcessorSpec{"sink", |
| 143 | + {InputSpec{"in", "TST", "SAMPLE", 0, Lifetime::Timeframe}}, |
| 144 | + {}, |
| 145 | + AlgorithmSpec{adaptStateless(sinkCallback)}}); |
| 146 | + |
| 147 | + return workflow; |
| 148 | +} |
0 commit comments