7272 *
7373 * <h2>usage</h2>
7474 * <pre>{@code
75- *
7675 * ...
7776 * import org.jooby.rx.Rx;
7877 * ...
121120 * ...
122121 *
123122 * {
124- * use(new Rx(o -> o.observeOn(Scheduler.io())));
123+ * use(new Rx()
124+ * .withObservable(observable -> observable.observeOn(Scheduler.io()),
125+ * .withSingle(single -> single.observeOn(Scheduler.io()),
126+ * .withCompletable(completable -> completable.observeOn(Scheduler.io()));
125127 *
126- * get("/1 ", req -> Observable...);
128+ * get("/observable ", req -> Observable...);
127129 *
128- * get("/2 ", req -> Observable ...);
130+ * get("/single ", req -> Single ...);
129131 *
130132 * ....
131133 *
132- * get("/N ", req -> Observable ...);
134+ * get("/completable ", req -> Completable ...);
133135 *
134136 * }
135137 * }</pre>
136138 *
137139 * <p>
138- * All the routes here will {@link Observable#observeOn(Scheduler) observeOn} the provided
139- * {@link Scheduler} .
140+ * Here every Observable/Single/Completable from a route handler will observe on the <code>io</code>
141+ * scheduler .
140142 * </p>
141143 *
142144 * <h2>schedulers</h2>
@@ -208,31 +210,47 @@ public void onNext(final Object value) {
208210 /** The logging system. */
209211 private final Logger log = LoggerFactory .getLogger (getClass ());
210212
211- private final Function <Observable , Observable > adapter = Function .identity ();
213+ private Function <Observable , Observable > observable = Function .identity ();
214+
215+ private Function <Single , Single > single = Function .identity ();
216+
217+ private Function <Completable , Completable > completable = Function .identity ();
212218
213219 /**
214- * Creates a new {@link Rx} module and allow further customization of the {@link Observable}
215- * returned by routes, like:
216- *
217- * <pre>{@code
218- * {
219- * use(new Rx(o -> o.observeOn(Schedulers.io())));
220- * }
221- * }</pre>
222- *
223- * @param adapter Observable adapter.
220+ * Creates a new {@link Rx} module.
224221 */
225- public Rx (final Function <Observable , Observable > adapter ) {
226- super ("rx.schedulers" );
222+ public Rx () {
227223 // daemon by default.
228224 daemon (true );
229225 }
230226
231227 /**
232- * Creates a new {@link Rx} module.
228+ * Map a rx object like {@link Observable}, {@link Single} or {@link Completable} into a
229+ * {@link Deferred} object.
230+ *
231+ * <pre>{@code
232+ * ...
233+ * import org.jooby.rx.Rx;
234+ * ...
235+ *
236+ * {
237+ * with(() -> {
238+ * get("/1", req -> Observable...);
239+ *
240+ * get("/2", req -> Single...);
241+ *
242+ * ....
243+ *
244+ * get("/N", req -> Completable...);
245+ *
246+ * }).map(Rx.rx());
247+ * }
248+ * }</pre>
249+ *
250+ * @return A new mapper.
233251 */
234- public Rx () {
235- this ( Function .identity ());
252+ public static Route . Mapper < Object > rx () {
253+ return rx ( Function . identity (), Function .identity ());
236254 }
237255
238256 /**
@@ -256,14 +274,20 @@ public Rx() {
256274 *
257275 * get("/N", req -> Observable...);
258276 *
259- * }).map(Rx.rx());
277+ * }).map(Rx.rx(
278+ * observable -> observable.observeOn(Scheduler.io()),
279+ * single -> single.observeOn(Scheduler.io()),
280+ * completable -> completable.observeOn(Scheduler.io())));
260281 * }
261282 * }</pre>
262283 *
284+ * @param observable Observable adapter.
285+ * @param single Single adapter.
263286 * @return A new mapper.
264287 */
265- public static Route .Mapper <Object > rx () {
266- return rx (Function .identity ());
288+ public static Route .Mapper <Object > rx (final Function <Observable , Observable > observable ,
289+ final Function <Single , Single > single ) {
290+ return rx (observable , single , Function .identity ());
267291 }
268292
269293 /**
@@ -287,34 +311,103 @@ public static Route.Mapper<Object> rx() {
287311 *
288312 * get("/N", req -> Observable...);
289313 *
290- * }).map(Rx.rx(o -> o.observeOn(Scheduler.io())));
314+ * }).map(Rx.rx(
315+ * observable -> observable.observeOn(Scheduler.io()),
316+ * single -> single.observeOn(Scheduler.io()),
317+ * completable -> completable.observeOn(Scheduler.io())));
291318 * }
292319 * }</pre>
293320 *
294- * @param adapter Observable adapter.
321+ * @param observable Observable adapter.
322+ * @param single Single adapter.
323+ * @param completable Completable adapter.
295324 * @return A new mapper.
296325 */
297326 @ SuppressWarnings ("unchecked" )
298- public static Route .Mapper <Object > rx (final Function <Observable , Observable > adapter ) {
299- requireNonNull (adapter , "Observable adapter is required." );
327+ public static Route .Mapper <Object > rx (final Function <Observable , Observable > observable ,
328+ final Function <Single , Single > single , final Function <Completable , Completable > completable ) {
329+ requireNonNull (observable , "Observable's adapter is required." );
330+ requireNonNull (single , "Single's adapter is required." );
331+ requireNonNull (completable , "Completable's adapter is required." );
300332
301- return Route .Mapper .create ("rx" , v -> Match (observable ( v ) ).of (
333+ return Route .Mapper .create ("rx" , v -> Match (v ).of (
302334 /** Observable : */
303335 Case (instanceOf (Observable .class ),
304- it -> new Deferred (deferred -> adapter .apply (it )
336+ it -> new Deferred (deferred -> observable .apply (it )
337+ .subscribe (new DeferredSubscriber (deferred )))),
338+ /** Single : */
339+ Case (instanceOf (Single .class ),
340+ it -> new Deferred (deferred -> single .apply (it )
341+ .subscribe (new DeferredSubscriber (deferred )))),
342+ /** Completable : */
343+ Case (instanceOf (Completable .class ),
344+ it -> new Deferred (deferred -> completable .apply (it )
305345 .subscribe (new DeferredSubscriber (deferred )))),
306346 /** Ignore */
307347 Case ($ (), v )));
308348 }
309349
310- private static Object observable (final Object value ) {
311- return Match (value ).of (
312- /** Single : */
313- Case (instanceOf (Single .class ), s -> s .toObservable ()),
314- /** Completable : */
315- Case (instanceOf (Completable .class ), c -> c .toObservable ()),
316- /** Ignore */
317- Case ($ (), value ));
350+ /**
351+ * Apply the given function adapter to observables returned by routes:
352+ *
353+ * <pre>{@code
354+ * {
355+ * use(new Rx().withObservable(observable -> observable.observeOn(Schedulers.io())));
356+ *
357+ * get("observable", -> {
358+ * return Observable...
359+ * });
360+ * }
361+ * }</pre>
362+ *
363+ * @param adapter Observable adapter.
364+ * @return This module.
365+ */
366+ public Rx withObservable (final Function <Observable , Observable > adapter ) {
367+ this .observable = requireNonNull (adapter , "Observable's adapter is required." );
368+ return this ;
369+ }
370+
371+ /**
372+ * Apply the given function adapter to single returned by routes:
373+ *
374+ * <pre>{@code
375+ * {
376+ * use(new Rx().withSingle(observable -> observable.observeOn(Schedulers.io())));
377+ *
378+ * get("single", -> {
379+ * return Single...
380+ * });
381+ * }
382+ * }</pre>
383+ *
384+ * @param adapter Single adapter.
385+ * @return This module.
386+ */
387+ public Rx withSingle (final Function <Single , Single > adapter ) {
388+ this .single = requireNonNull (adapter , "Single's adapter is required." );
389+ return this ;
390+ }
391+
392+ /**
393+ * Apply the given function adapter to completable returned by routes:
394+ *
395+ * <pre>{@code
396+ * {
397+ * use(new Rx().withObservable(observable -> observable.observeOn(Schedulers.io())));
398+ *
399+ * get("completable", -> {
400+ * return Completable...
401+ * });
402+ * }
403+ * }</pre>
404+ *
405+ * @param adapter Completable adapter.
406+ * @return This module.
407+ */
408+ public Rx withCompletable (final Function <Completable , Completable > adapter ) {
409+ this .completable = requireNonNull (adapter , "Completable's adapter is required." );
410+ return this ;
318411 }
319412
320413 @ Override
@@ -328,7 +421,7 @@ public void configure(final Env env, final Config conf, final Binder binder) {
328421 super .configure (env , conf , binder , executors ::put );
329422
330423 env .routes ()
331- .map (rx (adapter ));
424+ .map (rx (observable , single , completable ));
332425
333426 /**
334427 * Side effects of global/evil static state. Hack to turn off some of this errors.
0 commit comments