Skip to content

Commit 568e249

Browse files
committed
TransactionStatus now has value semantics and uses a shared_ptr to heap allocated state. This is safer and enables conditional waiting.
1 parent 3ea9772 commit 568e249

4 files changed

Lines changed: 81 additions & 60 deletions

File tree

include/react/Domain.h

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
#include "react/detail/Defs.h"
1313

14+
#include <memory>
1415
#include <utility>
1516

1617
#include "react/detail/ReactiveInput.h"
@@ -58,7 +59,6 @@ template <typename D>
5859
class Reactor;
5960

6061
using REACT_IMPL::TurnFlagsT;
61-
using REACT_IMPL::TransactionStatus;
6262

6363
//ETurnFlags
6464
using REACT_IMPL::enable_input_merging;
@@ -67,6 +67,45 @@ using REACT_IMPL::enable_input_merging;
6767
using REACT_IMPL::EventLog;
6868
#endif //REACT_ENABLE_LOGGING
6969

70+
///////////////////////////////////////////////////////////////////////////////////////////////////
71+
/// TransactionStatus
72+
///////////////////////////////////////////////////////////////////////////////////////////////////
73+
class TransactionStatus
74+
{
75+
using StateT = REACT_IMPL::AsyncState;
76+
77+
public:
78+
TransactionStatus() :
79+
state_( std::make_shared<StateT>() )
80+
{}
81+
82+
TransactionStatus(const TransactionStatus&) = default;
83+
84+
TransactionStatus(TransactionStatus&& other) :
85+
state_( std::move(other.state_) )
86+
{}
87+
88+
TransactionStatus& operator=(const TransactionStatus&) = default;
89+
90+
TransactionStatus& operator=(TransactionStatus&& other)
91+
{
92+
state_ = std::move(other.state_);
93+
return *this;
94+
}
95+
96+
inline void Wait()
97+
{
98+
assert(state_.get() != nullptr);
99+
state_->Wait();
100+
}
101+
102+
private:
103+
std::shared_ptr<StateT> state_;
104+
105+
template <typename D, typename TPolicy>
106+
friend class DomainBase;
107+
};
108+
70109
///////////////////////////////////////////////////////////////////////////////////////////////////
71110
/// DomainBase
72111
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -154,16 +193,17 @@ class DomainBase
154193
static void AsyncTransaction(TransactionStatus& status, F&& func)
155194
{
156195
using REACT_IMPL::DomainSpecificInputManager;
196+
157197
DomainSpecificInputManager<D>::Instance()
158-
.AsyncTransaction(0, &status, std::forward<F>(func));
198+
.AsyncTransaction(0, status.state_, std::forward<F>(func));
159199
}
160200

161201
template <typename F>
162202
static void AsyncTransaction(TurnFlagsT flags, TransactionStatus& status, F&& func)
163203
{
164204
using REACT_IMPL::DomainSpecificInputManager;
165205
DomainSpecificInputManager<D>::Instance()
166-
.AsyncTransaction(flags, &status, std::forward<F>(func));
206+
.AsyncTransaction(flags, status.state_, std::forward<F>(func));
167207
}
168208

169209
#ifdef REACT_ENABLE_LOGGING

include/react/detail/EngineBase.h

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ class TurnQueueManager
8888
e.Cond->Unblock();
8989

9090
// Async
91-
else if (e.Status != nullptr)
92-
TransactionStatusInterface::DecrementWaitCount(*e.Status);
91+
else if (e.StatusPtr != nullptr)
92+
e.StatusPtr->DecWaitCount();
9393
}
9494

9595
// Release next thread in queue
@@ -98,7 +98,7 @@ class TurnQueueManager
9898
}
9999

100100
template <typename F>
101-
inline bool TryMerge(F&& inputFunc, BlockingCondition* caller, TransactionStatus* status)
101+
inline bool TryMerge(F&& inputFunc, BlockingCondition* caller, AsyncStatePtrT&& statusPtr)
102102
{
103103
if (!isMergeable_)
104104
return false;
@@ -107,7 +107,7 @@ class TurnQueueManager
107107
bool merged = blockCondition_.RunIfBlocked([&] {
108108
if (caller)
109109
caller->Block();
110-
merged_.emplace_back(std::forward<F>(inputFunc), caller, status);
110+
merged_.emplace_back(std::forward<F>(inputFunc), caller, std::move(statusPtr));
111111
});
112112

113113
return merged;
@@ -117,10 +117,10 @@ class TurnQueueManager
117117
struct MergedData
118118
{
119119
template <typename F>
120-
MergedData(F&& func, BlockingCondition* cond, TransactionStatus* status) :
120+
MergedData(F&& func, BlockingCondition* cond, AsyncStatePtrT&& statusPtr) :
121121
InputFunc( std::forward<F>(func) ),
122122
Cond( cond ),
123-
Status( status )
123+
StatusPtr( std::move(statusPtr) )
124124
{}
125125

126126
std::function<void()> InputFunc;
@@ -129,7 +129,7 @@ class TurnQueueManager
129129
BlockingCondition* Cond;
130130

131131
// Status for async merged
132-
TransactionStatus* Status;
132+
AsyncStatePtrT StatusPtr;
133133
};
134134

135135
using MergedDataVectT = std::vector<MergedData>;
@@ -161,15 +161,16 @@ class TurnQueueManager
161161
}
162162

