Skip to content

Commit b70e052

Browse files
committed
Added AsyncTransaction (async equivalent of DoTransaction).
1 parent 79888e5 commit b70e052

14 files changed

Lines changed: 410 additions & 151 deletions

include/react/Domain.h

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ template <typename D>
5959
class Reactor;
6060

6161
using REACT_IMPL::TurnFlagsT;
62+
using REACT_IMPL::TransactionStatus;
6263

6364
#ifdef REACT_ENABLE_LOGGING
6465
using REACT_IMPL::EventLog;
@@ -111,19 +112,56 @@ class DomainBase
111112

112113
using ReactorT = Reactor<D>;
113114

114-
///////////////////////////////////////////////////////////////////////////////////////////////////
115+
///////////////////////////////////////////////////////////////////////////////////////////////
115116
/// DoTransaction
116-
///////////////////////////////////////////////////////////////////////////////////////////////////
117+
///////////////////////////////////////////////////////////////////////////////////////////////
117118
template <typename F>
118119
static void DoTransaction(F&& func)
119120
{
120-
REACT_IMPL::InputManager<D>::DoTransaction(0, std::forward<F>(func));
121+
using REACT_IMPL::DomainSpecificInputManager;
122+
DomainSpecificInputManager<D>::Instance().DoTransaction(0, std::forward<F>(func));
121123
}
122124

123125
template <typename F>
124126
static void DoTransaction(TurnFlagsT flags, F&& func)
125127
{
126-
REACT_IMPL::InputManager<D>::DoTransaction(flags, std::forward<F>(func));
128+
using REACT_IMPL::DomainSpecificInputManager;
129+
DomainSpecificInputManager<D>::Instance().DoTransaction(flags, std::forward<F>(func));
130+
}
131+
132+
///////////////////////////////////////////////////////////////////////////////////////////////
133+
/// AsyncTransaction
134+
///////////////////////////////////////////////////////////////////////////////////////////////
135+
template <typename F>
136+
static void AsyncTransaction(F&& func)
137+
{
138+
using REACT_IMPL::DomainSpecificInputManager;
139+
DomainSpecificInputManager<D>::Instance()
140+
.AsyncTransaction(0, nullptr, std::forward<F>(func));
141+
}
142+
143+
template <typename F>
144+
static void AsyncTransaction(TurnFlagsT flags, F&& func)
145+
{
146+
using REACT_IMPL::DomainSpecificInputManager;
147+
DomainSpecificInputManager<D>::Instance()
148+
.AsyncTransaction(flags, nullptr, std::forward<F>(func));
149+
}
150+
151+
template <typename F>
152+
static void AsyncTransaction(TransactionStatus& status, F&& func)
153+
{
154+
using REACT_IMPL::DomainSpecificInputManager;
155+
DomainSpecificInputManager<D>::Instance()
156+
.AsyncTransaction(0, &status, std::forward<F>(func));
157+
}
158+
159+
template <typename F>
160+
static void AsyncTransaction(TurnFlagsT flags, TransactionStatus& status, F&& func)
161+
{
162+
using REACT_IMPL::DomainSpecificInputManager;
163+
DomainSpecificInputManager<D>::Instance()
164+
.AsyncTransaction(flags, &status, std::forward<F>(func));
127165
}
128166

129167
#ifdef REACT_ENABLE_LOGGING
@@ -167,8 +205,9 @@ class DomainInitializer
167205
D::Log();
168206
#endif //REACT_ENABLE_LOGGING
169207

170-
D::Engine::Engine();
208+
D::Engine::Instance();
171209
DomainSpecificObserverRegistry<D>::Instance();
210+
DomainSpecificInputManager<D>::Instance();
172211
}
173212
};
174213

include/react/detail/EngineBase.h

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -185,22 +185,24 @@ class TurnQueueManager
185185
inline void UnblockSuccessors()
186186
{
187187
for (const auto& e : merged_)
188-
e.second->Unblock();
188+
if (e.second != nullptr)
189+
e.second->Unblock();
189190

190191
if (successor_)
191192
successor_->blockCondition_.Unblock();
192193
}
193194

