1010import android .view .ViewGroup ;
1111import android .widget .ArrayAdapter ;
1212import android .widget .ListView ;
13-
13+ import butterknife .Bind ;
14+ import butterknife .ButterKnife ;
15+ import butterknife .OnClick ;
1416import com .morihacky .android .rxjava .R ;
15-
17+ import io .reactivex .Flowable ;
18+ import io .reactivex .disposables .CompositeDisposable ;
19+ import io .reactivex .disposables .Disposable ;
20+ import io .reactivex .functions .Function ;
1621import java .util .ArrayList ;
1722import java .util .List ;
1823import java .util .Locale ;
1924import java .util .concurrent .TimeUnit ;
20-
21- import butterknife .Bind ;
22- import butterknife .ButterKnife ;
23- import butterknife .OnClick ;
24- import rx .Observable ;
25- import rx .functions .Func1 ;
26- import rx .subscriptions .CompositeSubscription ;
25+ import org .reactivestreams .Publisher ;
2726import timber .log .Timber ;
2827
2928public class PollingFragment
@@ -36,23 +35,15 @@ public class PollingFragment
3635 @ Bind (R .id .list_threading_log ) ListView _logsList ;
3736
3837 private LogAdapter _adapter ;
39- private List <String > _logs ;
40-
41- private CompositeSubscription _subscriptions ;
4238 private int _counter = 0 ;
43-
39+ private CompositeDisposable _disposables ;
40+ private List <String > _logs ;
4441
4542 @ Override
4643 public void onCreate (@ Nullable Bundle savedInstanceState ) {
4744 super .onCreate (savedInstanceState );
4845
49- _subscriptions = new CompositeSubscription ();
50- }
51-
52- @ Override
53- public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
54- super .onActivityCreated (savedInstanceState );
55- _setupLogger ();
46+ _disposables = new CompositeDisposable ();
5647 }
5748
5849 @ Override
@@ -64,10 +55,16 @@ public View onCreateView(LayoutInflater inflater,
6455 return layout ;
6556 }
6657
58+ @ Override
59+ public void onActivityCreated (@ Nullable Bundle savedInstanceState ) {
60+ super .onActivityCreated (savedInstanceState );
61+ _setupLogger ();
62+ }
63+
6764 @ Override
6865 public void onDestroy () {
6966 super .onDestroy ();
70- _subscriptions . unsubscribe ();
67+ _disposables . clear ();
7168 ButterKnife .unbind (this );
7269 }
7370
@@ -76,17 +73,21 @@ public void onStartSimplePollingClicked() {
7673
7774 final int pollCount = POLL_COUNT ;
7875
79- _subscriptions .add (//
80- Observable .interval (INITIAL_DELAY , POLLING_INTERVAL , TimeUnit .MILLISECONDS )
81- .map (this ::_doNetworkCallAndGetStringResult )//
82- .take (pollCount )
83- .doOnSubscribe (() ->
84- _log (String .format ("Start simple polling - %s" , _counter )))
85- .subscribe (taskName -> {
86- _log (String .format (Locale .US , "Executing polled task [%s] now time : [xx:%02d]" ,
87- taskName , _getSecondHand ()));
88- })
89- );
76+ Disposable d = Flowable
77+ .interval (INITIAL_DELAY , POLLING_INTERVAL , TimeUnit .MILLISECONDS )
78+ .map (this ::_doNetworkCallAndGetStringResult )
79+ .take (pollCount )
80+ .doOnSubscribe (subscription -> {
81+ _log (String .format ("Start simple polling - %s" , _counter ));
82+ })
83+ .subscribe (taskName -> {
84+ _log (String .format (Locale .US ,
85+ "Executing polled task [%s] now time : [xx:%02d]" ,
86+ taskName ,
87+ _getSecondHand ()));
88+ });
89+
90+ _disposables .add (d );
9091 }
9192
9293 @ OnClick (R .id .btn_start_increasingly_delayed_polling )
@@ -96,19 +97,15 @@ public void onStartIncreasinglyDelayedPolling() {
9697 final int pollingInterval = POLLING_INTERVAL ;
9798 final int pollCount = POLL_COUNT ;
9899
99- _log (String .format (Locale .US , "Start increasingly delayed polling now time: [xx:%02d]" ,
100- _getSecondHand ()));
101-
102- _subscriptions .add (//
103- Observable .just (1 )
104- .repeatWhen (new RepeatWithDelay (pollCount , pollingInterval ))
105- .subscribe (o -> {
106- _log (String .format (Locale .US , "Executing polled task now time : [xx:%02d]" ,
107- _getSecondHand ()));
108- }, e -> {
109- Timber .d (e , "arrrr. Error" );
110- })
111- );
100+ _log (String .format (Locale .US , "Start increasingly delayed polling now time: [xx:%02d]" , _getSecondHand ()));
101+
102+ _disposables .add (Flowable
103+ .just (1L )
104+ .repeatWhen (new RepeatWithDelay (pollCount , pollingInterval ))
105+ .subscribe (o -> _log (String .format (Locale .US ,
106+ "Executing polled task now time : [xx:%02d]" ,
107+ _getSecondHand ())),
108+ e -> Timber .d (e , "arrrr. Error" )));
112109 }
113110
114111 // -----------------------------------------------------------------------------------
@@ -127,7 +124,8 @@ private String _doNetworkCallAndGetStringResult(long attempt) {
127124 // randomly make one event super long so we test that the repeat logic waits
128125 // and accounts for this.
129126 Thread .sleep (9000 );
130- } else {
127+ }
128+ else {
131129 Thread .sleep (3000 );
132130 }
133131
@@ -153,7 +151,8 @@ private void _log(String logMsg) {
153151 _logs .add (0 , logMsg + " (main thread) " );
154152 _adapter .clear ();
155153 _adapter .addAll (_logs );
156- } else {
154+ }
155+ else {
157156 _logs .add (0 , logMsg + " (NOT main thread) " );
158157
159158 // You can only do below stuff on main thread.
@@ -177,7 +176,7 @@ private boolean _isCurrentlyOnMainThread() {
177176
178177 //public static class RepeatWithDelay
179178 public class RepeatWithDelay
180- implements Func1 < Observable <? extends Void >, Observable <? >> {
179+ implements Function < Flowable < Object >, Publisher < Long >> {
181180
182181 private final int _repeatLimit ;
183182 private final int _pollingInterval ;
@@ -193,28 +192,24 @@ public class RepeatWithDelay
193192 // only onNext triggers a re-subscription
194193
195194 @ Override
196- public Observable <?> call (Observable <? extends Void > inputObservable ) {
197-
195+ public Publisher <Long > apply (Flowable <Object > inputFlowable ) throws Exception {
198196 // it is critical to use inputObservable in the chain for the result
199197 // ignoring it and doing your own thing will break the sequence
200198
201- return inputObservable .flatMap (new Func1 < Void , Observable <? >>() {
199+ return inputFlowable .flatMap (new Function < Object , Publisher < Long >>() {
202200 @ Override
203- public Observable <?> call (Void blah ) {
204-
205-
201+ public Publisher <Long > apply (Object o ) throws Exception {
206202 if (_repeatCount >= _repeatLimit ) {
207203 // terminate the sequence cause we reached the limit
208204 _log ("Completing sequence" );
209- return Observable .empty ();
205+ return Flowable .empty ();
210206 }
211207
212208 // since we don't get an input
213209 // we store state in this handler to tell us the point of time we're firing
214210 _repeatCount ++;
215211
216- return Observable .timer (_repeatCount * _pollingInterval ,
217- TimeUnit .MILLISECONDS );
212+ return Flowable .timer (_repeatCount * _pollingInterval , TimeUnit .MILLISECONDS );
218213 }
219214 });
220215 }
0 commit comments