Skip to content

Commit 2fe0b52

Browse files
committed
PulseCount engine now spawns new task for heavyweight nodes and splits range of changed inputs to multiple tasks if necessary.
1 parent 4340819 commit 2fe0b52

2 files changed

Lines changed: 92 additions & 32 deletions

File tree

include/react/propagation/PulseCountEngine.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ using std::vector;
2929
using tbb::task;
3030
using tbb::empty_task;
3131
using tbb::spin_rw_mutex;
32+
using tbb::task_list;
3233

3334
///////////////////////////////////////////////////////////////////////////////////////////////////
3435
/// Turn
@@ -84,6 +85,7 @@ class Node : public IReactiveNode
8485
NodeVector<Node> Successors;
8586

8687
ENodeState State = ENodeState::unchanged;
88+
uint Weight = 0;
8789

8890
private:
8991
atomic<int> counter_ = 0;
@@ -112,9 +114,12 @@ class EngineBase : public IReactiveEngine<Node,TTurn>
112114
void OnDynamicNodeAttach(Node& node, Node& parent, TTurn& turn);
113115
void OnDynamicNodeDetach(Node& node, Node& parent, TTurn& turn);
114116

117+
void HintUpdateDuration(Node& node, uint dur);
118+
115119
private:
116120
NodeVectT changedInputs_;
117121
empty_task* rootTask_ = new(task::allocate_root()) empty_task;
122+
task_list spawnList_;
118123
};
119124

120125
class BasicEngine : public EngineBase<Turn> {};
@@ -135,3 +140,16 @@ template <> class PulseCountEngine<parallel> : public REACT_IMPL::pulsecount::Ba
135140
template <> class PulseCountEngine<parallel_queuing> : public REACT_IMPL::pulsecount::QueuingEngine {};
136141

137142
/******************************************/ REACT_END /******************************************/
143+
144+
/***************************************/ REACT_IMPL_BEGIN /**************************************/
145+
146+
template <typename T>
147+
struct EnableNodeUpdateTimer;
148+
149+
template <>
150+
struct EnableNodeUpdateTimer<PulseCountEngine<parallel>> : std::true_type {};
151+
152+
template <>
153+
struct EnableNodeUpdateTimer<PulseCountEngine<parallel_queuing>> : std::true_type {};
154+
155+
/****************************************/ REACT_IMPL_END /***************************************/

src/react/propagation/PulseCountEngine.cpp

