Skip to content

Commit 605dc13

Browse files
author
Kaushik Gopal
committed
feat: update disk + network cache example
inspiration from JW's twitter comment - https://twitter.com/JakeWharton/status/786363146990649345
1 parent 614c8b1 commit 605dc13

7 files changed

Lines changed: 458 additions & 11 deletions

File tree

README.md

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ I also gave a talk at a local meetup about warming up to RxJava here. Here's a l
99

1010
### Concurrency using schedulers
1111

12-
A common requirement is to offload lengthy heavy I/O intensive operationsacc to a background thread (non-UI thread) and feed the results back to the UI/main thread, on completion. This is a demo of how long-running operations can be offloaded to a background thread. After the operation is done, we resume back on the main thread. All using RxJava! Think of this as a replacement to AsyncTasks.
12+
A common requirement is to offload lengthy heavy I/O intensive operations to a background thread (non-UI thread) and feed the results back to the UI/main thread, on completion. This is a demo of how long-running operations can be offloaded to a background thread. After the operation is done, we resume back on the main thread. All using RxJava! Think of this as a replacement to AsyncTasks.
1313

1414
The long operation is simulated by a blocking Thread.sleep call (since this is done in a background thread, our UI is never interrupted).
1515

@@ -101,21 +101,30 @@ Note that the `Func3` function that checks for validity, kicks in only after ALL
101101

102102
The value of this technique becomes more apparent when you have more number of input fields in a form. Handling it otherwise with a bunch of booleans makes the code cluttered and kind of difficult to follow. But using `.combineLatest` all that logic is concentrated in a nice compact block of code (I still use booleans but that was to make the example more readable).
103103

104-
### Retrieve data first from a cache, then a network call - using [`.concat`](http://reactivex.io/documentation/operators/concat.html)
104+
### Retrieve data first from a cache, then a network call
105105

106-
Using concat, you can retrieve information from an observable first (presumably this one is fast like retrieving from a disk cache) and show preliminary data to a user. Subsequently, when the longer running 2nd observable is complete (say a network call), you can update the results on the interface using the latest information.
106+
We have two source (Observables): a disk (fast) cache and a network (fresh) call. The disk cache is much faster than the network Observable. But in order to demonstrate the working, I've also used a "slower" fake disk cache in order to demonstrate the working of the operators.
107107

108-
For the purposes of illustration i use an in-memory `List` (not an actual disk cache), then shoot out a real network call to the github api so it gives you a feel of how this can really be applied in production apps.
108+
This is demonstrated using 4 techniques:
109109

