-
Notifications
You must be signed in to change notification settings - Fork 59
Expand file tree
/
Copy patharena_slot.cpp
More file actions
218 lines (200 loc) · 8.95 KB
/
arena_slot.cpp
File metadata and controls
218 lines (200 loc) · 8.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
/*
Copyright (c) 2005-2021 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "arena_slot.h"
#include "arena.h"
#include "thread_data.h"
namespace tbb {
namespace detail {
namespace r1 {
//------------------------------------------------------------------------
// Arena Slot
//------------------------------------------------------------------------
d1::task* arena_slot::get_task_impl(size_t T, execution_data_ext& ed, bool& tasks_omitted, isolation_type isolation) {
__TBB_ASSERT(tail.load(std::memory_order_relaxed) <= T || is_local_task_pool_quiescent(),
"Is it safe to get a task at position T?");
d1::task* result = task_pool_ptr[T];
__TBB_ASSERT(!is_poisoned( result ), "The poisoned task is going to be processed");
if (!result) {
return nullptr;
}
bool omit = isolation != no_isolation && isolation != task_accessor::isolation(*result);
if (!omit && !task_accessor::is_proxy_task(*result)) {
return result;
} else if (omit) {
tasks_omitted = true;
return nullptr;
}
task_proxy& tp = static_cast<task_proxy&>(*result);
d1::slot_id aff_id = tp.slot;
if ( d1::task *t = tp.extract_task<task_proxy::pool_bit>() ) {
ed.affinity_slot = aff_id;
return t;
}
// Proxy was empty, so it's our responsibility to free it
tp.allocator.delete_object(&tp, ed);
if ( tasks_omitted ) {
task_pool_ptr[T] = nullptr;
}
return nullptr;
}
d1::task* arena_slot::get_task(execution_data_ext& ed, isolation_type isolation) {
__TBB_ASSERT(is_task_pool_published(), nullptr);
// The current task position in the task pool.
std::size_t T0 = tail.load(std::memory_order_relaxed);
// The bounds of available tasks in the task pool. H0 is only used when the head bound is reached.
std::size_t H0 = (std::size_t)-1, T = T0;
d1::task* result = nullptr;
bool task_pool_empty = false;
bool tasks_omitted = false;
do {
__TBB_ASSERT( !result, nullptr );
// The full fence is required to sync the store of `tail` with the load of `head` (write-read barrier)
T = --tail;
// The acquire load of head is required to guarantee consistency of our task pool
// when a thief rolls back the head.
if ( (std::intptr_t)( head.load(std::memory_order_acquire) ) > (std::intptr_t)T ) {
acquire_task_pool();
H0 = head.load(std::memory_order_relaxed);
if ( (std::intptr_t)H0 > (std::intptr_t)T ) {
// The thief has not backed off - nothing to grab.
__TBB_ASSERT( H0 == head.load(std::memory_order_relaxed)
&& T == tail.load(std::memory_order_relaxed)
&& H0 == T + 1, "victim/thief arbitration algorithm failure" );
reset_task_pool_and_leave();
// No tasks in the task pool.
task_pool_empty = true;
break;
} else if ( H0 == T ) {
// There is only one task in the task pool.
reset_task_pool_and_leave();
task_pool_empty = true;
} else {
// Release task pool if there are still some tasks.
// After the release, the tail will be less than T, thus a thief
// will not attempt to get a task at position T.
release_task_pool();
}
}
result = get_task_impl( T, ed, tasks_omitted, isolation );
if ( result ) {
poison_pointer( task_pool_ptr[T] );
break;
} else if ( !tasks_omitted ) {
poison_pointer( task_pool_ptr[T] );
__TBB_ASSERT( T0 == T+1, nullptr );
T0 = T;
}
} while ( !result && !task_pool_empty );
if ( tasks_omitted ) {
if ( task_pool_empty ) {
// All tasks have been checked. The task pool should be in reset state.
// We just restore the bounds for the available tasks.
// TODO: Does it have sense to move them to the beginning of the task pool?
__TBB_ASSERT( is_quiescent_local_task_pool_reset(), nullptr );
if ( result ) {
// If we have a task, it should be at H0 position.
__TBB_ASSERT( H0 == T, nullptr );
++H0;
}
__TBB_ASSERT( H0 <= T0, nullptr );
if ( H0 < T0 ) {
// Restore the task pool if there are some tasks.
head.store(H0, std::memory_order_relaxed);
tail.store(T0, std::memory_order_relaxed);
// The release fence is used in publish_task_pool.
publish_task_pool();
// Synchronize with snapshot as we published some tasks.
ed.task_disp->m_thread_data->my_arena->advertise_new_work<arena::wakeup>();
}
} else {
// A task has been obtained. We need to make a hole in position T.
__TBB_ASSERT( is_task_pool_published(), nullptr );
__TBB_ASSERT( result, nullptr );
task_pool_ptr[T] = nullptr;
tail.store(T0, std::memory_order_release);
// Synchronize with snapshot as we published some tasks.
// TODO: consider some approach not to call wakeup for each time. E.g. check if the tail reached the head.
ed.task_disp->m_thread_data->my_arena->advertise_new_work<arena::wakeup>();
}
}
__TBB_ASSERT( (std::intptr_t)tail.load(std::memory_order_relaxed) >= 0, nullptr );
__TBB_ASSERT( result || tasks_omitted || is_quiescent_local_task_pool_reset(), nullptr );
return result;
}
d1::task* arena_slot::steal_task(arena& a, isolation_type isolation, std::size_t slot_index) {
d1::task** victim_pool = lock_task_pool();
if (!victim_pool) {
return nullptr;
}
d1::task* result = nullptr;
std::size_t H = head.load(std::memory_order_relaxed); // mirror
std::size_t H0 = H;
bool tasks_omitted = false;
do {
// The full fence is required to sync the store of `head` with the load of `tail` (write-read barrier)
H = ++head;
// The acquire load of tail is required to guarantee consistency of victim_pool
// because the owner synchronizes task spawning via tail.
if ((std::intptr_t)H > (std::intptr_t)(tail.load(std::memory_order_acquire))) {
// Stealing attempt failed, deque contents has not been changed by us
head.store( /*dead: H = */ H0, std::memory_order_relaxed );
__TBB_ASSERT( !result, nullptr );
goto unlock;
}
result = victim_pool[H-1];
__TBB_ASSERT( !is_poisoned( result ), nullptr );
if (result) {
if (isolation == no_isolation || isolation == task_accessor::isolation(*result)) {
if (!task_accessor::is_proxy_task(*result)) {
break;
}
task_proxy& tp = *static_cast<task_proxy*>(result);
// If mailed task is likely to be grabbed by its destination thread, skip it.
if (!task_proxy::is_shared(tp.task_and_tag) || !tp.outbox->recipient_is_idle() || a.mailbox(slot_index).recipient_is_idle()) {
break;
}
}
// The task cannot be executed either due to isolation or proxy constraints.
result = nullptr;
tasks_omitted = true;
} else if (!tasks_omitted) {
// Cleanup the task pool from holes until a task is skipped.
__TBB_ASSERT( H0 == H-1, nullptr );
poison_pointer( victim_pool[H0] );
H0 = H;
}
} while (!result);
__TBB_ASSERT( result, nullptr );
// emit "task was consumed" signal
poison_pointer( victim_pool[H-1] );
if (tasks_omitted) {
// Some proxies in the task pool have been omitted. Set the stolen task to nullptr.
victim_pool[H-1] = nullptr;
// The release store synchronizes the victim_pool update(the store of nullptr).
head.store( /*dead: H = */ H0, std::memory_order_release );
}
unlock:
unlock_task_pool(victim_pool);
#if __TBB_PREFETCHING
__TBB_cl_evict(&victim_slot.head);
__TBB_cl_evict(&victim_slot.tail);
#endif
if (tasks_omitted) {
// Synchronize with snapshot as the head and tail can be bumped which can falsely trigger EMPTY state
a.advertise_new_work<arena::wakeup>();
}
return result;
}
} // namespace r1
} // namespace detail
} // namespace tbb