Skip to content

Commit 38874b1

Browse files
committed
reactive: reactive was moved out of core
- rxjava, reactor has his own module - we need a filter/decorator for adding reactive support (like in 1.x) - pipeline was simplified - added mutyni reactive library #1921 - wip for #2031
1 parent eb2aa9a commit 38874b1

43 files changed

Lines changed: 542 additions & 774 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

jooby/pom.xml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -199,20 +199,6 @@
199199
<optional>true</optional>
200200
</dependency>
201201

202-
<!-- rxjava -->
203-
<dependency>
204-
<groupId>io.reactivex.rxjava2</groupId>
205-
<artifactId>rxjava</artifactId>
206-
<optional>true</optional>
207-
</dependency>
208-
209-
<!-- reactor -->
210-
<dependency>
211-
<groupId>io.projectreactor</groupId>
212-
<artifactId>reactor-core</artifactId>
213-
<optional>true</optional>
214-
</dependency>
215-
216202
<!-- bucket4j -->
217203
<dependency>
218204
<groupId>com.github.vladimir-bukhtoyarov</groupId>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby;
7+
8+
import java.util.Optional;
9+
import java.util.concurrent.CompletionException;
10+
import java.util.concurrent.CompletionStage;
11+
import java.util.concurrent.Flow;
12+
13+
import edu.umd.cs.findbugs.annotations.NonNull;
14+
import io.jooby.internal.handler.ChunkedSubscriber;
15+
16+
/**
17+
* Utility function for handling {@link CompletionStage} and {@link Flow.Publisher}.
18+
*
19+
* @author edgar
20+
* @since 3.0.0
21+
*/
22+
public class ReactiveSupport {
23+
24+
/**
25+
* Creates a subscriber from web context.
26+
*
27+
* @param ctx Web Context.
28+
* @return New subscriber.
29+
* @param <T> Flow type.
30+
*/
31+
public static <T> Flow.Subscriber<T> newSubscriber(Context ctx) {
32+
return new ChunkedSubscriber(ctx);
33+
}
34+
35+
/**
36+
* Flow publisher filter. Handle flow responses.
37+
*
38+
* @return Filter.
39+
*/
40+
public static Route.Filter flow() {
41+
return new Route.Filter() {
42+
@NonNull @Override
43+
public Route.Handler apply(@NonNull Route.Handler next) {
44+
return ctx -> {
45+
Object result = next.apply(ctx);
46+
if (result instanceof Flow.Publisher publisher) {
47+
publisher.subscribe(newSubscriber(ctx));
48+
}
49+
return result;
50+
};
51+
}
52+
53+
@Override
54+
public void setRoute(Route route) {
55+
route.setReactive(true);
56+
}
57+
};
58+
}
59+
60+
/**
61+
* Completable future filter. Handle completable future responses.
62+
*
63+
* @return Filter.
64+
*/
65+
public static Route.Filter completableFuture() {
66+
return new Route.Filter() {
67+
@NonNull @Override
68+
public Route.Handler apply(@NonNull Route.Handler next) {
69+
return ctx -> {
70+
Object result = next.apply(ctx);
71+
if (result instanceof CompletionStage future) {
72+
return future.whenComplete(
73+
(value, x) -> {
74+
try {
75+
if (x != null) {
76+
Throwable exception = (Throwable) x;
77+
if (exception instanceof CompletionException) {
78+
exception = Optional.ofNullable(exception.getCause()).orElse(exception);
79+
}
80+
ctx.sendError(exception);
81+
} else {
82+
ctx.render(value);
83+
}
84+
} catch (Throwable cause) {
85+
ctx.sendError(cause);
86+
}
87+
});
88+
}
89+
return result;
90+
};
91+
}
92+
93+
@Override
94+
public void setRoute(Route route) {
95+
route.setReactive(true);
96+
}
97+
};
98+
}
99+
}