110-
Note the use of `concatEager` here over the traditional `concat` operator. Both show the results from the `Observables` in a sequential manner (so disk first and then network). The `concat` operator however would not even begin the subscription on subsequent Observables unless the first one is complete whereas `concatEager` kicks off all Observables at the time of Subscription in parallel, but still preserves order.
110+
1. [`.concat`](http://reactivex.io/documentation/operators/concat.html)
111+
2. [`.concatEager`](http://reactivex.io/RxJava/javadoc/rx/Observable.html#concatEager(java.lang.Iterable))
112+
3. [`.merge`](http://reactivex.io/documentation/operators/merge.html)
113+
4. [`.publish`](http://reactivex.io/RxJava/javadoc/rx/Observable.html#publish(rx.functions.Func1)) selector + merge + takeUntil
111114

112-
**Update:**
115+
The 4th technique [courtesy Jedi JW](https://twitter.com/JakeWharton/status/786363146990649345) is probably what you want to use. But in order to understand why it's interested to go through the progression of techniques.
113116

114-
After a [conversation I had with @artem_zin](https://twitter.com/kaushikgopal/status/591271805211451392), we arrived at an alternative solution to the same problem. One that used the [`.merge`](http://reactivex.io/documentation/operators/merge.html) operator instead.
117+
`concat` is great. It retrieves information from the first Observable (disk cache in our case) and then the subsequent network Observable. Since the disk cache is presumably faster, all appears well and the disk cache is loaded up fast, and once the network call finishes we swap out the "fresh" results.
115118

116-
The `concat` (and the equivalent [`startWith`](http://reactivex.io/documentation/operators/startwith.html)) opeartor is strictly sequential, meaning all of the items emitted by the first Observable are emitted strictly before any of the items from the second Observable are emitted. So assuming the first observable (for some strange reason) takes really long to run through all its items, even if the first few items from the second observable have come down the wire it will forcibly be queued.
119+
The problem with `concat` is that the subsequent observable doesn't even start until the first Observable completes. That can be a problem. We want all observables to start simultaneously but produce the results in a way we expect. Thankfully RxJava introduced `concatEager` which does exactly that. It starts both observables but buffers the result from the latter one until the former Observable finishes. This is a completely viable option.
117120

118-
The `merge` operator on the other hand interleaves items as they are emitted. The problem here though is if for some strange reason an item is emitted by the cache or slower observable *after* the newer/fresher observable, it will overwrite the newer content. To account for this you have to monitor the "resultAge" somehow. This is demonstrated in the updated solution `PseudoCacheMergeFragment`.
121+
Sometimes though, you just want to start showing the results immediately. Assuming the first observable (for some strange reason) takes really long to run through all its items, even if the first few items from the second observable have come down the wire it will forcibly be queued. You don't necessarily want to "wait" on any Observable. In these situations, we could use the `merge` operator. It interleaves items as they are emitted. This works great and starts to spit out the results as soon as they're shown.
122+
123+
Similar to the `concat` operator, if your first Observable is always faster than the second Observable you won't run into any problems. However the problem with `merge` is: if for some strange reason an item is emitted by the cache or slower observable *after* the newer/fresher observable, it will overwrite the newer content. Click the "MERGE (SLOWER DISK)" button in the example to see this problem in action. @JakeWharton and @swankjesse contributions go to 0! In the real world this could be bad, as it would mean the fresh data would get overridden by stale disk data.
124+
125+
To solve this problem you can use merge in combination with the super nifty `publish` operator which takes in a "selector". I wrote about this usage in a [blog post](http://blog.kaush.co/2015/01/21/rxjava-tip-for-the-day-share-publish-refcount-and-all-that-jazz/) but I have [Jedi JW](https://twitter.com/JakeWharton/status/786363146990649345) to thank for reminding of this technique. We `publish` the network observable and provide it a selector which starts emitting from the disk cache, up until the point that the network observable starts emitting. Once the network observable starts emitting, it ignores all results from the disk observable. This is perfect and handles any problems we might have.
126+
127+
Previously, I was using the `merge` operator but overcoming the problem of results being overwritten by monitoring the "resultAge". See the old `PseudoCacheMergeFragment` example if you're curious to see this.
119128

120129
### Simple Timing demos using timer/interval/delay
121130

app/src/main/java/com/morihacky/android/rxjava/fragments/MainFragment.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,7 @@ void formValidation() {
7474

7575
@OnClick(R.id.btn_demo_pseudo_cache)
7676
void pseudoCacheDemo() {
77-
//clickedOn(new PseudoCacheConcatFragment());
78-
clickedOn(new PseudoCacheMergeFragment());
77+
clickedOn(new PseudoCacheFragment());
7978
}
8079

8180
@OnClick(R.id.btn_demo_timing)
Lines changed: 301 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,301 @@
1+
package com.morihacky.android.rxjava.fragments;
2+
3+
import android.os.Bundle;
4+
import android.os.Handler;
5+
import android.os.Looper;
6+
import android.support.annotation.Nullable;
7+
import android.view.LayoutInflater;
8+
import android.view.View;
9+
import android.view.ViewGroup;
10+
import android.widget.ArrayAdapter;
11+
import android.widget.ListView;
12+
import android.widget.TextView;
13+
import butterknife.Bind;
14+
import butterknife.ButterKnife;
15+
import butterknife.OnClick;
16+
import com.morihacky.android.rxjava.R;
17+
import com.morihacky.android.rxjava.retrofit.Contributor;
18+
import com.morihacky.android.rxjava.retrofit.GithubApi;
19+
import com.morihacky.android.rxjava.retrofit.GithubService;
20+
import java.util.ArrayList;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.TimeUnit;
25+
import rx.Observable;
26+
import rx.Subscriber;
27+
import rx.android.schedulers.AndroidSchedulers;
28+
import rx.schedulers.Schedulers;
29+
import timber.log.Timber;
30+
31+
public class PseudoCacheFragment
32+
extends BaseFragment {
33+
34+
@Bind(R.id.info_pseudoCache_demo) TextView infoText;
35+
@Bind(R.id.info_pseudoCache_listSubscription) ListView listSubscriptionInfo;
36+
@Bind(R.id.info_pseudoCache_listDtl) ListView listDetail;
37+
38+
private ArrayAdapter<String> adapterDetail, adapterSubscriptionInfo;
39+
private HashMap<String, Long> contributionMap = null;
40+
41+
@Override
42+
public View onCreateView(LayoutInflater inflater,
43+
@Nullable ViewGroup container,
44+
@Nullable Bundle savedInstanceState) {
45+
View layout = inflater.inflate(R.layout.fragment_pseudo_cache, container, false);
46+
ButterKnife.bind(this, layout);
47+
return layout;
48+
}
49+
50+
@Override
51+
public void onDestroyView() {
52+
super.onDestroyView();
53+
ButterKnife.unbind(this);
54+
}
55+
56+
@OnClick(R.id.btn_pseudoCache_concat)
57+
public void onConcatBtnClicked() {
58+
infoText.setText(R.string.msg_pseudoCache_demoInfo_concat);
59+
wireupDemo();
60+
61+
Observable.concat(getSlowCachedDiskData(), getFreshNetworkData())
62+
.subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
63+
.observeOn(AndroidSchedulers.mainThread())
64+
.subscribe(new Subscriber<Contributor>() {
65+
@Override
66+
public void onCompleted() {
67+
Timber.d("done loading all data");
68+
}
69+
70+
@Override
71+
public void onError(Throwable e) {
72+
Timber.e(e, "arr something went wrong");
73+
}
74+
75+
@Override
76+
public void onNext(Contributor contributor) {
77+
contributionMap.put(contributor.login, contributor.contributions);
78+
adapterDetail.clear();
79+
adapterDetail.addAll(mapAsList(contributionMap));
80+
}
81+
});
82+
}
83+
84+
@OnClick(R.id.btn_pseudoCache_concatEager)
85+
public void onConcatEagerBtnClicked() {
86+
infoText.setText(R.string.msg_pseudoCache_demoInfo_concatEager);
87+
wireupDemo();
88+
89+
Observable.concatEager(getSlowCachedDiskData(), getFreshNetworkData())
90+
.subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
91+
.observeOn(AndroidSchedulers.mainThread())
92+
.subscribe(new Subscriber<Contributor>() {
93+
@Override
94+
public void onCompleted() {
95+
Timber.d("done loading all data");
96+
}
97+
98+
@Override
99+
public void onError(Throwable e) {
100+
Timber.e(e, "arr something went wrong");
101+
}
102+
103+
@Override
104+
public void onNext(Contributor contributor) {
105+
contributionMap.put(contributor.login, contributor.contributions);
106+
adapterDetail.clear();
107+
adapterDetail.addAll(mapAsList(contributionMap));
108+
}
109+
});
110+
111+
}
112+
113+
@OnClick(R.id.btn_pseudoCache_merge)
114+
public void onMergeBtnClicked() {
115+
infoText.setText(R.string.msg_pseudoCache_demoInfo_merge);
116+
wireupDemo();
117+
118+
Observable.merge(getCachedDiskData(), getFreshNetworkData())
119+
.subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
120+
.observeOn(AndroidSchedulers.mainThread())
121+
.subscribe(new Subscriber<Contributor>() {
122+
@Override
123+
public void onCompleted() {
124+
Timber.d("done loading all data");
125+
}
126+
127+
@Override
128+
public void onError(Throwable e) {
129+
Timber.e(e, "arr something went wrong");
130+
}
131+
132+
@Override
133+
public void onNext(Contributor contributor) {
134+
contributionMap.put(contributor.login, contributor.contributions);
135+
adapterDetail.clear();
136+
adapterDetail.addAll(mapAsList(contributionMap));
137+
}
138+
});
139+
}
140+
141+
@OnClick(R.id.btn_pseudoCache_mergeSlowDisk)
142+
public void onMergeSlowBtnClicked() {
143+
infoText.setText(R.string.msg_pseudoCache_demoInfo_mergeSlowDisk);
144+
wireupDemo();
145+
146+
Observable.merge(getSlowCachedDiskData(), getFreshNetworkData())
147+
.subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
148+
.observeOn(AndroidSchedulers.mainThread())
149+
.subscribe(new Subscriber<Contributor>() {
150+
@Override
151+
public void onCompleted() {
152+
Timber.d("done loading all data");
153+
}
154+
155+
@Override
156+
public void onError(Throwable e) {
157+
Timber.e(e, "arr something went wrong");
158+
}
159+
160+
@Override
161+
public void onNext(Contributor contributor) {
162+
contributionMap.put(contributor.login, contributor.contributions);
163+
adapterDetail.clear();
164+
adapterDetail.addAll(mapAsList(contributionMap));
165+
}
166+
});
167+
}
168+
169+
@OnClick(R.id.btn_pseudoCache_mergeOptimized)
170+
public void onMergeOptimizedBtnClicked() {
171+
infoText.setText(R.string.msg_pseudoCache_demoInfo_mergeOptimized);
172+
wireupDemo();
173+
174+
getFreshNetworkData()//
175+
.publish(network ->//
176+
Observable.merge(network,//
177+
getCachedDiskData().takeUntil(network)))
178+
.subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
179+
.observeOn(AndroidSchedulers.mainThread())
180+
.subscribe(new Subscriber<Contributor>() {
181+
@Override
182+
public void onCompleted() {
183+
Timber.d("done loading all data");
184+
}
185+
186+
@Override
187+
public void onError(Throwable e) {
188+
Timber.e(e, "arr something went wrong");
189+
}
190+
191+
@Override
192+
public void onNext(Contributor contributor) {
193+
contributionMap.put(contributor.login, contributor.contributions);
194+
adapterDetail.clear();
195+
adapterDetail.addAll(mapAsList(contributionMap));
196+
}
197+
});
198+
}
199+
200+
@OnClick(R.id.btn_pseudoCache_mergeOptimizedSlowDisk)
201+
public void onMergeOptimizedWithSlowDiskBtnClicked() {
202+
infoText.setText(R.string.msg_pseudoCache_demoInfo_mergeOptimizedSlowDisk);
203+
wireupDemo();
204+
205+
getFreshNetworkData()//
206+
.publish(network ->//
207+
Observable.merge(network,//
208+
getSlowCachedDiskData().takeUntil(network)))
209+
.subscribeOn(Schedulers.io()) // we want to add a list item at time of subscription
210+
.observeOn(AndroidSchedulers.mainThread())
211+
.subscribe(new Subscriber<Contributor>() {
212+
@Override
213+
public void onCompleted() {
214+
Timber.d("done loading all data");
215+
}
216+
217+
@Override
218+
public void onError(Throwable e) {
219+
Timber.e(e, "arr something went wrong");
220+
}
221+
222+
@Override
223+
public void onNext(Contributor contributor) {
224+
contributionMap.put(contributor.login, contributor.contributions);
225+
adapterDetail.clear();
226+
adapterDetail.addAll(mapAsList(contributionMap));
227+
}
228+
});
229+
}
230+
231+
// -----------------------------------------------------------------------------------
232+
// WIRING for example
233+
234+
private void wireupDemo() {
235+
contributionMap = new HashMap<>();
236+
237+
adapterDetail = new ArrayAdapter<>(getActivity(), R.layout.item_log_white, R.id.item_log, new ArrayList<>());
238+
listDetail.setAdapter(adapterDetail);
239+
240+
adapterSubscriptionInfo = new ArrayAdapter<>(getActivity(),
241+
R.layout.item_log_white,
242+
R.id.item_log,
243+
new ArrayList<>());
244+
listSubscriptionInfo.setAdapter(adapterSubscriptionInfo);
245+
}
246+
247+
private Observable<Contributor> getSlowCachedDiskData() {
248+
return Observable.timer(1, TimeUnit.SECONDS).flatMap(dummy -> getCachedDiskData());
249+
}
250+
251+
private Observable<Contributor> getCachedDiskData() {
252+
List<Contributor> list = new ArrayList<>();
253+
Map<String, Long> map = dummyDiskData();
254+
255+
for (String username : map.keySet()) {
256+
Contributor c = new Contributor();
257+
c.login = username;
258+
c.contributions = map.get(username);
259+
list.add(c);
260+
}
261+
262+
return Observable.from(list)//
263+
.doOnSubscribe(() -> new Handler(Looper.getMainLooper())//
264+
.post(() -> adapterSubscriptionInfo.add("(disk) cache subscribed")))//
265+
.doOnCompleted(() -> new Handler(Looper.getMainLooper())//
266+
.post(() -> adapterSubscriptionInfo.add("(disk) cache completed")));
267+
}
268+
269+
private Observable<Contributor> getFreshNetworkData() {
270+
String githubToken = getResources().getString(R.string.github_oauth_token);
271+
GithubApi githubService = GithubService.createGithubService(githubToken);
272+
273+
return githubService.contributors("square", "retrofit")
274+
.flatMap(Observable::from)
275+
.doOnSubscribe(() -> new Handler(Looper.getMainLooper())//
276+
.post(() -> adapterSubscriptionInfo.add("(network) subscribed")))//
277+
.doOnCompleted(() -> new Handler(Looper.getMainLooper())//
278+
.post(() -> adapterSubscriptionInfo.add("(network) completed")));
279+
}
280+
281+
private List<String> mapAsList(HashMap<String, Long> map) {
282+
List<String> list = new ArrayList<>();
283+
284+
for (String username : map.keySet()) {
285+
String rowLog = String.format("%s [%d]", username, contributionMap.get(username));
286+
list.add(rowLog);
287+
}
288+
289+
return list;
290+
}
291+
292+
private Map<String, Long> dummyDiskData() {
293+
Map<String, Long> map = new HashMap<>();
294+
map.put("JakeWharton", 0L);
295+
map.put("pforhan", 0L);
296+
map.put("edenman", 0L);
297+
map.put("swankjesse", 0L);
298+
map.put("bruceLee", 0L);
299+
return map;
300+
}
301+
}

0 commit comments

Comments
 (0)