Skip to content

Commit ad88394

Browse files
committed
Removed pipelining toposort engine. It was somewhat broken and adding async (which I'm planning) would have introduced further complications.
1 parent 0eea52c commit ad88394

8 files changed

Lines changed: 3 additions & 477 deletions

File tree

include/react/engine/ToposortEngine.h

Lines changed: 0 additions & 157 deletions
Original file line numberDiff line numberDiff line change
@@ -209,156 +209,6 @@ class QueuingSeqEngine : public DefaultQueuingEngine<SeqEngineBase,ExclusiveSeqT
209209
class BasicParEngine : public ParEngineBase<ExclusiveParTurn> {};
210210
class QueuingParEngine : public DefaultQueuingEngine<ParEngineBase,ExclusiveParTurn> {};
211211

212-
///////////////////////////////////////////////////////////////////////////////////////////////////
213-
/// PipeliningTurn
214-
///////////////////////////////////////////////////////////////////////////////////////////////////
215-
class PipeliningTurn : public TurnBase<true>
216-
{
217-
public:
218-
using ConcNodeVectT = concurrent_vector<ParNode*>;
219-
using NodeVectT = vector<ParNode*>;
220-
using IntervalSetT = set<pair<int,int>>;
221-
using DynRequestVectT = concurrent_vector<DynRequestData>;
222-
using TopoQueueT = ConcurrentTopoQueue
223-
<
224-
ParNode*,
225-
grain_size,
226-
GetLevelFunctor<ParNode>,
227-
GetWeightFunctor<ParNode>
228-
>;
229-
230-
PipeliningTurn(TurnIdT id, TurnFlagsT flags);
231-
232-
int CurrentLevel() const { return currentLevel_; }
233-
234-
bool AdvanceLevel();
235-
void SetMaxLevel(int level);
236-
void WaitForMaxLevel(int targetLevel);
237-
238-
void UpdateSuccessor();
239-
240-
void Append(PipeliningTurn* turn);
241-
242-
void Remove();
243-
244-
void AdjustUpperBound(int level);
245-
246-
template <typename F>
247-
bool TryMerge(F&& inputFunc, BlockingCondition& caller)
248-
{
249-
if (!isMergeable_)
250-
return false;
251-
252-
bool merged = false;
253-
254-
{// advMutex_
255-
std::lock_guard<std::mutex> scopedLock(advMutex_);
256-
257-
// Only merge if target is still blocked
258-
if (currentLevel_ == -1)
259-
{
260-
merged = true;
261-
caller.Block();
262-
merged_.emplace_back(std::make_pair(std::forward<F>(inputFunc), &caller));
263-
}
264-
}// ~advMutex_
265-
266-
return merged;
267-
}
268-
269-
void RunMergedInputs() const;
270-
271-
TopoQueueT TopoQueue;
272-
DynRequestVectT DynRequests;
273-
274-
private:
275-
using MergedDataVectT = vector<pair<function<void()>,BlockingCondition*>>;
276-
277-
const bool isMergeable_;
278-
MergedDataVectT merged_;
279-
280-
IntervalSetT levelIntervals_;
281-
282-
PipeliningTurn* predecessor_ { nullptr };
283-
PipeliningTurn* successor_ { nullptr };
284-
285-
int currentLevel_ { -1 };
286-
287-
/// This turn may only advance up to maxLevel
288-
int maxLevel_ { (numeric_limits<int>::max)() };
289-
290-
/// successor.maxLevel = this.minLevel - 1
291-
int minLevel_ { -1 };
292-
293-
int curUpperBound_ { -1 };
294-
295-
mutex advMutex_;
296-
condition_variable advCondition_;
297-
};
298-
299-
///////////////////////////////////////////////////////////////////////////////////////////////////
300-
/// PipeliningEngine
301-
///////////////////////////////////////////////////////////////////////////////////////////////////
302-
class PipeliningEngine : public IReactiveEngine<ParNode,PipeliningTurn>
303-
{
304-
public:
305-
using SeqMutexT = queuing_rw_mutex;
306-
using NodeSetT = set<ParNode*>;
307-
using MaxDynamicLevelMutexT = spin_mutex;
308-
309-
void OnNodeAttach(ParNode& node, ParNode& parent);
310-
void OnNodeDetach(ParNode& node, ParNode& parent);
311-
312-
void OnTurnAdmissionStart(PipeliningTurn& turn);
313-
void OnTurnAdmissionEnd(PipeliningTurn& turn);
314-
void OnTurnEnd(PipeliningTurn& turn);
315-
316-
void OnTurnInputChange(ParNode& node, PipeliningTurn& turn);
317-
void OnNodePulse(ParNode& node, PipeliningTurn& turn);
318-
319-
void OnTurnPropagate(PipeliningTurn& turn);
320-
321-
void OnDynamicNodeAttach(ParNode& node, ParNode& parent, PipeliningTurn& turn);
322-
void OnDynamicNodeDetach(ParNode& node, ParNode& parent, PipeliningTurn& turn);
323-
324-
template <typename F>
325-
inline bool TryMerge(F&& inputFunc)
326-
{
327-
bool merged = false;
328-
329-
BlockingCondition caller;
330-
331-
{// seqMutex_
332-
SeqMutexT::scoped_lock lock(seqMutex_);
333-
334-
if (tail_)
335-
merged = tail_->TryMerge(std::forward<F>(inputFunc), caller);
336-
}// ~seqMutex_
337-
338-
if (merged)
339-
caller.WaitForUnblock();
340-
341-
return merged;
342-
}
343-
344-
private:
345-
void applyDynamicAttach(ParNode& node, ParNode& parent, PipeliningTurn& turn);
346-
void applyDynamicDetach(ParNode& node, ParNode& parent, PipeliningTurn& turn);
347-
348-
void processChildren(ParNode& node, PipeliningTurn& turn);
349-
void invalidateSuccessors(ParNode& node);
350-
351-
void advanceTurn(PipeliningTurn& turn);
352-
353-
SeqMutexT seqMutex_;
354-
PipeliningTurn* tail_ { nullptr };
355-
356-
NodeSetT dynamicNodes_;
357-
int maxDynamicLevel_;
358-
359-
MaxDynamicLevelMutexT maxDynamicLevelMutex_;
360-
};
361-
362212
} // ~namespace toposort
363213

