Skip to content

Commit 74ece66

Browse files
author
Kaushik Gopal
committed
feat: port example Simple and Advanced polling (using interval and repeatWhen)
1 parent ca56b5a commit 74ece66

1 file changed

Lines changed: 52 additions & 57 deletions

File tree

app/src/main/java/com/morihacky/android/rxjava/fragments/PollingFragment.java

Lines changed: 52 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,19 @@
1010
import android.view.ViewGroup;
1111
import android.widget.ArrayAdapter;
1212
import android.widget.ListView;
13-
13+
import butterknife.Bind;
14+
import butterknife.ButterKnife;
15+
import butterknife.OnClick;
1416
import com.morihacky.android.rxjava.R;
15-
17+
import io.reactivex.Flowable;
18+
import io.reactivex.disposables.CompositeDisposable;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.functions.Function;
1621
import java.util.ArrayList;
1722
import java.util.List;
1823
import java.util.Locale;
1924
import java.util.concurrent.TimeUnit;
20-
21-
import butterknife.Bind;
22-
import butterknife.ButterKnife;
23-
import butterknife.OnClick;
24-
import rx.Observable;
25-
import rx.functions.Func1;
26-
import rx.subscriptions.CompositeSubscription;
25+
import org.reactivestreams.Publisher;
2726
import timber.log.Timber;
2827

2928
public class PollingFragment
@@ -36,23 +35,15 @@ public class PollingFragment
3635
@Bind(R.id.list_threading_log) ListView _logsList;
3736

3837
private LogAdapter _adapter;
39-
private List<String> _logs;
40-
41-
private CompositeSubscription _subscriptions;
4238
private int _counter = 0;
43-
39+
private CompositeDisposable _disposables;
40+
private List<String> _logs;
4441

4542
@Override
4643
public void onCreate(@Nullable Bundle savedInstanceState) {
4744
super.onCreate(savedInstanceState);
4845

49-
_subscriptions = new CompositeSubscription();
50-
}
51-
52-
@Override
53-
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
54-
super.onActivityCreated(savedInstanceState);
55-
_setupLogger();
46+
_disposables = new CompositeDisposable();
5647
}
5748

5849
@Override
@@ -64,10 +55,16 @@ public View onCreateView(LayoutInflater inflater,
6455
return layout;
6556
}
6657

58+
@Override
59+
public void onActivityCreated(@Nullable Bundle savedInstanceState) {
60+
super.onActivityCreated(savedInstanceState);
61+
_setupLogger();
62+
}
63+
6764
@Override
6865
public void onDestroy() {
6966
super.onDestroy();
70-
_subscriptions.unsubscribe();
67+
_disposables.clear();
7168
ButterKnife.unbind(this);
7269
}
7370

@@ -76,17 +73,21 @@ public void onStartSimplePollingClicked() {
7673

7774
final int pollCount = POLL_COUNT;
7875

79-
_subscriptions.add(//
80-
Observable.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
81-
.map(this::_doNetworkCallAndGetStringResult)//
82-
.take(pollCount)
83-
.doOnSubscribe(() ->
84-
_log(String.format("Start simple polling - %s", _counter)))
85-
.subscribe(taskName -> {
86-
_log(String.format(Locale.US, "Executing polled task [%s] now time : [xx:%02d]",
87-
taskName, _getSecondHand()));
88-
})
89-
);
76+
Disposable d = Flowable
77+
.interval(INITIAL_DELAY, POLLING_INTERVAL, TimeUnit.MILLISECONDS)
78+
.map(this::_doNetworkCallAndGetStringResult)
79+
.take(pollCount)
80+
.doOnSubscribe(subscription -> {
81+
_log(String.format("Start simple polling - %s", _counter));
82+
})
83+
.subscribe(taskName -> {
84+
_log(String.format(Locale.US,
85+
"Executing polled task [%s] now time : [xx:%02d]",
86+
taskName,
87+
_getSecondHand()));
88+
});
89+
90+
_disposables.add(d);
9091
}
9192

