@@ -209,156 +209,6 @@ class QueuingSeqEngine : public DefaultQueuingEngine<SeqEngineBase,ExclusiveSeqT
209209class BasicParEngine : public ParEngineBase <ExclusiveParTurn> {};
210210class QueuingParEngine : public DefaultQueuingEngine <ParEngineBase,ExclusiveParTurn> {};
211211
212- // /////////////////////////////////////////////////////////////////////////////////////////////////
213- // / PipeliningTurn
214- // /////////////////////////////////////////////////////////////////////////////////////////////////
215- class PipeliningTurn : public TurnBase <true >
216- {
217- public:
218- using ConcNodeVectT = concurrent_vector<ParNode*>;
219- using NodeVectT = vector<ParNode*>;
220- using IntervalSetT = set<pair<int ,int >>;
221- using DynRequestVectT = concurrent_vector<DynRequestData>;
222- using TopoQueueT = ConcurrentTopoQueue
223- <
224- ParNode*,
225- grain_size,
226- GetLevelFunctor<ParNode>,
227- GetWeightFunctor<ParNode>
228- >;
229-
230- PipeliningTurn (TurnIdT id, TurnFlagsT flags);
231-
232- int CurrentLevel () const { return currentLevel_; }
233-
234- bool AdvanceLevel ();
235- void SetMaxLevel (int level);
236- void WaitForMaxLevel (int targetLevel);
237-
238- void UpdateSuccessor ();
239-
240- void Append (PipeliningTurn* turn);
241-
242- void Remove ();
243-
244- void AdjustUpperBound (int level);
245-
246- template <typename F>
247- bool TryMerge (F&& inputFunc, BlockingCondition& caller)
248- {
249- if (!isMergeable_)
250- return false ;
251-
252- bool merged = false ;
253-
254- {// advMutex_
255- std::lock_guard<std::mutex> scopedLock (advMutex_);
256-
257- // Only merge if target is still blocked
258- if (currentLevel_ == -1 )
259- {
260- merged = true ;
261- caller.Block ();
262- merged_.emplace_back (std::make_pair (std::forward<F>(inputFunc), &caller));
263- }
264- }// ~advMutex_
265-
266- return merged;
267- }
268-
269- void RunMergedInputs () const ;
270-
271- TopoQueueT TopoQueue;
272- DynRequestVectT DynRequests;
273-
274- private:
275- using MergedDataVectT = vector<pair<function<void ()>,BlockingCondition*>>;
276-
277- const bool isMergeable_;
278- MergedDataVectT merged_;
279-
280- IntervalSetT levelIntervals_;
281-
282- PipeliningTurn* predecessor_ { nullptr };
283- PipeliningTurn* successor_ { nullptr };
284-
285- int currentLevel_ { -1 };
286-
287- // / This turn may only advance up to maxLevel
288- int maxLevel_ { (numeric_limits<int >::max)() };
289-
290- // / successor.maxLevel = this.minLevel - 1
291- int minLevel_ { -1 };
292-
293- int curUpperBound_ { -1 };
294-
295- mutex advMutex_;
296- condition_variable advCondition_;
297- };
298-
299- // /////////////////////////////////////////////////////////////////////////////////////////////////
300- // / PipeliningEngine
301- // /////////////////////////////////////////////////////////////////////////////////////////////////
302- class PipeliningEngine : public IReactiveEngine <ParNode,PipeliningTurn>
303- {
304- public:
305- using SeqMutexT = queuing_rw_mutex;
306- using NodeSetT = set<ParNode*>;
307- using MaxDynamicLevelMutexT = spin_mutex;
308-
309- void OnNodeAttach (ParNode& node, ParNode& parent);
310- void OnNodeDetach (ParNode& node, ParNode& parent);
311-
312- void OnTurnAdmissionStart (PipeliningTurn& turn);
313- void OnTurnAdmissionEnd (PipeliningTurn& turn);
314- void OnTurnEnd (PipeliningTurn& turn);
315-
316- void OnTurnInputChange (ParNode& node, PipeliningTurn& turn);
317- void OnNodePulse (ParNode& node, PipeliningTurn& turn);
318-
319- void OnTurnPropagate (PipeliningTurn& turn);
320-
321- void OnDynamicNodeAttach (ParNode& node, ParNode& parent, PipeliningTurn& turn);
322- void OnDynamicNodeDetach (ParNode& node, ParNode& parent, PipeliningTurn& turn);
323-
324- template <typename F>
325- inline bool TryMerge (F&& inputFunc)
326- {
327- bool merged = false ;
328-
329- BlockingCondition caller;
330-
331- {// seqMutex_
332- SeqMutexT::scoped_lock lock (seqMutex_);
333-
334- if (tail_)
335- merged = tail_->TryMerge (std::forward<F>(inputFunc), caller);
336- }// ~seqMutex_
337-
338- if (merged)
339- caller.WaitForUnblock ();
340-
341- return merged;
342- }
343-
344- private:
345- void applyDynamicAttach (ParNode& node, ParNode& parent, PipeliningTurn& turn);
346- void applyDynamicDetach (ParNode& node, ParNode& parent, PipeliningTurn& turn);
347-
348- void processChildren (ParNode& node, PipeliningTurn& turn);
349- void invalidateSuccessors (ParNode& node);
350-
351- void advanceTurn (PipeliningTurn& turn);
352-
353- SeqMutexT seqMutex_;
354- PipeliningTurn* tail_ { nullptr };
355-
356- NodeSetT dynamicNodes_;
357- int maxDynamicLevel_;
358-
359- MaxDynamicLevelMutexT maxDynamicLevelMutex_;
360- };
361-
362212} // ~namespace toposort
363213
364214/* ***************************************/ REACT_IMPL_END /* **************************************/
@@ -369,7 +219,6 @@ struct sequential;
369219struct sequential_queue ;
370220struct parallel ;
371221struct parallel_queue ;
372- struct parallel_pipeline ;
373222
374223template <typename TMode>
375224class ToposortEngine ;
@@ -386,27 +235,21 @@ template <> class ToposortEngine<parallel> :
386235template <> class ToposortEngine <parallel_queue> :
387236 public REACT_IMPL::toposort::QueuingParEngine {};
388237
389- template <> class ToposortEngine <parallel_pipeline> :
390- public REACT_IMPL::toposort::PipeliningEngine {};
391-
392238/* *****************************************/ REACT_END /* *****************************************/
393239
394240/* **************************************/ REACT_IMPL_BEGIN /* *************************************/
395241
396242template <typename > struct EnableNodeUpdateTimer ;
397243template <> struct EnableNodeUpdateTimer <ToposortEngine<parallel>> : std::true_type {};
398244template <> struct EnableNodeUpdateTimer <ToposortEngine<parallel_queue>> : std::true_type {};
399- template <> struct EnableNodeUpdateTimer <ToposortEngine<parallel_pipeline>> : std::true_type {};
400245
401246template <typename > struct EnableParallelUpdating ;
402247template <> struct EnableParallelUpdating <ToposortEngine<parallel>> : std::true_type {};
403248template <> struct EnableParallelUpdating <ToposortEngine<parallel_queue>> : std::true_type {};
404- template <> struct EnableParallelUpdating <ToposortEngine<parallel_pipeline>> : std::true_type {};
405249
406250template <typename > struct EnableConcurrentInput ;
407251template <> struct EnableConcurrentInput <ToposortEngine<sequential_queue>> : std::true_type {};
408252template <> struct EnableConcurrentInput <ToposortEngine<parallel_queue>> : std::true_type {};
409- template <> struct EnableConcurrentInput <ToposortEngine<parallel_pipeline>> : std::true_type {};
410253
411254/* ***************************************/ REACT_IMPL_END /* **************************************/
412255
0 commit comments