|
33 | 33 | import org.jooby.Deferred; |
34 | 34 | import org.jooby.Env; |
35 | 35 | import org.jooby.Route; |
| 36 | +import org.jooby.Routes; |
36 | 37 | import org.jooby.exec.Exec; |
| 38 | +import org.slf4j.Logger; |
| 39 | +import org.slf4j.LoggerFactory; |
37 | 40 |
|
38 | 41 | import com.google.inject.Binder; |
39 | 42 | import com.typesafe.config.Config; |
40 | 43 | import com.typesafe.config.ConfigFactory; |
41 | 44 |
|
| 45 | +import javaslang.control.Try; |
42 | 46 | import rx.Completable; |
43 | 47 | import rx.Observable; |
44 | 48 | import rx.Scheduler; |
|
48 | 52 | import rx.plugins.RxJavaSchedulersHook; |
49 | 53 | import rx.schedulers.Schedulers; |
50 | 54 |
|
| 55 | +/** |
| 56 | + * <h1>rxjava</h1> |
| 57 | + * <p> |
| 58 | + * Reactive programming via <a href="https://github.com/ReactiveX/RxJava">rxjava</a>. |
| 59 | + * </p> |
| 60 | + * <p> |
| 61 | + * RxJava is a Java VM implementation of <a href="http://reactivex.io">Reactive Extensions</a>: a |
| 62 | + * library for composing asynchronous and event-based programs by using observable sequences. |
| 63 | + * </p> |
| 64 | + * |
| 65 | + * <h2>exports</h2> |
| 66 | + * <ul> |
| 67 | + * <li>map route operator that converts {@link Observable} (and family) into {@link Deferred} API. |
| 68 | + * </li> |
| 69 | + * <li> |
| 70 | + * manage the lifecycle of {@link Scheduler schedulers} and make sure they go down on application |
| 71 | + * shutdown time. |
| 72 | + * </li> |
| 73 | + * <li>set a default server thread pool with the number of available processors.</li> |
| 74 | + * </ul> |
| 75 | + * |
| 76 | + * <h2>usage</h2> |
| 77 | + * <pre>{@code |
| 78 | + * |
| 79 | + * ... |
| 80 | + * import org.jooby.rx.Rx; |
| 81 | + * ... |
| 82 | + * |
| 83 | + * { |
| 84 | + * use(new Rx()); |
| 85 | + * |
| 86 | + * get("/", req -> Observable.from("reactive programming in jooby!")) |
| 87 | + * .map(Rx.rx()); |
| 88 | + * } |
| 89 | + * }</pre> |
| 90 | + * |
| 91 | + * <h2>how it works?</h2> |
| 92 | + * <p> |
| 93 | + * Previous example is translated to: |
| 94 | + * </p> |
| 95 | + * <pre>{@code |
| 96 | + * { |
| 97 | + * use(new Rx()); |
| 98 | + * |
| 99 | + * get("/", req -> { |
| 100 | + * |
| 101 | + * return new Deferred(deferred -> { |
| 102 | + * Observable.from("reactive programming in jooby!") |
| 103 | + * .subscribe(deferred::resolve, deferred::reject); |
| 104 | + * }); |
| 105 | + * |
| 106 | + * }); |
| 107 | + * } |
| 108 | + * }</pre> |
| 109 | + * |
| 110 | + * <p> |
| 111 | + * Translation is done with the {@link Rx#rx()} route operator. If you are a |
| 112 | + * <a href="https://github.com/ReactiveX/RxJava">rxjava</a> programmer then you don't need to worry |
| 113 | + * for learning a new API and semantic. The {@link Rx#rx()} route operator deal and take cares of |
| 114 | + * the {@link Deferred} API. |
| 115 | + * </p> |
| 116 | + * |
| 117 | + * <h2>rx()</h2> |
| 118 | + * <p> |
| 119 | + * We just learn that we are not force to learn a new API, just write |
| 120 | + * <a href="https://github.com/ReactiveX/RxJava">rxjava</a> code. That's cool! |
| 121 | + * </p> |
| 122 | + * |
| 123 | + * <p> |
| 124 | + * But.. what if you have 10 routes? 50 routes? |
| 125 | + * </p> |
| 126 | + * |
| 127 | + * <pre>{@code |
| 128 | + * |
| 129 | + * ... |
| 130 | + * import org.jooby.rx.Rx; |
| 131 | + * ... |
| 132 | + * |
| 133 | + * { |
| 134 | + * use(new Rx()); |
| 135 | + * |
| 136 | + * get("/1", req -> Observable...) |
| 137 | + * .map(Rx.rx()); |
| 138 | + * |
| 139 | + * get("/2", req -> Observable...) |
| 140 | + * .map(Rx.rx()); |
| 141 | + * |
| 142 | + * .... |
| 143 | + * |
| 144 | + * get("/N", req -> Observable...) |
| 145 | + * .map(Rx.rx()); |
| 146 | + * } |
| 147 | + * }</pre> |
| 148 | + * |
| 149 | + * <p> |
| 150 | + * This is better than written N routes using the {@link Deferred} API route by route... but still |
| 151 | + * there is one more option to help you (and your fingers) to right less code: |
| 152 | + * </p> |
| 153 | + * |
| 154 | + * <pre>{@code |
| 155 | + * ... |
| 156 | + * import org.jooby.rx.Rx; |
| 157 | + * ... |
| 158 | + * |
| 159 | + * { |
| 160 | + * use(new Rx()); |
| 161 | + * |
| 162 | + * with(() -> { |
| 163 | + * get("/1", req -> Observable...); |
| 164 | + * |
| 165 | + * get("/2", req -> Observable...); |
| 166 | + * |
| 167 | + * .... |
| 168 | + * |
| 169 | + * get("/N", req -> Observable...); |
| 170 | + * |
| 171 | + * }).map(Rx.rx()); |
| 172 | + * } |
| 173 | + * }</pre> |
| 174 | + * |
| 175 | + * <p> |
| 176 | + * <strong>Beautiful, hugh?</strong> |
| 177 | + * </p> |
| 178 | + * |
| 179 | + * <p> |
| 180 | + * The {@link Routes#with(Runnable) with} operator let you group any number of routes and apply |
| 181 | + * common attributes and/or operator to all them!!! |
| 182 | + * </p> |
| 183 | + * |
| 184 | + * <h2>rx()+scheduler</h2> |
| 185 | + * <p> |
| 186 | + * You can provide a {@link Scheduler} to the {@link #rx()} operator: |
| 187 | + * </p> |
| 188 | + * |
| 189 | + * <pre>{@code |
| 190 | + * ... |
| 191 | + * import org.jooby.rx.Rx; |
| 192 | + * ... |
| 193 | + * |
| 194 | + * { |
| 195 | + * use(new Rx()); |
| 196 | + * |
| 197 | + * with(() -> { |
| 198 | + * get("/1", req -> Observable...); |
| 199 | + * |
| 200 | + * get("/2", req -> Observable...); |
| 201 | + * |
| 202 | + * .... |
| 203 | + * |
| 204 | + * get("/N", req -> Observable...); |
| 205 | + * |
| 206 | + * }).map(Rx.rx(Schedulers::io)); |
| 207 | + * } |
| 208 | + * }</pre> |
| 209 | + * |
| 210 | + * <p> |
| 211 | + * All the routes here will {@link Observable#subscribeOn(Scheduler) subscribe-on} the |
| 212 | + * provided {@link Scheduler}. |
| 213 | + * </p> |
| 214 | + * |
| 215 | + * <h2>schedulers</h2> |
| 216 | + * <p> |
| 217 | + * This module provides the default {@link Scheduler} from |
| 218 | + * <a href="https://github.com/ReactiveX/RxJava">rxjava</a>. But also let you define your own |
| 219 | + * {@link Scheduler scheduler} using the {@link Exec} module. |
| 220 | + * </p> |
| 221 | + * |
| 222 | + * <pre> |
| 223 | + * rx.schedulers.io = forkjoin |
| 224 | + * rx.schedulers.computation = fixed |
| 225 | + * rx.schedulers.newThread = "fixed = 10" |
| 226 | + * </pre> |
| 227 | + * |
| 228 | + * <p> |
| 229 | + * The previous example defines a: |
| 230 | + * </p> |
| 231 | + * <ul> |
| 232 | + * <li>forkjoin pool for {@link Schedulers#io()}</li> |
| 233 | + * <li>fixed thread pool equals to the number of available processors for |
| 234 | + * {@link Schedulers#computation()}</li> |
| 235 | + * <li>fixed thread pool with a max of 10 for {@link Schedulers#newThread()}</li> |
| 236 | + * </ul> |
| 237 | + * |
| 238 | + * <p> |
| 239 | + * Of course, you can define/override all, some or none of them. In any case the {@link Scheduler} |
| 240 | + * will be shutdown at application shutdown time. |
| 241 | + * </p> |
| 242 | + * |
| 243 | + * @author edgar |
| 244 | + * @since 1.0.0.CR3 |
| 245 | + */ |
51 | 246 | public class Rx extends Exec { |
52 | 247 |
|
53 | 248 | static class DeferredSubscriber extends Subscriber<Object> { |
@@ -82,6 +277,9 @@ public void onNext(final Object value) { |
82 | 277 | } |
83 | 278 | } |
84 | 279 |
|
| 280 | + /** The logging system. */ |
| 281 | + private final Logger log = LoggerFactory.getLogger(getClass()); |
| 282 | + |
85 | 283 | public Rx() { |
86 | 284 | super("rx.schedulers"); |
87 | 285 | // daemon by default. |
@@ -124,49 +322,40 @@ private static Route.Mapper<Object> rx(final Optional<Supplier<Scheduler>> subsc |
124 | 322 | @Override |
125 | 323 | public void configure(final Env env, final Config conf, final Binder binder) { |
126 | 324 | // dump rx.* as system properties |
127 | | - if (conf.hasPath("rx")) { |
128 | | - conf.getConfig("rx") |
129 | | - .withoutPath("schedulers").entrySet() |
130 | | - .forEach( |
131 | | - e -> System.setProperty("rx." + e.getKey(), e.getValue().unwrapped().toString())); |
132 | | - } |
| 325 | + conf.getConfig("rx") |
| 326 | + .withoutPath("schedulers").entrySet() |
| 327 | + .forEach( |
| 328 | + e -> System.setProperty("rx." + e.getKey(), e.getValue().unwrapped().toString())); |
133 | 329 | Map<String, Executor> executors = new HashMap<>(); |
134 | 330 | super.configure(env, conf, binder, executors::put); |
135 | | - RxJavaPlugins plugins = RxJavaPlugins.getInstance(); |
136 | | - plugins.registerSchedulersHook(schedulerHook(executors)); |
| 331 | + |
| 332 | + /** |
| 333 | + * Side effects of global/evil static state. Hack to turn off some of this errors. |
| 334 | + */ |
| 335 | + trySchedulerHook(executors); |
| 336 | + |
137 | 337 | // shutdown schedulers: silent shutdown on tests we got a NoClassDefFoundError: Could not |
138 | 338 | // initialize class rx.internal.util.RxRingBuffer |
139 | | - env.onStop(Schedulers::shutdown); |
| 339 | + env.onStop(() -> Try.run(Schedulers::shutdown) |
| 340 | + .onFailure(x -> log.debug("Schedulers.shutdown resulted in error", x))); |
140 | 341 | } |
141 | 342 |
|
142 | 343 | @Override |
143 | 344 | public Config config() { |
144 | 345 | return ConfigFactory.parseResources(getClass(), "rx.conf"); |
145 | 346 | } |
146 | 347 |
|
147 | | - static RxJavaSchedulersHook schedulerHook(final Map<String, Executor> executors) { |
148 | | - return new RxJavaSchedulersHook() { |
149 | | - @Override |
150 | | - public Scheduler getComputationScheduler() { |
151 | | - return Optional.ofNullable(executors.get("computation")) |
152 | | - .map(Schedulers::from) |
153 | | - .orElse(null); |
154 | | - } |
155 | | - |
156 | | - @Override |
157 | | - public Scheduler getIOScheduler() { |
158 | | - return Optional.ofNullable(executors.get("io")) |
159 | | - .map(Schedulers::from) |
160 | | - .orElse(null); |
161 | | - } |
162 | | - |
163 | | - @Override |
164 | | - public Scheduler getNewThreadScheduler() { |
165 | | - return Optional.ofNullable(executors.get("newThread")) |
166 | | - .map(Schedulers::from) |
167 | | - .orElse(null); |
| 348 | + private static void trySchedulerHook(final Map<String, Executor> executors) { |
| 349 | + RxJavaPlugins plugins = RxJavaPlugins.getInstance(); |
| 350 | + try { |
| 351 | + plugins.registerSchedulersHook(new ExecSchedulerHook(executors)); |
| 352 | + } catch (IllegalStateException ex) { |
| 353 | + // there is a scheduler hook already, check if ours and ignore the exception |
| 354 | + RxJavaSchedulersHook hook = plugins.getSchedulersHook(); |
| 355 | + if (!(hook instanceof ExecSchedulerHook)) { |
| 356 | + throw ex; |
168 | 357 | } |
169 | | - }; |
| 358 | + } |
170 | 359 | } |
171 | 360 |
|
172 | 361 | } |
0 commit comments