Skip to content

Commit 7f55e81

Browse files
lryanejona86
authored andcommitted
Allow use of a LocalChannel with Netty & HTTP2
Remove old in-process handling Update tests and benchmarks ------------- Created by MOE: http://code.google.com/p/moe-java MOE_MIGRATED_REVID=81381986
1 parent 9dd0c94 commit 7f55e81

File tree

8 files changed

+79
-267
lines changed

8 files changed

+79
-267
lines changed

netty/src/main/java/com/google/net/stubby/transport/netty/Http2Negotiator.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package com.google.net.stubby.transport.netty;
22

33
import com.google.common.base.Preconditions;
4-
import com.google.common.collect.ImmutableList;
54
import com.google.common.util.concurrent.ListenableFuture;
65
import com.google.common.util.concurrent.SettableFuture;
76

@@ -10,7 +9,6 @@
109
import io.netty.channel.ChannelHandlerAdapter;
1110
import io.netty.channel.ChannelHandlerContext;
1211
import io.netty.channel.ChannelInitializer;
13-
import io.netty.channel.socket.SocketChannel;
1412
import io.netty.handler.codec.http.DefaultHttpRequest;
1513
import io.netty.handler.codec.http.HttpClientCodec;
1614
import io.netty.handler.codec.http.HttpClientUpgradeHandler;
@@ -58,7 +56,7 @@ public interface Negotiation {
5856
/**
5957
* Gets the {@link ChannelInitializer} for negotiating the protocol.
6058
*/
61-
ChannelInitializer<SocketChannel> initializer();
59+
ChannelInitializer<Channel> initializer();
6260

6361
void onConnected(Channel channel);
6462

@@ -90,9 +88,9 @@ public static Negotiation tls(final ChannelHandler handler, final SSLEngine sslE
9088
if (!installJettyTLSProtocolSelection(sslEngine, completeFuture, false)) {
9189
throw new IllegalStateException("NPN/ALPN extensions not installed");
9290
}
93-
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
91+
final ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
9492
@Override
95-
public void initChannel(final SocketChannel ch) throws Exception {
93+
public void initChannel(final Channel ch) throws Exception {
9694
SslHandler sslHandler = new SslHandler(sslEngine, false);
9795
sslHandler.handshakeFuture().addListener(
9896
new GenericFutureListener<Future<? super Channel>>() {
@@ -112,7 +110,7 @@ public void operationComplete(Future<? super Channel> future) throws Exception {
112110

113111
return new Negotiation() {
114112
@Override
115-
public ChannelInitializer<SocketChannel> initializer() {
113+
public ChannelInitializer<Channel> initializer() {
116114
return initializer;
117115
}
118116

@@ -138,17 +136,17 @@ public static Negotiation plaintextUpgrade(final Http2ConnectionHandler handler)
138136
final HttpClientUpgradeHandler upgrader =
139137
new HttpClientUpgradeHandler(httpClientCodec, upgradeCodec, 1000);
140138
final UpgradeCompletionHandler completionHandler = new UpgradeCompletionHandler();
141-
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
139+
final ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
142140
@Override
143-
public void initChannel(SocketChannel ch) throws Exception {
141+
public void initChannel(Channel ch) throws Exception {
144142
ch.pipeline().addLast(upgrader);
145143
ch.pipeline().addLast(completionHandler);
146144
}
147145
};
148146

149147
return new Negotiation() {
150148
@Override
151-
public ChannelInitializer<SocketChannel> initializer() {
149+
public ChannelInitializer<Channel> initializer() {
152150
return initializer;
153151
}
154152

@@ -172,16 +170,16 @@ public void onConnected(Channel channel) {
172170
* Create a "no-op" negotiation that simply assumes the protocol to already be negotiated.
173171
*/
174172
public static Negotiation plaintext(final ChannelHandler handler) {
175-
final ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
173+
final ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() {
176174
@Override
177-
public void initChannel(SocketChannel ch) throws Exception {
175+
public void initChannel(Channel ch) throws Exception {
178176
ch.pipeline().addLast(handler);
179177
}
180178
};
181179
return new Negotiation() {
182180
private final SettableFuture<Void> completeFuture = SettableFuture.create();
183181
@Override
184-
public ChannelInitializer<SocketChannel> initializer() {
182+
public ChannelInitializer<Channel> initializer() {
185183
return initializer;
186184
}
187185

netty/src/main/java/com/google/net/stubby/transport/netty/NettyChannelBuilder.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@
99
import io.netty.handler.ssl.SslContext;
1010

1111
import java.net.InetSocketAddress;
12+
import java.net.SocketAddress;
1213

1314
/**
1415
* Convenient class for building channels with the netty transport.
1516
*/
1617
public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChannelBuilder> {
1718

18-
private final InetSocketAddress serverAddress;
19+
private final SocketAddress serverAddress;
1920

2021
private NegotiationType negotiationType = NegotiationType.TLS;
2122
private EventLoopGroup userEventLoopGroup;
@@ -24,7 +25,7 @@ public final class NettyChannelBuilder extends AbstractChannelBuilder<NettyChann
2425
/**
2526
* Creates a new builder with the given server address.
2627
*/
27-
public static NettyChannelBuilder forAddress(InetSocketAddress serverAddress) {
28+
public static NettyChannelBuilder forAddress(SocketAddress serverAddress) {
2829
return new NettyChannelBuilder(serverAddress);
2930
}
3031

@@ -35,7 +36,7 @@ public static NettyChannelBuilder forAddress(String host, int port) {
3536
return forAddress(new InetSocketAddress(host, port));
3637
}
3738

38-
private NettyChannelBuilder(InetSocketAddress serverAddress) {
39+
private NettyChannelBuilder(SocketAddress serverAddress) {
3940
this.serverAddress = serverAddress;
4041
}
4142

netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientTransport.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import io.netty.channel.ChannelFuture;
1919
import io.netty.channel.ChannelFutureListener;
2020
import io.netty.channel.EventLoopGroup;
21+
import io.netty.channel.local.LocalAddress;
22+
import io.netty.channel.local.LocalChannel;
2123
import io.netty.channel.socket.nio.NioSocketChannel;
2224
import io.netty.handler.codec.AsciiString;
2325
import io.netty.handler.codec.http2.DefaultHttp2Connection;
@@ -38,6 +40,7 @@
3840
import io.netty.util.internal.logging.InternalLogLevel;
3941

4042
import java.net.InetSocketAddress;
43+
import java.net.SocketAddress;
4144
import java.util.concurrent.ExecutionException;
4245

4346
import javax.net.ssl.SSLEngine;
@@ -49,21 +52,31 @@
4952
*/
5053
class NettyClientTransport extends AbstractClientTransport {
5154

52-
private final InetSocketAddress address;
55+
private final SocketAddress address;
5356
private final EventLoopGroup eventGroup;
5457
private final Http2Negotiator.Negotiation negotiation;
5558
private final NettyClientHandler handler;
5659
private final boolean ssl;
5760
private final AsciiString authority;
5861
private Channel channel;
5962

60-
NettyClientTransport(InetSocketAddress address, NegotiationType negotiationType,
63+
NettyClientTransport(SocketAddress address, NegotiationType negotiationType,
6164
EventLoopGroup eventGroup, SslContext sslContext) {
6265
Preconditions.checkNotNull(negotiationType, "negotiationType");
6366
this.address = Preconditions.checkNotNull(address, "address");
6467
this.eventGroup = Preconditions.checkNotNull(eventGroup, "eventGroup");
6568

66-
authority = new AsciiString(address.getHostString() + ":" + address.getPort());
69+
InetSocketAddress inetAddress = null;
70+
if (address instanceof InetSocketAddress) {
71+
inetAddress = (InetSocketAddress) address;
72+
authority = new AsciiString(inetAddress.getHostString() + ":" + inetAddress.getPort());
73+
} else if (address instanceof LocalAddress) {
74+
authority = new AsciiString(address.toString());
75+
Preconditions.checkArgument(negotiationType != NegotiationType.TLS,
76+
"TLS not supported for in-process transport");
77+
} else {
78+
throw new IllegalStateException("Unknown socket address type " + address.toString());
79+
}
6780

6881
handler = newHandler();
6982
switch (negotiationType) {
@@ -85,7 +98,7 @@ class NettyClientTransport extends AbstractClientTransport {
8598
}
8699
// TODO(user): specify allocator. The method currently ignores it though.
87100
SSLEngine sslEngine
88-
= sslContext.newEngine(null, address.getHostString(), address.getPort());
101+
= sslContext.newEngine(null, inetAddress.getHostString(), inetAddress.getPort());
89102
SSLParameters sslParams = new SSLParameters();
90103
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
91104
sslEngine.setSSLParameters(sslParams);
@@ -127,8 +140,12 @@ protected ClientStream newStreamInternal(MethodDescriptor<?, ?> method, Metadata
127140
protected void doStart() {
128141
Bootstrap b = new Bootstrap();
129142
b.group(eventGroup);
130-
b.channel(NioSocketChannel.class);
131-
b.option(SO_KEEPALIVE, true);
143+
if (address instanceof LocalAddress) {
144+
b.channel(LocalChannel.class);
145+
} else {
146+
b.channel(NioSocketChannel.class);
147+
b.option(SO_KEEPALIVE, true);
148+
}
132149
b.handler(negotiation.initializer());
133150

134151
// Start the connection operation to the server.

netty/src/main/java/com/google/net/stubby/transport/netty/NettyClientTransportFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,19 @@
66
import io.netty.channel.EventLoopGroup;
77
import io.netty.handler.ssl.SslContext;
88

9-
import java.net.InetSocketAddress;
9+
import java.net.SocketAddress;
1010

1111
/**
1212
* Factory that manufactures instances of {@link NettyClientTransport}.
1313
*/
1414
class NettyClientTransportFactory implements ClientTransportFactory {
1515

16-
private final InetSocketAddress address;
16+
private final SocketAddress address;
1717
private final NegotiationType negotiationType;
1818
private final EventLoopGroup group;
1919
private final SslContext sslContext;
2020

21-
public NettyClientTransportFactory(InetSocketAddress address, NegotiationType negotiationType,
21+
public NettyClientTransportFactory(SocketAddress address, NegotiationType negotiationType,
2222
EventLoopGroup group, SslContext sslContext) {
2323
this.address = Preconditions.checkNotNull(address, "address");
2424
this.group = Preconditions.checkNotNull(group, "group");

netty/src/main/java/com/google/net/stubby/transport/netty/NettyServer.java

Lines changed: 21 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,16 @@
77
import com.google.common.util.concurrent.AbstractService;
88
import com.google.net.stubby.transport.ServerListener;
99

10+
import java.net.SocketAddress;
11+
1012
import io.netty.bootstrap.ServerBootstrap;
1113
import io.netty.channel.Channel;
1214
import io.netty.channel.ChannelFuture;
1315
import io.netty.channel.ChannelFutureListener;
1416
import io.netty.channel.ChannelInitializer;
1517
import io.netty.channel.EventLoopGroup;
16-
import io.netty.channel.socket.SocketChannel;
18+
import io.netty.channel.local.LocalAddress;
19+
import io.netty.channel.local.LocalServerChannel;
1720
import io.netty.channel.socket.nio.NioServerSocketChannel;
1821
import io.netty.handler.ssl.SslContext;
1922

@@ -24,26 +27,26 @@
2427
* Netty-based server.
2528
*/
2629
public class NettyServer extends AbstractService {
27-
private final int port;
28-
private final ChannelInitializer<SocketChannel> channelInitializer;
30+
private final SocketAddress address;
31+
private final ChannelInitializer<Channel> channelInitializer;
2932
private final EventLoopGroup bossGroup;
3033
private final EventLoopGroup workerGroup;
3134
private Channel channel;
3235

33-
public NettyServer(ServerListener serverListener, int port, EventLoopGroup bossGroup,
36+
public NettyServer(ServerListener serverListener, SocketAddress address, EventLoopGroup bossGroup,
3437
EventLoopGroup workerGroup) {
35-
this(serverListener, port, bossGroup, workerGroup, null);
38+
this(serverListener, address, bossGroup, workerGroup, null);
3639
}
3740

38-
public NettyServer(final ServerListener serverListener, int port, EventLoopGroup bossGroup,
41+
public NettyServer(final ServerListener serverListener, SocketAddress address,
42+
EventLoopGroup bossGroup,
3943
EventLoopGroup workerGroup, @Nullable final SslContext sslContext) {
4044
Preconditions.checkNotNull(bossGroup, "bossGroup");
4145
Preconditions.checkNotNull(workerGroup, "workerGroup");
42-
Preconditions.checkArgument(port >= 0, "port must be positive");
43-
this.port = port;
44-
this.channelInitializer = new ChannelInitializer<SocketChannel>() {
46+
this.address = address;
47+
this.channelInitializer = new ChannelInitializer<Channel>() {
4548
@Override
46-
public void initChannel(SocketChannel ch) throws Exception {
49+
public void initChannel(Channel ch) throws Exception {
4750
NettyServerTransport transport = new NettyServerTransport(ch, serverListener, sslContext);
4851
transport.startAsync();
4952
// TODO(user): Should we wait for transport shutdown before shutting down server?
@@ -57,13 +60,17 @@ public void initChannel(SocketChannel ch) throws Exception {
5760
protected void doStart() {
5861
ServerBootstrap b = new ServerBootstrap();
5962
b.group(bossGroup, workerGroup);
60-
b.channel(NioServerSocketChannel.class);
61-
b.option(SO_BACKLOG, 128);
62-
b.childOption(SO_KEEPALIVE, true);
63+
if (address instanceof LocalAddress) {
64+
b.channel(LocalServerChannel.class);
65+
} else {
66+
b.channel(NioServerSocketChannel.class);
67+
b.option(SO_BACKLOG, 128);
68+
b.childOption(SO_KEEPALIVE, true);
69+
}
6370
b.childHandler(channelInitializer);
6471

6572
// Bind and start to accept incoming connections.
66-
b.bind(port).addListener(new ChannelFutureListener() {
73+
b.bind(address).addListener(new ChannelFutureListener() {
6774
@Override
6875
public void operationComplete(ChannelFuture future) throws Exception {
6976
if (future.isSuccess()) {

netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
import com.google.net.stubby.SharedResourceHolder;
88
import com.google.net.stubby.transport.ServerListener;
99

10+
import java.net.InetSocketAddress;
11+
import java.net.SocketAddress;
12+
1013
import io.netty.channel.EventLoopGroup;
1114
import io.netty.handler.ssl.SslContext;
1215

@@ -15,7 +18,7 @@
1518
*/
1619
public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerBuilder> {
1720

18-
private final int port;
21+
private final SocketAddress address;
1922

2023
private EventLoopGroup userBossEventLoopGroup;
2124
private EventLoopGroup userWorkerEventLoopGroup;
@@ -29,13 +32,21 @@ public static NettyServerBuilder forRegistryAndPort(HandlerRegistry registry, in
2932
return new NettyServerBuilder(registry, port);
3033
}
3134

35+
public static NettyServerBuilder forAddress(SocketAddress address) {
36+
return new NettyServerBuilder(address);
37+
}
38+
3239
private NettyServerBuilder(int port) {
33-
this.port = port;
40+
this.address = new InetSocketAddress(port);
3441
}
3542

3643
private NettyServerBuilder(HandlerRegistry registry, int port) {
3744
super(registry);
38-
this.port = port;
45+
this.address = new InetSocketAddress(port);
46+
}
47+
48+
private NettyServerBuilder(SocketAddress address) {
49+
this.address = address;
3950
}
4051

4152
/**
@@ -81,7 +92,7 @@ protected Service buildTransportServer(ServerListener serverListener) {
8192
final EventLoopGroup workerEventLoopGroup = (userWorkerEventLoopGroup == null)
8293
? SharedResourceHolder.get(Utils.DEFAULT_WORKER_EVENT_LOOP_GROUP)
8394
: userWorkerEventLoopGroup;
84-
NettyServer server = new NettyServer(serverListener, port, bossEventLoopGroup,
95+
NettyServer server = new NettyServer(serverListener, address, bossEventLoopGroup,
8596
workerEventLoopGroup, sslContext);
8697
if (userBossEventLoopGroup == null) {
8798
server.addListener(new ClosureHook() {

netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerTransport.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55
import com.google.net.stubby.transport.ServerListener;
66
import com.google.net.stubby.transport.ServerTransportListener;
77

8+
import io.netty.channel.Channel;
89
import io.netty.channel.ChannelFuture;
910
import io.netty.channel.ChannelFutureListener;
10-
import io.netty.channel.socket.SocketChannel;
1111
import io.netty.handler.codec.http2.DefaultHttp2Connection;
1212
import io.netty.handler.codec.http2.DefaultHttp2FrameReader;
1313
import io.netty.handler.codec.http2.DefaultHttp2FrameWriter;
@@ -31,12 +31,12 @@
3131
*/
3232
class NettyServerTransport extends AbstractService {
3333
private static final Http2FrameLogger frameLogger = new Http2FrameLogger(InternalLogLevel.DEBUG);
34-
private final SocketChannel channel;
34+
private final Channel channel;
3535
private final ServerListener serverListener;
3636
private final SslContext sslContext;
3737
private NettyServerHandler handler;
3838

39-
NettyServerTransport(SocketChannel channel, ServerListener serverListener,
39+
NettyServerTransport(Channel channel, ServerListener serverListener,
4040
@Nullable SslContext sslContext) {
4141
this.channel = Preconditions.checkNotNull(channel, "channel");
4242
this.serverListener = Preconditions.checkNotNull(serverListener, "serverListener");

0 commit comments

Comments
 (0)