Lines changed: 74 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,39 @@
1414
/***************************************/ REACT_IMPL_BEGIN /**************************************/
1515
namespace pulsecount {
1616

17+
///////////////////////////////////////////////////////////////////////////////////////////////////
18+
/// Constants
19+
///////////////////////////////////////////////////////////////////////////////////////////////////
20+
static const uint chunk_size = 8;
21+
static const uint dfs_threshold = 3;
22+
static const uint heavy_weight = 1000;
23+
1724
///////////////////////////////////////////////////////////////////////////////////////////////////
1825
/// MarkerTask
1926
///////////////////////////////////////////////////////////////////////////////////////////////////
2027
class MarkerTask: public task
2128
{
2229
public:
23-
using StackT = NodeBuffer<Node,8>;
30+
using BufferT = NodeBuffer<Node,chunk_size>;
2431

2532
template <typename TInput>
2633
MarkerTask(TInput srcBegin, TInput srcEnd) :
2734
nodes_{ srcBegin, srcEnd }
28-
{}
35+
{
36+
}
2937

3038
MarkerTask(MarkerTask& other, SplitTag) :
3139
nodes_{ other.nodes_, SplitTag{} }
32-
{}
40+
{
41+
}
3342

3443
task* execute()
3544
{
3645
int splitCount = 0;
3746

3847
while (! nodes_.IsEmpty())
3948
{
40-
Node& node = splitCount > 3 ? *nodes_.PopBack() : *nodes_.PopFront();
49+
Node& node = splitCount > dfs_threshold ? *nodes_.PopBack() : *nodes_.PopFront();
4150

4251
// Increment counter of each successor and add it to smaller stack
4352
for (auto* succ : node.Successors)
@@ -67,7 +76,7 @@ class MarkerTask: public task
6776
}
6877

6978
private:
70-
StackT nodes_;
79+
BufferT nodes_;
7180
};
7281

7382
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -77,14 +86,19 @@ template <typename TTurn>
7786
class UpdaterTask: public task
7887
{
7988
public:
80-
using BufferT = NodeBuffer<Node,8>;
89+
using BufferT = NodeBuffer<Node,chunk_size>;
8190

8291
template <typename TInput>
8392
UpdaterTask(TTurn& turn, TInput srcBegin, TInput srcEnd) :
8493
turn_{ turn },
8594
nodes_{ srcBegin, srcEnd }
8695
{}
8796

97+
UpdaterTask(TTurn& turn, Node* node) :
98+
turn_{ turn },
99+
nodes_{ node }
100+
{}
101+
88102
UpdaterTask(UpdaterTask& other, SplitTag) :
89103
turn_{ other.turn_ },
90104
nodes_{ other.nodes_, SplitTag{} }
@@ -96,7 +110,7 @@ class UpdaterTask: public task
96110

97111
while (!nodes_.IsEmpty())
98112
{
99-
Node& node = splitCount > 3 ? *nodes_.PopBack() : *nodes_.PopFront();
113+
Node& node = splitCount > dfs_threshold ? *nodes_.PopBack() : *nodes_.PopFront();
100114

101115
if (node.Mark() == ENodeMark::should_update)
102116
node.Tick(&turn_);
@@ -120,18 +134,30 @@ class UpdaterTask: public task
120134
if (succ->DecCounter())
121135
continue;
122136

123-
nodes_.PushBack(succ);
124-
125-
if (nodes_.IsFull())
137+
// Heavyweight - spawn new task
138+
if (succ->Weight > heavy_weight)
126139
{
127-
splitCount++;
128-
129-
//Delegate half the work to new task
130140
auto& t = *new(task::allocate_additional_child_of(*parent()))
131-
UpdaterTask(*this, SplitTag{});
141+
UpdaterTask(turn_, succ);
132142

133143
spawn(t);
134144
}
145+
// Leightweight - add to buffer, split if full
146+
else
147+
{
148+
nodes_.PushBack(succ);
149+
150+
if (nodes_.IsFull())
151+
{
152+
splitCount++;
153+
154+
//Delegate half the work to new task
155+
auto& t = *new(task::allocate_additional_child_of(*parent()))
156+
UpdaterTask(*this, SplitTag{});
157+
158+
spawn(t);
159+
}
160+
}
135161
}
136162

137163
node.SetMark(ENodeMark::unmarked);
@@ -178,31 +204,41 @@ void EngineBase<TTurn>::OnTurnInputChange(Node& node, TTurn& turn)
178204
node.State = ENodeState::changed;
179205
}
180206

181-
//static vector<Node*> markNodes_;
182-
183-
template <typename TTurn>
184-
void EngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
207+
template <typename TTask, typename TIt, typename ... TArgs>
208+
void spawnHelper
209+
(
210+
task* rootTask, task_list& spawnList,
211+
const int count, TIt start, TIt end,
212+
TArgs& ... args
213+
)
185214
{
186-
if (changedInputs_.size() <= 8)
187-
{
188-
auto& markerTask = *new(rootTask_->allocate_child())
189-
MarkerTask(changedInputs_.begin(), changedInputs_.end());
215+
rootTask->set_ref_count(1 + count);
190216

191-
rootTask_->set_ref_count(2);
192-
rootTask_->spawn_and_wait_for_all(markerTask);
193-
}
194-
else
217+
for (int i=0; i < (count - 1); i++)
195218
{
196-
REACT_ASSERT(false, "not implemented yet\n");
219+
spawnList.push_back(*new(rootTask->allocate_child())
220+
TTask(args ..., start, start + chunk_size));
221+
start += chunk_size;
197222
}
198223

199-
auto& updaterTask = *new(rootTask_->allocate_child())
200-
UpdaterTask<TTurn>(turn, changedInputs_.begin(), changedInputs_.end());
224+
spawnList.push_back(*new(rootTask->allocate_child())
225+
TTask(args ..., start, end));
201226

202-
rootTask_->set_ref_count(2);
203-
rootTask_->spawn_and_wait_for_all(updaterTask);
227+
rootTask->spawn_and_wait_for_all(spawnList);
204228

205-
changedInputs_.clear();
229+
spawnList.clear();
230+
}
231+
232+
template <typename TTurn>
233+
void EngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
234+
{
235+
const int initialTaskCount = (changedInputs_.size() - 1) / chunk_size + 1;
236+
237+
spawnHelper<MarkerTask>(rootTask_, spawnList_, initialTaskCount,
238+
changedInputs_.begin(), changedInputs_.end());
239+
240+
spawnHelper<UpdaterTask<TTurn>>(rootTask_, spawnList_, initialTaskCount,
241+
changedInputs_.begin(), changedInputs_.end(), turn);
206242
}
207243

208244
template <typename TTurn>
@@ -252,6 +288,12 @@ void EngineBase<TTurn>::OnDynamicNodeDetach(Node& node, Node& parent, TTurn& tur
252288
parent.Successors.Remove(node);
253289
}// ~oldParent.ShiftMutex (write)
254290

291+
template <typename TTurn>
292+
void EngineBase<TTurn>::HintUpdateDuration(Node& node, uint dur)
293+
{
294+
node.Weight = dur;
295+
}
296+
255297
// Explicit instantiation
256298
template class EngineBase<Turn>;
257299
template class EngineBase<DefaultQueueableTurn<Turn>>;

0 commit comments

Comments
 (0)