// Copyright Sebastian Jeckel 2014. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include "react/engine/SubtreeEngine.h" #include #include #include "react/common/Types.h" /***************************************/ REACT_IMPL_BEGIN /**************************************/ namespace subtree { /////////////////////////////////////////////////////////////////////////////////////////////////// /// Parameters /////////////////////////////////////////////////////////////////////////////////////////////////// static const uint chunk_size = 8; static const uint dfs_threshold = 3; static const uint heavy_weight = 1000; /////////////////////////////////////////////////////////////////////////////////////////////////// /// Turn /////////////////////////////////////////////////////////////////////////////////////////////////// Turn::Turn(TurnIdT id, TurnFlagsT flags) : TurnBase(id, flags) { } /////////////////////////////////////////////////////////////////////////////////////////////////// /// PulseCountEngine /////////////////////////////////////////////////////////////////////////////////////////////////// template void EngineBase::OnNodeAttach(Node& node, Node& parent) { parent.Successors.Add(node); if (node.Level <= parent.Level) node.Level = parent.Level + 1; } template void EngineBase::OnNodeDetach(Node& node, Node& parent) { parent.Successors.Remove(node); } template void EngineBase::OnTurnInputChange(Node& node, TTurn& turn) { processChildren(node, turn); } /////////////////////////////////////////////////////////////////////////////////////////////////// /// UpdaterTask /////////////////////////////////////////////////////////////////////////////////////////////////// template class UpdaterTask: public task { public: using BufferT = NodeBuffer; UpdaterTask(TTurn& turn, Node* node) : turn_{ turn }, nodes_{ node } {} UpdaterTask(UpdaterTask& other, SplitTag) : turn_{ other.turn_ }, nodes_{ other.nodes_, SplitTag{} } {} task* execute() { int splitCount = 0; while (!nodes_.IsEmpty()) { Node& node = splitCount > dfs_threshold ? *nodes_.PopBack() : *nodes_.PopFront(); if (node.IsInitial() || node.ShouldUpdate()) node.Tick(&turn_); node.ClearInitialFlag(); node.SetShouldUpdate(false); // Skip deferred node if (node.IsDeferred()) { node.ClearDeferredFlag(); continue; } // Mark successors for update? bool update = node.IsChanged(); {// node.ShiftMutex Node::ShiftMutexT::scoped_lock lock(node.ShiftMutex, false); for (auto* succ : node.Successors) { if (update) succ->SetShouldUpdate(true); // Wait for more? if (succ->IncReadyCount()) continue; succ->SetReadyCount(0); // Heavyweight - spawn new task if (succ->IsHeavy()) { auto& t = *new(task::allocate_additional_child_of(*parent())) UpdaterTask(turn_, succ); spawn(t); } // Leightweight - add to buffer, split if full else { nodes_.PushBack(succ); if (nodes_.IsFull()) { splitCount++; //Delegate half the work to new task auto& t = *new(task::allocate_additional_child_of(*parent())) UpdaterTask(*this, SplitTag{}); spawn(t); } } } node.ClearMarkedFlag(); }// ~node.ShiftMutex } return nullptr; } private: TTurn& turn_; BufferT nodes_; }; template void EngineBase::OnTurnPropagate(TTurn& turn) { // Phase 1 while (scheduledNodes_.FetchNext()) { for (auto* curNode : scheduledNodes_.NextNodes()) { if (curNode->Level < curNode->NewLevel) { curNode->Level = curNode->NewLevel; invalidateSuccessors(*curNode); scheduledNodes_.Push(curNode); continue; } curNode->ClearQueuedFlag(); curNode->Tick(&turn); } } // Phase 2 isInPhase2_ = true; rootTask_->set_ref_count(1 + subtreeRoots_.size()); for (auto* node : subtreeRoots_) { // Ignore if root flag has been cleared because node was part of another subtree if (! node->IsRoot()) { rootTask_->decrement_ref_count(); continue; } spawnList_.push_back(*new(rootTask_->allocate_child()) UpdaterTask(turn, node)); node->ClearRootFlag(); } rootTask_->spawn_and_wait_for_all(spawnList_); subtreeRoots_.clear(); spawnList_.clear(); isInPhase2_ = false; } template void EngineBase::OnNodePulse(Node& node, TTurn& turn) { if (isInPhase2_) node.SetChangedFlag(); else processChildren(node, turn); } template void EngineBase::OnNodeIdlePulse(Node& node, TTurn& turn) { if (isInPhase2_) node.ClearChangedFlag(); else processChildren(node, turn); } template void EngineBase::OnDynamicNodeAttach(Node& node, Node& parent, TTurn& turn) { if (isInPhase2_) { applyAsyncDynamicAttach(node, parent, turn); } else { OnNodeAttach(node, parent); invalidateSuccessors(node); // Re-schedule this node node.SetQueuedFlag(); scheduledNodes_.Push(&node); } } template void EngineBase::OnDynamicNodeDetach(Node& node, Node& parent, TTurn& turn) { if (isInPhase2_) applyAsyncDynamicDetach(node, parent, turn); else OnNodeDetach(node, parent); } template void EngineBase::HintUpdateDuration(Node& node, uint dur) { if (dur > heavy_weight) node.SetHeavyFlag(); else node.ClearHeavyFlag(); } template void EngineBase::applyAsyncDynamicAttach(Node& node, Node& parent, TTurn& turn) { bool shouldTick = false; {// parent.ShiftMutex (write) NodeShiftMutexT::scoped_lock lock(parent.ShiftMutex, true); parent.Successors.Add(node); // Level recalulation applied when added to topoqueue next time. // During the async phase 2 it's not needed. if (node.NewLevel <= parent.Level) node.NewLevel = parent.Level + 1; // Has already nudged its neighbors? if (! parent.IsMarked()) { shouldTick = true; } else { node.SetDeferredFlag(); node.SetShouldUpdate(true); node.WaitCount = 1; } }// ~parent.ShiftMutex (write) if (shouldTick) node.Tick(&turn); } template void EngineBase::applyAsyncDynamicDetach(Node& node, Node& parent, TTurn& turn) {// parent.ShiftMutex (write) NodeShiftMutexT::scoped_lock lock(parent.ShiftMutex, true); parent.Successors.Remove(node); }// ~parent.ShiftMutex (write) template void EngineBase::processChildren(Node& node, TTurn& turn) { // Add children to queue for (auto* succ : node.Successors) { // Ignore if node part of marked subtree if (succ->IsMarked()) continue; // Light nodes use sequential toposort in phase 1 if (! succ->IsHeavy()) { if (!succ->IsQueued()) { succ->SetQueuedFlag(); scheduledNodes_.Push(succ); } } // Heavy nodes + subtrees are deferred for parallel updating in phase 2 else { // Force an initial update for heavy non-input nodes. // (non-atomic flag, unlike ShouldUpdate) if (!succ->IsInputNode()) succ->SetInitialFlag(); succ->SetChangedFlag(); succ->SetRootFlag(); markSubtree(*succ); subtreeRoots_.push_back(succ); } } } template void EngineBase::markSubtree(Node& root) { root.SetMarkedFlag(); root.WaitCount = 0; for (auto* succ : root.Successors) { if (!succ->IsMarked()) markSubtree(*succ); // Successor of another marked node? -> not a root anymore else if (succ->IsRoot()) succ->ClearRootFlag(); ++succ->WaitCount; } } template void EngineBase::invalidateSuccessors(Node& node) { for (auto* succ : node.Successors) { if (succ->NewLevel <= node.Level) succ->NewLevel = node.Level + 1; } } // Explicit instantiation template class EngineBase; template class EngineBase>; } // ~namespace subtree /****************************************/ REACT_IMPL_END /***************************************/