1+ /**
2+ * Copyright 2014 Netflix, Inc.
3+ *
4+ * Licensed under the Apache License, Version 2.0 (the "License");
5+ * you may not use this file except in compliance with the License.
6+ * You may obtain a copy of the License at
7+ *
8+ * http://www.apache.org/licenses/LICENSE-2.0
9+ *
10+ * Unless required by applicable law or agreed to in writing, software
11+ * distributed under the License is distributed on an "AS IS" BASIS,
12+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ * See the License for the specific language governing permissions and
14+ * limitations under the License.
15+ */
16+ package rx .subjects ;
17+
18+ import java .util .Collection ;
19+ import java .util .concurrent .TimeUnit ;
20+ import java .util .concurrent .atomic .AtomicReference ;
21+
22+ import rx .Notification ;
23+ import rx .Observer ;
24+ import rx .Scheduler ;
25+ import rx .Scheduler .Inner ;
26+ import rx .functions .Action1 ;
27+ import rx .schedulers .TestScheduler ;
28+ import rx .subjects .SubjectSubscriptionManager .SubjectObserver ;
29+
30+ /**
31+ * Subject that, once and {@link Observer} has subscribed, publishes all subsequent events to the subscriber.
32+ * <p>
33+ * <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/S.PublishSubject.png">
34+ * <p>
35+ * Example usage:
36+ * <p>
37+ * <pre> {@code
38+
39+ * PublishSubject<Object> subject = PublishSubject.create();
40+ // observer1 will receive all onNext and onCompleted events
41+ subject.subscribe(observer1);
42+ subject.onNext("one");
43+ subject.onNext("two");
44+ // observer2 will only receive "three" and onCompleted
45+ subject.subscribe(observer2);
46+ subject.onNext("three");
47+ subject.onCompleted();
48+
49+ } </pre>
50+ *
51+ * @param <T>
52+ */
53+ public final class TestSubject <T > extends Subject <T , T > {
54+
55+ public static <T > TestSubject <T > create (TestScheduler scheduler ) {
56+ final SubjectSubscriptionManager <T > subscriptionManager = new SubjectSubscriptionManager <T >();
57+ // set a default value so subscriptions will immediately receive this until a new notification is received
58+ final AtomicReference <Notification <T >> lastNotification = new AtomicReference <Notification <T >>();
59+
60+ OnSubscribe <T > onSubscribe = subscriptionManager .getOnSubscribeFunc (
61+ /**
62+ * This function executes at beginning of subscription.
63+ *
64+ * This will always run, even if Subject is in terminal state.
65+ */
66+ new Action1 <SubjectObserver <? super T >>() {
67+
68+ @ Override
69+ public void call (SubjectObserver <? super T > o ) {
70+ // nothing onSubscribe unless in terminal state which is the next function
71+ }
72+ },
73+ /**
74+ * This function executes if the Subject is terminated before subscription occurs.
75+ */
76+ new Action1 <SubjectObserver <? super T >>() {
77+
78+ @ Override
79+ public void call (SubjectObserver <? super T > o ) {
80+ /*
81+ * If we are already terminated, or termination happens while trying to subscribe
82+ * this will be invoked and we emit whatever the last terminal value was.
83+ */
84+ lastNotification .get ().accept (o );
85+ }
86+ });
87+
88+ return new TestSubject <T >(onSubscribe , subscriptionManager , lastNotification , scheduler );
89+ }
90+
91+ private final SubjectSubscriptionManager <T > subscriptionManager ;
92+ private final AtomicReference <Notification <T >> lastNotification ;
93+ private final Scheduler .Inner innerScheduler ;
94+
95+ protected TestSubject (OnSubscribe <T > onSubscribe , SubjectSubscriptionManager <T > subscriptionManager , AtomicReference <Notification <T >> lastNotification , TestScheduler scheduler ) {
96+ super (onSubscribe );
97+ this .subscriptionManager = subscriptionManager ;
98+ this .lastNotification = lastNotification ;
99+ this .innerScheduler = scheduler .createInnerScheduler ();
100+ }
101+
102+ @ Override
103+ public void onCompleted () {
104+ onCompleted (innerScheduler .now ());
105+ }
106+
107+ private void _onCompleted () {
108+ subscriptionManager .terminate (new Action1 <Collection <SubjectObserver <? super T >>>() {
109+
110+ @ Override
111+ public void call (Collection <SubjectObserver <? super T >> observers ) {
112+ lastNotification .set (Notification .<T > createOnCompleted ());
113+ for (Observer <? super T > o : observers ) {
114+ o .onCompleted ();
115+ }
116+ }
117+ });
118+ }
119+
120+ public void onCompleted (long timeInMilliseconds ) {
121+ innerScheduler .schedule (new Action1 <Inner >() {
122+
123+ @ Override
124+ public void call (Inner t1 ) {
125+ _onCompleted ();
126+ }
127+
128+ }, timeInMilliseconds , TimeUnit .MILLISECONDS );
129+ }
130+
131+ @ Override
132+ public void onError (final Throwable e ) {
133+ onError (e , innerScheduler .now ());
134+ }
135+
136+ private void _onError (final Throwable e ) {
137+ subscriptionManager .terminate (new Action1 <Collection <SubjectObserver <? super T >>>() {
138+
139+ @ Override
140+ public void call (Collection <SubjectObserver <? super T >> observers ) {
141+ lastNotification .set (Notification .<T > createOnError (e ));
142+ for (Observer <? super T > o : observers ) {
143+ o .onError (e );
144+ }
145+ }
146+ });
147+
148+ }
149+
150+ public void onError (final Throwable e , long timeInMilliseconds ) {
151+ innerScheduler .schedule (new Action1 <Inner >() {
152+
153+ @ Override
154+ public void call (Inner t1 ) {
155+ _onError (e );
156+ }
157+
158+ }, timeInMilliseconds , TimeUnit .MILLISECONDS );
159+ }
160+
161+ @ Override
162+ public void onNext (T v ) {
163+ onNext (v , innerScheduler .now ());
164+ }
165+
166+ private void _onNext (T v ) {
167+ for (Observer <? super T > o : subscriptionManager .rawSnapshot ()) {
168+ o .onNext (v );
169+ }
170+ }
171+
172+ public void onNext (final T v , long timeInMilliseconds ) {
173+ innerScheduler .schedule (new Action1 <Inner >() {
174+
175+ @ Override
176+ public void call (Inner t1 ) {
177+ _onNext (v );
178+ }
179+
180+ }, timeInMilliseconds , TimeUnit .MILLISECONDS );
181+ }
182+ }
0 commit comments