Skip to content

Commit a3b465a

Browse files
committed
Refactored engines to use new node weight interface.
1 parent 536ebb9 commit a3b465a

7 files changed

Lines changed: 23 additions & 59 deletions

File tree

include/react/detail/IReactiveEngine.h

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,6 @@ struct IReactiveEngine
4747

4848
template <typename F>
4949
bool TryMerge(F&& f) { return false; }
50-
51-
void HintUpdateDuration(NodeT& node, uint dur) {}
5250
};
5351

5452
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -160,18 +158,13 @@ struct EngineInterface
160158
{
161159
return Engine().TryMerge(std::forward<F>(f));
162160
}
163-
164-
static void HintUpdateDuration(NodeT& node, uint dur)
165-
{
166-
Engine().HintUpdateDuration(node, dur);
167-
}
168-
169161
};
170162

171163
///////////////////////////////////////////////////////////////////////////////////////////////////
172164
/// Engine traits
173165
///////////////////////////////////////////////////////////////////////////////////////////////////
174166
template <typename> struct EnableNodeUpdateTimer : std::false_type {};
175167
template <typename> struct EnableParallelUpdating : std::false_type {};
168+
template <typename> struct EnableConcurrentInput : std::false_type {};
176169

177170
/****************************************/ REACT_IMPL_END /***************************************/

include/react/engine/PulseCountEngine.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,6 @@ class Node : public IReactiveNode
8585
NodeVector<Node> Successors;
8686

8787
ENodeState State = ENodeState::unchanged;
88-
uint Weight = 0;
8988

9089
private:
9190
atomic<int> counter_ = 0;
@@ -114,8 +113,6 @@ class EngineBase : public IReactiveEngine<Node,TTurn>
114113
void OnDynamicNodeAttach(Node& node, Node& parent, TTurn& turn);
115114
void OnDynamicNodeDetach(Node& node, Node& parent, TTurn& turn);
116115

117-
void HintUpdateDuration(Node& node, uint dur);
118-
119116
private:
120117
NodeVectT changedInputs_;
121118
empty_task* rootTask_ = new(task::allocate_root()) empty_task;

include/react/engine/SubtreeEngine.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,6 @@ class Node : public IReactiveNode
5252
inline void SetQueuedFlag() { flags_.Set<flag_queued>(); }
5353
inline void ClearQueuedFlag() { flags_.Clear<flag_queued>(); }
5454

