forked from taskflow/taskflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_task_arena.cpp
More file actions
119 lines (103 loc) · 4.06 KB
/
Copy pathtest_task_arena.cpp
File metadata and controls
119 lines (103 loc) · 4.06 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest.h>
#include <taskflow/taskflow.hpp>
#include <taskflow/algorithm/for_each.hpp>
#include <taskflow/algorithm/lazy.hpp>
#include <chrono>
#include <thread>
TEST_CASE("ArenaCorrectness" * doctest::timeout(300)) {
tf::Executor e;
tf::Taskflow taskflow;
std::shared_ptr<tf::TaskArena> arena = std::make_shared<tf::TaskArena>(e);
thread_local bool is_outer_task_running = false;
try
{
taskflow.for_each_index(0, 32, 1, [&](int i)
{
CHECK(is_outer_task_running == false);
is_outer_task_running = true;
e.isolate(arena, [&] {
// Inside of this functor our worker cannot take tasks that are older than this task (e.g. from the outer for_each_index)
// We can achieve this by switching current thread to another task queue
tf::Taskflow child_taskflow;
child_taskflow.for_each_index(0, 2, 1, [&](int j) {}); // These tasks will be spawned inside of our new TaskQueue of TaskArena "arena"
e.corun(child_taskflow); // Guaranteed to never run tasks from the outer loop
});
is_outer_task_running = false;
});
CHECK(e.run(taskflow).wait_for(std::chrono::seconds(5)) == std::future_status::ready);
}
catch (const std::exception&)
{
std::cout << "Exception occurred" << '\n';
}
}
TEST_CASE("NestedArenas" * doctest::timeout(300)) {
tf::Executor e;
tf::Taskflow taskflow;
std::shared_ptr<tf::TaskArena> arena = std::make_shared<tf::TaskArena>(e);
std::shared_ptr<tf::TaskArena> second_arena = std::make_shared<tf::TaskArena>(e);
thread_local bool is_outer_task_running = false;
try
{
taskflow.for_each_index(0, 32, 1, [&](int i)
{
CHECK(is_outer_task_running == false);
is_outer_task_running = true;
e.isolate(arena, [&] {
// Inside of this functor our worker cannot take tasks that are older than this task (e.g. from the outer for_each_index)
// We can achieve this by switching current thread to another task queue
tf::Taskflow child_taskflow;
child_taskflow.for_each_index(0, 2, 1, [&](int j) { // These tasks will be spawned inside of our new TaskQueue of TaskArena "arena"
e.isolate(second_arena, [&] {
tf::Taskflow third_taskflow;
third_taskflow.for_each_index(0, 4, 1, [&](int k) {});
e.corun(third_taskflow);
});
});
e.corun(child_taskflow); // Guaranteed to never run tasks from the outer loop
});
is_outer_task_running = false;
});
CHECK(e.run(taskflow).wait_for(std::chrono::seconds(5)) == std::future_status::ready);
}
catch (const std::exception&)
{
std::cout << "Exception occurred" << '\n';
}
}
TEST_CASE("LazyDeadlock" * doctest::timeout(300)) {
using namespace std::chrono_literals;
tf::Executor ex;
for (size_t i = 0; i < 500; ++i)
{
// std::cout << "Iteration: " << i << std::endl;
tf::Lazy<int> data(
[&]()
{
tf::Taskflow taskflow2;
for (size_t j = 0; j < 1; ++j)
{
taskflow2.emplace([&] { std::this_thread::sleep_for(10ms); });
}
ex.this_worker_id() >= 0 ? ex.corun(taskflow2) : ex.run(taskflow2).get();
return 99;
},
ex);
tf::Taskflow taskflow1;
for (size_t k = 0; k < 16; ++k)
{
taskflow1.emplace(
[&]
{
if (*data == 100)
{
std::cerr << "This can never happen" << std::endl;
}
});
}
auto future = ex.run(taskflow1);
CHECK(future.wait_for(5s) != std::future_status::timeout);
future.get();
}
}