1919#include < memory>
2020#include < mutex>
2121#include < utility>
22- #include < thread>
2322#include < type_traits>
2423#include < vector>
2524
25+ #include " tbb/task.h"
2626#include " tbb/concurrent_queue.h"
2727#include " tbb/enumerable_thread_specific.h"
2828#include " tbb/queuing_mutex.h"
3939struct IInputNode ;
4040class IObserver ;
4141
42+ template <typename D>
43+ class InputManager ;
44+
4245// /////////////////////////////////////////////////////////////////////////////////////////////////
4346// / Common types & constants
4447// /////////////////////////////////////////////////////////////////////////////////////////////////
@@ -478,37 +481,142 @@ class TransactionQueue<concurrent_input>
478481};
479482
480483// /////////////////////////////////////////////////////////////////////////////////////////////////
481- // / InputManager
484+ // / AsyncWorker
482485// /////////////////////////////////////////////////////////////////////////////////////////////////
486+ struct AsyncItem
487+ {
488+ TransactionFlagsT Flags;
489+ WaitingStatePtrT WaitingStatePtr;
490+ TransactionFuncT Func;
491+ };
492+
493+ // Interface
494+ template <typename D, EInputMode>
495+ class AsyncWorker
496+ {
497+ public:
498+ AsyncWorker (InputManager<D>& mgr);
499+
500+ void PushItem (AsyncItem&& item);
501+
502+ void PopItem (AsyncItem& item);
503+ bool TryPopItem (AsyncItem& item);
504+
505+ bool IncrementItemCount (size_t n);
506+ bool DecrementItemCount (size_t n);
507+
508+ void Start ();
509+ };
510+
511+ // Disabled
483512template <typename D>
484- class InputManager :
485- public IContinuationTarget
513+ struct AsyncWorker <D, consecutive_input>
486514{
487- private:
488- struct AsyncItem
515+ public:
516+ AsyncWorker (InputManager<D>& mgr)
517+ {}
518+
519+ void PushItem (AsyncItem&& item) { assert (false ); }
520+
521+ void PopItem (AsyncItem& item) { assert (false ); }
522+ bool TryPopItem (AsyncItem& item) { assert (false ); return false ; }
523+
524+ bool IncrementItemCount (size_t n) { assert (false ); return false ; }
525+ bool DecrementItemCount (size_t n) { assert (false ); return false ; }
526+
527+ void Start () { assert (false ); }
528+ };
529+
530+ // Enabled
531+ template <typename D>
532+ struct AsyncWorker <D, concurrent_input>
533+ {
534+ using DataT = tbb::concurrent_bounded_queue<AsyncItem>;
535+
536+ class WorkerTask : public tbb ::task
489537 {
490- TransactionFlagsT Flags;
491- WaitingStatePtrT WaitingStatePtr;
492- TransactionFuncT Func;
538+ public:
539+ WorkerTask (InputManager<D>& mgr) :
540+ mgr_ ( mgr )
541+ {}
542+
543+ tbb::task* execute ()
544+ {
545+ mgr_.processAsyncQueue ();
546+ return nullptr ;
547+ }
548+
549+ private:
550+ InputManager<D>& mgr_;
493551 };
494552
495- using AsyncQueueT = tbb::concurrent_bounded_queue<AsyncItem>;
553+ public:
554+ AsyncWorker (InputManager<D>& mgr) :
555+ mgr_ ( mgr )
556+ {}
557+
558+ void PushItem (AsyncItem&& item)
559+ {
560+ data_.push (std::move (item));
561+ }
496562
563+ void PopItem (AsyncItem& item)
564+ {
565+ data_.pop (item);
566+ }
567+
568+ bool TryPopItem (AsyncItem& item)
569+ {
570+ return data_.try_pop (item);
571+ }
572+
573+ bool IncrementItemCount (size_t n)
574+ {
575+ return count_.fetch_add (n, std::memory_order_relaxed) == 0 ;
576+ }
577+
578+ bool DecrementItemCount (size_t n)
579+ {
580+ return count_.fetch_sub (n, std::memory_order_relaxed) == n;
581+ }
582+
583+ void Start ()
584+ {
585+ tbb::task::enqueue (*new (tbb::task::allocate_root ()) WorkerTask (mgr_));
586+ }
587+
588+ private:
589+ DataT data_;
590+ std::atomic<size_t > count_{ 0 };
591+
592+ InputManager<D>& mgr_;
593+ };
594+
595+ // /////////////////////////////////////////////////////////////////////////////////////////////////
596+ // / InputManager
597+ // /////////////////////////////////////////////////////////////////////////////////////////////////
598+ template <typename D>
599+ class InputManager :
600+ public IContinuationTarget
601+ {
602+ private:
497603 // Select between thread-safe and non thread-safe implementations
498604 using TransactionQueueT = TransactionQueue<D::Policy::input_mode>;
605+ using QueueEntryT = typename TransactionQueueT::QueueEntry;
606+
499607 using ContinuationManagerT = ContinuationManager<D::Policy::propagation_mode>;
608+ using AsyncWorkerT = AsyncWorker<D, D::Policy::input_mode>;
500609
501- using QueueEntryT = typename TransactionQueueT::QueueEntry;
610+ template <typename , EInputMode>
611+ friend class AsyncWorker ;
502612
503613public:
504614 using TurnT = typename D::TurnT;
505615 using Engine = typename D::Engine;
506616
507617 InputManager () :
508- asyncWorker_ ( [this ] { processAsyncQueue (); } )
509- {
510- asyncWorker_.detach ();
511- }
618+ asyncWorker_ (*this )
619+ {}
512620
513621 template <typename F>
514622 void DoTransaction (TransactionFlagsT flags, F&& func)
@@ -556,7 +664,10 @@ class InputManager :
556664 if (waitingStatePtr != nullptr )
557665 waitingStatePtr->IncWaitCount ();
558666
559- asyncQueue_.push (AsyncItem{ flags, waitingStatePtr, std::forward<F>(func) } );
667+ asyncWorker_.PushItem (AsyncItem{ flags, waitingStatePtr, std::forward<F>(func) });
668+
669+ if (asyncWorker_.IncrementItemCount (1 ))
670+ asyncWorker_.Start ();
560671 }
561672
562673 template <typename R, typename V>
@@ -702,11 +813,20 @@ class InputManager :
702813
703814 while (true )
704815 {
705- // Blocks if queue is empty
816+ size_t popCount = 0 ;
817+
706818 if (!skipPop)
707- asyncQueue_.pop (item);
819+ {
820+ // Blocks if queue is empty.
821+ // This should never happen,
822+ // and if (maybe due to some memory access internals), only briefly
823+ asyncWorker_.PopItem (item);
824+ popCount++;
825+ }
708826 else
827+ {
709828 skipPop = false ;
829+ }
710830
711831 // First try to merge to an existing synchronous item in the queue
712832 bool canMerge = (item.Flags & allow_merging) != 0 ;
@@ -745,8 +865,10 @@ class InputManager :
745865 {
746866 uint extraCount = 0 ;
747867 // Todo: Make configurable
748- while (extraCount < 512 && asyncQueue_. try_pop (item))
868+ while (extraCount < 1024 && asyncWorker_. TryPopItem (item))
749869 {
870+ ++popCount;
871+
750872 bool canMergeNext = (item.Flags & allow_merging) != 0 ;
751873 if (canMergeNext)
752874 {
@@ -786,10 +908,6 @@ class InputManager :
786908
787909 continuationManager_.template DetachQueuedObservers <D>();
788910
789- // printf("1\n");
790- // for (const auto& p : waitingStatePtrs)
791- // printf("%08X\n", p.Get());
792-
793911 // Has continuations? If so, status ptrs have to be passed on to
794912 // continuation transactions
795913 if (continuationManager_.HasContinuations ())
@@ -800,10 +918,6 @@ class InputManager :
800918 // More than 1 waiting state -> create collection from vector
801919 if (waitingStatePtrs.size () > 1 )
802920 {
803- /* printf("2\n");
804- for (const auto& p : waitingStatePtrs)
805- printf("%08X\n", p.Get());*/
806-
807921 WaitingStatePtrT p
808922 (
809923 SharedWaitingStateCollection::Create (std::move (waitingStatePtrs))
@@ -839,14 +953,17 @@ class InputManager :
839953 }
840954
841955 waitingStatePtrs.clear ();
956+
957+ // Stop this task if the number of items has just been decremented zero.
958+ // A new task will be started by the thread that increments the item count from zero.
959+ if (asyncWorker_.DecrementItemCount (popCount))
960+ break ;
842961 }
843962 }
844963
845964 TransactionQueueT transactionQueue_;
846965 ContinuationManagerT continuationManager_;
847-
848- AsyncQueueT asyncQueue_;
849- std::thread asyncWorker_;
966+ AsyncWorkerT asyncWorker_;
850967
851968 std::atomic<TurnIdT> nextTurnId_{ 0 };
852969
0 commit comments