Skip to content

Commit 9705693

Browse files
committed
Fixed remaining compile issues with previous transaction refactoring (most test cases are still failing).
1 parent 6769d2c commit 9705693

10 files changed

Lines changed: 65 additions & 87 deletions

File tree

include/react/EventStream.h

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -99,25 +99,10 @@ class REventSource : public REvents<D,E>
9999
{
100100
}
101101

102-
void Emit(const E& e) const
102+
template <typename V>
103+
void Emit(V&& v) const
103104
{
104-
if (! Domain::TransactionInputContinuation::IsNull())
105-
{
106-
auto sourceNode = std::static_pointer_cast<NodeT>(ptr_);
107-
Domain::TransactionInputContinuation::Get()->AddEventInput_Safe(*sourceNode, e);
108-
}
109-
else if (! Domain::ScopedTransactionInput::IsNull())
110-
{
111-
auto sourceNode = std::static_pointer_cast<NodeT>(ptr_);
112-
Domain::ScopedTransactionInput::Get()->AddEventInput(*sourceNode, e);
113-
}
114-
else
115-
{
116-
auto sourceNode = std::static_pointer_cast<NodeT>(ptr_);
117-
Domain::Transaction t;
118-
t.Data().Input().AddEventInput(*sourceNode, e);
119-
t.Commit();
120-
}
105+
D::AddInput(*std::static_pointer_cast<NodeT>(ptr_), std::forward<V>(v));
121106
}
122107

123108
template <typename = std::enable_if<std::is_same<E,EventToken>::value>::type>

include/react/graph/EventStreamNodes.h

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class EventStreamNode : public ReactiveNode<D,E,void>
5151
return events_;
5252
}
5353

54-
void SetCurrentTurn(const TurnInterface& turn, bool forceClear = false)
54+
bool SetCurrentTurn(const TurnInterface& turn, bool forceClear = false)
5555
{
5656
EventMutexT::scoped_lock lock(eventMutex_);
5757

@@ -60,6 +60,11 @@ class EventStreamNode : public ReactiveNode<D,E,void>
6060
curTurnId_ = turn.Id();
6161
//printf("Cleared turn\n");
6262
events_.clear();
63+
return true;
64+
}
65+
else
66+
{
67+
return false;
6368
}
6469
}
6570

@@ -93,10 +98,6 @@ class EventSourceNode : public EventStreamNode<D,E>
9398
explicit EventSourceNode(bool registered) :
9499
EventStreamNode(true)
95100
{
96-
admissionFlag_.clear();
97-
propagationFlag_.clear();
98-
99-
100101
if (!registered)
101102
registerNode();
102103
}
@@ -105,24 +106,27 @@ class EventSourceNode : public EventStreamNode<D,E>
105106

106107
virtual ETickResult Tick(void* turnPtr) override
107108
{
108-
REACT_ASSERT(false, "Don't tick EventSourceNode\n");
109-
return ETickResult::none;
109+
typedef typename D::Engine::TurnInterface TurnInterface;
110+
TurnInterface& turn = *static_cast<TurnInterface*>(turnPtr);
111+
112+
if (events_.size() > 0 && SetCurrentTurn(turn))
113+
{
114+
Engine::OnTurnInputChange(*this, turn);
115+
return ETickResult::pulsed;
116+
}
117+
else
118+
{
119+
return ETickResult::none;
120+
}
110121
}
111122

112123
virtual bool IsInputNode() const override { return true; }
113124

114-
EventSourceNode& Push(const E& e)
125+
template <typename V>
126+
void AddInput(V&& v)
115127
{
116-
events_.push_back(e);
117-
return *this;
128+
events_.push_back(std::forward<V>(v));
118129
}
119-
120-
private:
121-
std::atomic_flag admissionFlag_;
122-
std::atomic_flag propagationFlag_;
123-
124-
template <typename>
125-
friend class TransactionInput;
126130
};
127131

128132
////////////////////////////////////////////////////////////////////////////////////////
@@ -172,7 +176,7 @@ class EventMergeNode : public EventStreamNode<D, E>
172176

