-
Notifications
You must be signed in to change notification settings - Fork 499
Expand file tree
/
Copy patho2DiamondWorkflow.cxx
More file actions
113 lines (104 loc) · 4.77 KB
/
o2DiamondWorkflow.cxx
File metadata and controls
113 lines (104 loc) · 4.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/ConfigParamSpec.h"
#include "Framework/DataTakingContext.h"
#include "Framework/CompletionPolicyHelpers.h"
#include "Framework/DeviceSpec.h"
#include "Framework/RawDeviceService.h"
#include "Framework/ControlService.h"
#include "Framework/Configurable.h"
#include "Framework/RunningWorkflowInfo.h"
#include "Framework/CallbackService.h"
#include "Framework/RateLimiter.h"
#include <fairmq/Device.h>
#include <iostream>
#include <chrono>
#include <thread>
#include <vector>
using namespace o2::framework;
struct WorkflowOptions {
Configurable<int> anInt{"anInt", 1, ""};
Configurable<float> aFloat{"aFloat", 2.0f, {"a float option"}};
Configurable<double> aDouble{"aDouble", 3., {"a double option"}};
Configurable<std::string> aString{"aString", "foobar", {"a string option"}};
Configurable<bool> aBool{"aBool", true, {"a boolean option"}};
};
void customize(std::vector<CallbacksPolicy>& policies)
{
policies.push_back(CallbacksPolicy{
.matcher = DeviceMatchers::matchByName("A"),
.policy = [](CallbackService& service, InitContext&) {
service.set<CallbackService::Id::Start>([]() { LOG(info) << "invoked at start"; });
}});
}
// void customize(std::vector<SendingPolicy>& policies)
//{
// policies.push_back(SendingPolicy{
// .matcher = DeviceMatchers::matchByName("A"),
// .send = [](FairMQDeviceProxy& proxy, fair::mq::Parts& parts, ChannelIndex channelIndex) {
// LOG(info) << "A custom policy for sending invoked!";
// auto* channel = proxy.getOutputChannel(channelIndex);
// channel->Send(parts, 0);
// }});
// }
#include "Framework/runDataProcessing.h"
AlgorithmSpec simplePipe(std::string const& what, int minDelay)
{
return AlgorithmSpec{adaptStateful([what, minDelay](RunningWorkflowInfo const& runningWorkflow) {
srand(getpid());
LOG(info) << "There are " << runningWorkflow.devices.size() << " devices in the workflow";
return adaptStateless([what, minDelay](DataAllocator& outputs, RawDeviceService& device) {
LOGP(info, "Invoked {}", what);
device.device()->WaitFor(std::chrono::milliseconds(minDelay));
auto& bData = outputs.make<int>(OutputRef{what}, 1);
});
})};
}
// This is how you can define your processing in a declarative way
WorkflowSpec defineDataProcessing(ConfigContext const& specs)
{
DataProcessorSpec a{
.name = "A",
.outputs = {OutputSpec{{"a1"}, "TST", "A1"},
OutputSpec{{"a2"}, "TST", "A2"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](DataAllocator& outputs, RawDeviceService& device, DataTakingContext& context, ProcessingContext& pcx) {
// static RateLimiter limiter;
// limiter.check(pcx, std::stoi(device.device()->fConfig->GetValue<std::string>("timeframes-rate-limit")), 2000);
auto& aData = outputs.make<int>(OutputRef{"a1"}, 1);
auto& bData = outputs.make<int>(OutputRef{"a2"}, 1);
})},
.options = {ConfigParamSpec{"some-device-param", VariantType::Int, 1, {"Some device parameter"}},
}};
DataProcessorSpec b{
.name = "B",
.inputs = {InputSpec{"x", "TST", "A1", Lifetime::Timeframe, {ConfigParamSpec{"somestring", VariantType::String, "", {"Some input param"}}}}},
.outputs = {OutputSpec{{"b1"}, "TST", "B1"}},
.algorithm = simplePipe("b1", 1000)};
DataProcessorSpec c{.name = "C",
.inputs = {InputSpec{"x", "TST", "A2"}},
.outputs = {OutputSpec{{"c1"}, "TST", "C1"}},
.algorithm = simplePipe("c1", 2000)};
DataProcessorSpec d{.name = "D",
.inputs = {InputSpec{"a", "TST", "A1"},
InputSpec{"b", "TST", "B1"},
InputSpec{"c", "TST", "C1"}},
.algorithm = AlgorithmSpec{adaptStateless(
[](InputRecord& inputs) {
auto ref = inputs.get("b");
auto header = o2::header::get<const DataProcessingHeader*>(ref.header);
LOG(info) << "Start time: " << header->startTime;
})},
.labels = {{"expendable"}}};
return workflow::concat(WorkflowSpec{a},
WorkflowSpec{b, c},
WorkflowSpec{d});
}