364214
/****************************************/ REACT_IMPL_END /***************************************/
@@ -369,7 +219,6 @@ struct sequential;
369219
struct sequential_queue;
370220
struct parallel;
371221
struct parallel_queue;
372-
struct parallel_pipeline;
373222

374223
template <typename TMode>
375224
class ToposortEngine;
@@ -386,27 +235,21 @@ template <> class ToposortEngine<parallel> :
386235
template <> class ToposortEngine<parallel_queue> :
387236
public REACT_IMPL::toposort::QueuingParEngine {};
388237

389-
template <> class ToposortEngine<parallel_pipeline> :
390-
public REACT_IMPL::toposort::PipeliningEngine {};
391-
392238
/******************************************/ REACT_END /******************************************/
393239

394240
/***************************************/ REACT_IMPL_BEGIN /**************************************/
395241

396242
template <typename> struct EnableNodeUpdateTimer;
397243
template <> struct EnableNodeUpdateTimer<ToposortEngine<parallel>> : std::true_type {};
398244
template <> struct EnableNodeUpdateTimer<ToposortEngine<parallel_queue>> : std::true_type {};
399-
template <> struct EnableNodeUpdateTimer<ToposortEngine<parallel_pipeline>> : std::true_type {};
400245

401246
template <typename> struct EnableParallelUpdating;
402247
template <> struct EnableParallelUpdating<ToposortEngine<parallel>> : std::true_type {};
403248
template <> struct EnableParallelUpdating<ToposortEngine<parallel_queue>> : std::true_type {};
404-
template <> struct EnableParallelUpdating<ToposortEngine<parallel_pipeline>> : std::true_type {};
405249

406250
template <typename> struct EnableConcurrentInput;
407251
template <> struct EnableConcurrentInput<ToposortEngine<sequential_queue>> : std::true_type {};
408252
template <> struct EnableConcurrentInput<ToposortEngine<parallel_queue>> : std::true_type {};
409-
template <> struct EnableConcurrentInput<ToposortEngine<parallel_pipeline>> : std::true_type {};
410253

411254
/****************************************/ REACT_IMPL_END /***************************************/
412255

src/engine/PulsecountEngine.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,13 @@ template <typename TTask, typename TIt, typename ... TArgs>
204204
void spawnHelper
205205
(
206206
task* rootTask, task_list& spawnList,
207-
const int count, TIt start, TIt end,
207+
const uint count, TIt start, TIt end,
208208
TArgs& ... args
209209
)
210210
{
211211
rootTask->set_ref_count(1 + count);
212212

213-
for (int i=0; i < (count - 1); i++)
213+
for (uint i=0; i < (count - 1); i++)
214214
{
215215
spawnList.push_back(*new(rootTask->allocate_child())
216216
TTask(args ..., start, start + chunk_size));
@@ -228,7 +228,7 @@ void spawnHelper
228228
template <typename TTurn>
229229
void EngineBase<TTurn>::OnTurnPropagate(TTurn& turn)
230230
{
231-
const int initialTaskCount = (changedInputs_.size() - 1) / chunk_size + 1;
231+
const uint initialTaskCount = (changedInputs_.size() - 1) / chunk_size + 1;
232232

233233
spawnHelper<MarkerTask>(rootTask_, spawnList_, initialTaskCount,
234234
changedInputs_.begin(), changedInputs_.end());

0 commit comments

Comments
 (0)