55-
inline bool IsHeavy() const { return flags_.Test<flag_heavy>(); }
56-
inline void SetHeavyFlag() { flags_.Set<flag_heavy>(); }
57-
inline void ClearHeavyFlag() { flags_.Clear<flag_heavy>(); }
58-
5955
inline bool IsMarked() const { return flags_.Test<flag_marked>(); }
6056
inline void SetMarkedFlag() { flags_.Set<flag_marked>(); }
6157
inline void ClearMarkedFlag() { flags_.Clear<flag_marked>(); }
@@ -105,7 +101,6 @@ class Node : public IReactiveNode
105101
enum EFlags : uint16_t
106102
{
107103
flag_queued = 0,
108-
flag_heavy,
109104
flag_marked,
110105
flag_changed,
111106
flag_deferred,
@@ -142,8 +137,6 @@ class EngineBase : public IReactiveEngine<Node,TTurn>
142137
void OnDynamicNodeAttach(Node& node, Node& parent, TTurn& turn);
143138
void OnDynamicNodeDetach(Node& node, Node& parent, TTurn& turn);
144139

145-
void HintUpdateDuration(Node& node, uint dur);
146-
147140
private:
148141
void applyAsyncDynamicAttach(Node& node, Node& parent, TTurn& turn);
149142
void applyAsyncDynamicDetach(Node& node, Node& parent, TTurn& turn);

include/react/engine/ToposortEngine.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ class ParNode : public IReactiveNode
7676
int Level = 0;
7777
int NewLevel = 0;
7878
atomic<bool> Collected = false;
79-
uint Weight = 1;
8079

8180
NodeVector<ParNode> Successors;
8281
InvalidateMutexT InvalidateMutex;
@@ -156,8 +155,6 @@ class ParEngineBase : public EngineBase<ParNode,TTurn>
156155
void OnDynamicNodeAttach(ParNode& node, ParNode& parent, TTurn& turn);
157156
void OnDynamicNodeDetach(ParNode& node, ParNode& parent, TTurn& turn);
158157

159-
void HintUpdateDuration(ParNode& node, uint dur);
160-
161158
private:
162159
void applyDynamicAttach(ParNode& node, ParNode& parent, TTurn& turn);
163160
void applyDynamicDetach(ParNode& node, ParNode& parent, TTurn& turn);
@@ -247,7 +244,7 @@ class PipeliningTurn : public TurnBase
247244
PipeliningTurn* successor_ = nullptr;
248245

249246
int currentLevel_ = -1;
250-
int maxLevel_ = numeric_limits<int>::max(); /// This turn may only advance up to maxLevel
247+
int maxLevel_ = (numeric_limits<int>::max)(); /// This turn may only advance up to maxLevel
251248
int minLevel_ = -1; /// successor.maxLevel = this.minLevel - 1
252249

253250
int curUpperBound_ = -1;
@@ -363,4 +360,9 @@ template <> struct EnableParallelUpdating<ToposortEngine<parallel>> : std::true_
363360
template <> struct EnableParallelUpdating<ToposortEngine<parallel_queue>> : std::true_type {};
364361
template <> struct EnableParallelUpdating<ToposortEngine<parallel_pipeline>> : std::true_type {};
365362

363+
template <typename> struct EnableConcurrentInput;
364+
template <> struct EnableConcurrentInput<ToposortEngine<sequential_queue>> : std::true_type {};
365+
template <> struct EnableConcurrentInput<ToposortEngine<parallel_queue>> : std::true_type {};
366+
template <> struct EnableConcurrentInput<ToposortEngine<parallel_pipeline>> : std::true_type {};
367+
366368
/****************************************/ REACT_IMPL_END /***************************************/

src/engine/PulsecountEngine.cpp

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ class UpdaterTask: public task
133133
continue;
134134

135135
// Heavyweight - spawn new task
136-
if (succ->Weight > heavy_weight)
136+
if (succ->IsHeavyweight())
137137
{
138138
auto& t = *new(task::allocate_additional_child_of(*parent()))
139139
UpdaterTask(turn_, succ);
@@ -287,12 +287,6 @@ void EngineBase<TTurn>::OnDynamicNodeDetach(Node& node, Node& parent, TTurn& tur
287287
parent.Successors.Remove(node);
288288
}// ~parent.ShiftMutex (write)
289289

290-
template <typename TTurn>
291-
void EngineBase<TTurn>::HintUpdateDuration(Node& node, uint dur)
292-
{
293-
node.Weight = dur;
294-
}
295-
296290
// Explicit instantiation
297291
template class EngineBase<Turn>;
298292
template class EngineBase<DefaultQueueableTurn<Turn>>;

src/engine/SubtreeEngine.cpp

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class UpdaterTask: public task
112112
succ->SetReadyCount(0);
113113

114114
// Heavyweight - spawn new task
115-
if (succ->IsHeavy())
115+
if (succ->IsHeavyweight())
116116
{
117117
auto& t = *new(task::allocate_additional_child_of(*parent()))
118118
UpdaterTask(turn_, succ);
@@ -155,7 +155,7 @@ void EngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
155155
// Phase 1
156156
while (scheduledNodes_.FetchNext())
157157
{
158-
for (auto* curNode : scheduledNodes_.NextNodes())
158+
for (auto* curNode : scheduledNodes_.NextValues())
159159
{
160160
if (curNode->Level < curNode->NewLevel)
161161
{
@@ -244,15 +244,6 @@ void EngineBase<TTurn>::OnDynamicNodeDetach(Node& node, Node& parent, TTurn& tur
244244
OnNodeDetach(node, parent);
245245
}
246246

247-
template <typename TTurn>
248-
void EngineBase<TTurn>::HintUpdateDuration(Node& node, uint dur)
249-
{
250-
if (dur > heavy_weight)
251-
node.SetHeavyFlag();
252-
else
253-
node.ClearHeavyFlag();
254-
}
255-
256247
template <typename TTurn>
257248
void EngineBase<TTurn>::applyAsyncDynamicAttach(Node& node, Node& parent, TTurn& turn)
258249
{
@@ -306,7 +297,7 @@ void EngineBase<TTurn>::processChildren(Node& node, TTurn& turn)
306297
continue;
307298

308299
// Light nodes use sequential toposort in phase 1
309-
if (! succ->IsHeavy())
300+
if (! succ->IsHeavyweight())
310301
{
311302
if (!succ->IsQueued())
312303
{

src/engine/ToposortEngine.cpp

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ void SeqEngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
6262
{
6363
while (scheduledNodes_.FetchNext())
6464
{
65-
for (auto* curNode : scheduledNodes_.NextNodes())
65+
for (auto* curNode : scheduledNodes_.NextValues())
6666
{
6767
if (curNode->Level < curNode->NewLevel)
6868
{
@@ -133,15 +133,17 @@ void ParEngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
133133
while (topoQueue_.FetchNext())
134134
{
135135
//using RangeT = tbb::blocked_range<vector<ParNode*>::const_iterator>;
136-
using RangeT = ParEngineBase::TopoQueueT::RangeT;
136+
using RangeT = ParEngineBase::TopoQueueT::NextRangeT;
137137

138138
// Iterate all nodes of current level and start processing them in parallel
139139
tbb::parallel_for(
140140
topoQueue_.NextRange(),
141141
[&] (const RangeT& range)
142142
{
143-
for (auto* curNode : range)
143+
for (const auto& e : range)
144144
{
145+
auto* curNode = e.first;
146+
145147
if (curNode->Level < curNode->NewLevel)
146148
{
147149
curNode->Level = curNode->NewLevel;
@@ -186,15 +188,6 @@ void ParEngineBase<TTurn>::OnDynamicNodeDetach(ParNode& node, ParNode& parent, T
186188
dynRequests_.push_back(data);
187189
}
188190

189-
template <typename TTurn>
190-
void ParEngineBase<TTurn>::HintUpdateDuration(ParNode& node, uint dur)
191-
{
192-
if (dur < min_weight)
193-
dur = min_weight;
194-
195-
node.Weight = dur;
196-
}
197-
198191
template <typename TTurn>
199192
void ParEngineBase<TTurn>::applyDynamicAttach(ParNode& node, ParNode& parent, TTurn& turn)
200193
{
@@ -331,7 +324,7 @@ void PipeliningTurn::Remove()
331324
}
332325
else if (successor_)
333326
{
334-
successor_->SetMaxLevel(numeric_limits<int>::max());
327+
successor_->SetMaxLevel((numeric_limits<int>::max)());
335328
successor_->predecessor_ = nullptr;
336329
}
337330

@@ -434,10 +427,10 @@ void PipeliningEngine::OnTurnPropagate(PipeliningTurn& turn)
434427

435428
while (turn.TopoQueue.FetchNext())
436429
{
437-
using RangeT = PipeliningTurn::TopoQueueT::RangeT;
430+
using RangeT = PipeliningTurn::TopoQueueT::NextRangeT;
438431

439-
for (const auto* node : turn.TopoQueue.NextRange())
440-
turn.AdjustUpperBound(node->Level);
432+
for (const auto& e : turn.TopoQueue.NextRange())
433+
turn.AdjustUpperBound(e.first->Level);
441434

442435
advanceTurn(turn);
443436

@@ -446,8 +439,9 @@ void PipeliningEngine::OnTurnPropagate(PipeliningTurn& turn)
446439
turn.TopoQueue.NextRange(),
447440
[&] (const RangeT& range)
448441
{
449-
for (auto* curNode : range)
442+
for (const auto& e : range)
450443
{
444+
auto* curNode = e.first;
451445
if (curNode->Level < curNode->NewLevel)
452446
{
453447
curNode->Level = curNode->NewLevel;
@@ -492,7 +486,7 @@ void PipeliningEngine::OnDynamicNodeDetach(ParNode& node, ParNode& parent, Pipel
492486

493487
void PipeliningEngine::applyDynamicAttach(ParNode& node, ParNode& parent, PipeliningTurn& turn)
494488
{
495-
turn.WaitForMaxLevel(numeric_limits<int>::max());
489+
turn.WaitForMaxLevel((numeric_limits<int>::max)());
496490

497491
OnNodeAttach(node, parent);
498492

0 commit comments

Comments
 (0)