5454import rx .operators .OperationFilter ;
5555import rx .operators .OperationFinally ;
5656import rx .operators .OperationFlatMap ;
57- import rx .operators .OperationGroupBy ;
57+ import rx .operators .OperatorGroupBy ;
5858import rx .operators .OperationGroupByUntil ;
5959import rx .operators .OperationGroupJoin ;
6060import rx .operators .OperationInterval ;
159159 */
160160public class Observable <T > {
161161
162- final Action2 < Observer <? super T >, OperatorSubscription > f ;
162+ final Action1 < Operator <? super T >> f ;
163163
164164 /**
165165 * Observable with Function to execute when subscribed to.
@@ -171,7 +171,7 @@ public class Observable<T> {
171171 * @param onSubscribe
172172 * {@link OnSubscribeFunc} to be executed when {@link #subscribe(Observer)} is called
173173 */
174- protected Observable (Action2 < Observer <? super T >, OperatorSubscription > f ) {
174+ protected Observable (Action1 < Operator <? super T >> f ) {
175175 this .f = f ;
176176 }
177177
@@ -189,31 +189,6 @@ public static interface OnSubscribeFunc<T> extends Function {
189189 public Subscription onSubscribe (Observer <? super T > t1 );
190190 }
191191
192- public static class OperatorSubscription implements Subscription {
193-
194- private final CompositeSubscription cs = new CompositeSubscription ();
195-
196- @ Override
197- public void unsubscribe () {
198- cs .unsubscribe ();
199- }
200-
201- public static OperatorSubscription create (Subscription s ) {
202- OperatorSubscription _s = new OperatorSubscription ();
203- _s .add (s );
204- return _s ;
205- }
206-
207- public boolean isUnsubscribed () {
208- return cs .isUnsubscribed ();
209- }
210-
211- public void add (Subscription s ) {
212- cs .add (s );
213- }
214-
215- }
216-
217192 private final static RxJavaObservableExecutionHook hook = RxJavaPlugins .getInstance ().getObservableExecutionHook ();
218193
219194 /**
@@ -260,27 +235,27 @@ protected Observable(Action2<Observer<? super T>, OperatorSubscription> f) {
260235 * @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#create">RxJava Wiki: create()</a>
261236 * @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.create.aspx">MSDN: Observable.Create</a>
262237 */
263- public final static <T > Observable <T > create (final Action2 < Observer <? super T >, OperatorSubscription > f ) {
238+ public final static <T > Observable <T > create (final Action1 < Operator <? super T >> f ) {
264239 return new Observable <T >(f );
265240 }
266241
267242 public final static <T > Observable <T > create (final OnSubscribeFunc <T > func ) {
268- return new Observable <T >(new Action2 < Observer <? super T >, OperatorSubscription >() {
243+ return new Observable <T >(new Action1 < Operator <? super T >>() {
269244
270245 @ Override
271- public void call (Observer <? super T > o , OperatorSubscription s ) {
272- s .add (func .onSubscribe (o ));
246+ public void call (Operator <? super T > o ) {
247+ o .add (func .onSubscribe (o ));
273248 }
274249
275250 });
276251 }
277252
278- public <R > Observable <R > bind (final Func2 < Observer <? super R >, OperatorSubscription , Observer <? super T >> bind ) {
279- return new Observable <R >(new Action2 < Observer <? super R >, OperatorSubscription >() {
253+ public <R > Observable <R > bind (final Func1 < Operator <? super R >, Operator <? super T >> bind ) {
254+ return new Observable <R >(new Action1 < Operator <? super R >>() {
280255
281256 @ Override
282- public void call (Observer <? super R > o , OperatorSubscription s ) {
283- f . call (bind .call (o , s ), s );
257+ public void call (Operator <? super R > o ) {
258+ subscribe (bind .call (o ) );
284259 }
285260 });
286261 }
@@ -2878,7 +2853,7 @@ public final static <T> Observable<T> synchronize(Observable<T> source) {
28782853 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229435.aspx">MSDN: Observable.Timer</a>
28792854 */
28802855 public final static Observable <Long > timer (long initialDelay , long period , TimeUnit unit ) {
2881- return timer (initialDelay , period , unit , Schedulers .threadPoolForComputation ());
2856+ return timer (initialDelay , period , unit , Schedulers .computation ());
28822857 }
28832858
28842859 /**
@@ -2917,7 +2892,7 @@ public final static Observable<Long> timer(long initialDelay, long period, TimeU
29172892 * @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#timer">RxJava wiki: timer()</a>
29182893 */
29192894 public final static Observable <Long > timer (long delay , TimeUnit unit ) {
2920- return timer (delay , unit , Schedulers .threadPoolForComputation ());
2895+ return timer (delay , unit , Schedulers .computation ());
29212896 }
29222897
29232898 /**
@@ -4299,7 +4274,7 @@ public final <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> i
42994274 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229810.aspx">MSDN: Observable.Delay</a>
43004275 */
43014276 public final Observable <T > delay (long delay , TimeUnit unit ) {
4302- return OperationDelay .delay (this , delay , unit , Schedulers .threadPoolForComputation ());
4277+ return OperationDelay .delay (this , delay , unit , Schedulers .computation ());
43034278 }
43044279
43054280 /**
@@ -4336,7 +4311,7 @@ public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
43364311 * amount
43374312 */
43384313 public final Observable <T > delaySubscription (long delay , TimeUnit unit ) {
4339- return delaySubscription (delay , unit , Schedulers .threadPoolForComputation ());
4314+ return delaySubscription (delay , unit , Schedulers .computation ());
43404315 }
43414316
43424317 /**
@@ -4795,7 +4770,7 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
47954770 * @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
47964771 */
47974772 public final <K > Observable <GroupedObservable <K , T >> groupBy (final Func1 <? super T , ? extends K > keySelector ) {
4798- return create ( OperationGroupBy . groupBy ( this , keySelector ));
4773+ return bind ( new OperatorGroupBy < K , T >( keySelector ));
47994774 }
48004775
48014776 /**
@@ -4819,7 +4794,7 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super
48194794 * @see <a href="https://github.com/Netflix/RxJava/wiki/Transforming-Observables#groupby-and-groupbyuntil">RxJava Wiki: groupBy</a>
48204795 */
48214796 public final <K , R > Observable <GroupedObservable <K , R >> groupBy (final Func1 <? super T , ? extends K > keySelector , final Func1 <? super T , ? extends R > elementSelector ) {
4822- return create ( OperationGroupBy . groupBy ( this , keySelector , elementSelector )) ;
4797+ return null ;
48234798 }
48244799
48254800 /**
@@ -5936,7 +5911,7 @@ public final Subject<T, T> call() {
59365911 * @see <a href="http://msdn.microsoft.com/en-us/library/hh228952.aspx">MSDN: Observable.Replay</a>
59375912 */
59385913 public final <R > Observable <R > replay (Func1 <? super Observable <T >, ? extends Observable <R >> selector , int bufferSize , long time , TimeUnit unit ) {
5939- return replay (selector , bufferSize , time , unit , Schedulers .threadPoolForComputation ());
5914+ return replay (selector , bufferSize , time , unit , Schedulers .computation ());
59405915 }
59415916
59425917 /**
@@ -6036,7 +6011,7 @@ public final Subject<T, T> call() {
60366011 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229526.aspx">MSDN: Observable.Replay</a>
60376012 */
60386013 public final <R > Observable <R > replay (Func1 <? super Observable <T >, ? extends Observable <R >> selector , long time , TimeUnit unit ) {
6039- return replay (selector , time , unit , Schedulers .threadPoolForComputation ());
6014+ return replay (selector , time , unit , Schedulers .computation ());
60406015 }
60416016
60426017 /**
@@ -6139,7 +6114,7 @@ public final ConnectableObservable<T> replay(int bufferSize) {
61396114 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229874.aspx">MSDN: Observable.Replay</a>
61406115 */
61416116 public final ConnectableObservable <T > replay (int bufferSize , long time , TimeUnit unit ) {
6142- return replay (bufferSize , time , unit , Schedulers .threadPoolForComputation ());
6117+ return replay (bufferSize , time , unit , Schedulers .computation ());
61436118 }
61446119
61456120 /**
@@ -6209,7 +6184,7 @@ public final ConnectableObservable<T> replay(int bufferSize, Scheduler scheduler
62096184 * @see <a href="http://msdn.microsoft.com/en-us/library/hh229232.aspx">MSDN: Observable.Replay</a>
62106185 */
62116186 public final ConnectableObservable <T > replay (long time , TimeUnit unit ) {
6212- return replay (time , unit , Schedulers .threadPoolForComputation ());
6187+ return replay (time , unit , Schedulers .computation ());
62136188 }
62146189
62156190 /**
@@ -6516,7 +6491,7 @@ public final Observable<T> skip(int num) {
65166491 * @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#skip">RxJava Wiki: skip()</a>
65176492 */
65186493 public final Observable <T > skip (long time , TimeUnit unit ) {
6519- return skip (time , unit , Schedulers .threadPoolForComputation ());
6494+ return skip (time , unit , Schedulers .computation ());
65206495 }
65216496
65226497 /**
@@ -6578,7 +6553,7 @@ public final Observable<T> skipLast(int count) {
65786553 * @see <a href="http://msdn.microsoft.com/en-us/library/hh211750.aspx">MSDN: Observable.SkipLast</a>
65796554 */
65806555 public final Observable <T > skipLast (long time , TimeUnit unit ) {
6581- return skipLast (time , unit , Schedulers .threadPoolForComputation ());
6556+ return skipLast (time , unit , Schedulers .computation ());
65826557 }
65836558
65846559 /**
@@ -6929,8 +6904,8 @@ public final Observable<T> startWith(T[] values, Scheduler scheduler) {
69296904 }
69306905
69316906 // TODO should this be called `observe` instead of `subscribe`?
6932- public final void subscribe (Observer <? super T > o , Func0 < OperatorSubscription > sf ) {
6933- f .call (o , sf . call () );
6907+ public final void subscribe (Operator <? super T > o ) {
6908+ f .call (o );
69346909 }
69356910
69366911 /**
@@ -7166,7 +7141,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, Scheduler s
71667141 */
71677142 public final Subscription subscribe (Observer <? super T > observer ) {
71687143 // allow the hook to intercept and/or decorate
7169- Action2 < Observer <? super T >, OperatorSubscription > onSubscribeFunction = hook .onSubscribeStart (this , f );
7144+ Action1 < Operator <? super T >> onSubscribeFunction = hook .onSubscribeStart (this , f );
71707145 // validate and proceed
71717146 if (observer == null ) {
71727147 throw new IllegalArgumentException ("observer can not be null" );
@@ -7176,17 +7151,20 @@ public final Subscription subscribe(Observer<? super T> observer) {
71767151 // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception
71777152 }
71787153 try {
7179- OperatorSubscription os = new OperatorSubscription () ;
7154+ Operator <? super T > op = null ;
71807155 /**
71817156 * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
71827157 */
71837158 if (isInternalImplementation (observer )) {
7184- onSubscribeFunction .call (observer , os );
7159+ op = Operator .create (observer , new CompositeSubscription ());
7160+ onSubscribeFunction .call (op );
71857161 } else {
7186- SafeObservableSubscription subscription = new SafeObservableSubscription (os );
7187- onSubscribeFunction .call (new SafeObserver <T >(subscription , observer ), os );
7162+ // TODO this doesn't seem correct any longer with the Operator and injecting of CompositeSubscription
7163+ SafeObservableSubscription subscription = new SafeObservableSubscription (op );
7164+ op = Operator .create (new SafeObserver <T >(subscription , observer ), new CompositeSubscription ());
7165+ onSubscribeFunction .call (op );
71887166 }
7189- return hook .onSubscribeReturn (this , os );
7167+ return hook .onSubscribeReturn (this , op );
71907168 } catch (OnErrorNotImplementedException e ) {
71917169 // special handling when onError is not implemented ... we just rethrow
71927170 throw e ;
@@ -7426,7 +7404,7 @@ public final Observable<T> take(final int num) {
74267404 * @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#take">RxJava Wiki: take()</a>
74277405 */
74287406 public final Observable <T > take (long time , TimeUnit unit ) {
7429- return take (time , unit , Schedulers .threadPoolForComputation ());
7407+ return take (time , unit , Schedulers .computation ());
74307408 }
74317409
74327410 /**
@@ -7517,7 +7495,7 @@ public final Observable<T> takeLast(final int count) {
75177495 * were emitted in a specified window of time before the Observable completed.
75187496 */
75197497 public final Observable <T > takeLast (int count , long time , TimeUnit unit ) {
7520- return takeLast (count , time , unit , Schedulers .threadPoolForComputation ());
7498+ return takeLast (count , time , unit , Schedulers .computation ());
75217499 }
75227500
75237501 /**
@@ -7560,7 +7538,7 @@ public final Observable<T> takeLast(int count, long time, TimeUnit unit, Schedul
75607538 * the window of time before the Observable completed specified by {@code time}
75617539 */
75627540 public final Observable <T > takeLast (long time , TimeUnit unit ) {
7563- return takeLast (time , unit , Schedulers .threadPoolForComputation ());
7541+ return takeLast (time , unit , Schedulers .computation ());
75647542 }
75657543
75667544 /**
@@ -8735,10 +8713,10 @@ public final <T2, R> Observable<R> zip(Observable<? extends T2> other, Func2<? s
87358713 */
87368714 private static class NeverObservable <T > extends Observable <T > {
87378715 public NeverObservable () {
8738- super (new Action2 < Observer <? super T >, OperatorSubscription >() {
8716+ super (new Action1 < Operator <? super T >>() {
87398717
87408718 @ Override
8741- public void call (Observer <? super T > observer , OperatorSubscription t2 ) {
8719+ public void call (Operator <? super T > observer ) {
87428720 // do nothing
87438721 }
87448722
@@ -8756,7 +8734,7 @@ public void call(Observer<? super T> observer, OperatorSubscription t2) {
87568734 private static class ThrowObservable <T > extends Observable <T > {
87578735
87588736 public ThrowObservable (final Throwable exception ) {
8759- super (new Action2 < Observer <? super T >, OperatorSubscription >() {
8737+ super (new Action1 < Operator <? super T >>() {
87608738
87618739 /**
87628740 * Accepts an {@link Observer} and calls its {@link Observer#onError onError} method.
@@ -8766,14 +8744,15 @@ public ThrowObservable(final Throwable exception) {
87668744 * @return a reference to the subscription
87678745 */
87688746 @ Override
8769- public void call (Observer <? super T > observer , OperatorSubscription t2 ) {
8747+ public void call (Operator <? super T > observer ) {
87708748 observer .onError (exception );
87718749 }
87728750
87738751 });
87748752 }
87758753 }
87768754
8755+ @ SuppressWarnings ("rawtypes" )
87778756 private final static ConcurrentHashMap <Class , Boolean > internalClassMap = new ConcurrentHashMap <Class , Boolean >();
87788757
87798758 /**
0 commit comments