Skip to content

Commit 4a92db7

Browse files
committed
Reactive domain cleanup.
1 parent 2a63821 commit 4a92db7

6 files changed

Lines changed: 50 additions & 75 deletions

File tree

include/react/Observer.h

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,12 @@ class Observer
4141
Observer() :
4242
ptr_{ nullptr },
4343
subject_{ nullptr }
44-
{
45-
}
44+
{}
4645

4746
Observer(ObserverNodeT* ptr, const std::shared_ptr<SubjectT>& subject) :
4847
ptr_{ ptr },
4948
subject_{ subject }
50-
{
51-
}
49+
{}
5250

5351
const ObserverNodeT* GetPtr() const
5452
{
@@ -92,14 +90,12 @@ class ObserverRegistry
9290
Entry_(std::unique_ptr<IObserverNode>&& obs, SubjectT* aSubject) :
9391
obs_{ std::move(obs) },
9492
Subject{ aSubject }
95-
{
96-
}
93+
{}
9794

9895
Entry_(Entry_&& other) :
9996
obs_(std::move(other.obs_)),
10097
Subject(other.Subject)
101-
{
102-
}
98+
{}
10399

104100
SubjectT* Subject;
105101

include/react/detail/EngineBase.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ class TurnBase
4747
template <typename D, typename TPolicy>
4848
friend class DomainBase;
4949

50+
template <typename D>
51+
friend class ContinuationHolder;
52+
5053
private:
5154
using ObsVectT = tbb::concurrent_vector<IObserverNode*>;
5255

include/react/detail/Options.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
/*****************************************/ REACT_BEGIN /*****************************************/
1212

1313
///////////////////////////////////////////////////////////////////////////////////////////////////
14-
/// CommitFlags
14+
/// ETurnFlags
1515
///////////////////////////////////////////////////////////////////////////////////////////////////
1616
enum ETurnFlags
1717
{

include/react/detail/ReactiveDomain.h

Lines changed: 33 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,16 @@ class DomainBase
162162
///////////////////////////////////////////////////////////////////////////////////////////////////
163163
template
164164
<
165-
typename F,
166-
typename ... TArgs
165+
typename FIn,
166+
typename ... TArgs,
167+
typename F = std::decay<FIn>::type,
168+
typename S = std::result_of<F(TArgs...)>::type,
169+
typename TOp = REACT_IMPL::FunctionOp<S,F,REACT_IMPL::SignalNodePtr<D,TArgs> ...>
167170
>
168-
static auto MakeSignal(F&& func, const SignalT<TArgs>& ... args)
169-
-> SignalT<typename std::result_of<F(TArgs...)>::type>
171+
static auto MakeSignal(FIn&& func, const SignalT<TArgs>& ... args)
172+
-> TempSignal<D,S,TOp>
170173
{
171-
using S = typename std::result_of<F(TArgs...)>::type;
172-
173-
return REACT::MakeSignal<D>(std::forward<F>(func), args ...);
174+
return REACT::MakeSignal<D>(std::forward<FIn>(func), args ...);
174175
}
175176

176177
///////////////////////////////////////////////////////////////////////////////////////////////////
@@ -195,7 +196,7 @@ class DomainBase
195196
template <typename F>
196197
static void DoTransaction(F&& func)
197198
{
198-
DoTransaction(turnFlags_, std::forward<F>(func));
199+
DoTransaction(0, std::forward<F>(func));
199200
}
200201

201202
template <typename F>
@@ -238,7 +239,7 @@ class DomainBase
238239
template <typename R, typename V>
239240
static void AddInput(R&& r, V&& v)
240241
{
241-
if (! ContinuationHolder_::IsNull())
242+
if (ContinuationHolder<D>::Get() != nullptr)
242243
{
243244
addContinuationInput(std::forward<R>(r), std::forward<V>(v));
244245
}
@@ -252,48 +253,7 @@ class DomainBase
252253
}
253254
}
254255

255-
///////////////////////////////////////////////////////////////////////////////////////////////////
256-
/// Set/Clear continuation
257-
///////////////////////////////////////////////////////////////////////////////////////////////////
258-
static void SetCurrentContinuation(TurnT& turn)
259-
{
260-
ContinuationHolder_::Set(&turn.continuation_);
261-
}
262-
263-
static void ClearCurrentContinuation()
264-
{
265-
ContinuationHolder_::Reset();
266-
}
267-
268-
///////////////////////////////////////////////////////////////////////////////////////////////////
269-
/// Options - TODO: This sucks
270-
///////////////////////////////////////////////////////////////////////////////////////////////////
271-
template <typename Opt>
272-
static void Set(uint v) { static_assert(false, "Set option not implemented."); }
273-
274-
template <typename Opt>
275-
static bool IsSet(uint v) { static_assert(false, "IsSet option not implemented."); }
276-
277-
template <typename Opt>
278-
static void Unset(uint v) { static_assert(false, "Unset option not implemented."); }
279-
280-
template <typename Opt>
281-
static void Reset() { static_assert(false, "Reset option not implemented."); }
282-
283-
template <> static void Set<ETurnFlags>(uint v) { turnFlags_ |= v; }
284-
template <> static bool IsSet<ETurnFlags>(uint v) { return (turnFlags_ & v) != 0; }
285-
template <> static void Unset<ETurnFlags>(uint v) { turnFlags_ &= ~v; }
286-
template <> static void Reset<ETurnFlags>() { turnFlags_ = 0; }
287-
288256
private:
289-
290-
///////////////////////////////////////////////////////////////////////////////////////////////////
291-
/// Transaction input continuation
292-
///////////////////////////////////////////////////////////////////////////////////////////////////
293-
struct ContinuationHolder_ : public ThreadLocalStaticPtr<ContinuationInput> {};
294-
295-
static __declspec(thread) TurnFlagsT turnFlags_;
296-
297257
static std::atomic<TurnIdT> nextTurnId_;
298258

299259
static TurnIdT nextTurnId()
@@ -350,7 +310,7 @@ class DomainBase
350310
static void addContinuationInput(R&& r, V&& v)
351311
{
352312
// Copy v
353-
ContinuationHolder_::Get()->Add(
313+
ContinuationHolder<D>::Get()->Add(
354314
[&r,v] { addTransactionInput(r, std::move(v)); }
355315
);
356316
}
@@ -403,9 +363,6 @@ class DomainBase
403363
template <typename D, typename TPolicy>
404364
std::atomic<TurnIdT> DomainBase<D,TPolicy>::nextTurnId_( 0 );
405365

406-
template <typename D, typename TPolicy>
407-
TurnFlagsT DomainBase<D,TPolicy>::turnFlags_( 0 );
408-
409366
template <typename D, typename TPolicy>
410367
typename DomainBase<D,TPolicy>::TransactionState DomainBase<D,TPolicy>::transactionState_;
411368

@@ -426,6 +383,28 @@ class DomainInitializer
426383
}
427384
};
428385

