Skip to content

Commit 2cc8eca

Browse files
author
hiram
committed
netty实现http客户端访问
1 parent ac4748d commit 2cc8eca

9 files changed

Lines changed: 349 additions & 25 deletions

File tree

02nio/nio02/src/main/java/io/github/kimmking/gateway/NettyServerApplication.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@
44
import io.github.kimmking.gateway.inbound.HttpInboundServer;
55

66
public class NettyServerApplication {
7-
7+
88
public final static String GATEWAY_NAME = "NIOGateway";
99
public final static String GATEWAY_VERSION = "1.0.0";
10-
10+
1111
public static void main(String[] args) {
1212
String proxyServer = System.getProperty("proxyServer","http://localhost:8088");
13+
// String proxyServer = System.getProperty("proxyServer","http://www.baidu.com:80");
1314
String proxyPort = System.getProperty("proxyPort","8888");
14-
15-
// http://localhost:8888/api/hello ==> gateway API
16-
// http://localhost:8088/api/hello ==> backend service
17-
15+
16+
// http://localhost:8888/api/hello ==> gateway API
17+
// http://localhost:8088/api/hello ==> backend service
18+
1819
int port = Integer.parseInt(proxyPort);
1920
System.out.println(GATEWAY_NAME + " " + GATEWAY_VERSION +" starting...");
2021
HttpInboundServer server = new HttpInboundServer(port, proxyServer);

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ public class HttpInboundHandler extends ChannelInboundHandlerAdapter {
1515
private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class);
1616
private final String proxyServer;
1717
private NettyHttpClient handler;
18-
18+
1919
public HttpInboundHandler(String proxyServer) {
2020
this.proxyServer = proxyServer;
2121
handler = new NettyHttpClient();
2222
}
23-
23+
2424
@Override
2525
public void channelReadComplete(ChannelHandlerContext ctx) {
2626
ctx.flush();
@@ -37,11 +37,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
3737
// if (uri.contains("/test")) {
3838
// handlerTest(fullRequest, ctx);
3939
// }
40-
40+
4141
//handler.handle(fullRequest, ctx);
42-
String host = proxyServer.replaceAll("/", "").split(":")[1];
43-
int port = Integer.parseInt(proxyServer.replaceAll("/", "").split(":")[2]);
44-
handler.connect(host, port, ctx);
42+
handler.connect(proxyServer, ctx);
4543
} catch(Exception e) {
4644
e.printStackTrace();
4745
} finally {

02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
import io.netty.channel.nio.NioEventLoopGroup;
66
import io.netty.channel.socket.SocketChannel;
77
import io.netty.channel.socket.nio.NioSocketChannel;
8-
import io.netty.handler.codec.http.*;
8+
import io.netty.handler.codec.http.HttpRequestEncoder;
9+
import io.netty.handler.codec.http.HttpResponseDecoder;
910

1011
public class NettyHttpClient {
11-
public void connect(String host, int port, ChannelHandlerContext ctx) throws Exception {
12+
public void connect(String proxyServer, ChannelHandlerContext ctx) throws Exception {
1213
EventLoopGroup workerGroup = new NioEventLoopGroup();
1314

1415
try {
@@ -24,7 +25,7 @@ public void initChannel(SocketChannel ch) throws Exception {
2425
ch.pipeline().addLast(new HttpResponseDecoder());
2526
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
2627
ch.pipeline().addLast(new HttpRequestEncoder());
27-
ch.pipeline().addLast(new NettyHttpOutboundHandler(ctx));
28+
ch.pipeline().addLast(new NettyHttpOutboundHandler(ctx, proxyServer));
2829
}
2930
});
3031

@@ -38,6 +39,8 @@ public void initChannel(SocketChannel ch) throws Exception {
3839
request.headers().set(HttpHeaderNames.CONTENT_LENGTH,
3940
request.content().readableBytes());*/
4041
// Start the client.
42+
String host = proxyServer.replaceAll("/", "").split(":")[1];
43+
int port = Integer.parseInt(proxyServer.replaceAll("/", "").split(":")[2]);
4144
ChannelFuture f = b.connect(host, port).sync();
4245
/*f.channel().write(request);
4346
f.channel().flush();*/

02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpOutboundHandler.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,11 @@
11
package io.github.kimmking.gateway.outbound.netty4;
22

33
import io.github.kimmking.gateway.util.ByteBufToBytes;
4-
import io.netty.bootstrap.Bootstrap;
54
import io.netty.buffer.ByteBuf;
65
import io.netty.buffer.Unpooled;
7-
import io.netty.channel.*;
8-
import io.netty.channel.nio.NioEventLoopGroup;
9-
import io.netty.channel.socket.SocketChannel;
10-
import io.netty.channel.socket.nio.NioSocketChannel;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
118
import io.netty.handler.codec.http.*;
12-
import org.apache.http.util.EntityUtils;
139

1410
import java.net.URI;
1511

@@ -21,8 +17,10 @@ public class NettyHttpOutboundHandler extends ChannelInboundHandlerAdapter {
2117
private ByteBufToBytes reader;
2218
private ChannelHandlerContext parentCtx;
2319
private int contentLength = 0;
24-
public NettyHttpOutboundHandler(ChannelHandlerContext ctx) {
20+
private String proxyServer = null;
21+
public NettyHttpOutboundHandler(ChannelHandlerContext ctx, String proxyServer) {
2522
this.parentCtx = ctx;
23+
this.proxyServer = proxyServer;
2624
}
2725
@Override
2826
public void channelActive(ChannelHandlerContext ctx)
@@ -33,10 +31,14 @@ public void channelActive(ChannelHandlerContext ctx)
3331
request.headers().add(HttpHeaderNames.CONNECTION,HttpHeaderValues.KEEP_ALIVE);
3432
request.headers().add(HttpHeaderNames.CONTENT_LENGTH,request.content().readableBytes());
3533
ctx.writeAndFlush(request);*/
34+
String host = proxyServer.replaceAll("/", "").split(":")[1];
3635
DefaultFullHttpRequest request = new DefaultFullHttpRequest(
3736
HttpVersion.HTTP_1_1, HttpMethod.GET, new URI("/api/hello").toASCIIString());
38-
// 构建http请求
39-
request.headers().set(HttpHeaderNames.HOST, "127.0.0.1");
37+
/*DefaultFullHttpRequest request = new DefaultFullHttpRequest(
38+
HttpVersion.HTTP_1_1, HttpMethod.GET, new URI("/").toASCIIString());*/
39+
40+
// 构建http请求
41+
request.headers().set(HttpHeaderNames.HOST, host);
4042
request.headers().set(HttpHeaderNames.CONNECTION,
4143
HttpHeaderNames.CONNECTION);
4244
request.headers().set(HttpHeaderNames.CONTENT_LENGTH,
@@ -69,7 +71,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg)
6971
FullHttpResponse response = null;
7072
try {
7173
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(bytes));
72-
response.headers().set("Content-Type", "text/plain;charset=UTF-8");
74+
response.headers().set("Content-Type", "text/html;charset=UTF-8");
7375
response.headers().setInt("Content-Length", contentLength);
7476
} catch (Exception e) {
7577
e.printStackTrace();
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package io.github.kimmking.gateway.outbound.okhttp.httpclient4;
2+
3+
4+
import io.netty.buffer.Unpooled;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
8+
import io.netty.handler.codec.http.FullHttpRequest;
9+
import io.netty.handler.codec.http.FullHttpResponse;
10+
import io.netty.handler.codec.http.HttpUtil;
11+
import org.apache.http.HttpResponse;
12+
import org.apache.http.client.methods.HttpGet;
13+
import org.apache.http.concurrent.FutureCallback;
14+
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
15+
import org.apache.http.impl.nio.client.HttpAsyncClients;
16+
import org.apache.http.impl.nio.reactor.IOReactorConfig;
17+
import org.apache.http.protocol.HTTP;
18+
import org.apache.http.util.EntityUtils;
19+
20+
import java.util.concurrent.*;
21+
22+
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
23+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
24+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
25+
26+
public class HttpOutboundHandler {
27+
28+
private CloseableHttpAsyncClient httpclient;
29+
private ExecutorService proxyService;
30+
private String backendUrl;
31+
32+
public HttpOutboundHandler(String backendUrl){
33+
this.backendUrl = backendUrl.endsWith("/")?backendUrl.substring(0,backendUrl.length()-1):backendUrl;
34+
int cores = Runtime.getRuntime().availableProcessors() * 2;
35+
long keepAliveTime = 1000;
36+
int queueSize = 2048;
37+
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();//.DiscardPolicy();
38+
proxyService = new ThreadPoolExecutor(cores, cores,
39+
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
40+
new NamedThreadFactory("proxyService"), handler);
41+
42+
IOReactorConfig ioConfig = IOReactorConfig.custom()
43+
.setConnectTimeout(1000)
44+
.setSoTimeout(1000)
45+
.setIoThreadCount(cores)
46+
.setRcvBufSize(32 * 1024)
47+
.build();
48+
49+
httpclient = HttpAsyncClients.custom().setMaxConnTotal(40)
50+
.setMaxConnPerRoute(8)
51+
.setDefaultIOReactorConfig(ioConfig)
52+
.setKeepAliveStrategy((response,context) -> 6000)
53+
.build();
54+
httpclient.start();
55+
}
56+
57+
public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx) {
58+
final String url = this.backendUrl + fullRequest.uri();
59+
proxyService.submit(()->fetchGet(fullRequest, ctx, url));
60+
}
61+
62+
private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url) {
63+
final HttpGet httpGet = new HttpGet(url);
64+
//httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
65+
httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_KEEP_ALIVE);
66+
httpclient.execute(httpGet, new FutureCallback<HttpResponse>() {
67+
@Override
68+
public void completed(final HttpResponse endpointResponse) {
69+
try {
70+
handleResponse(inbound, ctx, endpointResponse);
71+
} catch (Exception e) {
72+
e.printStackTrace();
73+
} finally {
74+
75+
}
76+
}
77+
78+
@Override
79+
public void failed(final Exception ex) {
80+
httpGet.abort();
81+
ex.printStackTrace();
82+
}
83+
84+
@Override
85+
public void cancelled() {
86+
httpGet.abort();
87+
}
88+
});
89+
}
90+
91+
private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final HttpResponse endpointResponse) throws Exception {
92+
FullHttpResponse response = null;
93+
try {
94+
// String value = "hello,kimmking";
95+
// response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
96+
// response.headers().set("Content-Type", "application/json");
97+
// response.headers().setInt("Content-Length", response.content().readableBytes());
98+
99+
100+
byte[] body = EntityUtils.toByteArray(endpointResponse.getEntity());
101+
// System.out.println(new String(body));
102+
// System.out.println(body.length);
103+
104+
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body));
105+
response.headers().set("Content-Type", "application/json");
106+
response.headers().setInt("Content-Length", Integer.parseInt(endpointResponse.getFirstHeader("Content-Length").getValue()));
107+
108+
// for (Header e : endpointResponse.getAllHeaders()) {
109+
// //response.headers().set(e.getName(),e.getValue());
110+
// System.out.println(e.getName() + " => " + e.getValue());
111+
// }
112+
113+
} catch (Exception e) {
114+
e.printStackTrace();
115+
response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
116+
exceptionCaught(ctx, e);
117+
} finally {
118+
if (fullRequest != null) {
119+
if (!HttpUtil.isKeepAlive(fullRequest)) {
120+
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
121+
} else {
122+
//response.headers().set(CONNECTION, KEEP_ALIVE);
123+
ctx.write(response);
124+
}
125+
}
126+
ctx.flush();
127+
//ctx.close();
128+
}
129+
130+
}
131+
132+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
133+
cause.printStackTrace();
134+
ctx.close();
135+
}
136+
137+
138+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.github.kimmking.gateway.outbound.okhttp.httpclient4;
2+
3+
import java.util.concurrent.ThreadFactory;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
public class NamedThreadFactory implements ThreadFactory {
7+
8+
private final ThreadGroup group;
9+
private final AtomicInteger threadNumber = new AtomicInteger(1);
10+
11+
private final String namePrefix;
12+
private final boolean daemon;
13+
14+
public NamedThreadFactory(String namePrefix, boolean daemon) {
15+
this.daemon = daemon;
16+
SecurityManager s = System.getSecurityManager();
17+
group = (s != null) ? s.getThreadGroup() :
18+
Thread.currentThread().getThreadGroup();
19+
this.namePrefix = namePrefix;
20+
}
21+
22+
public NamedThreadFactory(String namePrefix) {
23+
this(namePrefix, false);
24+
}
25+
26+
@Override
27+
public Thread newThread(Runnable r) {
28+
Thread t = new Thread(group, r, namePrefix + "-thread-" + threadNumber.getAndIncrement(), 0);
29+
t.setDaemon(daemon);
30+
return t;
31+
}
32+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.github.kimmking.gateway.outbound.okhttp.netty4;
2+
3+
import io.netty.bootstrap.Bootstrap;
4+
import io.netty.channel.*;
5+
import io.netty.channel.nio.NioEventLoopGroup;
6+
import io.netty.channel.socket.SocketChannel;
7+
import io.netty.channel.socket.nio.NioSocketChannel;
8+
import io.netty.handler.codec.http.HttpRequestEncoder;
9+
import io.netty.handler.codec.http.HttpResponseDecoder;
10+
11+
public class NettyHttpClient {
12+
public void connect(String proxyServer, ChannelHandlerContext ctx) throws Exception {
13+
EventLoopGroup workerGroup = new NioEventLoopGroup();
14+
15+
try {
16+
Bootstrap b = new Bootstrap();
17+
b.group(workerGroup);
18+
b.channel(NioSocketChannel.class);
19+
b.option(ChannelOption.SO_KEEPALIVE, true)
20+
.option(ChannelOption.SO_RCVBUF, 32 * 1024);
21+
b.handler(new ChannelInitializer<SocketChannel>() {
22+
@Override
23+
public void initChannel(SocketChannel ch) throws Exception {
24+
// 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
25+
ch.pipeline().addLast(new HttpResponseDecoder());
26+
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
27+
ch.pipeline().addLast(new HttpRequestEncoder());
28+
ch.pipeline().addLast(new NettyHttpOutboundHandler(ctx, proxyServer));
29+
}
30+
});
31+
32+
33+
/*DefaultFullHttpRequest request = new DefaultFullHttpRequest(
34+
HttpVersion.HTTP_1_1, HttpMethod.GET, new URI("/api/hello").toASCIIString());
35+
// 构建http请求
36+
request.headers().set(HttpHeaderNames.HOST, host);
37+
request.headers().set(HttpHeaderNames.CONNECTION,
38+
HttpHeaderNames.CONNECTION);
39+
request.headers().set(HttpHeaderNames.CONTENT_LENGTH,
40+
request.content().readableBytes());*/
41+
// Start the client.
42+
String host = proxyServer.replaceAll("/", "").split(":")[1];
43+
int port = Integer.parseInt(proxyServer.replaceAll("/", "").split(":")[2]);
44+
ChannelFuture f = b.connect(host, port).sync();
45+
/*f.channel().write(request);
46+
f.channel().flush();*/
47+
f.channel().closeFuture().sync();
48+
} finally {
49+
workerGroup.shutdownGracefully();
50+
}
51+
52+
}
53+
54+
public static void main(String[] args) throws Exception {
55+
NettyHttpClient client = new NettyHttpClient();
56+
//client.connect("127.0.0.1", 8088);
57+
}
58+
}

0 commit comments

Comments
 (0)