194195
template <typename F>
195-
inline bool TryMerge(F&& inputFunc, BlockingCondition& caller)
196+
inline bool TryMerge(F&& inputFunc, BlockingCondition* caller)
196197
{
197198
if (!isMergeable_)
198199
return false;
199200

200201
// Only merge if target is still blocked
201202
bool merged = blockCondition_.RunIfBlocked([&] {
202-
caller.Block();
203-
merged_.emplace_back(std::make_pair(std::forward<F>(inputFunc), &caller));
203+
if (caller)
204+
caller->Block();
205+
merged_.emplace_back(std::make_pair(std::forward<F>(inputFunc), caller));
204206
});
205207

206208
return merged;
@@ -230,7 +232,7 @@ class TurnQueueManager
230232
SeqMutexT::scoped_lock lock(seqMutex_);
231233

232234
if (tail_)
233-
merged = tail_->TryMerge(std::forward<F>(inputFunc), caller);
235+
merged = tail_->TryMerge(std::forward<F>(inputFunc), &caller);
234236
}// ~seqMutex_
235237

236238
if (merged)
@@ -239,7 +241,22 @@ class TurnQueueManager
239241
return merged;
240242
}
241243

242-
inline void StartTurn(QueueEntry& turn)
244+
template <typename F>
245+
inline bool TryMergeAsync(F&& inputFunc)
246+
{
247+
bool merged = false;
248+
249+
{// seqMutex_
250+
SeqMutexT::scoped_lock lock(seqMutex_);
251+
252+
if (tail_)
253+
merged = tail_->TryMerge(std::forward<F>(inputFunc), nullptr);
254+
}// ~seqMutex_
255+
256+
return merged;
257+
}
258+
259+
inline void EnterQueue(QueueEntry& turn)
243260
{
244261
{// seqMutex_
245262
SeqMutexT::scoped_lock lock(seqMutex_);
@@ -253,7 +270,7 @@ class TurnQueueManager
253270
turn.WaitForUnblock();
254271
}
255272

256-
inline void EndTurn(QueueEntry& turn)
273+
inline void ExitQueue(QueueEntry& turn)
257274
{// seqMutex_
258275
SeqMutexT::scoped_lock lock(seqMutex_);
259276

@@ -273,10 +290,7 @@ class TurnQueueManager
273290
///////////////////////////////////////////////////////////////////////////////////////////////////
274291
/// DefaultQueueableTurn
275292
///////////////////////////////////////////////////////////////////////////////////////////////////
276-
template
277-
<
278-
typename TTurnBase
279-
>
293+
template <typename TTurnBase>
280294
class DefaultQueueableTurn :
281295
public TTurnBase,
282296
public TurnQueueManager::QueueEntry
@@ -301,29 +315,32 @@ class DefaultQueuingEngine : public TTEngineBase<DefaultQueueableTurn<TTurnBase>
301315
public:
302316
using TurnT = DefaultQueueableTurn<TTurnBase>;
303317

304-
void OnTurnAdmissionStart(TurnT& turn)
318+
template <typename F>
319+
bool TryMergeInput(F&& f, bool isBlocking)
305320
{
306-
queueManager_.StartTurn(turn);
321+
if (isBlocking)
322+
return queueManager_.TryMerge(std::forward<F>(f));
323+
else
324+
return queueManager_.TryMergeAsync(std::forward<F>(f));
307325
}
308326

309-
void OnTurnAdmissionEnd(TurnT& turn)
327+
void ApplyMergedInputs(TurnT& turn)
310328
{
311329
turn.RunMergedInputs();
312330
}
313331

314-
void OnTurnEnd(TurnT& turn)
332+
void EnterTurnQueue(TurnT& turn)
315333
{
316-
queueManager_.EndTurn(turn);
334+
queueManager_.EnterQueue(turn);
317335
}
318-
319-
template <typename F>
320-
bool TryMerge(F&& f)
336+
337+
void ExitTurnQueue(TurnT& turn)
321338
{
322-
return queueManager_.TryMerge(std::forward<F>(f));
339+
queueManager_.ExitQueue(turn);
323340
}
324341

325342
private:
326-
TurnQueueManager queueManager_;
343+
TurnQueueManager queueManager_;
327344
};
328345

329346
/****************************************/ REACT_IMPL_END /***************************************/

include/react/detail/EventBase.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class EventStreamBase : public ReactiveBase<EventStreamNode<D,E>>
4242
template <typename T>
4343
void emit(T&& e) const
4444
{
45-
InputManager<D>::AddInput(
45+
DomainSpecificInputManager<D>::Instance().AddInput(
4646
*reinterpret_cast<EventSourceNode<D,E>*>(this->ptr_.get()),
4747
std::forward<T>(e));
4848
}

0 commit comments

Comments
 (0)