Skip to content

Commit a38e881

Browse files
committed
Refactor: Decouple gRPC module using server-agnostic SPI
This commit removes the gRPC module's tight coupling to Jooby's core request lifecycle (`Context` and `Sender`) and introduces a clean, zero-dependency Service Provider Interface (SPI). This prevents gRPC specifics (like HTTP/2 trailers and framing) from polluting the standard HTTP/1.1 pipeline. Core changes: - Add `GrpcExchange` and `GrpcProcessor` SPI to `jooby-core`. - Refactor `UnifiedGrpcBridge` to act as a pure protocol bridge using Java `Flow` and `ByteBuffer`, entirely isolated from web core classes. - Fix circular dependencies in the reactive `GrpcRequestBridge`. Server Implementations: - Jetty: Implement `JettyGrpcExchange` and `JettyGrpcInputBridge`. Fixed early commit issues by eagerly registering `TrailersSupplier`. - Netty: Implement `NettyGrpcExchange` and `NettyGrpcInputBridge`. Map trailing headers natively via `LastHttpContent` and properly intercept HTTP/2 pipeline streams. - Undertow: Implement `UndertowGrpcExchange`, `UndertowGrpcInputBridge`, and remove hardcoded gRPC checks from standard `UndertowHandler`. Fixed XNIO non-blocking event loop stalls using `wakeupReads()` and implemented manual, synchronous `StreamSinkChannel` flushing to prevent bidirectional deadlocks. All three servers now handle reactive backpressure natively and achieve 100% compliance with strict HTTP/2 clients like `grpcurl`.
1 parent ae1a940 commit a38e881

File tree

31 files changed

+1101
-350
lines changed

31 files changed

+1101
-350
lines changed