173177
//printf("EventMergeNode: Tick %08X by thread %08X\n", this, std::this_thread::get_id().hash());
174178

175-
SetCurrentTurn(turn, true);
179+
SetCurrentTurn(turn);
176180

177181
D::Log().template Append<NodeEvaluateBeginEvent>(GetObjectId(*this), turn.Id(), std::this_thread::get_id().hash());
178182
func_(std::cref(turn));

include/react/graph/ObserverNodes.h

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,13 @@ class SignalObserverNode : public ObserverNode<D>
8383

8484
current_observer_state_::shouldDetach = false;
8585

86-
//ContinuationInputHolder_::Set(&turn.Continuation());
8786
D::SetCurrentContinuation(turn);
8887

8988
if (auto p = subject_.lock())
9089
func_(p->ValueRef());
9190

9291
D::ClearCurrentContinuation();
9392

94-
//D::TransactionInputContinuation::Reset();
95-
9693
if (current_observer_state_::shouldDetach)
9794
turn.QueueForDetach(*this);
9895

@@ -154,15 +151,15 @@ class EventObserverNode : public ObserverNode<D>
154151

155152
current_observer_state_::shouldDetach = false;
156153

157-
D::TransactionInputContinuation::Set(&turn.InputContinuation());
154+
D::SetCurrentContinuation(turn);
158155

159156
if (auto p = subject_.lock())
160157
{
161158
for (const auto& e : p->Events())
162159
func_(e);
163160
}
164161

165-
D::TransactionInputContinuation::Reset();
162+
D::ClearCurrentContinuation();
166163

167164
if (current_observer_state_::shouldDetach)
168165
turn.QueueForDetach(*this);

include/react/graph/SignalNodes.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,16 @@ class VarNode : public SignalNode<D,S>
104104
}
105105
else
106106
{
107-
return ETickResult::idle_pulsed;
107+
return ETickResult::none;
108108
}
109109
}
110110

111111
virtual bool IsInputNode() const override { return true; }
112112

113-
void AddInput(const S& newValue)
113+
template <typename V>
114+
void AddInput(V&& newValue)
114115
{
115-
newValue_ = newValue;
116+
newValue_ = std::forward<V>(newValue);
116117
}
117118

118119
private:

src/benchmark/BenchmarkRandom.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -297,16 +297,15 @@ struct Benchmark_Random : public BenchmarkBase<D>
297297
auto t0 = tbb::tick_count::now();
298298
for (int i=0; i<params.K; i++)
299299
{
300-
{
301-
MyDomain::ScopedTransaction _;
300+
MyDomain::DoTransaction([&] {
302301
for (int j=0; j<counts[i]; j++)
303302
{
304303
generator.InputSignals[cursor++] <<= 10+i;
305304

306305
if (cursor >= params.W)
307306
cursor = 0;
308307
}
309-
}
308+
});
310309
}
311310
auto t1 = tbb::tick_count::now();
312311

src/benchmark/Main.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ using namespace react;
2828

2929
REACTIVE_DOMAIN(FloodingDomain, FloodingEngine, EventLog);
3030
REACTIVE_DOMAIN(TopoSortDomain, TopoSortEngine, EventLog);
31-
REACTIVE_DOMAIN(TopoSortO1Domain, TopoSortO1Engine, EventLog);
31+
//REACTIVE_DOMAIN(TopoSortO1Domain, TopoSortO1Engine, EventLog);
3232
REACTIVE_DOMAIN(PulseCountDomain, PulseCountEngine, EventLog);
3333
REACTIVE_DOMAIN(SourceSetDomain, SourceSetEngine, EventLog);
3434
REACTIVE_DOMAIN(TopoSortSTDomain, TopoSortSTEngine, EventLog);
@@ -37,7 +37,7 @@ REACTIVE_DOMAIN(ELMDomain, ELMEngine, EventLog);
3737

