-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Expand file tree
/
Copy pathparallel_graph_pipeline.cpp
More file actions
79 lines (62 loc) · 1.92 KB
/
Copy pathparallel_graph_pipeline.cpp
File metadata and controls
79 lines (62 loc) · 1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// This program demonstrates how to pipeline a sequence of linearly dependent
// tasks (stage function) over a directed acyclic graph.
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
// 1st-stage function
void f1(const std::string& node) {
printf("f1(%s)\n", node.c_str());
}
// 2nd-stage function
void f2(const std::string& node) {
printf("f2(%s)\n", node.c_str());
}
// 3rd-stage function
void f3(const std::string& node) {
printf("f3(%s)\n", node.c_str());
}
int main() {
tf::Taskflow taskflow("graph processing pipeline");
tf::Executor executor;
const size_t num_lines = 2;
// a topological order of the graph
// |-> B
// A--|
// |-> C
const std::vector<std::string> nodes = {"A", "B", "C"};
// the pipeline consists of three serial pipes
// and up to two concurrent scheduling tokens
tf::Pipeline pl(num_lines,
// first pipe calls f1
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
if(pf.token() == nodes.size()) {
pf.stop();
}
else {
f1(nodes[pf.token()]);
}
}},
// second pipe calls f2
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
f2(nodes[pf.token()]);
}},
// third pipe calls f3
tf::Pipe{tf::PipeType::SERIAL, [&](tf::Pipeflow& pf) {
f3(nodes[pf.token()]);
}}
);
// build the pipeline graph using composition
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
.name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
.name("pipeline stopped");
// create task dependency
init.precede(task);
task.precede(stop);
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
return 0;
}