1313import java .nio .file .Path ;
1414import java .util .List ;
1515import java .util .Optional ;
16- import java .util .concurrent .CompletionStage ;
1716import java .util .concurrent .Executor ;
1817
1918import io .jooby .Context ;
2322import io .jooby .ResponseHandler ;
2423import io .jooby .Route ;
2524import io .jooby .Route .Handler ;
26- import io .jooby .internal .handler .CompletionStageHandler ;
2725import io .jooby .internal .handler .DefaultHandler ;
2826import io .jooby .internal .handler .DetachHandler ;
2927import io .jooby .internal .handler .DispatchHandler ;
3735import io .jooby .internal .handler .SendFileChannel ;
3836import io .jooby .internal .handler .SendStream ;
3937import io .jooby .internal .handler .WorkerHandler ;
40- import io .jooby .internal .handler .reactive .ObservableHandler ;
41- import io .jooby .internal .handler .reactive .ReactivePublisherHandler ;
42- import io .jooby .internal .handler .reactive .ReactorFluxHandler ;
43- import io .jooby .internal .handler .reactive .ReactorMonoHandler ;
44- import io .jooby .internal .handler .reactive .RxFlowableHandler ;
45- import io .jooby .internal .handler .reactive .RxMaybeHandler ;
46- import io .jooby .internal .handler .reactive .RxSingleHandler ;
4738
4839public class Pipeline {
4940
@@ -54,62 +45,11 @@ public static Handler compute(
5445 Executor executor ,
5546 ContextInitializer initializer ,
5647 List <ResponseHandler > responseHandler ) {
48+ if (route .isReactive ()) {
49+ return reactive (mode , route , executor , initializer );
50+ }
5751 Type returnType = route .getReturnType ();
5852 Class <?> type = Reified .rawType (returnType );
59- if (CompletionStage .class .isAssignableFrom (type )) {
60- return completableFuture (mode , route , executor , initializer );
61- }
62- /** Rx 2: */
63- // Single:
64- Optional <Class > single = loadClass (loader , "io.reactivex.Single" );
65- if (single .isPresent ()) {
66- if (single .get ().isAssignableFrom (type )) {
67- return single (mode , route , executor , initializer );
68- }
69- }
70- // Maybe:
71- Optional <Class > maybe = loadClass (loader , "io.reactivex.Maybe" );
72- if (maybe .isPresent ()) {
73- if (maybe .get ().isAssignableFrom (type )) {
74- return rxMaybe (mode , route , executor , initializer );
75- }
76- }
77- // Flowable:
78- Optional <Class > flowable = loadClass (loader , "io.reactivex.Flowable" );
79- if (flowable .isPresent ()) {
80- if (flowable .get ().isAssignableFrom (type )) {
81- return rxFlowable (mode , route , executor , initializer );
82- }
83- }
84- // Observable:
85- Optional <Class > observable = loadClass (loader , "io.reactivex.Observable" );
86- if (observable .isPresent ()) {
87- if (observable .get ().isAssignableFrom (type )) {
88- return rxObservable (mode , route , executor , initializer );
89- }
90- }
91- // Disposable
92- Optional <Class > disposable = loadClass (loader , "io.reactivex.disposables.Disposable" );
93- if (disposable .isPresent ()) {
94- if (disposable .get ().isAssignableFrom (type )) {
95- return rxDisposable (mode , route , executor , initializer );
96- }
97- }
98- /** Reactor: */
99- // Flux:
100- Optional <Class > flux = loadClass (loader , "reactor.core.publisher.Flux" );
101- if (flux .isPresent ()) {
102- if (flux .get ().isAssignableFrom (type )) {
103- return reactorFlux (mode , route , executor , initializer );
104- }
105- }
106- // Mono:
107- Optional <Class > mono = loadClass (loader , "reactor.core.publisher.Mono" );
108- if (mono .isPresent ()) {
109- if (mono .get ().isAssignableFrom (type )) {
110- return reactorMono (mode , route , executor , initializer );
111- }
112- }
11353 /** Kotlin: */
11454 Optional <Class > deferred = loadClass (loader , "kotlinx.coroutines.Deferred" );
11555 if (deferred .isPresent ()) {
@@ -130,13 +70,6 @@ public static Handler compute(
13070 }
13171 }
13272
133- /** ReactiveStream: */
134- Optional <Class > publisher = loadClass (loader , "org.reactivestreams.Publisher" );
135- if (publisher .isPresent ()) {
136- if (publisher .get ().isAssignableFrom (type )) {
137- return reactivePublisher (mode , route , executor , initializer );
138- }
139- }
14073 /** Context: */
14174 if (Context .class .isAssignableFrom (type )) {
14275 if (executor == null && mode == ExecutionMode .EVENT_LOOP ) {
@@ -206,67 +139,10 @@ private static Handler decorate(ContextInitializer initializer, Handler handler)
206139 return new PostDispatchInitializerHandler (initializer , pipeline );
207140 }
208141
209- private static Handler completableFuture (
210- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
211- return next (
212- mode ,
213- executor ,
214- new DetachHandler (decorate (initializer , new CompletionStageHandler (next .getPipeline ()))),
215- false );
216- }
217-
218- private static Handler rxFlowable (
219- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
220- return next (
221- mode ,
222- executor ,
223- new DetachHandler (decorate (initializer , new RxFlowableHandler (next .getPipeline ()))),
224- false );
225- }
226-
227- private static Handler reactivePublisher (
228- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
229- return next (
230- mode ,
231- executor ,
232- new DetachHandler (decorate (initializer , new ReactivePublisherHandler (next .getPipeline ()))),
233- false );
234- }
235-
236- private static Handler rxDisposable (
142+ private static Handler reactive (
237143 ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
238144 return next (
239- mode ,
240- executor ,
241- new DetachHandler (decorate (initializer , new SendDirect (next .getPipeline ()))),
242- false );
243- }
244-
245- private static Handler rxObservable (
246- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
247- return next (
248- mode ,
249- executor ,
250- new DetachHandler (decorate (initializer , new ObservableHandler (next .getPipeline ()))),
251- false );
252- }
253-
254- private static Handler reactorFlux (
255- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
256- return next (
257- mode ,
258- executor ,
259- new DetachHandler (decorate (initializer , new ReactorFluxHandler (next .getPipeline ()))),
260- false );
261- }
262-
263- private static Handler reactorMono (
264- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
265- return next (
266- mode ,
267- executor ,
268- new DetachHandler (decorate (initializer , new ReactorMonoHandler (next .getPipeline ()))),
269- false );
145+ mode , executor , new DetachHandler (decorate (initializer , next .getPipeline ())), false );
270146 }
271147
272148 private static Handler kotlinJob (
@@ -284,24 +160,6 @@ private static Handler kotlinContinuation(
284160 mode , executor , new DetachHandler (decorate (initializer , next .getPipeline ())), false );
285161 }
286162
287- private static Handler single (
288- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
289- return next (
290- mode ,
291- executor ,
292- new DetachHandler (decorate (initializer , new RxSingleHandler (next .getPipeline ()))),
293- false );
294- }
295-
296- private static Handler rxMaybe (
297- ExecutionMode mode , Route next , Executor executor , ContextInitializer initializer ) {
298- return next (
299- mode ,
300- executor ,
301- new DetachHandler (decorate (initializer , new RxMaybeHandler (next .getPipeline ()))),
302- false );
303- }
304-
305163 private static Handler next (
306164 ExecutionMode mode , Executor executor , Handler handler , boolean blocking ) {
307165 if (executor == null ) {
0 commit comments