9293
@OnClick(R.id.btn_start_increasingly_delayed_polling)
@@ -96,19 +97,15 @@ public void onStartIncreasinglyDelayedPolling() {
9697
final int pollingInterval = POLLING_INTERVAL;
9798
final int pollCount = POLL_COUNT;
9899

99-
_log(String.format(Locale.US, "Start increasingly delayed polling now time: [xx:%02d]",
100-
_getSecondHand()));
101-
102-
_subscriptions.add(//
103-
Observable.just(1)
104-
.repeatWhen(new RepeatWithDelay(pollCount, pollingInterval))
105-
.subscribe(o -> {
106-
_log(String.format(Locale.US, "Executing polled task now time : [xx:%02d]",
107-
_getSecondHand()));
108-
}, e -> {
109-
Timber.d(e, "arrrr. Error");
110-
})
111-
);
100+
_log(String.format(Locale.US, "Start increasingly delayed polling now time: [xx:%02d]", _getSecondHand()));
101+
102+
_disposables.add(Flowable
103+
.just(1L)
104+
.repeatWhen(new RepeatWithDelay(pollCount, pollingInterval))
105+
.subscribe(o -> _log(String.format(Locale.US,
106+
"Executing polled task now time : [xx:%02d]",
107+
_getSecondHand())),
108+
e -> Timber.d(e, "arrrr. Error")));
112109
}
113110

114111
// -----------------------------------------------------------------------------------
@@ -127,7 +124,8 @@ private String _doNetworkCallAndGetStringResult(long attempt) {
127124
// randomly make one event super long so we test that the repeat logic waits
128125
// and accounts for this.
129126
Thread.sleep(9000);
130-
} else {
127+
}
128+
else {
131129
Thread.sleep(3000);
132130
}
133131

@@ -153,7 +151,8 @@ private void _log(String logMsg) {
153151
_logs.add(0, logMsg + " (main thread) ");
154152
_adapter.clear();
155153
_adapter.addAll(_logs);
156-
} else {
154+
}
155+
else {
157156
_logs.add(0, logMsg + " (NOT main thread) ");
158157

159158
// You can only do below stuff on main thread.
@@ -177,7 +176,7 @@ private boolean _isCurrentlyOnMainThread() {
177176

178177
//public static class RepeatWithDelay
179178
public class RepeatWithDelay
180-
implements Func1<Observable<? extends Void>, Observable<?>> {
179+
implements Function<Flowable<Object>, Publisher<Long>> {
181180

182181
private final int _repeatLimit;
183182
private final int _pollingInterval;
@@ -193,28 +192,24 @@ public class RepeatWithDelay
193192
// only onNext triggers a re-subscription
194193

195194
@Override
196-
public Observable<?> call(Observable<? extends Void> inputObservable) {
197-
195+
public Publisher<Long> apply(Flowable<Object> inputFlowable) throws Exception {
198196
// it is critical to use inputObservable in the chain for the result
199197
// ignoring it and doing your own thing will break the sequence
200198

201-
return inputObservable.flatMap(new Func1<Void, Observable<?>>() {
199+
return inputFlowable.flatMap(new Function<Object, Publisher<Long>>() {
202200
@Override
203-
public Observable<?> call(Void blah) {
204-
205-
201+
public Publisher<Long> apply(Object o) throws Exception {
206202
if (_repeatCount >= _repeatLimit) {
207203
// terminate the sequence cause we reached the limit
208204
_log("Completing sequence");
209-
return Observable.empty();
205+
return Flowable.empty();
210206
}
211207

212208
// since we don't get an input
213209
// we store state in this handler to tell us the point of time we're firing
214210
_repeatCount++;
215211

216-
return Observable.timer(_repeatCount * _pollingInterval,
217-
TimeUnit.MILLISECONDS);
212+
return Flowable.timer(_repeatCount * _pollingInterval, TimeUnit.MILLISECONDS);
218213
}
219214
});
220215
}

0 commit comments

Comments
 (0)