Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Framework/Core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ foreach(w
Task
ExternalFairMQDeviceWorkflow
VariablePayloadSequenceWorkflow
DataDescriptorMatcherWorkflow
)
o2_add_test(${w} NAME test_Framework_test_${w}
SOURCES test/test_${w}.cxx
Expand Down
13 changes: 7 additions & 6 deletions Framework/Core/src/DataDescriptorMatcher.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Framework/DataProcessingHeader.h"
#include "Framework/VariantHelpers.h"
#include "Framework/RuntimeError.h"
#include "Headers/DataHeader.h"
#include "Headers/Stack.h"
#include <iostream>

Expand Down Expand Up @@ -54,13 +55,13 @@ bool OriginValueMatcher::match(header::DataHeader const& header, VariableContext
if (auto ref = std::get_if<ContextRef>(&mValue)) {
auto& variable = context.get(ref->index);
if (auto value = std::get_if<std::string>(&variable)) {
return strncmp(header.dataOrigin.str, value->c_str(), 4) == 0;
return strncmp(header.dataOrigin.str, value->c_str(), header::DataOrigin::size) == 0;
}
auto maxSize = strnlen(header.dataOrigin.str, 4);
auto maxSize = strnlen(header.dataOrigin.str, header::DataOrigin::size);
context.put({ref->index, std::string(header.dataOrigin.str, maxSize)});
return true;
} else if (auto s = std::get_if<std::string>(&mValue)) {
return strncmp(header.dataOrigin.str, s->c_str(), 4) == 0;
return strncmp(header.dataOrigin.str, s->c_str(), header::DataOrigin::size) == 0;
}
throw runtime_error("Mismatching type for variable");
}
Expand All @@ -70,13 +71,13 @@ bool DescriptionValueMatcher::match(header::DataHeader const& header, VariableCo
if (auto ref = std::get_if<ContextRef>(&mValue)) {
auto& variable = context.get(ref->index);
if (auto value = std::get_if<std::string>(&variable)) {
return strncmp(header.dataDescription.str, value->c_str(), 16) == 0;
return strncmp(header.dataDescription.str, value->c_str(), header::DataDescription::size) == 0;
}
auto maxSize = strnlen(header.dataDescription.str, 16);
auto maxSize = strnlen(header.dataDescription.str, header::DataDescription::size);
context.put({ref->index, std::string(header.dataDescription.str, maxSize)});
return true;
} else if (auto s = std::get_if<std::string>(&this->mValue)) {
return strncmp(header.dataDescription.str, s->c_str(), 16) == 0;
return strncmp(header.dataDescription.str, s->c_str(), header::DataDescription::size) == 0;
}
throw runtime_error("Mismatching type for variable");
}
Expand Down
148 changes: 148 additions & 0 deletions Framework/Core/test/test_DataDescriptorMatcherWorkflow.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

#include "Framework/DataDescriptorMatcher.h"
#include "Framework/DataDescriptorQueryBuilder.h"
#include "Framework/InputSpec.h"
#include "Framework/DataDescriptorMatcher.h"
#include "Framework/InputRecordWalker.h"
#include "Framework/DataProcessorSpec.h"
#include "Framework/DataRefUtils.h"
#include "Framework/ControlService.h"
#include "Framework/InputRecord.h"
#include "Framework/DataAllocator.h"

// we need to add workflow options before including Framework/runDataProcessing
//void customize(std::vector<ConfigParamSpec>& workflowOptions)
//{
//}

#include "Framework/runDataProcessing.h"

// A test workflow for DataDescriptorNegator
// Create a processor which subscribes to input spec TST/SAMPLE/!0
// meaning TST/SAMPLE with all but subspec 0
// Subscribing processor to TST/SAMPLE/0

// Two tasks:
// - InputSpec matching to OutputSpec: OutputSpec has to options,
// ConcreteDataMatcher and ConcreteDataTypeMatcher, DataSpecUtils has to
// methods matching InputSpec matcher to either if this
// -> this is sufficient for the use case if the negator is implemented
// in DataDescriptorMatcher
// - matching of data packets to InputRoutes of the DataRelayer, also this
// is based on DataDescriptorMatcher
//
// DataDescriptorMatcher extension
// - define Negator
//
// InputSpec definition:
// - refactor to have one constructor with templated parameters

#define ASSERT_ERROR(condition) \
if ((condition) == false) { \
LOG(fatal) << R"(Test condition ")" #condition R"(" failed)"; \
}

using namespace o2::framework;
using DataHeader = o2::header::DataHeader;

std::vector<DataProcessorSpec> defineDataProcessing(ConfigContext const& config)
{
std::vector<DataProcessorSpec> workflow;

auto producerCallback = [counter = std::make_shared<size_t>()](DataAllocator& outputs, ControlService& control) {
if (*counter > 0) {
// don't know if we enter the processing callback after the EOS was sent
return;
}
outputs.make<unsigned int>(Output{"TST", "SAMPLE", 1}) = 1;
outputs.make<unsigned int>(Output{"TST", "SAMPLE", 2}) = 2;
++(*counter);
control.endOfStream();
};

workflow.emplace_back(DataProcessorSpec{"producer",
{InputSpec{"timer", "TST", "TIMER", 0, Lifetime::Timer}},
{OutputSpec{"TST", "SAMPLE", 1, Lifetime::Timeframe},
OutputSpec{"TST", "SAMPLE", 2, Lifetime::Timeframe}},
AlgorithmSpec{adaptStateless(producerCallback)},
{ConfigParamSpec{"period-timer", VariantType::Int, 100000, {"timer"}}}});

auto processorCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs, DataAllocator& outputs) {
// should not be called more than one time
ASSERT_ERROR(*counter == 0);
++(*counter);
int nBlocks = 0;
for (auto ref : InputRecordWalker(inputs)) {
auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
ASSERT_ERROR(dh != nullptr);
auto const& data = inputs.get<unsigned int>(ref);
ASSERT_ERROR(data == dh->subSpecification);
outputs.make<unsigned int>(OutputRef{"out", 0}) = data;
LOG(info) << fmt::format("forwarded {}/{}/{} with data {}",
dh->dataOrigin.as<std::string>(),
dh->dataDescription.as<std::string>(),
dh->subSpecification,
data);
++nBlocks;
}
ASSERT_ERROR(nBlocks == 2);
};

using DataDescriptorMatcher = o2::framework::data_matcher::DataDescriptorMatcher;
DataDescriptorMatcher processorInputMatcher = {
DataDescriptorMatcher::Op::And,
data_matcher::OriginValueMatcher{"TST"},
std::make_unique<DataDescriptorMatcher>(
DataDescriptorMatcher::Op::And,
data_matcher::DescriptionValueMatcher{"SAMPLE"},
std::make_unique<DataDescriptorMatcher>(
DataDescriptorMatcher::Op::And,
std::make_unique<DataDescriptorMatcher>(
DataDescriptorMatcher::Op::Not,
data_matcher::SubSpecificationTypeValueMatcher{0}),
std::make_unique<DataDescriptorMatcher>(
DataDescriptorMatcher::Op::Just,
data_matcher::StartTimeValueMatcher(data_matcher::ContextRef{0}))))};

workflow.emplace_back(DataProcessorSpec{"processor",
{InputSpec{"in", std::move(processorInputMatcher), Lifetime::Timeframe}},
{OutputSpec{{"out"}, "TST", "SAMPLE", 0, Lifetime::Timeframe}},
AlgorithmSpec{adaptStateless(processorCallback)}});

auto sinkCallback = [counter = std::make_shared<size_t>()](InputRecord& inputs) {
// should not be called more than one time
ASSERT_ERROR(*counter == 0);
++(*counter);
int nBlocks = 0;
for (auto ref : InputRecordWalker(inputs)) {
auto const* dh = DataRefUtils::getHeader<DataHeader*>(ref);
ASSERT_ERROR(dh != nullptr);
auto const& data = inputs.get<unsigned int>(ref);
ASSERT_ERROR(data > 0 && data < 3);
LOG(info) << fmt::format("received {}/{}/{} with data {}",
dh->dataOrigin.as<std::string>(),
dh->dataDescription.as<std::string>(),
dh->subSpecification,
data);
++nBlocks;
}
ASSERT_ERROR(nBlocks == 2);
};

workflow.emplace_back(DataProcessorSpec{"sink",
{InputSpec{"in", "TST", "SAMPLE", 0, Lifetime::Timeframe}},
{},
AlgorithmSpec{adaptStateless(sinkCallback)}});

return workflow;
}