Skip to content
This repository was archived by the owner on Mar 3, 2026. It is now read-only.

Commit 4828f3f

Browse files
committed
Deferred Executor
* deferred API: global executor fix jooby-project#488 * deferred API: local executor fix jooby-project#489
1 parent 34150b0 commit 4828f3f

File tree

20 files changed

+715
-122
lines changed

20 files changed

+715
-122
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.jooby.issues;
2+
3+
import static org.junit.Assert.assertEquals;
4+
5+
import org.jooby.Deferred;
6+
import org.jooby.test.ServerFeature;
7+
import org.junit.Test;
8+
9+
public class Issue484 extends ServerFeature {
10+
11+
{
12+
get("/484", req -> {
13+
String t1 = Thread.currentThread().getName();
14+
return new Deferred(deferred -> {
15+
deferred.resolve(t1 + ":" + Thread.currentThread().getName());
16+
});
17+
});
18+
19+
get("/484/promise", promise(deferred -> {
20+
String t1 = Thread.currentThread().getName();
21+
deferred.resolve(t1 + ":" + Thread.currentThread().getName());
22+
}));
23+
}
24+
25+
@Test
26+
public void deferredOnDefaultExecutor() throws Exception {
27+
request()
28+
.get("/484")
29+
.expect(rsp -> {
30+
String[] threads = rsp.split(":");
31+
assertEquals(threads[0], threads[1]);
32+
});
33+
34+
request()
35+
.get("/484/promise")
36+
.expect(rsp -> {
37+
String[] threads = rsp.split(":");
38+
assertEquals(threads[0], threads[1]);
39+
});
40+
}
41+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.jooby.issues;
2+
3+
import static org.junit.Assert.assertNotEquals;
4+
5+
import java.util.concurrent.ForkJoinPool;
6+
7+
import org.jooby.Deferred;
8+
import org.jooby.test.ServerFeature;
9+
import org.junit.Test;
10+
11+
public class Issue484b extends ServerFeature {
12+
13+
{
14+
executor(new ForkJoinPool());
15+
16+
get("/484", req -> {
17+
return new Deferred(deferred -> {
18+
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
19+
});
20+
});
21+
22+
get("/484/promise", promise(deferred -> {
23+
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
24+
}));
25+
}
26+
27+
@Test
28+
public void deferredWithExecutorInstance() throws Exception {
29+
request()
30+
.get("/484")
31+
.expect(rsp -> {
32+
System.out.println(rsp);
33+
String[] threads = rsp.split(":");
34+
assertNotEquals(threads[0], threads[1]);
35+
});
36+
37+
request()
38+
.get("/484/promise")
39+
.expect(rsp -> {
40+
System.out.println(rsp);
41+
String[] threads = rsp.split(":");
42+
assertNotEquals(threads[0], threads[1]);
43+
});
44+
}
45+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package org.jooby.issues;
2+
3+
import static org.junit.Assert.assertNotEquals;
4+
5+
import java.util.concurrent.Executor;
6+
import java.util.concurrent.Executors;
7+
8+
import org.jooby.Deferred;
9+
import org.jooby.test.ServerFeature;
10+
import org.junit.Test;
11+
12+
import com.google.inject.Key;
13+
import com.google.inject.name.Names;
14+
15+
public class Issue484c extends ServerFeature {
16+
17+
{
18+
executor("ste");
19+
20+
use((env, conf, binder) -> {
21+
binder.bind(Key.get(Executor.class, Names.named("ste")))
22+
.toInstance(Executors.newSingleThreadExecutor());
23+
});
24+
25+
get("/484", req -> {
26+
return new Deferred(deferred -> {
27+
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
28+
});
29+
});
30+
31+
get("/484/promise", promise((req, deferred) -> {
32+
deferred.resolve(deferred.callerThread() + ":" + Thread.currentThread().getName());
33+
}));
34+
}
35+
36+
@Test
37+
public void deferredWithExecutorReference() throws Exception {
38+
request()
39+
.get("/484")
40+
.expect(rsp -> {
41+
System.out.println(rsp);
42+
String[] threads = rsp.split(":");
43+
assertNotEquals(threads[0], threads[1]);
44+
});
45+
46+
request()
47+
.get("/484/promise")
48+
.expect(rsp -> {
49+
System.out.println(rsp);
50+
String[] threads = rsp.split(":");
51+
assertNotEquals(threads[0], threads[1]);
52+
});
53+
}
54+
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package org.jooby.issues;
2+
3+
import static org.junit.Assert.assertTrue;
4+
5+
import org.jooby.Deferred;
6+
import org.jooby.exec.Exec;
7+
import org.jooby.test.ServerFeature;
8+
import org.junit.Test;
9+
10+
import com.typesafe.config.ConfigFactory;
11+
import com.typesafe.config.ConfigValueFactory;
12+
13+
public class Issue484d extends ServerFeature {
14+
15+
{
16+
use(ConfigFactory.empty()
17+
.withValue("executors.fj", ConfigValueFactory.fromAnyRef("forkjoin = 2"))
18+
.withValue("executors.cached", ConfigValueFactory.fromAnyRef("cached")));
19+
20+
executor("fj");
21+
22+
use(new Exec());
23+
24+
get("/484", req -> new Deferred(deferred -> {
25+
deferred.resolve(Thread.currentThread().getName());
26+
}));
27+
28+
get("/484/cached", req -> new Deferred("cached", deferred -> {
29+
deferred.resolve(Thread.currentThread().getName());
30+
}));
31+
32+
get("/484/fj", promise(deferred -> {
33+
deferred.resolve(Thread.currentThread().getName());
34+
}));
35+
36+
get("/484/local/cached", promise("cached", (req, deferred) -> {
37+
deferred.resolve(Thread.currentThread().getName());
38+
}));
39+
40+
get("/484/local/fj", promise("fj", deferred -> {
41+
deferred.resolve(Thread.currentThread().getName());
42+
}));
43+
}
44+
45+
@Test
46+
public void deferredOnGloablOrLocalExecutor() throws Exception {
47+
request()
48+
.get("/484")
49+
.expect(rsp -> {
50+
assertTrue(rsp.startsWith("forkjoin"));
51+
});
52+
53+
request()
54+
.get("/484/cached")
55+
.expect(rsp -> {
56+
assertTrue(rsp.startsWith("cached"));
57+
});
58+
59+
request()
60+
.get("/484/fj")
61+
.expect(rsp -> {
62+
assertTrue(rsp.startsWith("forkjoin"));
63+
});
64+
65+
request()
66+
.get("/484/local/cached")
67+
.expect(rsp -> {
68+
assertTrue(rsp.startsWith("cached"));
69+
});
70+
71+
request()
72+
.get("/484/local/fj")
73+
.expect(rsp -> {
74+
assertTrue(rsp.startsWith("forkjoin"));
75+
});
76+
}
77+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.jooby.issues;
2+
3+
import static org.junit.Assert.assertTrue;
4+
5+
import java.util.concurrent.Executors;
6+
import java.util.concurrent.ForkJoinPool;
7+
8+
import org.jooby.test.ServerFeature;
9+
import org.junit.Test;
10+
11+
public class Issue485 extends ServerFeature {
12+
13+
{
14+
executor(new ForkJoinPool());
15+
executor("cached", Executors.newCachedThreadPool());
16+
17+
get("/485/fj", promise(deferred -> {
18+
deferred.resolve(Thread.currentThread().getName());
19+
}));
20+
21+
get("/485/cached", promise("cached", deferred -> {
22+
deferred.resolve(Thread.currentThread().getName());
23+
}));
24+
25+
}
26+
27+
@Test
28+
public void globalOrLocalExecutor() throws Exception {
29+
request()
30+
.get("/485/fj")
31+
.expect(rsp -> {
32+
assertTrue(rsp.toLowerCase().startsWith("forkjoinpool"));
33+
});
34+
35+
request()
36+
.get("/485/cached")
37+
.expect(rsp -> {
38+
assertTrue(rsp.toLowerCase().startsWith("pool"));
39+
});
40+
}
41+
}

jooby-jetty/src/main/java/org/jooby/internal/jetty/JettyHandler.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -78,28 +78,7 @@ public void handle(final String target, final Request baseRequest,
7878
}
7979

8080
ServletServletRequest nreq = new ServletServletRequest(request, tmpdir, multipart)
81-
.with(new ServletUpgrade() {
82-
83-
@SuppressWarnings("unchecked")
84-
@Override
85-
public <T> T upgrade(final Class<T> type) throws Exception {
86-
if (type == NativeWebSocket.class
87-
&& webSocketServerFactory.isUpgradeRequest(request, response)
88-
&& webSocketServerFactory.acceptWebSocket(request, response)) {
89-
String key = JettyWebSocket.class.getName();
90-
NativeWebSocket ws = (NativeWebSocket) request.getAttribute(key);
91-
if (ws != null) {
92-
request.removeAttribute(key);
93-
return (T) ws;
94-
}
95-
} else if (type == Sse.class) {
96-
return (T) new JettySse(baseRequest, (Response) response);
97-
} else if (type == NativePushPromise.class) {
98-
return (T) new JettyPush(baseRequest);
99-
}
100-
throw new UnsupportedOperationException("Not Supported: " + type);
101-
}
102-
});
81+
.with(upgrade(baseRequest, request, response, webSocketServerFactory));
10382
dispatcher.handle(nreq, new JettyResponse(nreq, response));
10483
} catch (IOException | ServletException | RuntimeException ex) {
10584
baseRequest.setHandled(false);
@@ -112,4 +91,29 @@ public <T> T upgrade(final Class<T> type) throws Exception {
11291
}
11392
}
11493

94+
private static ServletUpgrade upgrade(final Request baseRequest, final HttpServletRequest request,
95+
final HttpServletResponse response, final WebSocketServerFactory webSocketServerFactory) {
96+
return new ServletUpgrade() {
97+
@SuppressWarnings("unchecked")
98+
@Override
99+
public <T> T upgrade(final Class<T> type) throws Exception {
100+
if (type == NativeWebSocket.class
101+
&& webSocketServerFactory.isUpgradeRequest(request, response)
102+
&& webSocketServerFactory.acceptWebSocket(request, response)) {
103+
String key = JettyWebSocket.class.getName();
104+
NativeWebSocket ws = (NativeWebSocket) request.getAttribute(key);
105+
if (ws != null) {
106+
request.removeAttribute(key);
107+
return (T) ws;
108+
}
109+
} else if (type == Sse.class) {
110+
return (T) new JettySse(baseRequest, (Response) response);
111+
} else if (type == NativePushPromise.class) {
112+
return (T) new JettyPush(baseRequest);
113+
}
114+
throw new UnsupportedOperationException("Not Supported: " + type);
115+
}
116+
};
117+
}
118+
115119
}

jooby-jetty/src/main/java/org/jooby/internal/jetty/JettyResponse.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.nio.channels.Channels;
2424
import java.nio.channels.FileChannel;
2525

26+
import javax.servlet.http.HttpServletRequest;
2627
import javax.servlet.http.HttpServletResponse;
2728

2829
import org.eclipse.jetty.server.HttpOutput;
@@ -60,7 +61,7 @@ public void send(final ByteBuffer buffer) throws Exception {
6061
@Override
6162
public void send(final InputStream stream) throws Exception {
6263
endRequest = false;
63-
nreq.startAsync();
64+
startAsyncIfNeedIt();
6465
sender().sendContent(Channels.newChannel(stream), this);
6566
}
6667

@@ -72,7 +73,7 @@ public void send(final FileChannel channel) throws Exception {
7273
sender().sendContent(channel);
7374
} else {
7475
endRequest = false;
75-
nreq.startAsync();
76+
startAsyncIfNeedIt();
7677
sender().sendContent(channel, this);
7778
}
7879
}
@@ -107,4 +108,10 @@ private HttpOutput sender() {
107108
return ((Response) rsp).getHttpOutput();
108109
}
109110

111+
private void startAsyncIfNeedIt() {
112+
HttpServletRequest req = nreq.servletRequest();
113+
if (!req.isAsyncStarted()) {
114+
req.startAsync();
115+
}
116+
}
110117
}

jooby-jetty/src/main/java/org/jooby/internal/jetty/JettyServer.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.eclipse.jetty.server.Server;
4040
import org.eclipse.jetty.server.ServerConnector;
4141
import org.eclipse.jetty.server.SslConnectionFactory;
42+
import org.eclipse.jetty.server.handler.ContextHandler;
4243
import org.eclipse.jetty.util.ssl.SslContextFactory;
4344
import org.eclipse.jetty.util.thread.QueuedThreadPool;
4445
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
@@ -116,9 +117,14 @@ private Server server(final HttpHandler handler, final Config conf,
116117
return ws;
117118
});
118119

119-
server.setHandler(new JettyHandler(handler, webSocketServerFactory, conf
120+
ContextHandler sch = new ContextHandler();
121+
// always '/' context path is internally handle by jooby
122+
sch.setContextPath("/");
123+
sch.setHandler(new JettyHandler(handler, webSocketServerFactory, conf
120124
.getString("application.tmpdir"), conf.getBytes("jetty.FileSizeThreshold").intValue()));
121125

126+
server.setHandler(sch);
127+
122128
return server;
123129
}
124130

0 commit comments

Comments
 (0)