Skip to content

Commit e792fc8

Browse files
committed
Examples 4.3 Coincidence
1 parent 220df5e commit e792fc8

File tree

3 files changed

+482
-0
lines changed

3 files changed

+482
-0
lines changed
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package itrx.chapter4.coincidence;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.observers.TestSubscriber;
10+
import rx.schedulers.Schedulers;
11+
import rx.schedulers.TestScheduler;
12+
13+
public class GroupJoinTest {
14+
15+
private static class Tuple<T1, T2> {
16+
public final T1 item1;
17+
public final T2 item2;
18+
19+
public Tuple(T1 item1, T2 item2) {
20+
this.item1 = item1;
21+
this.item2 = item2;
22+
}
23+
24+
public static <T1,T2> Tuple<T1,T2> create(T1 item1, T2 item2) {
25+
return new Tuple<T1,T2>(item1, item2);
26+
}
27+
28+
@Override
29+
public boolean equals(Object obj) {
30+
if (obj instanceof Tuple<?,?>) {
31+
Tuple<?,?> other = (Tuple<?,?>) obj;
32+
return this.item1.equals(other.item1) &&
33+
this.item2.equals(other.item2);
34+
}
35+
return false;
36+
}
37+
38+
@Override
39+
public String toString() {
40+
return "(" + item1 + ", " + item2 + ")";
41+
}
42+
}
43+
44+
public void example() {
45+
Observable<String> left =
46+
Observable.interval(100, TimeUnit.MILLISECONDS)
47+
.map(i -> "L" + i)
48+
.take(6);
49+
Observable<String> right =
50+
Observable.interval(200, TimeUnit.MILLISECONDS)
51+
.map(i -> "R" + i)
52+
.take(3);
53+
54+
left
55+
.groupJoin(
56+
right,
57+
i -> Observable.never(),
58+
i -> Observable.timer(0, TimeUnit.MILLISECONDS),
59+
(l, rs) -> rs.toList().subscribe(list -> System.out.println(l + ": " + list))
60+
)
61+
.subscribe();
62+
63+
// L0: [R0, R1, R2]
64+
// L1: [R0, R1, R2]
65+
// L2: [R1, R2]
66+
// L3: [R1, R2]
67+
// L4: [R2]
68+
// L5: [R2]
69+
}
70+
71+
72+
//
73+
// Test
74+
//
75+
76+
@Test
77+
public void test() {
78+
TestSubscriber<Object> tester = new TestSubscriber<>();
79+
TestScheduler scheduler = Schedulers.test();
80+
81+
Observable<Long> left =
82+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler)
83+
.take(6);
84+
Observable<Long> right =
85+
Observable.interval(200, TimeUnit.MILLISECONDS, scheduler)
86+
.take(3);
87+
88+
left
89+
.groupJoin(
90+
right,
91+
i -> Observable.never(),
92+
i -> Observable.timer(0, TimeUnit.MILLISECONDS, scheduler),
93+
(l, rs) -> rs.toList().map(rl -> Tuple.create(l, rl))
94+
)
95+
.flatMap(i -> i)
96+
.subscribe(tester);
97+
98+
scheduler.advanceTimeTo(600, TimeUnit.MILLISECONDS);
99+
tester.assertReceivedOnNext(Arrays.asList(
100+
Tuple.create(0L, Arrays.asList(0L, 1L, 2L)),
101+
Tuple.create(1L, Arrays.asList(0L, 1L, 2L)),
102+
Tuple.create(2L, Arrays.asList(1L, 2L)),
103+
Tuple.create(3L, Arrays.asList(1L, 2L)),
104+
Tuple.create(4L, Arrays.asList(2L)),
105+
Tuple.create(5L, Arrays.asList(2L))
106+
));
107+
}
108+
109+
@Test
110+
public void exampleEquivalence() {
111+
TestScheduler scheduler = Schedulers.test();
112+
TestSubscriber<Object> testerJoin = new TestSubscriber<>();
113+
TestSubscriber<Object> testerGroupJoin = new TestSubscriber<>();
114+
115+
Observable<Long> left =
116+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
117+
Observable<Long> right =
118+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
119+
120+
left.join(
121+
right,
122+
i -> Observable.timer(150, TimeUnit.MILLISECONDS, scheduler),
123+
i -> Observable.timer(0, TimeUnit.MILLISECONDS, scheduler),
124+
(l,r) -> Tuple.create(l, r)
125+
)
126+
.take(10)
127+
.subscribe(testerJoin);
128+
129+
left.groupJoin(
130+
right,
131+
i -> Observable.timer(150, TimeUnit.MILLISECONDS, scheduler),
132+
i -> Observable.timer(0, TimeUnit.MILLISECONDS, scheduler),
133+
(l, rs) -> rs.map(r -> Tuple.create(l, r))
134+
)
135+
.flatMap(i -> i)
136+
.take(10)
137+
.subscribe(testerGroupJoin);
138+
139+
scheduler.advanceTimeTo(600, TimeUnit.MILLISECONDS);
140+
testerJoin.assertReceivedOnNext(testerGroupJoin.getOnNextEvents());
141+
}
142+
}
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package itrx.chapter4.coincidence;
2+
3+
import java.util.Arrays;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.Test;
7+
8+
import rx.Observable;
9+
import rx.observers.TestSubscriber;
10+
import rx.schedulers.Schedulers;
11+
import rx.schedulers.TestScheduler;
12+
13+
public class JoinTest {
14+
15+
private static class Tuple<T1, T2> {
16+
public final T1 item1;
17+
public final T2 item2;
18+
19+
public Tuple(T1 item1, T2 item2) {
20+
this.item1 = item1;
21+
this.item2 = item2;
22+
}
23+
24+
public static <T1,T2> Tuple<T1,T2> create(T1 item1, T2 item2) {
25+
return new Tuple<T1,T2>(item1, item2);
26+
}
27+
28+
@Override
29+
public boolean equals(Object obj) {
30+
if (obj instanceof Tuple<?,?>) {
31+
Tuple<?,?> other = (Tuple<?,?>) obj;
32+
return this.item1.equals(other.item1) &&
33+
this.item2.equals(other.item2);
34+
}
35+
return false;
36+
}
37+
}
38+
39+
public void exampleJoinSimple() {
40+
Observable<String> left =
41+
Observable.interval(100, TimeUnit.MILLISECONDS)
42+
.map(i -> "L" + i);
43+
Observable<String> right =
44+
Observable.interval(200, TimeUnit.MILLISECONDS)
45+
.map(i -> "R" + i);
46+
47+
left
48+
.join(
49+
right,
50+
i -> Observable.never(),
51+
i -> Observable.timer(0, TimeUnit.MILLISECONDS),
52+
(l,r) -> l + " - " + r
53+
)
54+
.take(10)
55+
.subscribe(System.out::println);
56+
57+
// L0 - R0
58+
// L1 - R0
59+
// L0 - R1
60+
// L1 - R1
61+
// L2 - R1
62+
// L3 - R1
63+
// L0 - R2
64+
// L1 - R2
65+
// L2 - R2
66+
// L3 - R2
67+
}
68+
69+
public void exampleJoin2Way() {
70+
Observable<String> left =
71+
Observable.interval(100, TimeUnit.MILLISECONDS)
72+
.map(i -> "L" + i);
73+
Observable<String> right =
74+
Observable.interval(100, TimeUnit.MILLISECONDS)
75+
.map(i -> "R" + i);
76+
77+
left
78+
.join(
79+
right,
80+
i -> Observable.timer(150, TimeUnit.MILLISECONDS),
81+
i -> Observable.timer(0, TimeUnit.MILLISECONDS),
82+
(l,r) -> l + " - " + r
83+
)
84+
.take(10)
85+
.subscribe(System.out::println);
86+
87+
// L0 - R0
88+
// L0 - R1
89+
// L1 - R1
90+
// L1 - R2
91+
// L2 - R2
92+
// L2 - R3
93+
// L3 - R3
94+
// L3 - R4
95+
// L4 - R4
96+
// L4 - R5
97+
}
98+
99+
100+
//
101+
// Tests
102+
//
103+
104+
@Test
105+
public void testJoinSimple() {
106+
TestSubscriber<Tuple<Long,Long>> tester = new TestSubscriber<>();
107+
TestScheduler scheduler = Schedulers.test();
108+
109+
Observable<Long> left =
110+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
111+
Observable<Long> right =
112+
Observable.interval(200, TimeUnit.MILLISECONDS, scheduler);
113+
114+
left
115+
.join(
116+
right,
117+
i -> Observable.never(),
118+
i -> Observable.timer(0, TimeUnit.MILLISECONDS, scheduler),
119+
(l,r) -> Tuple.create(l, r)
120+
)
121+
.take(10)
122+
.subscribe(tester);
123+
124+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
125+
tester.assertReceivedOnNext(Arrays.asList(
126+
Tuple.create(0L, 0L),
127+
Tuple.create(1L, 0L),
128+
Tuple.create(0L, 1L),
129+
Tuple.create(1L, 1L),
130+
Tuple.create(2L, 1L),
131+
Tuple.create(3L, 1L),
132+
Tuple.create(0L, 2L),
133+
Tuple.create(1L, 2L),
134+
Tuple.create(2L, 2L),
135+
Tuple.create(3L, 2L)
136+
));
137+
}
138+
139+
@Test
140+
public void testJoin2Way() {
141+
TestSubscriber<Tuple<Long,Long>> tester = new TestSubscriber<>();
142+
TestScheduler scheduler = Schedulers.test();
143+
144+
Observable<Long> left =
145+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
146+
Observable<Long> right =
147+
Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
148+
149+
left
150+
.join(
151+
right,
152+
i -> Observable.timer(150, TimeUnit.MILLISECONDS, scheduler),
153+
i -> Observable.timer(0, TimeUnit.MILLISECONDS, scheduler),
154+
(l,r) -> Tuple.create(l, r)
155+
)
156+
.take(10)
157+
.subscribe(tester);
158+
159+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
160+
tester.assertReceivedOnNext(Arrays.asList(
161+
Tuple.create(0L, 0L),
162+
Tuple.create(0L, 1L),
163+
Tuple.create(1L, 1L),
164+
Tuple.create(1L, 2L),
165+
Tuple.create(2L, 2L),
166+
Tuple.create(2L, 3L),
167+
Tuple.create(3L, 3L),
168+
Tuple.create(3L, 4L),
169+
Tuple.create(4L, 4L),
170+
Tuple.create(4L, 5L)
171+
));
172+
}
173+
}

0 commit comments

Comments
 (0)