163163
template <typename F>
164-
inline bool TryMergeAsync(F&& inputFunc, TransactionStatus* status)
164+
inline bool TryMergeAsync(F&& inputFunc, AsyncStatePtrT&& statusPtr)
165165
{
166166
bool merged = false;
167167

168168
{// seqMutex_
169169
SeqMutexT::scoped_lock lock(seqMutex_);
170170

171171
if (tail_)
172-
merged = tail_->TryMerge(std::forward<F>(inputFunc), nullptr, status);
172+
merged = tail_->TryMerge(
173+
std::forward<F>(inputFunc), nullptr, std::move(statusPtr));
173174
}// ~seqMutex_
174175

175176
return merged;
@@ -241,9 +242,9 @@ class DefaultQueuingEngine : public TTEngineBase<DefaultQueueableTurn<TTurnBase>
241242
}
242243

243244
template <typename F>
244-
bool TryMergeAsync(F&& f, TransactionStatus* statusPtr)
245+
bool TryMergeAsync(F&& f, std::shared_ptr<AsyncState>&& statusPtr)
245246
{
246-
return queueManager_.TryMergeAsync(std::forward<F>(f), statusPtr);
247+
return queueManager_.TryMergeAsync(std::forward<F>(f), std::move(statusPtr));
247248
}
248249

249250
void ApplyMergedInputs(TurnT& turn)

include/react/detail/IReactiveEngine.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
#pragma once
1111

12+
#include <memory>
1213
#include <utility>
1314

1415
#include "react/detail/Defs.h"
@@ -20,7 +21,7 @@
2021
///////////////////////////////////////////////////////////////////////////////////////////////////
2122
/// Forward declarations
2223
///////////////////////////////////////////////////////////////////////////////////////////////////
23-
class TransactionStatus;
24+
class AsyncState;
2425

2526
///////////////////////////////////////////////////////////////////////////////////////////////////
2627
/// IReactiveEngine
@@ -39,7 +40,7 @@ struct IReactiveEngine
3940
bool TryMergeSync(F&& f) { return false; }
4041

4142
template <typename F>
42-
bool TryMergeAsync(F&& f, TransactionStatus* status) { return false; }
43+
bool TryMergeAsync(F&& f, std::shared_ptr<AsyncState>&& statusPtr) { return false; }
4344

4445
void ApplyMergedInputs(TurnT& turn) {}
4546

@@ -92,9 +93,9 @@ struct EngineInterface
9293
}
9394

9495
template <typename F>
95-
static bool TryMergeAsync(F&& f, TransactionStatus* status)
96+
static bool TryMergeAsync(F&& f, std::shared_ptr<AsyncState>&& statusPtr)
9697
{
97-
return Instance().TryMergeAsync(std::forward<F>(f), status);
98+
return Instance().TryMergeAsync(std::forward<F>(f), std::move(statusPtr));
9899
}
99100

100101
static void ApplyMergedInputs(TurnT& turn)

include/react/detail/ReactiveInput.h

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -189,23 +189,18 @@ class ContinuationData<true>
189189
};
190190

191191
///////////////////////////////////////////////////////////////////////////////////////////////////
192-
/// TransactionStatus
192+
/// AsyncState
193193
///////////////////////////////////////////////////////////////////////////////////////////////////
194-
class TransactionStatus
194+
class AsyncState
195195
{
196196
public:
197-
TransactionStatus() = default;
198-
TransactionStatus(const TransactionStatus&) = delete;
199-
TransactionStatus(TransactionStatus&&) = delete;
200-
201197
inline void Wait()
202198
{
203199
std::unique_lock<std::mutex> lock(mutex_);
204200
condition_.wait(lock, [this] { return !isWaiting_; });
205201
}
206202

207-
private:
208-
inline void incWaitCount()
203+
inline void IncWaitCount()
209204
{
210205
auto oldVal = waitCount_.fetch_add(1, std::memory_order_relaxed);
211206

@@ -216,8 +211,7 @@ class TransactionStatus
216211
}// ~mutex_
217212
}
218213

