|
1 | 1 | package io.jooby.internal; |
2 | 2 |
|
3 | | -import io.jooby.Mode; |
| 3 | +import io.jooby.ExecutionMode; |
4 | 4 | import io.jooby.Reified; |
5 | 5 | import io.jooby.Route.Handler; |
6 | 6 | import io.jooby.internal.handler.CompletionStageHandler; |
7 | 7 | import io.jooby.internal.handler.DefaultHandler; |
8 | 8 | import io.jooby.internal.handler.DetachHandler; |
9 | | -import io.jooby.internal.handler.ExecutorHandler; |
| 9 | +import io.jooby.internal.handler.WorkerExecHandler; |
10 | 10 | import io.jooby.internal.handler.FlowPublisherHandler; |
11 | 11 | import io.jooby.internal.handler.FluxHandler; |
12 | 12 | import io.jooby.internal.handler.MaybeHandler; |
|
24 | 24 |
|
25 | 25 | public class Pipeline { |
26 | 26 |
|
27 | | - public static Handler compute(ClassLoader loader, RouteImpl route, Mode mode) { |
| 27 | + public static Handler compute(ClassLoader loader, RouteImpl route, ExecutionMode mode) { |
28 | 28 | return provider(loader, Reified.rawType(route.returnType())).apply(mode, route); |
29 | 29 | } |
30 | 30 |
|
31 | | - private static BiFunction<Mode, RouteImpl, Handler> provider(ClassLoader loader, Class type) { |
| 31 | + private static BiFunction<ExecutionMode, RouteImpl, Handler> provider(ClassLoader loader, Class type) { |
32 | 32 | if (CompletionStage.class.isAssignableFrom(type)) { |
33 | 33 | return Pipeline::completableFuture; |
34 | 34 | } |
@@ -86,48 +86,58 @@ private static BiFunction<Mode, RouteImpl, Handler> provider(ClassLoader loader, |
86 | 86 | if (Flow.Publisher.class.isAssignableFrom(type)) { |
87 | 87 | return Pipeline::flowPublisher; |
88 | 88 | } |
89 | | - return (mode, route) -> next(mode, route.executor(), new DefaultHandler(route.pipeline())); |
| 89 | + return (mode, route) -> next(mode, route.executor(), new DefaultHandler(route.pipeline()), |
| 90 | + true); |
90 | 91 | } |
91 | 92 |
|
92 | | - private static Handler completableFuture(Mode mode, RouteImpl next) { |
| 93 | + private static Handler completableFuture(ExecutionMode mode, RouteImpl next) { |
93 | 94 | return next(mode, next.executor(), |
94 | | - new DetachHandler(new CompletionStageHandler(next.pipeline()))); |
| 95 | + new DetachHandler(new CompletionStageHandler(next.pipeline())), false); |
95 | 96 | } |
96 | 97 |
|
97 | | - private static Handler publisher(Mode mode, RouteImpl next) { |
98 | | - return next(mode, next.executor(), new DetachHandler(new PublisherHandler(next.pipeline()))); |
| 98 | + private static Handler publisher(ExecutionMode mode, RouteImpl next) { |
| 99 | + return next(mode, next.executor(), new DetachHandler(new PublisherHandler(next.pipeline())), |
| 100 | + false); |
99 | 101 | } |
100 | 102 |
|
101 | | - private static Handler observable(Mode mode, RouteImpl next) { |
102 | | - return next(mode, next.executor(), new DetachHandler(new ObservableHandler(next.pipeline()))); |
| 103 | + private static Handler observable(ExecutionMode mode, RouteImpl next) { |
| 104 | + return next(mode, next.executor(), new DetachHandler(new ObservableHandler(next.pipeline())), |
| 105 | + false); |
103 | 106 | } |
104 | 107 |
|
105 | | - private static Handler flux(Mode mode, RouteImpl next) { |
106 | | - return next(mode, next.executor(), new DetachHandler(new FluxHandler(next.pipeline()))); |
| 108 | + private static Handler flux(ExecutionMode mode, RouteImpl next) { |
| 109 | + return next(mode, next.executor(), new DetachHandler(new FluxHandler(next.pipeline())), false); |
107 | 110 | } |
108 | 111 |
|
109 | | - private static Handler mono(Mode mode, RouteImpl next) { |
110 | | - return next(mode, next.executor(), new DetachHandler(new MonoHandler(next.pipeline()))); |
| 112 | + private static Handler mono(ExecutionMode mode, RouteImpl next) { |
| 113 | + return next(mode, next.executor(), new DetachHandler(new MonoHandler(next.pipeline())), false); |
111 | 114 | } |
112 | 115 |
|
113 | | - private static Handler flowPublisher(Mode mode, RouteImpl next) { |
| 116 | + private static Handler flowPublisher(ExecutionMode mode, RouteImpl next) { |
114 | 117 | return next(mode, next.executor(), |
115 | | - new DetachHandler(new FlowPublisherHandler(next.pipeline()))); |
| 118 | + new DetachHandler(new FlowPublisherHandler(next.pipeline())), false); |
116 | 119 | } |
117 | 120 |
|
118 | | - private static Handler single(Mode mode, RouteImpl next) { |
119 | | - return next(mode, next.executor(), new DetachHandler(new SingleHandler(next.pipeline()))); |
| 121 | + private static Handler single(ExecutionMode mode, RouteImpl next) { |
| 122 | + return next(mode, next.executor(), new DetachHandler(new SingleHandler(next.pipeline())), |
| 123 | + false); |
120 | 124 | } |
121 | 125 |
|
122 | | - private static Handler maybe(Mode mode, RouteImpl next) { |
123 | | - return next(mode, next.executor(), new DetachHandler(new MaybeHandler(next.pipeline()))); |
| 126 | + private static Handler maybe(ExecutionMode mode, RouteImpl next) { |
| 127 | + return next(mode, next.executor(), new DetachHandler(new MaybeHandler(next.pipeline())), false); |
124 | 128 | } |
125 | 129 |
|
126 | | - private static Handler next(Mode mode, Executor executor, Handler handler) { |
| 130 | + private static Handler next(ExecutionMode mode, Executor executor, Handler handler, boolean blocking) { |
127 | 131 | if (executor == null) { |
128 | | - return mode == Mode.WORKER ? new WorkerHandler(handler) : handler; |
| 132 | + if (mode == ExecutionMode.WORKER) { |
| 133 | + return new WorkerHandler(handler); |
| 134 | + } |
| 135 | + if (mode == ExecutionMode.DEFAULT && blocking) { |
| 136 | + return new WorkerHandler(handler); |
| 137 | + } |
| 138 | + return handler; |
129 | 139 | } |
130 | | - return new ExecutorHandler(handler, executor); |
| 140 | + return new WorkerExecHandler(handler, executor); |
131 | 141 | } |
132 | 142 |
|
133 | 143 | private static Optional<Class> loadClass(ClassLoader loader, String name) { |
|
0 commit comments