3838
REACTIVE_DOMAIN(BFloodingDomain, FloodingEngine);
3939
REACTIVE_DOMAIN(BTopoSortDomain, TopoSortEngine);
40-
REACTIVE_DOMAIN(BTopoSortO1Domain, TopoSortO1Engine);
40+
//REACTIVE_DOMAIN(BTopoSortO1Domain, TopoSortO1Engine);
4141
REACTIVE_DOMAIN(BPulseCountDomain, PulseCountEngine);
4242
REACTIVE_DOMAIN(BSourceSetDomain, SourceSetEngine);
4343
REACTIVE_DOMAIN(BTopoSortSTDomain, TopoSortSTEngine);
@@ -236,21 +236,21 @@ void debugBenchmarks()
236236

237237
void profileBenchmark()
238238
{
239-
//RUN_BENCHMARK(std::cout, 1, Benchmark_Grid, BenchmarkParams_Grid(30, 10000),
240-
// BSourceSetDomain);
239+
RUN_BENCHMARK(std::cout, 1, Benchmark_Grid, BenchmarkParams_Grid(30, 1000),
240+
BSourceSetDomain);
241241

242242
//RUN_BENCHMARK(std::cout, 1, Benchmark_Grid, BenchmarkParams_Grid(30, 10000),
243243
// BSourceSetDomain);
244244

245-
RUN_BENCHMARK(std::cout, 1, Benchmark_Random, BenchmarkParams_Random(20, 11, 100, 0, 10, 40, 40, false, 41556, 21624),
246-
BFloodingDomain);
245+
//RUN_BENCHMARK(std::cout, 1, Benchmark_Random, BenchmarkParams_Random(20, 11, 100, 0, 10, 40, 40, false, 41556, 21624),
246+
// BFloodingDomain);
247247
}
248248

249249
} // ~anonymous namespace
250250

251251
int main()
252252
{
253-
runBenchmarks();
253+
//runBenchmarks();
254254
//debugBenchmarks();
255-
//profileBenchmark();
255+
profileBenchmark();
256256
}

src/test/EventStreamTest.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,11 @@ TYPED_TEST_P(EventStreamTest, EventMerge1)
9797
results.push_back(v);
9898
});
9999

100-
{
101-
MyDomain::ScopedTransaction _;
100+
MyDomain::DoTransaction([&] {
102101
a1 << 10;
103102
a2 << 20;
104103
a3 << 30;
105-
}
104+
});
106105

107106
ASSERT_EQ(results.size(), 3);
108107
ASSERT_TRUE(std::find(results.begin(), results.end(), 10) != results.end());
@@ -132,12 +131,11 @@ TYPED_TEST_P(EventStreamTest, EventMerge2)
132131
std::string s2("two");
133132
std::string s3("three");
134133

135-
{
136-
MyDomain::ScopedTransaction _;
134+
MyDomain::DoTransaction([&] {
137135
a1 << s1;
138136
a2 << s2;
139137
a3 << s3;
140-
}
138+
});
141139

142140
ASSERT_EQ(results.size(), 3);
143141
ASSERT_TRUE(std::find(results.begin(), results.end(), "one") != results.end());

src/test/OperationsTest.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ TYPED_TEST_P(OperationsTest, Fold1)
3636
});
3737

3838
for (auto i=1; i<=100; i++)
39+
{
3940
numSrc << i;
41+
}
4042

4143
ASSERT_EQ(numFold(), 5050);
4244

@@ -67,12 +69,10 @@ TYPED_TEST_P(OperationsTest, Fold2)
6769
ASSERT_EQ(v, 5050);
6870
});
6971

70-
{
71-
MyDomain::ScopedTransaction _;
72-
72+
MyDomain::DoTransaction([&] {
7373
for (auto i=1; i<=100; i++)
7474
src << i;
75-
}
75+
});
7676

7777
ASSERT_EQ(f(), 5050);
7878
ASSERT_EQ(c, 1);

src/test/SignalTest.h

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -203,11 +203,10 @@ TYPED_TEST_P(SignalTest, Signals3)
203203

