Skip to content

Commit d3d0d01

Browse files
committed
finalize rxjava module fix jooby-project#350
1 parent a7a5871 commit d3d0d01

File tree

7 files changed

+429
-45
lines changed

7 files changed

+429
-45
lines changed

jooby-rxjava/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>org.jooby</groupId>
77
<artifactId>jooby-project</artifactId>
8-
<version>1.0.0.CR2</version>
8+
<version>1.0.0.CR3</version>
99
</parent>
1010

1111
<modelVersion>4.0.0</modelVersion>
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.jooby.rx;
20+
21+
import java.util.Map;
22+
import java.util.concurrent.Executor;
23+
24+
import com.google.common.collect.ImmutableMap;
25+
26+
import javaslang.Lazy;
27+
import rx.Scheduler;
28+
import rx.plugins.RxJavaSchedulersHook;
29+
import rx.schedulers.Schedulers;
30+
31+
class ExecSchedulerHook extends RxJavaSchedulersHook {
32+
33+
private Lazy<Map<String, Scheduler>> schedulers;
34+
35+
public ExecSchedulerHook(final Map<String, Executor> executors) {
36+
// we don't want eager initialization of Schedulers
37+
this.schedulers = Lazy.of(() -> {
38+
ImmutableMap.Builder<String, Scheduler> schedulers = ImmutableMap.builder();
39+
executors.forEach((k, e) -> schedulers.put(k, Schedulers.from(e)));
40+
return schedulers.build();
41+
});
42+
}
43+
44+
@Override
45+
public Scheduler getComputationScheduler() {
46+
return schedulers.get().get("computation");
47+
}
48+
49+
@Override
50+
public Scheduler getIOScheduler() {
51+
return schedulers.get().get("io");
52+
}
53+
54+
@Override
55+
public Scheduler getNewThreadScheduler() {
56+
return schedulers.get().get("newThread");
57+
}
58+
59+
}

jooby-rxjava/src/main/java/org/jooby/rx/Rx.java

Lines changed: 220 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,16 @@
3333
import org.jooby.Deferred;
3434
import org.jooby.Env;
3535
import org.jooby.Route;
36+
import org.jooby.Routes;
3637
import org.jooby.exec.Exec;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3740

3841
import com.google.inject.Binder;
3942
import com.typesafe.config.Config;
4043
import com.typesafe.config.ConfigFactory;
4144

45+
import javaslang.control.Try;
4246
import rx.Completable;
4347
import rx.Observable;
4448
import rx.Scheduler;
@@ -48,6 +52,197 @@
4852
import rx.plugins.RxJavaSchedulersHook;
4953
import rx.schedulers.Schedulers;
5054

