2222import rx .Subscriber ;
2323import rx .observables .GroupedObservable ;
2424import rx .subjects .PublishSubject ;
25- import rx .subscriptions .CompositeSubscription ;
26- import rx .subscriptions .Subscriptions ;
27- import rx .util .functions .Action0 ;
2825import rx .util .functions .Action1 ;
2926
3027/**
31- * Asynchronously subscribes and unsubscribes Observers on the specified Scheduler.
28+ * Subscribes and unsubscribes Observers on the specified Scheduler.
3229 * <p>
30+ * Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
31+ * in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
32+ * <p>
33+ * See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
34+ * subscribe is solving.
35+ *
3336 * <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
3437 */
3538public class OperatorSubscribeOn <T > implements Operator <T , Observable <T >> {
3639
3740 private final Scheduler scheduler ;
38- /**
41+ /**
3942 * Indicate that events fired between the original subscription time and
4043 * the actual subscription time should not get lost.
4144 */
4245 private final boolean dontLoseEvents ;
4346 /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
4447 private final int bufferSize ;
48+
4549 public OperatorSubscribeOn (Scheduler scheduler , boolean dontLoseEvents ) {
4650 this (scheduler , dontLoseEvents , -1 );
4751 }
52+
4853 /**
4954 * Construct a SubscribeOn operator.
50- * @param scheduler the target scheduler
51- * @param dontLoseEvents indicate that events should be buffered until the actual subscription happens
52- * @param bufferSize if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
53- * block the source. -1 indicates an unbounded buffer
55+ *
56+ * @param scheduler
57+ * the target scheduler
58+ * @param dontLoseEvents
59+ * indicate that events should be buffered until the actual subscription happens
60+ * @param bufferSize
61+ * if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
62+ * block the source. -1 indicates an unbounded buffer
5463 */
5564 public OperatorSubscribeOn (Scheduler scheduler , boolean dontLoseEvents , int bufferSize ) {
5665 this .scheduler = scheduler ;
@@ -71,78 +80,61 @@ public void onCompleted() {
7180 public void onError (Throwable e ) {
7281 subscriber .onError (e );
7382 }
83+
7484 boolean checkNeedBuffer (Observable <?> o ) {
75- return dontLoseEvents || ((o instanceof GroupedObservable <?, ?>)
85+ /*
86+ * Included are some Observable types known to be "hot" and thus needing
87+ * buffering when subscribing across thread boundaries otherwise
88+ * we can lose data.
89+ *
90+ * See https://github.com/Netflix/RxJava/issues/844 for more information.
91+ */
92+ return dontLoseEvents
93+ || ((o instanceof GroupedObservable <?, ?>)
7694 || (o instanceof PublishSubject <?>)
7795 // || (o instanceof BehaviorSubject<?, ?>)
7896 );
7997 }
98+
8099 @ Override
81100 public void onNext (final Observable <T > o ) {
82101 if (checkNeedBuffer (o )) {
83- final CompositeSubscription cs = new CompositeSubscription ();
84- subscriber .add (cs );
85- final BufferUntilSubscriber <T > bus = new BufferUntilSubscriber <T >(bufferSize , subscriber , new CompositeSubscription ());
102+ // use buffering (possibly blocking) for a possibly synchronous subscribe
103+ final BufferUntilSubscriber <T > bus = new BufferUntilSubscriber <T >(bufferSize , subscriber );
86104 o .subscribe (bus );
87- scheduler .schedule (new Action1 <Inner >() {
105+ subscriber . add ( scheduler .schedule (new Action1 <Inner >() {
88106 @ Override
89107 public void call (final Inner inner ) {
90- cs .add (Subscriptions .create (new Action0 () {
91- @ Override
92- public void call () {
93- inner .schedule (new Action1 <Inner >() {
94- @ Override
95- public void call (final Inner inner ) {
96- bus .unsubscribe ();
97- }
98- });
99- }
100- }));
101108 bus .enterPassthroughMode ();
102109 }
103- });
110+ })) ;
104111 return ;
105- }
106- scheduler .schedule (new Action1 <Inner >() {
107-
108- @ Override
109- public void call (final Inner inner ) {
110- final CompositeSubscription cs = new CompositeSubscription ();
111- subscriber .add (Subscriptions .create (new Action0 () {
112-
113- @ Override
114- public void call () {
115- inner .schedule (new Action1 <Inner >() {
116-
117- @ Override
118- public void call (final Inner inner ) {
119- cs .unsubscribe ();
120- }
121-
122- });
123- }
112+ } else {
113+ // no buffering (async subscribe)
114+ scheduler .schedule (new Action1 <Inner >() {
124115
125- }));
126- cs . add ( subscriber );
127- o .subscribe (new Subscriber <T >(cs ) {
116+ @ Override
117+ public void call ( final Inner inner ) {
118+ o .subscribe (new Subscriber <T >(subscriber ) {
128119
129- @ Override
130- public void onCompleted () {
131- subscriber .onCompleted ();
132- }
120+ @ Override
121+ public void onCompleted () {
122+ subscriber .onCompleted ();
123+ }
133124
134- @ Override
135- public void onError (Throwable e ) {
136- subscriber .onError (e );
137- }
125+ @ Override
126+ public void onError (Throwable e ) {
127+ subscriber .onError (e );
128+ }
138129
139- @ Override
140- public void onNext (T t ) {
141- subscriber .onNext (t );
142- }
143- });
144- }
145- });
130+ @ Override
131+ public void onNext (T t ) {
132+ subscriber .onNext (t );
133+ }
134+ });
135+ }
136+ });
137+ }
146138 }
147139
148140 };
0 commit comments