Skip to content

Commit c29d50e

Browse files
committed
Async queue no longer uses a dedicated worker thread.
Instead, a task is started as soon as there are items in the queue and stopped once it's empty.
1 parent 2acc6ec commit c29d50e

1 file changed

Lines changed: 147 additions & 30 deletions

File tree

include/react/detail/ReactiveInput.h

Lines changed: 147 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
#include <memory>
2020
#include <mutex>
2121
#include <utility>
22-
#include <thread>
2322
#include <type_traits>
2423
#include <vector>
2524

25+
#include "tbb/task.h"
2626
#include "tbb/concurrent_queue.h"
2727
#include "tbb/enumerable_thread_specific.h"
2828
#include "tbb/queuing_mutex.h"
@@ -39,6 +39,9 @@
3939
struct IInputNode;
4040
class IObserver;
4141

42+
template <typename D>
43+
class InputManager;
44+
4245
///////////////////////////////////////////////////////////////////////////////////////////////////
4346
/// Common types & constants
4447
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -478,37 +481,142 @@ class TransactionQueue<concurrent_input>
478481
};
479482

480483
///////////////////////////////////////////////////////////////////////////////////////////////////
481-
/// InputManager
484+
/// AsyncWorker
482485
///////////////////////////////////////////////////////////////////////////////////////////////////
486+
struct AsyncItem
487+
{
488+
TransactionFlagsT Flags;
489+
WaitingStatePtrT WaitingStatePtr;
490+
TransactionFuncT Func;
491+
};
492+
493+
// Interface
494+
template <typename D, EInputMode>
495+
class AsyncWorker
496+
{
497+
public:
498+
AsyncWorker(InputManager<D>& mgr);
499+
500+
void PushItem(AsyncItem&& item);
501+
502+
void PopItem(AsyncItem& item);
503+
bool TryPopItem(AsyncItem& item);
504+
505+
bool IncrementItemCount(size_t n);
506+
bool DecrementItemCount(size_t n);
507+
508+
void Start();
509+
};
510+
511+
// Disabled
483512
template <typename D>
484-
class InputManager :
485-
public IContinuationTarget
513+
struct AsyncWorker<D, consecutive_input>
486514
{
487-
private:
488-
struct AsyncItem
515+
public:
516+
AsyncWorker(InputManager<D>& mgr)
517+
{}
518+
519+
void PushItem(AsyncItem&& item) { assert(false); }
520+
521+
void PopItem(AsyncItem& item) { assert(false); }
522+
bool TryPopItem(AsyncItem& item) { assert(false); return false; }
523+
524+
bool IncrementItemCount(size_t n) { assert(false); return false; }
525+
bool DecrementItemCount(size_t n) { assert(false); return false; }
526+
527+
void Start() { assert(false); }
528+
};
529+
530+
// Enabled
531+
template <typename D>
532+
struct AsyncWorker<D, concurrent_input>
533+
{
534+
using DataT = tbb::concurrent_bounded_queue<AsyncItem>;
535+
536+
class WorkerTask : public tbb::task
489537
{
490-
TransactionFlagsT Flags;
491-
WaitingStatePtrT WaitingStatePtr;
492-
TransactionFuncT Func;
538+
public:
539+
WorkerTask(InputManager<D>& mgr) :
540+
mgr_( mgr )
541+
{}
542+
543+
tbb::task* execute()
544+
{
545+
mgr_.processAsyncQueue();
546+
return nullptr;
547+
}
548+
549+
private:
550+
InputManager<D>& mgr_;
493551
};
494552

495-
using AsyncQueueT = tbb::concurrent_bounded_queue<AsyncItem>;
553+
public:
554+
AsyncWorker(InputManager<D>& mgr) :
555+
mgr_( mgr )
556+
{}
557+
558+
void PushItem(AsyncItem&& item)
559+
{
560+
data_.push(std::move(item));
561+
}
496562

563+
void PopItem(AsyncItem& item)
564+
{
565+
data_.pop(item);
566+
}
567+
568+
bool TryPopItem(AsyncItem& item)
569+
{
570+
return data_.try_pop(item);
571+
}
572+
573+
bool IncrementItemCount(size_t n)
574+
{
575+
return count_.fetch_add(n, std::memory_order_relaxed) == 0;
576+
}
577+
578+
bool DecrementItemCount(size_t n)
579+
{
580+
return count_.fetch_sub(n, std::memory_order_relaxed) == n;
581+
}
582+
583+
void Start()
584+
{
585+
tbb::task::enqueue(*new(tbb::task::allocate_root()) WorkerTask(mgr_));
586+
}
587+
588+
private:
589+
DataT data_;
590+
std::atomic<size_t> count_{ 0 };
591+
592+
InputManager<D>& mgr_;
593+
};
594+
595+
///////////////////////////////////////////////////////////////////////////////////////////////////
596+
/// InputManager
597+
///////////////////////////////////////////////////////////////////////////////////////////////////
598+
template <typename D>
599+
class InputManager :
600+
public IContinuationTarget
601+
{
602+
private:
497603
// Select between thread-safe and non thread-safe implementations
498604
using TransactionQueueT = TransactionQueue<D::Policy::input_mode>;
605+
using QueueEntryT = typename TransactionQueueT::QueueEntry;
606+
499607
using ContinuationManagerT = ContinuationManager<D::Policy::propagation_mode>;
608+
using AsyncWorkerT = AsyncWorker<D, D::Policy::input_mode>;
500609

501-
using QueueEntryT = typename TransactionQueueT::QueueEntry;
610+
template <typename, EInputMode>
611+
friend class AsyncWorker;
502612

503613
public:
504614
using TurnT = typename D::TurnT;
505615
using Engine = typename D::Engine;
506616

507617
InputManager() :
508-
asyncWorker_( [this] { processAsyncQueue(); } )
509-
{
510-
asyncWorker_.detach();
511-
}
618+
asyncWorker_(*this)
619+
{}
512620

513621
template <typename F>
514622
void DoTransaction(TransactionFlagsT flags, F&& func)
@@ -556,7 +664,10 @@ class InputManager :
556664
if (waitingStatePtr != nullptr)
557665
waitingStatePtr->IncWaitCount();
558666

559-
asyncQueue_.push(AsyncItem{ flags, waitingStatePtr, std::forward<F>(func) } );
667+
asyncWorker_.PushItem(AsyncItem{ flags, waitingStatePtr, std::forward<F>(func) });
668+
669+
if (asyncWorker_.IncrementItemCount(1))
670+
asyncWorker_.Start();
560671
}
561672