jooby/src/main/java/io/jooby/Route.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,8 @@ public interface Handler extends Serializable, Aware {
398398

399399
private String description;
400400

401+
private boolean reactive;
402+
401403
/**
402404
* Creates a new route.
403405
*
@@ -606,6 +608,14 @@ public Route(@NonNull String method, @NonNull String pattern, @NonNull Handler h
606608
return this;
607609
}
608610

611+
public boolean isReactive() {
612+
return reactive;
613+
}
614+
615+
public void setReactive(boolean reactive) {
616+
this.reactive = reactive;
617+
}
618+
609619
/**
610620
* Return return type.
611621
*

jooby/src/main/java/io/jooby/internal/Pipeline.java

Lines changed: 5 additions & 147 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.nio.file.Path;
1414
import java.util.List;
1515
import java.util.Optional;
16-
import java.util.concurrent.CompletionStage;
1716
import java.util.concurrent.Executor;
1817

1918
import io.jooby.Context;
@@ -23,7 +22,6 @@
2322
import io.jooby.ResponseHandler;
2423
import io.jooby.Route;
2524
import io.jooby.Route.Handler;
26-
import io.jooby.internal.handler.CompletionStageHandler;
2725
import io.jooby.internal.handler.DefaultHandler;
2826
import io.jooby.internal.handler.DetachHandler;
2927
import io.jooby.internal.handler.DispatchHandler;
@@ -37,13 +35,6 @@
3735
import io.jooby.internal.handler.SendFileChannel;
3836
import io.jooby.internal.handler.SendStream;
3937
import io.jooby.internal.handler.WorkerHandler;
40-
import io.jooby.internal.handler.reactive.ObservableHandler;
41-
import io.jooby.internal.handler.reactive.ReactivePublisherHandler;
42-
import io.jooby.internal.handler.reactive.ReactorFluxHandler;
43-
import io.jooby.internal.handler.reactive.ReactorMonoHandler;
44-
import io.jooby.internal.handler.reactive.RxFlowableHandler;
45-
import io.jooby.internal.handler.reactive.RxMaybeHandler;
46-
import io.jooby.internal.handler.reactive.RxSingleHandler;
4738

4839
public class Pipeline {
4940

@@ -54,62 +45,11 @@ public static Handler compute(
5445
Executor executor,
5546
ContextInitializer initializer,
5647
List<ResponseHandler> responseHandler) {
48+
if (route.isReactive()) {
49+
return reactive(mode, route, executor, initializer);
50+
}
5751
Type returnType = route.getReturnType();
5852
Class<?> type = Reified.rawType(returnType);
59-
if (CompletionStage.class.isAssignableFrom(type)) {
60-
return completableFuture(mode, route, executor, initializer);
61-
}
62-
/** Rx 2: */
63-
// Single:
64-
Optional<Class> single = loadClass(loader, "io.reactivex.Single");
65-
if (single.isPresent()) {
66-
if (single.get().isAssignableFrom(type)) {
67-
return single(mode, route, executor, initializer);
68-
}
69-
}
70-
// Maybe:
71-
Optional<Class> maybe = loadClass(loader, "io.reactivex.Maybe");
72-
if (maybe.isPresent()) {
73-
if (maybe.get().isAssignableFrom(type)) {
74-
return rxMaybe(mode, route, executor, initializer);
75-
}
76-
}
77-
// Flowable:
78-
Optional<Class> flowable = loadClass(loader, "io.reactivex.Flowable");
79-
if (flowable.isPresent()) {
80-
if (flowable.get().isAssignableFrom(type)) {
81-
return rxFlowable(mode, route, executor, initializer);
82-
}
83-
}
84-
// Observable:
85-
Optional<Class> observable = loadClass(loader, "io.reactivex.Observable");
86-
if (observable.isPresent()) {
87-
if (observable.get().isAssignableFrom(type)) {
88-
return rxObservable(mode, route, executor, initializer);
89-
}
90-
}
91-
// Disposable
92-
Optional<Class> disposable = loadClass(loader, "io.reactivex.disposables.Disposable");
93-
if (disposable.isPresent()) {
94-
if (disposable.get().isAssignableFrom(type)) {
95-
return rxDisposable(mode, route, executor, initializer);
96-
}
97-
}
98-
/** Reactor: */
99-
// Flux:
100-
Optional<Class> flux = loadClass(loader, "reactor.core.publisher.Flux");
101-
if (flux.isPresent()) {
102-
if (flux.get().isAssignableFrom(type)) {
103-
return reactorFlux(mode, route, executor, initializer);
104-
}
105-
}
106-
// Mono:
107-
Optional<Class> mono = loadClass(loader, "reactor.core.publisher.Mono");
108-
if (mono.isPresent()) {
109-
if (mono.get().isAssignableFrom(type)) {
110-
return reactorMono(mode, route, executor, initializer);
111-
}
112-
}
11353
/** Kotlin: */
11454
Optional<Class> deferred = loadClass(loader, "kotlinx.coroutines.Deferred");
11555
if (deferred.isPresent()) {
@@ -130,13 +70,6 @@ public static Handler compute(
13070
}
13171
}
13272

133-
/** ReactiveStream: */
134-
Optional<Class> publisher = loadClass(loader, "org.reactivestreams.Publisher");
135-
if (publisher.isPresent()) {
136-
if (publisher.get().isAssignableFrom(type)) {
137-
return reactivePublisher(mode, route, executor, initializer);
138-
}
139-
}
14073
/** Context: */
14174
if (Context.class.isAssignableFrom(type)) {
14275
if (executor == null && mode == ExecutionMode.EVENT_LOOP) {
@@ -206,67 +139,10 @@ private static Handler decorate(ContextInitializer initializer, Handler handler)
206139
return new PostDispatchInitializerHandler(initializer, pipeline);
207140
}
208141

209-
private static Handler completableFuture(
210-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
211-
return next(
212-
mode,
213-
executor,
214-
new DetachHandler(decorate(initializer, new CompletionStageHandler(next.getPipeline()))),
215-
false);
216-
}
217-
218-
private static Handler rxFlowable(
219-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
220-
return next(
221-
mode,
222-
executor,
223-
new DetachHandler(decorate(initializer, new RxFlowableHandler(next.getPipeline()))),
224-
false);
225-
}
226-
227-
private static Handler reactivePublisher(
228-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
229-
return next(
230-
mode,
231-
executor,
232-
new DetachHandler(decorate(initializer, new ReactivePublisherHandler(next.getPipeline()))),
233-
false);
234-
}
235-
236-
private static Handler rxDisposable(
142+
private static Handler reactive(
237143
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
238144
return next(
239-
mode,
240-
executor,
241-
new DetachHandler(decorate(initializer, new SendDirect(next.getPipeline()))),
242-
false);
243-
}
244-
245-
private static Handler rxObservable(
246-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
247-
return next(
248-
mode,
249-
executor,
250-
new DetachHandler(decorate(initializer, new ObservableHandler(next.getPipeline()))),
251-
false);
252-
}
253-
254-
private static Handler reactorFlux(
255-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
256-
return next(
257-
mode,
258-
executor,
259-
new DetachHandler(decorate(initializer, new ReactorFluxHandler(next.getPipeline()))),
260-
false);
261-
}
262-
263-
private static Handler reactorMono(
264-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
265-
return next(
266-
mode,
267-
executor,
268-
new DetachHandler(decorate(initializer, new ReactorMonoHandler(next.getPipeline()))),
269-
false);
145+
mode, executor, new DetachHandler(decorate(initializer, next.getPipeline())), false);
270146
}
271147

272148
private static Handler kotlinJob(
@@ -284,24 +160,6 @@ private static Handler kotlinContinuation(
284160
mode, executor, new DetachHandler(decorate(initializer, next.getPipeline())), false);
285161
}
286162

287-
private static Handler single(
288-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
289-
return next(
290-
mode,
291-
executor,
292-
new DetachHandler(decorate(initializer, new RxSingleHandler(next.getPipeline()))),
293-
false);
294-
}
295-
296-
private static Handler rxMaybe(
297-
ExecutionMode mode, Route next, Executor executor, ContextInitializer initializer) {
298-
return next(
299-
mode,
300-
executor,
301-
new DetachHandler(decorate(initializer, new RxMaybeHandler(next.getPipeline()))),
302-
false);
303-
}
304-
305163
private static Handler next(
306164
ExecutionMode mode, Executor executor, Handler handler, boolean blocking) {
307165
if (executor == null) {

jooby/src/main/java/io/jooby/internal/handler/reactive/ChunkedSubscriber.java renamed to jooby/src/main/java/io/jooby/internal/handler/ChunkedSubscriber.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
44
* Copyright 2014 Edgar Espina
55
*/
6-
package io.jooby.internal.handler.reactive;
6+
package io.jooby.internal.handler;
7+
8+
import java.util.concurrent.Flow;
79

810
import org.slf4j.Logger;
911

@@ -14,12 +16,12 @@
1416
import io.jooby.Sender;
1517
import io.jooby.Server;
1618

17-
public class ChunkedSubscriber {
19+
public class ChunkedSubscriber implements Flow.Subscriber {
1820

1921
private static final byte JSON_LBRACKET = '[';
2022
private static final byte JSON_SEP = ',';
2123
private static final byte[] JSON_RBRACKET = {']'};
22-
private ChunkedSubscription subscription;
24+
private Flow.Subscription subscription;
2325
private final Context ctx;
2426
private final Sender sender;
2527
private MediaType responseType;
@@ -29,7 +31,8 @@ public ChunkedSubscriber(Context ctx) {
2931
this.sender = ctx.responseSender();
3032
}
3133

32-
public void onSubscribe(ChunkedSubscription subscription) {
34+
@Override
35+
public void onSubscribe(Flow.Subscription subscription) {
3336
this.subscription = subscription;
3437
this.subscription.request(1);
3538
}

0 commit comments

Comments
 (0)