forked from taskflow/taskflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline.cpp
More file actions
84 lines (71 loc) · 2.28 KB
/
Copy pathpipeline.cpp
File metadata and controls
84 lines (71 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// This program demonstrates how to create a pipeline scheduling framework
// that propagates a series of integers and adds one to the result at each
// stage.
//
// The pipeline has the following structure:
//
// o -> o -> o
// | |
// v v
// o -> o -> o
// | |
// v v
// o -> o -> o
// | |
// v v
// o -> o -> o
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/pipeline.hpp>
int main() {
tf::Taskflow taskflow("pipeline");
tf::Executor executor;
const size_t num_lines = 4;
// custom data storage
std::array<size_t, num_lines> buffer;
// the pipeline consists of three pipes (serial-parallel-serial)
// and up to four concurrent scheduling tokens
tf::Pipeline pl(num_lines,
tf::Pipe{tf::PipeType::SERIAL, [&buffer](tf::Pipeflow& pf) {
// generate only 5 scheduling tokens
if(pf.token() == 5) {
pf.stop();
}
// save the result of this pipe into the buffer
else {
printf("stage 1: input token = %zu\n", pf.token());
buffer[pf.line()] = pf.token();
}
}},
tf::Pipe{tf::PipeType::PARALLEL, [&buffer](tf::Pipeflow& pf) {
printf(
"stage 2: input buffer[%zu] = %zu\n", pf.line(), buffer[pf.line()]
);
// propagate the previous result to this pipe and increment
// it by one
buffer[pf.line()] = buffer[pf.line()] + 1;
}},
tf::Pipe{tf::PipeType::SERIAL, [&buffer](tf::Pipeflow& pf) {
printf(
"stage 3: input buffer[%zu] = %zu\n", pf.line(), buffer[pf.line()]
);
// propagate the previous result to this pipe and increment
// it by one
buffer[pf.line()] = buffer[pf.line()] + 1;
}}
);
// build the pipeline graph using composition
tf::Task init = taskflow.emplace([](){ std::cout << "ready\n"; })
.name("starting pipeline");
tf::Task task = taskflow.composed_of(pl)
.name("pipeline");
tf::Task stop = taskflow.emplace([](){ std::cout << "stopped\n"; })
.name("pipeline stopped");
// create task dependency
init.precede(task);
task.precede(stop);
// dump the pipeline graph structure (with composition)
taskflow.dump(std::cout);
// run the pipeline
executor.run(taskflow).wait();
return 0;
}