77import android .view .View ;
88import android .view .ViewGroup ;
99import android .widget .ListView ;
10-
10+ import butterknife .Bind ;
11+ import butterknife .ButterKnife ;
12+ import butterknife .OnClick ;
1113import com .morihacky .android .rxjava .R ;
1214import com .morihacky .android .rxjava .wiring .LogAdapter ;
13-
15+ import hu .akarnokd .rxjava .interop .RxJavaInterop ;
16+ import io .reactivex .Flowable ;
17+ import io .reactivex .disposables .CompositeDisposable ;
18+ import io .reactivex .functions .Function ;
19+ import io .reactivex .subscribers .DisposableSubscriber ;
1420import java .util .ArrayList ;
1521import java .util .List ;
1622import java .util .concurrent .TimeUnit ;
17-
18- import butterknife .Bind ;
19- import butterknife .ButterKnife ;
20- import butterknife .OnClick ;
21- import rx .Observable ;
22- import rx .Observer ;
23- import rx .Subscriber ;
24- import rx .functions .Func1 ;
23+ import org .reactivestreams .Publisher ;
2524import rx .observables .MathObservable ;
26- import rx .subscriptions .CompositeSubscription ;
2725import timber .log .Timber ;
2826
27+
2928import static android .os .Looper .getMainLooper ;
3029
3130public class ExponentialBackoffFragment
3231 extends BaseFragment {
3332
3433 @ Bind (R .id .list_threading_log ) ListView _logList ;
3534 private LogAdapter _adapter ;
35+ private CompositeDisposable _disposables = new CompositeDisposable ();
3636 private List <String > _logs ;
3737
38- private CompositeSubscription _subscriptions = new CompositeSubscription ();
39-
40- @ Override
41- public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
42- super .onActivityCreated (savedInstanceState );
43- _setupLogger ();
44- }
45-
4638 @ Override
4739 public View onCreateView (LayoutInflater inflater ,
4840 @ Nullable ViewGroup container ,
@@ -52,11 +44,17 @@ public View onCreateView(LayoutInflater inflater,
5244 return layout ;
5345 }
5446
47+ @ Override
48+ public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
49+ super .onActivityCreated (savedInstanceState );
50+ _setupLogger ();
51+ }
52+
5553 @ Override
5654 public void onPause () {
5755 super .onPause ();
5856
59- _subscriptions .clear ();
57+ _disposables .clear ();
6058 }
6159
6260 @ Override
@@ -72,27 +70,30 @@ public void startRetryingWithExponentialBackoffStrategy() {
7270 _logs = new ArrayList <>();
7371 _adapter .clear ();
7472
75- _subscriptions .add (//
76- Observable //
77- .error (new RuntimeException ("testing" )) // always fails
78- .retryWhen (new RetryWithDelay (5 , 1000 )) // notice this is called only onError (onNext values sent are ignored)
79- .doOnSubscribe (() -> _log ("Attempting the impossible 5 times in intervals of 1s" ))//
80- .subscribe (new Observer <Object >() {
81- @ Override
82- public void onCompleted () {
83- Timber .d ("on Completed" );
84- }
85-
86- @ Override
87- public void onError (Throwable e ) {
88- _log ("Error: I give up!" );
89- }
90-
91- @ Override
92- public void onNext (Object aVoid ) {
93- Timber .d ("on Next" );
94- }
95- }));
73+ DisposableSubscriber <Object > disposableSubscriber = new DisposableSubscriber <Object >() {
74+ @ Override
75+ public void onNext (Object aVoid ) {
76+ Timber .d ("on Next" );
77+ }
78+
79+ @ Override
80+ public void onComplete () {
81+ Timber .d ("on Completed" );
82+ }
83+
84+ @ Override
85+ public void onError (Throwable e ) {
86+ _log ("Error: I give up!" );
87+ }
88+ };
89+
90+ Flowable .error (new RuntimeException ("testing" )) // always fails
91+ .retryWhen (new RetryWithDelay (5 , 1000 )) // notice this is called only onError (onNext
92+ // values sent are ignored)
93+ .doOnSubscribe (subscription -> _log ("Attempting the impossible 5 times in intervals of 1s" ))
94+ .subscribe (disposableSubscriber );
95+
96+ _disposables .add (disposableSubscriber );
9697 }
9798
9899 @ OnClick (R .id .btn_eb_delay )
@@ -101,41 +102,41 @@ public void startExecutingWithExponentialBackoffDelay() {
101102 _logs = new ArrayList <>();
102103 _adapter .clear ();
103104
104- _subscriptions . add ( //
105-
106- Observable . range ( 1 , 4 ) //
107- . delay ( integer -> {
108- // Rx-y way of doing the Fibonnaci :P
109- return MathObservable //
110- . sumInteger ( Observable . range ( 1 , integer ))
111- . flatMap ( targetSecondDelay -> Observable . just ( integer )
112- . delay ( targetSecondDelay , TimeUnit . SECONDS ));
113- }) //
114- . doOnSubscribe (() ->
115- _log ( String . format ( "Execute 4 tasks with delay - time now: [xx:%02d]" ,
116- _getSecondHand ()))) //
117- . subscribe ( new Subscriber < Integer >() {
118- @ Override
119- public void onCompleted () {
120- Timber . d ( "onCompleted " );
121- _log ( "Completed" );
122- }
123-
124- @ Override
125- public void onError ( Throwable e ) {
126- Timber . d ( e , "arrrr. Error" );
127- _log ( "Error" );
128- }
129-
130- @ Override
131- public void onNext ( Integer integer ) {
132- Timber . d ( "executing Task %d [xx:%02d]" , integer , _getSecondHand ( ));
133- _log ( String . format ( "executing Task %d [xx:%02d]" ,
134- integer ,
135- _getSecondHand ()));
136-
137- }
138- }) );
105+ DisposableSubscriber < Integer > disposableSubscriber = new DisposableSubscriber < Integer >() {
106+ @ Override
107+ public void onNext ( Integer integer ) {
108+ Timber . d ( "executing Task %d [xx:%02d]" , integer , _getSecondHand ());
109+ _log ( String . format ( "executing Task %d [xx:%02d]" , integer , _getSecondHand ()));
110+ }
111+
112+ @ Override
113+ public void onError ( Throwable e ) {
114+ Timber . d ( e , "arrrr. Error" );
115+ _log ( "Error" );
116+ }
117+
118+ @ Override
119+ public void onComplete () {
120+ Timber . d ( "onCompleted" );
121+ _log ( "Completed " );
122+ }
123+ };
124+
125+ Flowable
126+ . range ( 1 , 4 )
127+ . delay ( integer -> {
128+ // Rx-y way of doing the Fibonnaci :P
129+ return RxJavaInterop
130+ . toV2Flowable ( MathObservable . sumInteger ( rx . Observable . range ( 1 , integer )))
131+ . flatMap ( targetSecondDelay -> Flowable
132+ . just ( integer )
133+ . delay ( targetSecondDelay , TimeUnit . SECONDS ));
134+ })
135+ . doOnSubscribe ( s -> _log ( String . format ( "Execute 4 tasks with delay - time now: [xx:%02d]" ,
136+ _getSecondHand ())))
137+ . subscribe ( disposableSubscriber );
138+
139+ _disposables . add ( disposableSubscriber );
139140 }
140141
141142 // -----------------------------------------------------------------------------------
@@ -176,7 +177,7 @@ private void _log(String logMsg) {
176177
177178 //public static class RetryWithDelay
178179 public class RetryWithDelay
179- implements Func1 < Observable <? extends Throwable >, Observable <?>> {
180+ implements Function < Flowable <? extends Throwable >, Publisher <?>> {
180181
181182 private final int _maxRetries ;
182183 private final int _retryDelayMillis ;
@@ -193,14 +194,14 @@ public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
193194 // only onNext triggers a re-subscription (onError + onComplete kills it)
194195
195196 @ Override
196- public Observable <?> call ( Observable <? extends Throwable > inputObservable ) {
197+ public Publisher <?> apply ( Flowable <? extends Throwable > inputObservable ) {
197198
198199 // it is critical to use inputObservable in the chain for the result
199200 // ignoring it and doing your own thing will break the sequence
200201
201- return inputObservable .flatMap (new Func1 <Throwable , Observable <?>>() {
202+ return inputObservable .flatMap (new Function <Throwable , Publisher <?>>() {
202203 @ Override
203- public Observable <?> call (Throwable throwable ) {
204+ public Publisher <?> apply (Throwable throwable ) {
204205 if (++_retryCount < _maxRetries ) {
205206
206207 // When this Observable calls onNext, the original
@@ -209,15 +210,14 @@ public Observable<?> call(Throwable throwable) {
209210 Timber .d ("Retrying in %d ms" , _retryCount * _retryDelayMillis );
210211 _log (String .format ("Retrying in %d ms" , _retryCount * _retryDelayMillis ));
211212
212- return Observable .timer (_retryCount * _retryDelayMillis ,
213- TimeUnit .MILLISECONDS );
213+ return Flowable .timer (_retryCount * _retryDelayMillis , TimeUnit .MILLISECONDS );
214214 }
215215
216216 Timber .d ("Argh! i give up" );
217217
218218 // Max retries hit. Pass an error so the chain is forcibly completed
219219 // only onNext triggers a re-subscription (onError + onComplete kills it)
220- return Observable .error (throwable );
220+ return Flowable .error (throwable );
221221 }
222222 });
223223 }
0 commit comments