55+
/**
56+
* <h1>rxjava</h1>
57+
* <p>
58+
* Reactive programming via <a href="https://github.com/ReactiveX/RxJava">rxjava</a>.
59+
* </p>
60+
* <p>
61+
* RxJava is a Java VM implementation of <a href="http://reactivex.io">Reactive Extensions</a>: a
62+
* library for composing asynchronous and event-based programs by using observable sequences.
63+
* </p>
64+
*
65+
* <h2>exports</h2>
66+
* <ul>
67+
* <li>map route operator that converts {@link Observable} (and family) into {@link Deferred} API.
68+
* </li>
69+
* <li>
70+
* manage the lifecycle of {@link Scheduler schedulers} and make sure they go down on application
71+
* shutdown time.
72+
* </li>
73+
* <li>set a default server thread pool with the number of available processors.</li>
74+
* </ul>
75+
*
76+
* <h2>usage</h2>
77+
* <pre>{@code
78+
*
79+
* ...
80+
* import org.jooby.rx.Rx;
81+
* ...
82+
*
83+
* {
84+
* use(new Rx());
85+
*
86+
* get("/", req -> Observable.from("reactive programming in jooby!"))
87+
* .map(Rx.rx());
88+
* }
89+
* }</pre>
90+
*
91+
* <h2>how it works?</h2>
92+
* <p>
93+
* Previous example is translated to:
94+
* </p>
95+
* <pre>{@code
96+
* {
97+
* use(new Rx());
98+
*
99+
* get("/", req -> {
100+
*
101+
* return new Deferred(deferred -> {
102+
* Observable.from("reactive programming in jooby!")
103+
* .subscribe(deferred::resolve, deferred::reject);
104+
* });
105+
*
106+
* });
107+
* }
108+
* }</pre>
109+
*
110+
* <p>
111+
* Translation is done with the {@link Rx#rx()} route operator. If you are a
112+
* <a href="https://github.com/ReactiveX/RxJava">rxjava</a> programmer then you don't need to worry
113+
* for learning a new API and semantic. The {@link Rx#rx()} route operator deal and take cares of
114+
* the {@link Deferred} API.
115+
* </p>
116+
*
117+
* <h2>rx()</h2>
118+
* <p>
119+
* We just learn that we are not force to learn a new API, just write
120+
* <a href="https://github.com/ReactiveX/RxJava">rxjava</a> code. That's cool!
121+
* </p>
122+
*
123+
* <p>
124+
* But.. what if you have 10 routes? 50 routes?
125+
* </p>
126+
*
127+
* <pre>{@code
128+
*
129+
* ...
130+
* import org.jooby.rx.Rx;
131+
* ...
132+
*
133+
* {
134+
* use(new Rx());
135+
*
136+
* get("/1", req -> Observable...)
137+
* .map(Rx.rx());
138+
*
139+
* get("/2", req -> Observable...)
140+
* .map(Rx.rx());
141+
*
142+
* ....
143+
*
144+
* get("/N", req -> Observable...)
145+
* .map(Rx.rx());
146+
* }
147+
* }</pre>
148+
*
149+
* <p>
150+
* This is better than written N routes using the {@link Deferred} API route by route... but still
151+
* there is one more option to help you (and your fingers) to right less code:
152+
* </p>
153+
*
154+
* <pre>{@code
155+
* ...
156+
* import org.jooby.rx.Rx;
157+
* ...
158+
*
159+
* {
160+
* use(new Rx());
161+
*
162+
* with(() -> {
163+
* get("/1", req -> Observable...);
164+
*
165+
* get("/2", req -> Observable...);
166+
*
167+
* ....
168+
*
169+
* get("/N", req -> Observable...);
170+
*
171+
* }).map(Rx.rx());
172+
* }
173+
* }</pre>
174+
*
175+
* <p>
176+
* <strong>Beautiful, hugh?</strong>
177+
* </p>
178+
*
179+
* <p>
180+
* The {@link Routes#with(Runnable) with} operator let you group any number of routes and apply
181+
* common attributes and/or operator to all them!!!
182+
* </p>
183+
*
184+
* <h2>rx()+scheduler</h2>
185+
* <p>
186+
* You can provide a {@link Scheduler} to the {@link #rx()} operator:
187+
* </p>
188+
*
189+
* <pre>{@code
190+
* ...
191+
* import org.jooby.rx.Rx;
192+
* ...
193+
*
194+
* {
195+
* use(new Rx());
196+
*
197+
* with(() -> {
198+
* get("/1", req -> Observable...);
199+
*
200+
* get("/2", req -> Observable...);
201+
*
202+
* ....
203+
*
204+
* get("/N", req -> Observable...);
205+
*
206+
* }).map(Rx.rx(Schedulers::io));
207+
* }
208+
* }</pre>
209+
*
210+
* <p>
211+
* All the routes here will {@link Observable#subscribeOn(Scheduler) subscribe-on} the
212+
* provided {@link Scheduler}.
213+
* </p>
214+
*
215+
* <h2>schedulers</h2>
216+
* <p>
217+
* This module provides the default {@link Scheduler} from
218+
* <a href="https://github.com/ReactiveX/RxJava">rxjava</a>. But also let you define your own
219+
* {@link Scheduler scheduler} using the {@link Exec} module.
220+
* </p>
221+
*
222+
* <pre>
223+
* rx.schedulers.io = forkjoin
224+
* rx.schedulers.computation = fixed
225+
* rx.schedulers.newThread = "fixed = 10"
226+
* </pre>
227+
*
228+
* <p>
229+
* The previous example defines a:
230+
* </p>
231+
* <ul>
232+
* <li>forkjoin pool for {@link Schedulers#io()}</li>
233+
* <li>fixed thread pool equals to the number of available processors for
234+
* {@link Schedulers#computation()}</li>
235+
* <li>fixed thread pool with a max of 10 for {@link Schedulers#newThread()}</li>
236+
* </ul>
237+
*
238+
* <p>
239+
* Of course, you can define/override all, some or none of them. In any case the {@link Scheduler}
240+
* will be shutdown at application shutdown time.
241+
* </p>
242+
*
243+
* @author edgar
244+
* @since 1.0.0.CR3
245+
*/
51246
public class Rx extends Exec {
52247

53248
static class DeferredSubscriber extends Subscriber<Object> {
@@ -82,6 +277,9 @@ public void onNext(final Object value) {
82277
}
83278
}
84279

280+
/** The logging system. */
281+
private final Logger log = LoggerFactory.getLogger(getClass());
282+
85283
public Rx() {
86284
super("rx.schedulers");
87285
// daemon by default.
@@ -124,49 +322,40 @@ private static Route.Mapper<Object> rx(final Optional<Supplier<Scheduler>> subsc
124322
@Override
125323
public void configure(final Env env, final Config conf, final Binder binder) {
126324
// dump rx.* as system properties
127-
if (conf.hasPath("rx")) {
128-
conf.getConfig("rx")
129-
.withoutPath("schedulers").entrySet()
130-
.forEach(
131-
e -> System.setProperty("rx." + e.getKey(), e.getValue().unwrapped().toString()));
132-
}
325+
conf.getConfig("rx")
326+
.withoutPath("schedulers").entrySet()
327+
.forEach(
328+
e -> System.setProperty("rx." + e.getKey(), e.getValue().unwrapped().toString()));
133329
Map<String, Executor> executors = new HashMap<>();
134330
super.configure(env, conf, binder, executors::put);
135-
RxJavaPlugins plugins = RxJavaPlugins.getInstance();
136-
plugins.registerSchedulersHook(schedulerHook(executors));
331+
332+
/**
333+
* Side effects of global/evil static state. Hack to turn off some of this errors.
334+
*/
335+
trySchedulerHook(executors);
336+
137337
// shutdown schedulers: silent shutdown on tests we got a NoClassDefFoundError: Could not
138338
// initialize class rx.internal.util.RxRingBuffer
139-
env.onStop(Schedulers::shutdown);
339+
env.onStop(() -> Try.run(Schedulers::shutdown)
340+
.onFailure(x -> log.debug("Schedulers.shutdown resulted in error", x)));
140341
}
141342

142343
@Override
143344
public Config config() {
144345
return ConfigFactory.parseResources(getClass(), "rx.conf");
145346
}
146347

147-
static RxJavaSchedulersHook schedulerHook(final Map<String, Executor> executors) {
148-
return new RxJavaSchedulersHook() {
149-
@Override
150-
public Scheduler getComputationScheduler() {
151-
return Optional.ofNullable(executors.get("computation"))
152-
.map(Schedulers::from)
153-
.orElse(null);
154-
}
155-
156-
@Override
157-
public Scheduler getIOScheduler() {
158-
return Optional.ofNullable(executors.get("io"))
159-
.map(Schedulers::from)
160-
.orElse(null);
161-
}
162-
163-
@Override
164-
public Scheduler getNewThreadScheduler() {
165-
return Optional.ofNullable(executors.get("newThread"))
166-
.map(Schedulers::from)
167-
.orElse(null);
348+
private static void trySchedulerHook(final Map<String, Executor> executors) {
349+
RxJavaPlugins plugins = RxJavaPlugins.getInstance();
350+
try {
351+
plugins.registerSchedulersHook(new ExecSchedulerHook(executors));
352+
} catch (IllegalStateException ex) {
353+
// there is a scheduler hook already, check if ours and ignore the exception
354+
RxJavaSchedulersHook hook = plugins.getSchedulersHook();
355+
if (!(hook instanceof ExecSchedulerHook)) {
356+
throw ex;
168357
}
169-
};
358+
}
170359
}
171360

172361
}

jooby-rxjava/src/main/resources/org/jooby/rx/rx.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@
22

33
server.threads.Min = ${runtime.processors}
44
server.threads.Max = ${runtime.processors}
5+
6+
rx.scheduler.max-computation-threads = ${runtime.processors}

0 commit comments

Comments
 (0)