Skip to content

Commit eb6f668

Browse files
committed
simplify/make more flexible to adapt rx object via function adapters fix jooby-project#386
1 parent 5a7e108 commit eb6f668

4 files changed

Lines changed: 306 additions & 55 deletions

File tree

jooby-rxjava/src/main/java/org/jooby/rx/Rx.java

Lines changed: 134 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
*
7373
* <h2>usage</h2>
7474
* <pre>{@code
75-
*
7675
* ...
7776
* import org.jooby.rx.Rx;
7877
* ...
@@ -121,22 +120,25 @@
121120
* ...
122121
*
123122
* {
124-
* use(new Rx(o -> o.observeOn(Scheduler.io())));
123+
* use(new Rx()
124+
* .withObservable(observable -> observable.observeOn(Scheduler.io()),
125+
* .withSingle(single -> single.observeOn(Scheduler.io()),
126+
* .withCompletable(completable -> completable.observeOn(Scheduler.io()));
125127
*
126-
* get("/1", req -> Observable...);
128+
* get("/observable", req -> Observable...);
127129
*
128-
* get("/2", req -> Observable...);
130+
* get("/single", req -> Single...);
129131
*
130132
* ....
131133
*
132-
* get("/N", req -> Observable...);
134+
* get("/completable", req -> Completable...);
133135
*
134136
* }
135137
* }</pre>
136138
*
137139
* <p>
138-
* All the routes here will {@link Observable#observeOn(Scheduler) observeOn} the provided
139-
* {@link Scheduler}.
140+
* Here every Observable/Single/Completable from a route handler will observe on the <code>io</code>
141+
* scheduler.
140142
* </p>
141143
*
142144
* <h2>schedulers</h2>
@@ -208,31 +210,47 @@ public void onNext(final Object value) {
208210
/** The logging system. */
209211
private final Logger log = LoggerFactory.getLogger(getClass());
210212

211-
private final Function<Observable, Observable> adapter = Function.identity();
213+
private Function<Observable, Observable> observable = Function.identity();
214+
215+
private Function<Single, Single> single = Function.identity();
216+
217+
private Function<Completable, Completable> completable = Function.identity();
212218

213219
/**
214-
* Creates a new {@link Rx} module and allow further customization of the {@link Observable}
215-
* returned by routes, like:
216-
*
217-
* <pre>{@code
218-
* {
219-
* use(new Rx(o -> o.observeOn(Schedulers.io())));
220-
* }
221-
* }</pre>
222-
*
223-
* @param adapter Observable adapter.
220+
* Creates a new {@link Rx} module.
224221
*/
225-
public Rx(final Function<Observable, Observable> adapter) {
226-
super("rx.schedulers");
222+
public Rx() {
227223
// daemon by default.
228224
daemon(true);
229225
}
230226

231227
/**
232-
* Creates a new {@link Rx} module.
228+
* Map a rx object like {@link Observable}, {@link Single} or {@link Completable} into a
229+
* {@link Deferred} object.
230+
*
231+
* <pre>{@code
232+
* ...
233+
* import org.jooby.rx.Rx;
234+
* ...
235+
*
236+
* {
237+
* with(() -> {
238+
* get("/1", req -> Observable...);
239+
*
240+
* get("/2", req -> Single...);
241+
*
242+
* ....
243+
*
244+
* get("/N", req -> Completable...);
245+
*
246+
* }).map(Rx.rx());
247+
* }
248+
* }</pre>
249+
*
250+
* @return A new mapper.
233251
*/
234-
public Rx() {
235-
this(Function.identity());
252+
public static Route.Mapper<Object> rx() {
253+
return rx(Function.identity(), Function.identity());
236254
}
237255

238256
/**
@@ -256,14 +274,20 @@ public Rx() {
256274
*
257275
* get("/N", req -> Observable...);
258276
*
259-
* }).map(Rx.rx());
277+
* }).map(Rx.rx(
278+
* observable -> observable.observeOn(Scheduler.io()),
279+
* single -> single.observeOn(Scheduler.io()),
280+
* completable -> completable.observeOn(Scheduler.io())));
260281
* }
261282
* }</pre>
262283
*
284+
* @param observable Observable adapter.
285+
* @param single Single adapter.
263286
* @return A new mapper.
264287
*/
265-
public static Route.Mapper<Object> rx() {
266-
return rx(Function.identity());
288+
public static Route.Mapper<Object> rx(final Function<Observable, Observable> observable,
289+
final Function<Single, Single> single) {
290+
return rx(observable, single, Function.identity());
267291
}
268292

269293
/**
@@ -287,34 +311,103 @@ public static Route.Mapper<Object> rx() {
287311
*
288312
* get("/N", req -> Observable...);
289313
*
290-
* }).map(Rx.rx(o -> o.observeOn(Scheduler.io())));
314+
* }).map(Rx.rx(
315+
* observable -> observable.observeOn(Scheduler.io()),
316+
* single -> single.observeOn(Scheduler.io()),
317+
* completable -> completable.observeOn(Scheduler.io())));
291318
* }
292319
* }</pre>
293320
*
294-
* @param adapter Observable adapter.
321+
* @param observable Observable adapter.
322+
* @param single Single adapter.
323+
* @param completable Completable adapter.
295324
* @return A new mapper.
296325
*/
297326
@SuppressWarnings("unchecked")
298-
public static Route.Mapper<Object> rx(final Function<Observable, Observable> adapter) {
299-
requireNonNull(adapter, "Observable adapter is required.");
327+
public static Route.Mapper<Object> rx(final Function<Observable, Observable> observable,
328+
final Function<Single, Single> single, final Function<Completable, Completable> completable) {
329+
requireNonNull(observable, "Observable's adapter is required.");
330+
requireNonNull(single, "Single's adapter is required.");
331+
requireNonNull(completable, "Completable's adapter is required.");
300332

301-
return Route.Mapper.create("rx", v -> Match(observable(v)).of(
333+
return Route.Mapper.create("rx", v -> Match(v).of(
302334
/** Observable : */
303335
Case(instanceOf(Observable.class),
304-
it -> new Deferred(deferred -> adapter.apply(it)
336+
it -> new Deferred(deferred -> observable.apply(it)
337+
.subscribe(new DeferredSubscriber(deferred)))),
338+
/** Single : */
339+
Case(instanceOf(Single.class),
340+
it -> new Deferred(deferred -> single.apply(it)
341+
.subscribe(new DeferredSubscriber(deferred)))),
342+
/** Completable : */
343+
Case(instanceOf(Completable.class),
344+
it -> new Deferred(deferred -> completable.apply(it)
305345
.subscribe(new DeferredSubscriber(deferred)))),
306346
/** Ignore */
307347
Case($(), v)));
308348
}
309349

310-
private static Object observable(final Object value) {
311-
return Match(value).of(
312-
/** Single : */
313-
Case(instanceOf(Single.class), s -> s.toObservable()),
314-
/** Completable : */
315-
Case(instanceOf(Completable.class), c -> c.toObservable()),
316-
/** Ignore */
317-
Case($(), value));
350+
/**
351+
* Apply the given function adapter to observables returned by routes:
352+
*
353+
* <pre>{@code
354+
* {
355+
* use(new Rx().withObservable(observable -> observable.observeOn(Schedulers.io())));
356+
*
357+
* get("observable", -> {
358+
* return Observable...
359+
* });
360+
* }
361+
* }</pre>
362+
*
363+
* @param adapter Observable adapter.
364+
* @return This module.
365+
*/
366+
public Rx withObservable(final Function<Observable, Observable> adapter) {
367+
this.observable = requireNonNull(adapter, "Observable's adapter is required.");
368+
return this;
369+
}
370+
371+
/**
372+
* Apply the given function adapter to single returned by routes:
373+
*
374+
* <pre>{@code
375+
* {
376+
* use(new Rx().withSingle(observable -> observable.observeOn(Schedulers.io())));
377+
*
378+
* get("single", -> {
379+
* return Single...
380+
* });
381+
* }
382+
* }</pre>
383+
*
384+
* @param adapter Single adapter.
385+
* @return This module.
386+
*/
387+
public Rx withSingle(final Function<Single, Single> adapter) {
388+
this.single = requireNonNull(adapter, "Single's adapter is required.");
389+
return this;
390+
}
391+
392+
/**
393+
* Apply the given function adapter to completable returned by routes:
394+
*
395+
* <pre>{@code
396+
* {
397+
* use(new Rx().withObservable(observable -> observable.observeOn(Schedulers.io())));
398+
*
399+
* get("completable", -> {
400+
* return Completable...
401+
* });
402+
* }
403+
* }</pre>
404+
*
405+
* @param adapter Completable adapter.
406+
* @return This module.
407+
*/
408+
public Rx withCompletable(final Function<Completable, Completable> adapter) {
409+
this.completable = requireNonNull(adapter, "Completable's adapter is required.");
410+
return this;
318411
}
319412

320413
@Override
@@ -328,7 +421,7 @@ public void configure(final Env env, final Config conf, final Binder binder) {
328421
super.configure(env, conf, binder, executors::put);
329422

330423
env.routes()
331-
.map(rx(adapter));
424+
.map(rx(observable, single, completable));
332425

333426
/**
334427
* Side effects of global/evil static state. Hack to turn off some of this errors.

jooby-rxjava/src/test/java/org/jooby/rx/RxMapperTest.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import static org.easymock.EasyMock.isA;
55
import static org.junit.Assert.assertEquals;
66

7+
import java.util.function.Function;
8+
79
import org.jooby.Deferred;
810
import org.jooby.rx.Rx.DeferredSubscriber;
911
import org.jooby.test.MockUnit;
@@ -32,15 +34,12 @@ public class RxMapperTest {
3234

3335
private Block sSubscribeInit = unit -> {
3436
Single single = unit.powerMock(Single.class);
35-
expect(single.toObservable()).andReturn(unit.get(Observable.class));
3637

3738
unit.registerMock(Single.class, single);
3839
};
3940

4041
private Block cSubscribeInit = unit -> {
4142
Completable single = unit.powerMock(Completable.class);
42-
43-
expect(single.toObservable()).andReturn(unit.get(Observable.class));
4443
unit.registerMock(Completable.class, single);
4544
};
4645

@@ -59,6 +58,22 @@ public class RxMapperTest {
5958
expect(value.subscribe(subscriber)).andReturn(null);
6059
};
6160

61+
private Block sSubscribe = unit -> {
62+
Single value = unit.get(Single.class);
63+
64+
DeferredSubscriber subscriber = unit.get(DeferredSubscriber.class);
65+
66+
expect(value.subscribe(subscriber)).andReturn(null);
67+
};
68+
69+
private Block cSubscribe = unit -> {
70+
Completable value = unit.get(Completable.class);
71+
72+
DeferredSubscriber subscriber = unit.get(DeferredSubscriber.class);
73+
74+
value.subscribe(subscriber);
75+
};
76+
6277
private Block scheduler = unit -> {
6378
Scheduler scheduler = unit.mock(Scheduler.class);
6479
unit.registerMock(Scheduler.class, scheduler);
@@ -90,7 +105,8 @@ public void rxObservableWithScheduler() throws Exception {
90105
expect(single.observeOn(unit.get(Scheduler.class))).andReturn(single);
91106
})
92107
.run(unit -> {
93-
Deferred deferred = (Deferred) Rx.rx(o -> o.observeOn(unit.get(Scheduler.class)))
108+
Deferred deferred = (Deferred) Rx
109+
.rx(o -> o.observeOn(unit.get(Scheduler.class)), Function.identity())
94110
.map(unit.get(Observable.class));
95111
deferred.handler((r, x) -> {
96112
});
@@ -100,10 +116,9 @@ public void rxObservableWithScheduler() throws Exception {
100116
@Test
101117
public void rxSingle() throws Exception {
102118
new MockUnit()
103-
.expect(obsSubscribeInit)
104119
.expect(sSubscribeInit)
105120
.expect(deferredSubscriber)
106-
.expect(obsSubscribe)
121+
.expect(sSubscribe)
107122
.run(unit -> {
108123
Deferred deferred = (Deferred) Rx.rx().map(unit.get(Single.class));
109124
deferred.handler((r, x) -> {
@@ -114,10 +129,9 @@ public void rxSingle() throws Exception {
114129
@Test
115130
public void rxComplete() throws Exception {
116131
new MockUnit()
117-
.expect(obsSubscribeInit)
118132
.expect(cSubscribeInit)
119133
.expect(deferredSubscriber)
120-
.expect(obsSubscribe)
134+
.expect(cSubscribe)
121135
.run(unit -> {
122136
Deferred deferred = (Deferred) Rx.rx().map(unit.get(Completable.class));
123137
deferred.handler((r, x) -> {

0 commit comments

Comments
 (0)