219-
220-
inline void decWaitCount()
214+
inline void DecWaitCount()
221215
{
222216
auto oldVal = waitCount_.fetch_sub(1, std::memory_order_relaxed);
223217

@@ -228,30 +222,15 @@ class TransactionStatus
228222
condition_.notify_all();
229223
}// ~mutex_
230224
}
231-
232225

226+
private:
233227
std::atomic<uint> waitCount_{ 0 };
234-
235228
std::condition_variable condition_;
236229
std::mutex mutex_;
237-
bool isWaiting_{ false };
238-
239-
friend class TransactionStatusInterface;
230+
bool isWaiting_ = false;
240231
};
241232

242-
class TransactionStatusInterface
243-
{
244-
public:
245-
inline static void IncrementWaitCount(TransactionStatus& status)
246-
{
247-
status.incWaitCount();
248-
}
249-
250-
inline static void DecrementWaitCount(TransactionStatus& status)
251-
{
252-
status.decWaitCount();
253-
}
254-
};
233+
using AsyncStatePtrT = std::shared_ptr<AsyncState>;
255234

256235
///////////////////////////////////////////////////////////////////////////////////////////////////
257236
/// InputManager
@@ -264,9 +243,9 @@ class InputManager
264243

265244
struct AsyncItem
266245
{
267-
TurnFlagsT Flags;
268-
TransactionStatus* StatusPtr;
269-
TransactionFuncT Func;
246+
TurnFlagsT Flags;
247+
AsyncStatePtrT StatusPtr;
248+
TransactionFuncT Func;
270249
};
271250

272251
using AsyncQueueT = tbb::concurrent_bounded_queue<AsyncItem>;
@@ -328,10 +307,10 @@ class InputManager
328307
}
329308

330309
template <typename F>
331-
void AsyncTransaction(TurnFlagsT flags, TransactionStatus* statusPtr, F&& func)
310+
void AsyncTransaction(TurnFlagsT flags, const AsyncStatePtrT& statusPtr, F&& func)
332311
{
333312
if (statusPtr != nullptr)
334-
TransactionStatusInterface::IncrementWaitCount(*statusPtr);
313+
statusPtr->IncWaitCount();
335314

336315
asyncQueue_.push(AsyncItem{ flags, statusPtr, func } );
337316
}
@@ -340,10 +319,10 @@ class InputManager
340319
{
341320
AsyncItem item;
342321

343-
TransactionStatus* savedStatusPtr = nullptr;
344-
TurnFlagsT savedFlags = 0;
322+
AsyncStatePtrT savedStatusPtr = nullptr;
323+
TurnFlagsT savedFlags = 0;
345324

346-
std::vector<TransactionStatus*> statusPtrStash;
325+
std::vector<AsyncStatePtrT> statusPtrStash;
347326

348327
bool skipPop = false;
349328

@@ -357,7 +336,7 @@ class InputManager
357336

358337
// First try to merge to an existing synchronous item in the queue
359338
bool canMerge = (item.Flags & enable_input_merging) != 0;
360-
if (canMerge && Engine::TryMergeAsync(std::move(item.Func), item.StatusPtr))
339+
if (canMerge && Engine::TryMergeAsync(std::move(item.Func), std::move(item.StatusPtr)))
361340
continue;
362341

363342
bool shouldPropagate = false;
@@ -379,7 +358,7 @@ class InputManager
379358
Engine::ApplyMergedInputs(turn);
380359

381360
// Save data, because item might be re-used for next input
382-
savedStatusPtr = item.StatusPtr;
361+
savedStatusPtr = std::move(item.StatusPtr);
383362
savedFlags = item.Flags;
384363

385364
// If the current item supports merging, try to add more mergeable inputs
@@ -394,7 +373,7 @@ class InputManager
394373
{
395374
item.Func();
396375
if (item.StatusPtr != nullptr)
397-
statusPtrStash.push_back(item.StatusPtr);
376+
statusPtrStash.push_back(std::move(item.StatusPtr));
398377

399378
++extraCount;
400379
}
@@ -431,10 +410,10 @@ class InputManager
431410

432411
// Decrement transaction status counts of processed items
433412
if (savedStatusPtr != nullptr)
434-
TransactionStatusInterface::DecrementWaitCount(*savedStatusPtr);
413+
savedStatusPtr->DecWaitCount();
435414

436-
for (auto* p : statusPtrStash)
437-
TransactionStatusInterface::DecrementWaitCount(*p);
415+
for (auto& p : statusPtrStash)
416+
p->DecWaitCount();
438417
statusPtrStash.clear();
439418
}
440419
}

0 commit comments

Comments
 (0)