-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Expand file tree
/
Copy pathExamplesAsyncProducerConsumer.html
More file actions
238 lines (236 loc) · 18.4 KB
/
Copy pathExamplesAsyncProducerConsumer.html
File metadata and controls
238 lines (236 loc) · 18.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
<!-- HTML header for doxygen 1.13.1-->
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "https://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="en-US">
<head>
<meta http-equiv="Content-Type" content="text/xhtml;charset=UTF-8"/>
<meta http-equiv="X-UA-Compatible" content="IE=11"/>
<meta name="generator" content="Doxygen 1.13.1"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<title>Taskflow: A General-purpose Task-parallel Programming System: Async Producer-Consumer Pipeline</title>
<link href="tabs.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="jquery.js"></script>
<script type="text/javascript" src="dynsections.js"></script>
<script type="text/javascript" src="clipboard.js"></script>
<link href="navtree.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="navtreedata.js"></script>
<script type="text/javascript" src="navtree.js"></script>
<script type="text/javascript" src="resize.js"></script>
<script type="text/javascript" src="cookie.js"></script>
<link href="search/search.css" rel="stylesheet" type="text/css"/>
<script type="text/javascript" src="search/searchdata.js"></script>
<script type="text/javascript" src="search/search.js"></script>
<link href="doxygen.css" rel="stylesheet" type="text/css" />
<link href="custom.css" rel="stylesheet" type="text/css"/>
</head>
<body>
<div id="top"><!-- do not remove this div, it is closed by doxygen! -->
<div id="titlearea">
<table cellspacing="0" cellpadding="0">
<tbody>
<tr id="projectrow">
<td id="projectlogo"><img alt="Logo" src="taskflow_logo.png"/></td>
<td id="projectalign">
<div id="projectname"><a href="https://github.com/taskflow/taskflow" style="color:inherit; text-decoration:none;">Taskflow: A General-purpose Task-parallel Programming System</a>
</div>
</td>
</tr>
</tbody>
</table>
</div>
<!-- end header part -->
<!-- Generated by Doxygen 1.13.1 -->
<script type="text/javascript">
/* @license magnet:?xt=urn:btih:d3d9a9a6595521f9666a5e94cc830dab83b65699&dn=expat.txt MIT */
var searchBox = new SearchBox("searchBox", "search/",'.html');
/* @license-end */
</script>
<script type="text/javascript">
/* @license magnet:?xt=urn:btih:d3d9a9a6595521f9666a5e94cc830dab83b65699&dn=expat.txt MIT */
$(function() { codefold.init(0); });
/* @license-end */
</script>
<script type="text/javascript" src="menudata.js"></script>
<script type="text/javascript" src="menu.js"></script>
<script type="text/javascript">
/* @license magnet:?xt=urn:btih:d3d9a9a6595521f9666a5e94cc830dab83b65699&dn=expat.txt MIT */
$(function() {
initMenu('',true,false,'search.php','Search',true);
$(function() { init_search(); });
});
/* @license-end */
</script>
<div id="main-nav"></div>
</div><!-- top -->
<div id="side-nav" class="ui-resizable side-nav-resizable">
<div id="nav-tree">
<div id="nav-tree-contents">
<div id="nav-sync" class="sync"></div>
</div>
</div>
<div id="splitbar" style="-moz-user-select:none;"
class="ui-resizable-handle">
</div>
</div>
<script type="text/javascript">
/* @license magnet:?xt=urn:btih:d3d9a9a6595521f9666a5e94cc830dab83b65699&dn=expat.txt MIT */
$(function(){initNavTree('ExamplesAsyncProducerConsumer.html',''); initResizable(true); });
/* @license-end */
</script>
<div id="doc-content">
<!-- window showing the filter options -->
<div id="MSearchSelectWindow"
onmouseover="return searchBox.OnSearchSelectShow()"
onmouseout="return searchBox.OnSearchSelectHide()"
onkeydown="return searchBox.OnSearchSelectKey(event)">
</div>
<!-- iframe showing the search results (closed by default) -->
<div id="MSearchResultsWindow">
<div id="MSearchResults">
<div class="SRPage">
<div id="SRIndex">
<div id="SRResults"></div>
<div class="SRStatus" id="Loading">Loading...</div>
<div class="SRStatus" id="Searching">Searching...</div>
<div class="SRStatus" id="NoMatches">No Matches</div>
</div>
</div>
</div>
</div>
<div><div class="header">
<div class="headertitle"><div class="title">Async Producer-Consumer Pipeline</div></div>
</div><!--header-->
<div class="contents">
<div class="toc"><h3>Table of Contents</h3>
<ul>
<li class="level1">
<a href="#AsyncProducerConsumerProblem">Problem Formulation</a>
</li>
<li class="level1">
<a href="#AsyncProducerConsumerImplementation">Implementation</a>
</li>
<li class="level1">
<a href="#AsyncProducerConsumerIsDone">Conditional Consumption with Cooperative Execution</a>
</li>
</ul>
</div>
<div class="textblock"><p>We implement a producer-consumer pipeline using dependent-async tasks, demonstrating how <a class="el" href="classtf_1_1Executor.html#a3278e2611e43b80b65ef10a3391ddcc3" title="runs the given function asynchronously when the given predecessors finish">tf::Executor::dependent_async</a> naturally expresses stage-level dependencies between data items and how production and consumption overlap in time without any manual synchronization.</p>
<h1><a class="anchor" id="AsyncProducerConsumerProblem"></a>
Problem Formulation</h1>
<p>A producer generates <code>N</code> data items one by one. Each item must pass through a validator before it can be consumed. Consumption can begin as soon as an item is validated. There is no need to wait for the entire input to be produced first. The three-stage pipeline per item is:</p>
<ol type="1">
<li><b>Produce:</b> generate the data item</li>
<li><b>Validate:</b> verify the item is correct</li>
<li><b>Consume:</b> process the validated item</li>
</ol>
<p>Items are independent of each other across all three stages, so stages of different items can overlap in time. This is exactly the kind of dynamic, data-driven structure that <a class="el" href="classtf_1_1Executor.html#a3278e2611e43b80b65ef10a3391ddcc3" title="runs the given function asynchronously when the given predecessors finish">tf::Executor::dependent_async</a> is designed for.</p>
<p>The following diagram illustrates the overlapping execution of four items through the three stages. Each item's stages are wired by dependency edges so that Produce must finish before Validate, and Validate before Consume. Because items are independent of one another, the executor schedules stages of different items in parallel whenever workers are available:</p>
<div class="image">
<object type="image/svg+xml" data="async_pipeline.svg" width="100%" style="pointer-events: none;"></object>
</div>
<h1><a class="anchor" id="AsyncProducerConsumerImplementation"></a>
Implementation</h1>
<p>We create three dependent-async tasks per item. Each task depends on the previous stage of the <em>same</em> item. Because <a class="el" href="classtf_1_1Executor.html#a3278e2611e43b80b65ef10a3391ddcc3" title="runs the given function asynchronously when the given predecessors finish">tf::Executor::dependent_async</a> begins executing a task as soon as all its predecessors complete, item <code>i+1</code> can be produced while item <code>i</code> is being validated and item <code>i-1</code> is being consumed:</p>
<div class="fragment"><div class="line"><span class="preprocessor">#include <taskflow/taskflow.hpp></span></div>
<div class="line"> </div>
<div class="line"><span class="comment">// simulate data production, validation, and consumption</span></div>
<div class="line"><span class="keywordtype">int</span> produce (<span class="keywordtype">size_t</span> i) { <span class="keywordflow">return</span> <span class="keyword">static_cast<</span><span class="keywordtype">int</span><span class="keyword">></span>(i * i); }</div>
<div class="line"><span class="keywordtype">bool</span> validate(<span class="keywordtype">int</span> value) { <span class="keywordflow">return</span> value >= 0; }</div>
<div class="line"><span class="keywordtype">void</span> consume (<span class="keywordtype">size_t</span> i, <span class="keywordtype">int</span> val) { printf(<span class="stringliteral">"consumed item %zu: %d\n"</span>, i, val); }</div>
<div class="line"> </div>
<div class="line"><span class="keywordtype">int</span> main() {</div>
<div class="line"> </div>
<div class="line"> tf::Executor executor;</div>
<div class="line"> </div>
<div class="line"> <span class="keyword">const</span> <span class="keywordtype">size_t</span> N = 8;</div>
<div class="line"> std::vector<int> data(N);</div>
<div class="line"> std::vector<bool> valid(N, <span class="keyword">false</span>);</div>
<div class="line"> </div>
<div class="line"> <span class="keywordflow">for</span>(<span class="keywordtype">size_t</span> i = 0; i < N; i++) {</div>
<div class="line"> </div>
<div class="line"> <span class="comment">// stage 1: produce item i (no predecessor, runs immediately)</span></div>
<div class="line"> tf::AsyncTask prod = executor.<a class="code hl_function" href="classtf_1_1Executor.html#a0af0918b7179f9e42945fb407e0bad65">silent_dependent_async</a>([i, &data]() {</div>
<div class="line"> data[i] = produce(i);</div>
<div class="line"> printf(<span class="stringliteral">"produced item %zu: %d\n"</span>, i, data[i]);</div>
<div class="line"> });</div>
<div class="line"> </div>
<div class="line"> <span class="comment">// stage 2: validate item i, depends on production completing</span></div>
<div class="line"> tf::AsyncTask val = executor.<a class="code hl_function" href="classtf_1_1Executor.html#a0af0918b7179f9e42945fb407e0bad65">silent_dependent_async</a>(</div>
<div class="line"> [i, &data, &valid]() {</div>
<div class="line"> valid[i] = validate(data[i]);</div>
<div class="line"> printf(<span class="stringliteral">"validated item %zu: %s\n"</span>, i, valid[i] ? <span class="stringliteral">"ok"</span> : <span class="stringliteral">"fail"</span>);</div>
<div class="line"> },</div>
<div class="line"> prod <span class="comment">// predecessor: stage 1 of the same item</span></div>
<div class="line"> );</div>
<div class="line"> </div>
<div class="line"> <span class="comment">// stage 3: consume item i, depends on validation completing</span></div>
<div class="line"> executor.<a class="code hl_function" href="classtf_1_1Executor.html#a0af0918b7179f9e42945fb407e0bad65">silent_dependent_async</a>(</div>
<div class="line"> [i, &data, &valid]() {</div>
<div class="line"> <span class="keywordflow">if</span>(valid[i]) {</div>
<div class="line"> consume(i, data[i]);</div>
<div class="line"> }</div>
<div class="line"> },</div>
<div class="line"> val <span class="comment">// predecessor: stage 2 of the same item</span></div>
<div class="line"> );</div>
<div class="line"> }</div>
<div class="line"> </div>
<div class="line"> <span class="comment">// wait for all submitted tasks to finish</span></div>
<div class="line"> executor.<a class="code hl_function" href="classtf_1_1Executor.html#ab9aa252f70e9a40020a1e5a89d485b85">wait_for_all</a>();</div>
<div class="line"> </div>
<div class="line"> <span class="keywordflow">return</span> 0;</div>
<div class="line">}</div>
<div class="ttc" id="aclasstf_1_1Executor_html_a0af0918b7179f9e42945fb407e0bad65"><div class="ttname"><a href="classtf_1_1Executor.html#a0af0918b7179f9e42945fb407e0bad65">tf::Executor::silent_dependent_async</a></div><div class="ttdeci">tf::AsyncTask silent_dependent_async(F &&func, Tasks &&... tasks)</div><div class="ttdoc">runs the given function asynchronously when the given predecessors finish</div></div>
<div class="ttc" id="aclasstf_1_1Executor_html_ab9aa252f70e9a40020a1e5a89d485b85"><div class="ttname"><a href="classtf_1_1Executor.html#ab9aa252f70e9a40020a1e5a89d485b85">tf::Executor::wait_for_all</a></div><div class="ttdeci">void wait_for_all()</div><div class="ttdoc">waits for all tasks to complete</div></div>
</div><!-- fragment --><p>Because each stage only depends on the previous stage of the <em>same</em> item rather than on the previous item's stages, the executor's work-stealing scheduler can run all three stages of different items concurrently across available workers. No mutex, condition variable, or queue is needed; the dependency edges express all synchronization requirements.</p>
<h1><a class="anchor" id="AsyncProducerConsumerIsDone"></a>
Conditional Consumption with Cooperative Execution</h1>
<p>In some pipelines, the main thread needs to inspect intermediate results before deciding what to submit next. <a class="el" href="classtf_1_1AsyncTask.html#aefeefa30d7cafdfbb7dc8def542e8e51" title="checks if this dependent-async task finishes">tf::AsyncTask::is_done</a> provides a non-blocking way to check whether a specific task has completed. Combined with <a class="el" href="classtf_1_1Executor.html#a0fc6eb19f168dc4a9cd0a7c6187c1d2d" title="keeps running the work-stealing loop until the predicate returns true">tf::Executor::corun_until</a>, the calling thread remains active in the work-stealing loop while polling and never blocks.</p>
<p>The example below produces and validates one item, polls completion, and conditionally submits a downstream task based on the validation result:</p>
<div class="fragment"><div class="line"><a class="code hl_class" href="classtf_1_1Executor.html">tf::Executor</a> executor;</div>
<div class="line"> </div>
<div class="line"><span class="keywordtype">int</span> item = 42;</div>
<div class="line"><span class="keywordtype">bool</span> valid = <span class="keyword">false</span>;</div>
<div class="line"> </div>
<div class="line"><span class="comment">// stage 1: produce</span></div>
<div class="line"><span class="keyword">auto</span> [prod, fu_prod] = executor.<a class="code hl_function" href="classtf_1_1Executor.html#a3278e2611e43b80b65ef10a3391ddcc3">dependent_async</a>([&]() -> <span class="keywordtype">int</span> {</div>
<div class="line"> <span class="keywordflow">return</span> produce(0);</div>
<div class="line">});</div>
<div class="line"> </div>
<div class="line"><span class="comment">// stage 2: validate — depends on stage 1</span></div>
<div class="line"><span class="keyword">auto</span> [val, fu_val] = executor.<a class="code hl_function" href="classtf_1_1Executor.html#a3278e2611e43b80b65ef10a3391ddcc3">dependent_async</a>([&]() -> <span class="keywordtype">bool</span> {</div>
<div class="line"> <span class="keywordflow">return</span> validate(fu_prod.get());</div>
<div class="line">}, prod);</div>
<div class="line"> </div>
<div class="line"><span class="comment">// keep the calling thread's worker active while both tasks run</span></div>
<div class="line">executor.<a class="code hl_function" href="classtf_1_1Executor.html#a0fc6eb19f168dc4a9cd0a7c6187c1d2d">corun_until</a>([&]() {</div>
<div class="line"> <span class="keywordflow">return</span> prod.<a class="code hl_function" href="classtf_1_1AsyncTask.html#aefeefa30d7cafdfbb7dc8def542e8e51">is_done</a>() && val.<a class="code hl_function" href="classtf_1_1AsyncTask.html#aefeefa30d7cafdfbb7dc8def542e8e51">is_done</a>();</div>
<div class="line">});</div>
<div class="line"> </div>
<div class="line"><span class="comment">// safe to read results: both tasks have completed</span></div>
<div class="line"><span class="keywordflow">if</span>(fu_val.get()) {</div>
<div class="line"> <span class="comment">// only submit the consume task if validation passed</span></div>
<div class="line"> executor.<a class="code hl_function" href="classtf_1_1Executor.html#a0af0918b7179f9e42945fb407e0bad65">silent_dependent_async</a>([&]() {</div>
<div class="line"> consume(0, fu_prod.get());</div>
<div class="line"> }, val);</div>
<div class="line">}</div>
<div class="line"> </div>
<div class="line">executor.<a class="code hl_function" href="classtf_1_1Executor.html#ab9aa252f70e9a40020a1e5a89d485b85">wait_for_all</a>();</div>
<div class="ttc" id="aclasstf_1_1AsyncTask_html_aefeefa30d7cafdfbb7dc8def542e8e51"><div class="ttname"><a href="classtf_1_1AsyncTask.html#aefeefa30d7cafdfbb7dc8def542e8e51">tf::AsyncTask::is_done</a></div><div class="ttdeci">bool is_done() const</div><div class="ttdoc">checks if this dependent-async task finishes</div><div class="ttdef"><b>Definition</b> async_task.hpp:292</div></div>
<div class="ttc" id="aclasstf_1_1Executor_html"><div class="ttname"><a href="classtf_1_1Executor.html">tf::Executor</a></div><div class="ttdoc">class to create an executor</div><div class="ttdef"><b>Definition</b> executor.hpp:62</div></div>
<div class="ttc" id="aclasstf_1_1Executor_html_a0fc6eb19f168dc4a9cd0a7c6187c1d2d"><div class="ttname"><a href="classtf_1_1Executor.html#a0fc6eb19f168dc4a9cd0a7c6187c1d2d">tf::Executor::corun_until</a></div><div class="ttdeci">void corun_until(P &&predicate)</div><div class="ttdoc">keeps running the work-stealing loop until the predicate returns true</div></div>
<div class="ttc" id="aclasstf_1_1Executor_html_a3278e2611e43b80b65ef10a3391ddcc3"><div class="ttname"><a href="classtf_1_1Executor.html#a3278e2611e43b80b65ef10a3391ddcc3">tf::Executor::dependent_async</a></div><div class="ttdeci">auto dependent_async(F &&func, Tasks &&... tasks)</div><div class="ttdoc">runs the given function asynchronously when the given predecessors finish</div></div>
</div><!-- fragment --><dl class="section note"><dt>Note</dt><dd><a class="el" href="classtf_1_1AsyncTask.html#aefeefa30d7cafdfbb7dc8def542e8e51" title="checks if this dependent-async task finishes">tf::AsyncTask::is_done</a> is designed to be used with <a class="el" href="classtf_1_1Executor.html#a0fc6eb19f168dc4a9cd0a7c6187c1d2d" title="keeps running the work-stealing loop until the predicate returns true">tf::Executor::corun_until</a>. Calling <code>is_done</code> in a tight spin-wait without <code>corun_until</code> risks starving the worker thread pool if the calling thread is itself one of the executor's workers. See <a class="el" href="DependentAsyncTasking.html">Asynchronous Tasking with Dependencies</a> for a full discussion of the dependent-async API. </dd></dl>
</div></div><!-- contents -->
</div><!-- PageDoc -->
</div><!-- doc-content -->
<!-- HTML footer for doxygen 1.13.1-->
<!-- start footer part -->
<div id="nav-path" class="navpath"><!-- id is needed for treeview function! -->
<ul>
<li class="navelem"><a class="el" href="Examples.html">Learning from Examples</a></li>
<li class="footer">
Maintained by <a href="https://tsung-wei-huang.github.io/">Dr. Tsung-Wei Huang</a>
—
Generated by <a href="https://www.doxygen.org/index.html"><img class="footer" src="doxygen.svg" width="104" height="31" alt="doxygen"/></a> 1.13.1
</li>
</ul>
</div>