From ae7aed7a7c55a616de315c2c3bb7c951423dce27 Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Tue, 15 Mar 2022 14:54:36 +0100 Subject: [PATCH 1/2] Minor code cleanup, using defined constants for length of DataHeader properties --- Framework/Core/src/DataDescriptorMatcher.cxx | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/Framework/Core/src/DataDescriptorMatcher.cxx b/Framework/Core/src/DataDescriptorMatcher.cxx index 25803946abff3..da5f6b075e4f1 100644 --- a/Framework/Core/src/DataDescriptorMatcher.cxx +++ b/Framework/Core/src/DataDescriptorMatcher.cxx @@ -15,6 +15,7 @@ #include "Framework/DataProcessingHeader.h" #include "Framework/VariantHelpers.h" #include "Framework/RuntimeError.h" +#include "Headers/DataHeader.h" #include "Headers/Stack.h" #include @@ -54,13 +55,13 @@ bool OriginValueMatcher::match(header::DataHeader const& header, VariableContext if (auto ref = std::get_if(&mValue)) { auto& variable = context.get(ref->index); if (auto value = std::get_if(&variable)) { - return strncmp(header.dataOrigin.str, value->c_str(), 4) == 0; + return strncmp(header.dataOrigin.str, value->c_str(), header::DataOrigin::size) == 0; } - auto maxSize = strnlen(header.dataOrigin.str, 4); + auto maxSize = strnlen(header.dataOrigin.str, header::DataOrigin::size); context.put({ref->index, std::string(header.dataOrigin.str, maxSize)}); return true; } else if (auto s = std::get_if(&mValue)) { - return strncmp(header.dataOrigin.str, s->c_str(), 4) == 0; + return strncmp(header.dataOrigin.str, s->c_str(), header::DataOrigin::size) == 0; } throw runtime_error("Mismatching type for variable"); } @@ -70,13 +71,13 @@ bool DescriptionValueMatcher::match(header::DataHeader const& header, VariableCo if (auto ref = std::get_if(&mValue)) { auto& variable = context.get(ref->index); if (auto value = std::get_if(&variable)) { - return strncmp(header.dataDescription.str, value->c_str(), 16) == 0; + return strncmp(header.dataDescription.str, value->c_str(), header::DataDescription::size) == 0; } - auto maxSize = strnlen(header.dataDescription.str, 16); + auto maxSize = strnlen(header.dataDescription.str, header::DataDescription::size); context.put({ref->index, std::string(header.dataDescription.str, maxSize)}); return true; } else if (auto s = std::get_if(&this->mValue)) { - return strncmp(header.dataDescription.str, s->c_str(), 16) == 0; + return strncmp(header.dataDescription.str, s->c_str(), header::DataDescription::size) == 0; } throw runtime_error("Mismatching type for variable"); } From b50062cf618df2a6965860103364bf8eee546d5e Mon Sep 17 00:00:00 2001 From: Matthias Richter Date: Mon, 11 Apr 2022 16:45:13 +0200 Subject: [PATCH 2/2] Prototyping a demonstrator workflow for extended DataDescriptor matching There is a request to support input specs matching all blocks of a particular origin/description with all but a specific sub spec. E.g. a processor with input spec {TST/SAMPLE/!0} matching all blocks of {TST/SAMPLE} with a subspec other than '0', and the output {TST/SAMPLE/0}. A custom matcher can be created for this purpose using operation 'not' to invert the result of the embedded SubSpecificationTypeValueMatcher. --- Framework/Core/CMakeLists.txt | 1 + .../test_DataDescriptorMatcherWorkflow.cxx | 148 ++++++++++++++++++ 2 files changed, 149 insertions(+) create mode 100644 Framework/Core/test/test_DataDescriptorMatcherWorkflow.cxx diff --git a/Framework/Core/CMakeLists.txt b/Framework/Core/CMakeLists.txt index c57fae6422b09..a83dcebdd4b7e 100644 --- a/Framework/Core/CMakeLists.txt +++ b/Framework/Core/CMakeLists.txt @@ -320,6 +320,7 @@ foreach(w Task ExternalFairMQDeviceWorkflow VariablePayloadSequenceWorkflow + DataDescriptorMatcherWorkflow ) o2_add_test(${w} NAME test_Framework_test_${w} SOURCES test/test_${w}.cxx diff --git a/Framework/Core/test/test_DataDescriptorMatcherWorkflow.cxx b/Framework/Core/test/test_DataDescriptorMatcherWorkflow.cxx new file mode 100644 index 0000000000000..ef792afa4cc79 --- /dev/null +++ b/Framework/Core/test/test_DataDescriptorMatcherWorkflow.cxx @@ -0,0 +1,148 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +#include "Framework/DataDescriptorMatcher.h" +#include "Framework/DataDescriptorQueryBuilder.h" +#include "Framework/InputSpec.h" +#include "Framework/DataDescriptorMatcher.h" +#include "Framework/InputRecordWalker.h" +#include "Framework/DataProcessorSpec.h" +#include "Framework/DataRefUtils.h" +#include "Framework/ControlService.h" +#include "Framework/InputRecord.h" +#include "Framework/DataAllocator.h" + +// we need to add workflow options before including Framework/runDataProcessing +//void customize(std::vector& workflowOptions) +//{ +//} + +#include "Framework/runDataProcessing.h" + +// A test workflow for DataDescriptorNegator +// Create a processor which subscribes to input spec TST/SAMPLE/!0 +// meaning TST/SAMPLE with all but subspec 0 +// Subscribing processor to TST/SAMPLE/0 + +// Two tasks: +// - InputSpec matching to OutputSpec: OutputSpec has to options, +// ConcreteDataMatcher and ConcreteDataTypeMatcher, DataSpecUtils has to +// methods matching InputSpec matcher to either if this +// -> this is sufficient for the use case if the negator is implemented +// in DataDescriptorMatcher +// - matching of data packets to InputRoutes of the DataRelayer, also this +// is based on DataDescriptorMatcher +// +// DataDescriptorMatcher extension +// - define Negator +// +// InputSpec definition: +// - refactor to have one constructor with templated parameters + +#define ASSERT_ERROR(condition) \ + if ((condition) == false) { \ + LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \ + } + +using namespace o2::framework; +using DataHeader = o2::header::DataHeader; + +std::vector defineDataProcessing(ConfigContext const& config) +{ + std::vector workflow; + + auto producerCallback = [counter = std::make_shared()](DataAllocator& outputs, ControlService& control) { + if (*counter > 0) { + // don't know if we enter the processing callback after the EOS was sent + return; + } + outputs.make(Output{"TST", "SAMPLE", 1}) = 1; + outputs.make(Output{"TST", "SAMPLE", 2}) = 2; + ++(*counter); + control.endOfStream(); + }; + + workflow.emplace_back(DataProcessorSpec{"producer", + {InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}}, + {OutputSpec{"TST", "SAMPLE", 1, Lifetime::Timeframe}, + OutputSpec{"TST", "SAMPLE", 2, Lifetime::Timeframe}}, + AlgorithmSpec{adaptStateless(producerCallback)}, + {ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"timer"}}}}); + + auto processorCallback = [counter = std::make_shared()](InputRecord& inputs, DataAllocator& outputs) { + // should not be called more than one time + ASSERT_ERROR(*counter == 0); + ++(*counter); + int nBlocks = 0; + for (auto ref : InputRecordWalker(inputs)) { + auto const* dh = DataRefUtils::getHeader(ref); + ASSERT_ERROR(dh != nullptr); + auto const& data = inputs.get(ref); + ASSERT_ERROR(data == dh->subSpecification); + outputs.make(OutputRef{"out", 0}) = data; + LOG(info) << fmt::format("forwarded {}/{}/{} with data {}", + dh->dataOrigin.as(), + dh->dataDescription.as(), + dh->subSpecification, + data); + ++nBlocks; + } + ASSERT_ERROR(nBlocks == 2); + }; + + using DataDescriptorMatcher = o2::framework::data_matcher::DataDescriptorMatcher; + DataDescriptorMatcher processorInputMatcher = { + DataDescriptorMatcher::Op::And, + data_matcher::OriginValueMatcher{"TST"}, + std::make_unique( + DataDescriptorMatcher::Op::And, + data_matcher::DescriptionValueMatcher{"SAMPLE"}, + std::make_unique( + DataDescriptorMatcher::Op::And, + std::make_unique( + DataDescriptorMatcher::Op::Not, + data_matcher::SubSpecificationTypeValueMatcher{0}), + std::make_unique( + DataDescriptorMatcher::Op::Just, + data_matcher::StartTimeValueMatcher(data_matcher::ContextRef{0}))))}; + + workflow.emplace_back(DataProcessorSpec{"processor", + {InputSpec{"in", std::move(processorInputMatcher), Lifetime::Timeframe}}, + {OutputSpec{{"out"}, "TST", "SAMPLE", 0, Lifetime::Timeframe}}, + AlgorithmSpec{adaptStateless(processorCallback)}}); + + auto sinkCallback = [counter = std::make_shared()](InputRecord& inputs) { + // should not be called more than one time + ASSERT_ERROR(*counter == 0); + ++(*counter); + int nBlocks = 0; + for (auto ref : InputRecordWalker(inputs)) { + auto const* dh = DataRefUtils::getHeader(ref); + ASSERT_ERROR(dh != nullptr); + auto const& data = inputs.get(ref); + ASSERT_ERROR(data > 0 && data < 3); + LOG(info) << fmt::format("received {}/{}/{} with data {}", + dh->dataOrigin.as(), + dh->dataDescription.as(), + dh->subSpecification, + data); + ++nBlocks; + } + ASSERT_ERROR(nBlocks == 2); + }; + + workflow.emplace_back(DataProcessorSpec{"sink", + {InputSpec{"in", "TST", "SAMPLE", 0, Lifetime::Timeframe}}, + {}, + AlgorithmSpec{adaptStateless(sinkCallback)}}); + + return workflow; +}