// Copyright Sebastian Jeckel 2014. // Distributed under the Boost Software License, Version 1.0. // (See accompanying file LICENSE_1_0.txt or copy at // http://www.boost.org/LICENSE_1_0.txt) #include "react/engine/PulseCountEngine.h" #include #include #include "react/common/Types.h" /***************************************/ REACT_IMPL_BEGIN /**************************************/ namespace pulsecount { /////////////////////////////////////////////////////////////////////////////////////////////////// /// Constants /////////////////////////////////////////////////////////////////////////////////////////////////// static const uint chunk_size = 8; static const uint dfs_threshold = 3; static const uint heavy_weight = 1000; /////////////////////////////////////////////////////////////////////////////////////////////////// /// MarkerTask /////////////////////////////////////////////////////////////////////////////////////////////////// class MarkerTask: public task { public: using BufferT = NodeBuffer; template MarkerTask(TInput srcBegin, TInput srcEnd) : nodes_{ srcBegin, srcEnd } { } MarkerTask(MarkerTask& other, SplitTag) : nodes_{ other.nodes_, SplitTag{} } { } task* execute() { int splitCount = 0; while (! nodes_.IsEmpty()) { Node& node = splitCount > dfs_threshold ? *nodes_.PopBack() : *nodes_.PopFront(); // Increment counter of each successor and add it to smaller stack for (auto* succ : node.Successors) { succ->IncCounter(); // Skip if already marked as reachable if (! succ->ExchangeMark(ENodeMark::visited)) continue; nodes_.PushBack(succ); if (nodes_.IsFull()) { splitCount++; //Delegate half the work to new task auto& t = *new(task::allocate_additional_child_of(*parent())) MarkerTask(*this, SplitTag{}); spawn(t); } } } return nullptr; } private: BufferT nodes_; }; /////////////////////////////////////////////////////////////////////////////////////////////////// /// UpdaterTask /////////////////////////////////////////////////////////////////////////////////////////////////// template class UpdaterTask: public task { public: using BufferT = NodeBuffer; template UpdaterTask(TTurn& turn, TInput srcBegin, TInput srcEnd) : turn_{ turn }, nodes_{ srcBegin, srcEnd } {} UpdaterTask(TTurn& turn, Node* node) : turn_{ turn }, nodes_{ node } {} UpdaterTask(UpdaterTask& other, SplitTag) : turn_{ other.turn_ }, nodes_{ other.nodes_, SplitTag{} } {} task* execute() { int splitCount = 0; while (!nodes_.IsEmpty()) { Node& node = splitCount > dfs_threshold ? *nodes_.PopBack() : *nodes_.PopFront(); if (node.Mark() == ENodeMark::should_update) node.Tick(&turn_); if (node.State == ENodeState::deferred) continue; // Mark successors for update? bool update = node.State == ENodeState::changed; node.State = ENodeState::unchanged; {// node.ShiftMutex Node::ShiftMutexT::scoped_lock lock(node.ShiftMutex, false); for (auto* succ : node.Successors) { if (update) succ->SetMark(ENodeMark::should_update); // Delay tick? if (succ->DecCounter()) continue; // Heavyweight - spawn new task if (succ->Weight > heavy_weight) { auto& t = *new(task::allocate_additional_child_of(*parent())) UpdaterTask(turn_, succ); spawn(t); } // Leightweight - add to buffer, split if full else { nodes_.PushBack(succ); if (nodes_.IsFull()) { splitCount++; //Delegate half the work to new task auto& t = *new(task::allocate_additional_child_of(*parent())) UpdaterTask(*this, SplitTag{}); spawn(t); } } } node.SetMark(ENodeMark::unmarked); }// ~node.ShiftMutex } return nullptr; } private: TTurn& turn_; BufferT nodes_; }; /////////////////////////////////////////////////////////////////////////////////////////////////// /// Turn /////////////////////////////////////////////////////////////////////////////////////////////////// Turn::Turn(TurnIdT id, TurnFlagsT flags) : TurnBase(id, flags) { } /////////////////////////////////////////////////////////////////////////////////////////////////// /// PulseCountEngine /////////////////////////////////////////////////////////////////////////////////////////////////// template void EngineBase::OnNodeAttach(Node& node, Node& parent) { parent.Successors.Add(node); } template void EngineBase::OnNodeDetach(Node& node, Node& parent) { parent.Successors.Remove(node); } template void EngineBase::OnTurnInputChange(Node& node, TTurn& turn) { changedInputs_.push_back(&node); node.State = ENodeState::changed; } template void spawnHelper ( task* rootTask, task_list& spawnList, const int count, TIt start, TIt end, TArgs& ... args ) { rootTask->set_ref_count(1 + count); for (int i=0; i < (count - 1); i++) { spawnList.push_back(*new(rootTask->allocate_child()) TTask(args ..., start, start + chunk_size)); start += chunk_size; } spawnList.push_back(*new(rootTask->allocate_child()) TTask(args ..., start, end)); rootTask->spawn_and_wait_for_all(spawnList); spawnList.clear(); } template void EngineBase::OnTurnPropagate(TTurn& turn) { const int initialTaskCount = (changedInputs_.size() - 1) / chunk_size + 1; spawnHelper(rootTask_, spawnList_, initialTaskCount, changedInputs_.begin(), changedInputs_.end()); spawnHelper>(rootTask_, spawnList_, initialTaskCount, changedInputs_.begin(), changedInputs_.end(), turn); changedInputs_.clear(); } template void EngineBase::OnNodePulse(Node& node, TTurn& turn) { node.State = ENodeState::changed; } template void EngineBase::OnNodeIdlePulse(Node& node, TTurn& turn) { node.State = ENodeState::unchanged; } template void EngineBase::OnDynamicNodeAttach(Node& node, Node& parent, TTurn& turn) { bool shouldTick = false; {// parent.ShiftMutex (write) NodeShiftMutexT::scoped_lock lock(parent.ShiftMutex, true); parent.Successors.Add(node); // Has already nudged its neighbors? if (parent.Mark() == ENodeMark::unmarked) { shouldTick = true; } else { node.State = ENodeState::deferred; node.SetCounter(1); node.SetMark(ENodeMark::should_update); } }// ~parent.ShiftMutex (write) if (shouldTick) node.Tick(&turn); } template void EngineBase::OnDynamicNodeDetach(Node& node, Node& parent, TTurn& turn) {// parent.ShiftMutex (write) NodeShiftMutexT::scoped_lock lock(parent.ShiftMutex, true); parent.Successors.Remove(node); }// ~parent.ShiftMutex (write) template void EngineBase::HintUpdateDuration(Node& node, uint dur) { node.Weight = dur; } // Explicit instantiation template class EngineBase; template class EngineBase>; } // ~namespace pulsecount /****************************************/ REACT_IMPL_END /***************************************/