562673
template <typename R, typename V>
@@ -702,11 +813,20 @@ class InputManager :
702813

703814
while (true)
704815
{
705-
// Blocks if queue is empty
816+
size_t popCount = 0;
817+
706818
if (!skipPop)
707-
asyncQueue_.pop(item);
819+
{
820+
// Blocks if queue is empty.
821+
// This should never happen,
822+
// and if (maybe due to some memory access internals), only briefly
823+
asyncWorker_.PopItem(item);
824+
popCount++;
825+
}
708826
else
827+
{
709828
skipPop = false;
829+
}
710830

711831
// First try to merge to an existing synchronous item in the queue
712832
bool canMerge = (item.Flags & allow_merging) != 0;
@@ -745,8 +865,10 @@ class InputManager :
745865
{
746866
uint extraCount = 0;
747867
// Todo: Make configurable
748-
while (extraCount < 512 && asyncQueue_.try_pop(item))
868+
while (extraCount < 1024 && asyncWorker_.TryPopItem(item))
749869
{
870+
++popCount;
871+
750872
bool canMergeNext = (item.Flags & allow_merging) != 0;
751873
if (canMergeNext)
752874
{
@@ -786,10 +908,6 @@ class InputManager :
786908

787909
continuationManager_.template DetachQueuedObservers<D>();
788910

789-
//printf("1\n");
790-
//for (const auto& p : waitingStatePtrs)
791-
// printf("%08X\n", p.Get());
792-
793911
// Has continuations? If so, status ptrs have to be passed on to
794912
// continuation transactions
795913
if (continuationManager_.HasContinuations())
@@ -800,10 +918,6 @@ class InputManager :
800918
// More than 1 waiting state -> create collection from vector
801919
if (waitingStatePtrs.size() > 1)
802920
{
803-
/* printf("2\n");
804-
for (const auto& p : waitingStatePtrs)
805-
printf("%08X\n", p.Get());*/
806-
807921
WaitingStatePtrT p
808922
(
809923
SharedWaitingStateCollection::Create(std::move(waitingStatePtrs))
@@ -839,14 +953,17 @@ class InputManager :
839953
}
840954

841955
waitingStatePtrs.clear();
956+
957+
// Stop this task if the number of items has just been decremented zero.
958+
// A new task will be started by the thread that increments the item count from zero.
959+
if (asyncWorker_.DecrementItemCount(popCount))
960+
break;
842961
}
843962
}
844963

845964
TransactionQueueT transactionQueue_;
846965
ContinuationManagerT continuationManager_;
847-
848-
AsyncQueueT asyncQueue_;
849-
std::thread asyncWorker_;
966+
AsyncWorkerT asyncWorker_;
850967

851968
std::atomic<TurnIdT> nextTurnId_{ 0 };
852969

0 commit comments

Comments
 (0)