204204
ASSERT_EQ(result(),6);
205205

206-
{
207-
MyDomain::ScopedTransaction _;
206+
MyDomain::DoTransaction([&] {
208207
a1 <<= 2;
209208
a2 <<= 2;
210-
}
209+
});
211210

212211
ASSERT_EQ(observeCount,1);
213212

@@ -389,11 +388,10 @@ TYPED_TEST_P(SignalTest, Flatten2)
389388
ASSERT_EQ(result(), 100 + 300);
390389
ASSERT_EQ(observeCount, 2);
391390

392-
{
393-
MyDomain::ScopedTransaction _;
391+
MyDomain::DoTransaction([&] {
394392
a0 <<= 5000;
395393
a1 <<= 6000;
396-
}
394+
});
397395

398396
ASSERT_EQ(result(), 5000 + 6000);
399397
ASSERT_EQ(observeCount, 3);
@@ -429,31 +427,28 @@ TYPED_TEST_P(SignalTest, Flatten3)
429427
ASSERT_EQ(result(), 10 + 30);
430428
ASSERT_EQ(observeCount, 0);
431429

432-
{
433-
MyDomain::ScopedTransaction _;
430+
MyDomain::DoTransaction([&] {
434431
inner1 <<= 1000;
435432
a0 <<= 200000;
436433
a1 <<= 50000;
437434
outer <<= inner2;
438-
}
435+
});
439436

440437
ASSERT_EQ(result(), 50000 + 200000);
441438
ASSERT_EQ(observeCount, 1);
442439

443-
{
444-
MyDomain::ScopedTransaction _;
440+
MyDomain::DoTransaction([&] {
445441
a0 <<= 667;
446442
a1 <<= 776;
447-
}
443+
});
448444

449445
ASSERT_EQ(result(), 776 + 667);
450446
ASSERT_EQ(observeCount, 2);
451447

452-
{
453-
MyDomain::ScopedTransaction _;
448+
MyDomain::DoTransaction([&] {
454449
inner1 <<= 999;
455450
a0 <<= 888;
456-
}
451+
});
457452

458453
ASSERT_EQ(result(), 776 + 888);
459454
ASSERT_EQ(observeCount, 2);
@@ -484,11 +479,10 @@ TYPED_TEST_P(SignalTest, Flatten4)
484479
results.push_back(v);
485480
});
486481

487-
{
488-
MyDomain::ScopedTransaction _;
482+
MyDomain::DoTransaction([&] {
489483
a3 <<= 400;
490484
outer <<= inner2;
491-
}
485+
});
492486

493487
ASSERT_EQ(results.size(), 1);
494488

src/test/TransactionTest.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -248,22 +248,22 @@ TYPED_TEST_P(TransactionTest, Concurrent4)
248248
// Todo: improve this as it'll fail occasionally
249249
shouldSpin = true;
250250
std::thread t1([&] {
251-
MyDomain::SetDefaultCommitFlags(allow_transaction_merging);
251+
MyDomain::Set<ETurnFlags>(enable_input_merging);
252252
n1 <<= 2;
253253
});
254254
std::this_thread::sleep_for(std::chrono::milliseconds(2000));
255255
std::thread t2([&] {
256-
MyDomain::SetDefaultCommitFlags(allow_transaction_merging);
256+
MyDomain::Set<ETurnFlags>(enable_input_merging);
257257
n1 <<= 3;
258258
});
259259
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
260260
std::thread t3([&] {
261-
MyDomain::SetDefaultCommitFlags(allow_transaction_merging);
261+
MyDomain::Set<ETurnFlags>(enable_input_merging);
262262
n1 <<= 4;
263263
});
264264
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
265265
std::thread t4([&] {
266-
MyDomain::SetDefaultCommitFlags(allow_transaction_merging);
266+
MyDomain::Set<ETurnFlags>(enable_input_merging);
267267
n1 <<= 5;
268268
});
269269
std::this_thread::sleep_for(std::chrono::milliseconds(1000));

0 commit comments

Comments
 (0)