jooby/src/main/java/io/jooby/Context.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,15 +1101,6 @@ default Value lookup(String name) {
11011101
*/
11021102
Context setResponseHeader(@NonNull String name, @NonNull String value);
11031103

1104-
/**
1105-
* Set response trailer header.
1106-
*
1107-
* @param name Header name.
1108-
* @param value Header value.
1109-
* @return This context.
1110-
*/
1111-
Context setResponseTrailer(@NonNull String name, @NonNull String value);
1112-
11131104
/**
11141105
* Remove a response header.
11151106
*
@@ -1260,13 +1251,6 @@ Context responseStream(
12601251
*
12611252
* @return HTTP channel as chunker. Usually for chunked response.
12621253
*/
1263-
Sender responseSender(boolean startResponse);
1264-
1265-
/**
1266-
* HTTP response channel as chunker. Mark the response as started.
1267-
*
1268-
* @return HTTP channel as chunker. Usually for chunked response.
1269-
*/
12701254
Sender responseSender();
12711255

12721256
/**

jooby/src/main/java/io/jooby/DefaultContext.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -559,11 +559,6 @@ default Context render(@NonNull Object value) {
559559
}
560560
}
561561

562-
@Override
563-
default Sender responseSender() {
564-
return responseSender(true);
565-
}
566-
567562
@Override
568563
default OutputStream responseStream(@NonNull MediaType contentType) {
569564
setResponseType(contentType);

jooby/src/main/java/io/jooby/ForwardingContext.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,12 +1096,6 @@ public Context setResponseHeader(@NonNull String name, @NonNull Date value) {
10961096
return this;
10971097
}
10981098

1099-
@Override
1100-
public Context setResponseTrailer(@NonNull String name, @NonNull String value) {
1101-
ctx.setResponseHeader(name, value);
1102-
return this;
1103-
}
1104-
11051099
@Override
11061100
public Context setResponseHeader(@NonNull String name, @NonNull Instant value) {
11071101
ctx.setResponseHeader(name, value);
@@ -1228,11 +1222,6 @@ public Sender responseSender() {
12281222
return ctx.responseSender();
12291223
}
12301224

1231-
@Override
1232-
public Sender responseSender(boolean startResponse) {
1233-
return ctx.responseSender(startResponse);
1234-
}
1235-
12361225
@Override
12371226
public PrintWriter responseWriter() {
12381227
return ctx.responseWriter();
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby;
7+
8+
import java.nio.ByteBuffer;
9+
import java.util.Map;
10+
import java.util.function.Consumer;
11+
12+
/** Server-agnostic abstraction for HTTP/2 trailing-header exchanges. */
13+
public interface GrpcExchange {
14+
15+
String getRequestPath();
16+
17+
String getHeader(String name);
18+
19+
Map<String, String> getHeaders();
20+
21+
/** Write framed bytes to the underlying non-blocking socket. */
22+
void send(ByteBuffer payload, Consumer<Throwable> onFailure);
23+
24+
/**
25+
* Closes the HTTP/2 stream with trailing headers.
26+
*
27+
* @param statusCode The gRPC status code (e.g., 0 for OK, 12 for UNIMPLEMENTED).
28+
* @param description Optional status message.
29+
*/
30+
void close(int statusCode, String description);
31+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Jooby https://jooby.io
3+
* Apache License Version 2.0 https://jooby.io/LICENSE.txt
4+
* Copyright 2014 Edgar Espina
5+
*/
6+
package io.jooby;
7+
8+
import java.nio.ByteBuffer;
9+
import java.util.concurrent.Flow;
10+
11+
import edu.umd.cs.findbugs.annotations.NonNull;
12+
13+
/** Intercepts and processes gRPC exchanges. */
14+
public interface GrpcProcessor {
15+
/**
16+
* @return A subscriber that the server will feed ByteBuffer chunks into, or null if the exchange
17+
* was rejected/unimplemented.
18+
*/
19+
Flow.Subscriber<ByteBuffer> process(@NonNull GrpcExchange exchange);
20+
}

jooby/src/main/java/io/jooby/Sender.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,6 @@ default Sender write(@NonNull String data, @NonNull Callback callback) {
7373
return write(data, StandardCharsets.UTF_8, callback);
7474
}
7575

76-
/**
77-
* Set response trailer header.
78-
*
79-
* @param name Header name.
80-
* @param value Header value.
81-
* @return This context.
82-
*/
83-
Sender setTrailer(@NonNull String name, @NonNull String value);
84-
8576
/**
8677
* Write a string chunk. Chunk is flushed immediately.
8778
*

jooby/src/main/java/io/jooby/internal/HeadContext.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,6 @@ public Sender write(@NonNull Output output, @NonNull Callback callback) {
190190
return this;
191191
}
192192

193-
@Override
194-
public Sender setTrailer(@NonNull String name, @NonNull String value) {
195-
return this;
196-
}
197-
198193
@Override
199194
public void close() {}
200195
}

modules/jooby-grpc/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@
1818
<version>${jooby.version}</version>
1919
</dependency>
2020

21+
<dependency>
22+
<groupId>org.slf4j</groupId>
23+
<artifactId>jul-to-slf4j</artifactId>
24+
<version>${slf4j.version}</version> </dependency>
25+
2126
<dependency>
2227
<groupId>io.grpc</groupId>
2328
<artifactId>grpc-protobuf</artifactId>

modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcModule.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,29 @@
77

88
import java.util.List;
99

10+
import org.slf4j.bridge.SLF4JBridgeHandler;
11+
1012
import edu.umd.cs.findbugs.annotations.NonNull;
1113
import io.grpc.BindableService;
1214
import io.grpc.Server;
1315
import io.grpc.inprocess.InProcessChannelBuilder;
1416
import io.grpc.inprocess.InProcessServerBuilder;
1517
import io.grpc.protobuf.services.ProtoReflectionServiceV1;
16-
import io.jooby.Extension;
17-
import io.jooby.Jooby;
18+
import io.jooby.*;
1819

1920
public class GrpcModule implements Extension {
2021
private final List<BindableService> services;
2122
private final GrpcMethodRegistry methodRegistry = new GrpcMethodRegistry();
2223
private final String serverName = "jooby-internal-" + System.nanoTime();
2324
private Server grpcServer;
2425

26+
static {
27+
// Optionally remove existing handlers attached to the j.u.l root logger
28+
SLF4JBridgeHandler.removeHandlersForRootLogger();
29+
// Install the SLF4J bridge
30+
SLF4JBridgeHandler.install();
31+
}
32+
2533
public GrpcModule(BindableService... services) {
2634
this.services = List.of(services);
2735
}
@@ -36,7 +44,6 @@ public void install(@NonNull Jooby app) throws Exception {
3644
methodRegistry.registerService(service);
3745
}
3846

39-
// 2. Register stable gRPC Server Reflection (v1)
4047
BindableService reflectionService = ProtoReflectionServiceV1.newInstance();
4148
builder.addService(reflectionService);
4249
methodRegistry.registerService(reflectionService);
@@ -45,10 +52,14 @@ public void install(@NonNull Jooby app) throws Exception {
4552

4653
var channel = InProcessChannelBuilder.forName(serverName).directExecutor().build();
4754

48-
UnifiedGrpcBridge bridge = new UnifiedGrpcBridge(channel, methodRegistry);
55+
var bridge = new UnifiedGrpcBridge(channel, methodRegistry);
56+
// Register it in the Service Registry so the server layer can find it
57+
app.getServices().put(UnifiedGrpcBridge.class, bridge);
58+
59+
app.getServices().put(GrpcProcessor.class, bridge);
4960

5061
// Mount the bridge.
51-
app.post("/*", bridge);
62+
// app.post("/*", bridge);
5263

5364
app.onStop(
5465
() -> {

modules/jooby-grpc/src/main/java/io/jooby/grpc/GrpcRequestBridge.java

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,51 +13,77 @@
1313
import org.slf4j.Logger;
1414
import org.slf4j.LoggerFactory;
1515

16+
import io.grpc.ClientCall;
17+
import io.grpc.MethodDescriptor;
1618
import io.grpc.stub.ClientCallStreamObserver;
17-
import io.grpc.stub.StreamObserver;
19+
import io.grpc.stub.ClientCalls;
20+
import io.grpc.stub.ClientResponseObserver;
1821

1922
public class GrpcRequestBridge implements Subscriber<ByteBuffer> {
2023

2124
private final Logger log = LoggerFactory.getLogger(getClass());
22-
private final ClientCallStreamObserver<byte[]> internalObserver;
23-
private final GrpcDeframer deframer;
24-
private Subscription subscription;
25+
private final GrpcDeframer deframer = new GrpcDeframer();
2526
private final AtomicBoolean completed = new AtomicBoolean(false);
2627

27-
public GrpcRequestBridge(StreamObserver<byte[]> internalObserver) {
28-
this.deframer = new GrpcDeframer();
29-
this.internalObserver = (ClientCallStreamObserver<byte[]>) internalObserver;
28+
private final ClientCall<byte[], byte[]> call;
29+
private final MethodDescriptor.MethodType methodType;
30+
private final boolean isUnaryOrServerStreaming;
31+
32+
private ClientResponseObserver<byte[], byte[]> responseObserver;
33+
private ClientCallStreamObserver<byte[]> requestObserver;
34+
private Subscription subscription;
35+
private byte[] singlePayload;
36+
37+
public GrpcRequestBridge(
38+
ClientCall<byte[], byte[]> call, MethodDescriptor.MethodType methodType) {
39+
this.call = call;
40+
this.methodType = methodType;
41+
this.isUnaryOrServerStreaming =
42+
methodType == MethodDescriptor.MethodType.UNARY
43+
|| methodType == MethodDescriptor.MethodType.SERVER_STREAMING;
44+
}
45+
46+
public void setResponseObserver(ClientResponseObserver<byte[], byte[]> responseObserver) {
47+
this.responseObserver = responseObserver;
48+
}
49+
50+
public void setRequestObserver(ClientCallStreamObserver<byte[]> requestObserver) {
51+
this.requestObserver = requestObserver;
52+
}
53+
54+
public void onGrpcReady() {
55+
if (subscription != null
56+
&& requestObserver != null
57+
&& requestObserver.isReady()
58+
&& !completed.get()) {
59+
subscription.request(1);
60+
}
3061
}
3162

3263
@Override
3364
public void onSubscribe(Subscription subscription) {
3465
this.subscription = subscription;
35-
36-
// Wire gRPC readiness to Jooby's Flow.Subscription
37-
internalObserver.setOnReadyHandler(
38-
() -> {
39-
if (internalObserver.isReady() && !completed.get()) {
40-
subscription.request(1);
41-
}
42-
});
43-
44-
// Initial demand
66+
// Initial demand to kick off the network body reader
4567
subscription.request(1);
4668
}
4769

4870
@Override
4971
public void onNext(ByteBuffer item) {
5072
try {
51-
// Pass the zero-copy buffer straight to the deframer
5273
deframer.process(
5374
item,
5475
msg -> {
55-
internalObserver.onNext(msg);
76+
if (isUnaryOrServerStreaming) {
77+
singlePayload = msg;
78+
} else {
79+
requestObserver.onNext(msg);
80+
}
5681
});
5782

58-
// Only request more from the server if gRPC is ready
59-
if (internalObserver.isReady()) {
60-
subscription.request(1);
83+
if (isUnaryOrServerStreaming) {
84+
subscription.request(1); // Keep reading until EOF for unary/server-streaming
85+
} else if (requestObserver != null && requestObserver.isReady()) {
86+
subscription.request(1); // Ask for more if the streaming gRPC buffer is ready
6187
}
6288
} catch (Throwable t) {
6389
subscription.cancel();
@@ -69,14 +95,27 @@ public void onNext(ByteBuffer item) {
6995
public void onError(Throwable throwable) {
7096
if (completed.compareAndSet(false, true)) {
7197
log.error("Error in gRPC request stream", throwable);
72-
internalObserver.onError(throwable);
98+
if (requestObserver != null) {
99+
requestObserver.onError(throwable);
100+
} else if (responseObserver != null) {
101+
responseObserver.onError(throwable);
102+
}
73103
}
74104
}
75105

76106
@Override
77107
public void onComplete() {
78108
if (completed.compareAndSet(false, true)) {
79-
internalObserver.onCompleted();
109+
if (isUnaryOrServerStreaming) {
110+
byte[] payload = singlePayload == null ? new byte[0] : singlePayload;
111+
if (methodType == MethodDescriptor.MethodType.UNARY) {
112+
ClientCalls.asyncUnaryCall(call, payload, responseObserver);
113+
} else {
114+
ClientCalls.asyncServerStreamingCall(call, payload, responseObserver);
115+
}
116+
} else if (requestObserver != null) {
117+
requestObserver.onCompleted();
118+
}
80119
}
81120
}
82121
}

0 commit comments

Comments
 (0)