386+
///////////////////////////////////////////////////////////////////////////////////////////////////
387+
/// ContinuationHolder
388+
///////////////////////////////////////////////////////////////////////////////////////////////////
389+
template <typename D>
390+
class ContinuationHolder
391+
{
392+
public:
393+
using TurnT = typename D::TurnT;
394+
395+
ContinuationHolder() = delete;
396+
397+
static void SetTurn(TurnT& turn) { ptr_ = &turn.continuation_; }
398+
static void Clear() { ptr_ = nullptr; }
399+
static ContinuationInput* Get() { return ptr_; }
400+
401+
private:
402+
static __declspec(thread) ContinuationInput* ptr_;
403+
};
404+
405+
template <typename D>
406+
ContinuationInput* ContinuationHolder<D>::ptr_(nullptr);
407+
429408
/****************************************/ REACT_IMPL_END /***************************************/
430409

431410
#define REACTIVE_DOMAIN(name, ...) \

include/react/detail/graph/ObserverNodes.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
/***************************************/ REACT_IMPL_BEGIN /**************************************/
1919

20+
// tbb tasks are non-preemptible, thread local flag for each worker
2021
namespace current_observer_state_
2122
{
2223
static __declspec(thread) bool shouldDetach = false;
@@ -87,12 +88,12 @@ class SignalObserverNode : public ObserverNode<D>
8788

8889
current_observer_state_::shouldDetach = false;
8990

90-
D::SetCurrentContinuation(turn);
91+
ContinuationHolder<D>::SetTurn(turn);
9192

9293
if (auto p = subject_.lock())
9394
func_(p->ValueRef());
9495

95-
D::ClearCurrentContinuation();
96+
ContinuationHolder<D>::Clear();
9697

9798
if (current_observer_state_::shouldDetach)
9899
turn.QueueForDetach(*this);
@@ -162,15 +163,15 @@ class EventObserverNode : public ObserverNode<D>
162163

163164
current_observer_state_::shouldDetach = false;
164165

165-
D::SetCurrentContinuation(turn);
166+
ContinuationHolder<D>::SetTurn(turn);
166167

167168
if (auto p = subject_.lock())
168169
{
169170
for (const auto& e : p->Events())
170171
func_(e);
171172
}
172173

173-
D::ClearCurrentContinuation();
174+
ContinuationHolder<D>::Clear();
174175

175176
if (current_observer_state_::shouldDetach)
176177
turn.QueueForDetach(*this);

src/test/TransactionTest.h

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -256,29 +256,25 @@ TYPED_TEST_P(TransactionTest, Merging1)
256256
// Todo: improve this as it'll fail occasionally
257257
shouldSpin = true;
258258
std::thread t1([&] {
259-
MyDomain::Set<ETurnFlags>(enable_input_merging);
260-
MyDomain::DoTransaction([&] {
259+
MyDomain::DoTransaction(enable_input_merging, [&] {
261260
n1 <<= 2;
262261
});
263262
});
264263
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
265264
std::thread t2([&] {
266-
MyDomain::Set<ETurnFlags>(enable_input_merging);
267-
MyDomain::DoTransaction([&] {
265+
MyDomain::DoTransaction(enable_input_merging, [&] {
268266
n1 <<= 3;
269267
});
270268
});
271269
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
272270
std::thread t3([&] {
273-
MyDomain::Set<ETurnFlags>(enable_input_merging);
274-
MyDomain::DoTransaction([&] {
271+
MyDomain::DoTransaction(enable_input_merging, [&] {
275272
n1 <<= 4;
276273
});
277274
});
278275
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
279276
std::thread t4([&] {
280-
MyDomain::Set<ETurnFlags>(enable_input_merging);
281-
MyDomain::DoTransaction([&] {
277+
MyDomain::DoTransaction(enable_input_merging, [&] {
282278
n1 <<= 5;
283279
});
284280

0 commit comments

Comments
 (0)