1414/* **************************************/ REACT_IMPL_BEGIN /* *************************************/
1515namespace pulsecount {
1616
17+ // /////////////////////////////////////////////////////////////////////////////////////////////////
18+ // / Constants
19+ // /////////////////////////////////////////////////////////////////////////////////////////////////
20+ static const uint chunk_size = 8 ;
21+ static const uint dfs_threshold = 3 ;
22+ static const uint heavy_weight = 1000 ;
23+
1724// /////////////////////////////////////////////////////////////////////////////////////////////////
1825// / MarkerTask
1926// /////////////////////////////////////////////////////////////////////////////////////////////////
2027class MarkerTask : public task
2128{
2229public:
23- using StackT = NodeBuffer<Node,8 >;
30+ using BufferT = NodeBuffer<Node,chunk_size >;
2431
2532 template <typename TInput>
2633 MarkerTask (TInput srcBegin, TInput srcEnd) :
2734 nodes_{ srcBegin, srcEnd }
28- {}
35+ {
36+ }
2937
3038 MarkerTask (MarkerTask& other, SplitTag) :
3139 nodes_{ other.nodes_ , SplitTag{} }
32- {}
40+ {
41+ }
3342
3443 task* execute ()
3544 {
3645 int splitCount = 0 ;
3746
3847 while (! nodes_.IsEmpty ())
3948 {
40- Node& node = splitCount > 3 ? *nodes_.PopBack () : *nodes_.PopFront ();
49+ Node& node = splitCount > dfs_threshold ? *nodes_.PopBack () : *nodes_.PopFront ();
4150
4251 // Increment counter of each successor and add it to smaller stack
4352 for (auto * succ : node.Successors )
@@ -67,7 +76,7 @@ class MarkerTask: public task
6776 }
6877
6978private:
70- StackT nodes_;
79+ BufferT nodes_;
7180};
7281
7382// /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -77,14 +86,19 @@ template <typename TTurn>
7786class UpdaterTask : public task
7887{
7988public:
80- using BufferT = NodeBuffer<Node,8 >;
89+ using BufferT = NodeBuffer<Node,chunk_size >;
8190
8291 template <typename TInput>
8392 UpdaterTask (TTurn& turn, TInput srcBegin, TInput srcEnd) :
8493 turn_{ turn },
8594 nodes_{ srcBegin, srcEnd }
8695 {}
8796
97+ UpdaterTask (TTurn& turn, Node* node) :
98+ turn_{ turn },
99+ nodes_{ node }
100+ {}
101+
88102 UpdaterTask (UpdaterTask& other, SplitTag) :
89103 turn_{ other.turn_ },
90104 nodes_{ other.nodes_ , SplitTag{} }
@@ -96,7 +110,7 @@ class UpdaterTask: public task
96110
97111 while (!nodes_.IsEmpty ())
98112 {
99- Node& node = splitCount > 3 ? *nodes_.PopBack () : *nodes_.PopFront ();
113+ Node& node = splitCount > dfs_threshold ? *nodes_.PopBack () : *nodes_.PopFront ();
100114
101115 if (node.Mark () == ENodeMark::should_update)
102116 node.Tick (&turn_);
@@ -120,18 +134,30 @@ class UpdaterTask: public task
120134 if (succ->DecCounter ())
121135 continue ;
122136
123- nodes_.PushBack (succ);
124-
125- if (nodes_.IsFull ())
137+ // Heavyweight - spawn new task
138+ if (succ->Weight > heavy_weight)
126139 {
127- splitCount++;
128-
129- // Delegate half the work to new task
130140 auto & t = *new (task::allocate_additional_child_of (*parent ()))
131- UpdaterTask (* this , SplitTag{} );
141+ UpdaterTask (turn_, succ );
132142
133143 spawn (t);
134144 }
145+ // Leightweight - add to buffer, split if full
146+ else
147+ {
148+ nodes_.PushBack (succ);
149+
150+ if (nodes_.IsFull ())
151+ {
152+ splitCount++;
153+
154+ // Delegate half the work to new task
155+ auto & t = *new (task::allocate_additional_child_of (*parent ()))
156+ UpdaterTask (*this , SplitTag{});
157+
158+ spawn (t);
159+ }
160+ }
135161 }
136162
137163 node.SetMark (ENodeMark::unmarked);
@@ -178,31 +204,41 @@ void EngineBase<TTurn>::OnTurnInputChange(Node& node, TTurn& turn)
178204 node.State = ENodeState::changed;
179205}
180206
181- // static vector<Node*> markNodes_;
182-
183- template <typename TTurn>
184- void EngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
207+ template <typename TTask, typename TIt, typename ... TArgs>
208+ void spawnHelper
209+ (
210+ task* rootTask, task_list& spawnList,
211+ const int count, TIt start, TIt end,
212+ TArgs& ... args
213+ )
185214{
186- if (changedInputs_.size () <= 8 )
187- {
188- auto & markerTask = *new (rootTask_->allocate_child ())
189- MarkerTask (changedInputs_.begin (), changedInputs_.end ());
215+ rootTask->set_ref_count (1 + count);
190216
191- rootTask_->set_ref_count (2 );
192- rootTask_->spawn_and_wait_for_all (markerTask);
193- }
194- else
217+ for (int i=0 ; i < (count - 1 ); i++)
195218 {
196- REACT_ASSERT (false , " not implemented yet\n " );
219+ spawnList.push_back (*new (rootTask->allocate_child ())
220+ TTask (args ..., start, start + chunk_size));
221+ start += chunk_size;
197222 }
198223
199- auto & updaterTask = *new (rootTask_ ->allocate_child ())
200- UpdaterTask<TTurn>(turn, changedInputs_. begin (), changedInputs_. end ( ));
224+ spawnList. push_back ( *new (rootTask ->allocate_child ())
225+ TTask (args ..., start, end));
201226
202- rootTask_->set_ref_count (2 );
203- rootTask_->spawn_and_wait_for_all (updaterTask);
227+ rootTask->spawn_and_wait_for_all (spawnList);
204228
205- changedInputs_.clear ();
229+ spawnList.clear ();
230+ }
231+
232+ template <typename TTurn>
233+ void EngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
234+ {
235+ const int initialTaskCount = (changedInputs_.size () - 1 ) / chunk_size + 1 ;
236+
237+ spawnHelper<MarkerTask>(rootTask_, spawnList_, initialTaskCount,
238+ changedInputs_.begin (), changedInputs_.end ());
239+
240+ spawnHelper<UpdaterTask<TTurn>>(rootTask_, spawnList_, initialTaskCount,
241+ changedInputs_.begin (), changedInputs_.end (), turn);
206242}
207243
208244template <typename TTurn>
@@ -252,6 +288,12 @@ void EngineBase<TTurn>::OnDynamicNodeDetach(Node& node, Node& parent, TTurn& tur
252288 parent.Successors .Remove (node);
253289}// ~oldParent.ShiftMutex (write)
254290
291+ template <typename TTurn>
292+ void EngineBase<TTurn>::HintUpdateDuration(Node& node, uint dur)
293+ {
294+ node.Weight = dur;
295+ }
296+
255297// Explicit instantiation
256298template class EngineBase <Turn>;
257299template class EngineBase <DefaultQueueableTurn<Turn>>;
0 commit comments