Skip to content

Commit 30e6ff6

Browse files
committed
本次提交主要为 1.将自己写的HttpClient以组件的形式集成,2.自定filter过滤器
1 parent 01d9794 commit 30e6ff6

8 files changed

Lines changed: 272 additions & 96 deletions

File tree

02nio/nio02/pom.xml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,13 @@
5252
<artifactId>httpasyncclient</artifactId>
5353
<version>4.1.4</version>
5454
</dependency>
55-
56-
<!--
55+
<!--引入httpClient-->
56+
<dependency>
57+
<groupId>org.apache.httpcomponents</groupId>
58+
<artifactId>httpclient</artifactId>
59+
<version>4.5.13</version>
60+
</dependency>
61+
5762
<dependency>
5863
<groupId>org.springframework.boot</groupId>
5964
<artifactId>spring-boot-starter-web</artifactId>
@@ -64,7 +69,6 @@
6469
<artifactId>spring-boot-starter-test</artifactId>
6570
<scope>test</scope>
6671
</dependency>
67-
-->
6872

6973
</dependencies>
7074

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.github.kimmking.gateway;
2+
3+
import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory;
4+
5+
import java.util.concurrent.ArrayBlockingQueue;
6+
import java.util.concurrent.RejectedExecutionHandler;
7+
import java.util.concurrent.ThreadPoolExecutor;
8+
import java.util.concurrent.TimeUnit;
9+
10+
public class ThreadPool {
11+
12+
public static ThreadPoolExecutor getThreadPoolExecutor(){
13+
int cores = Runtime.getRuntime().availableProcessors() * 2;
14+
System.out.println(Runtime.getRuntime().availableProcessors());
15+
long keepAliveTime = 1000;
16+
int queueSize = 2048;
17+
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();//.DiscardPolicy();
18+
ThreadPoolExecutor proxyService = new ThreadPoolExecutor(cores,
19+
cores,
20+
keepAliveTime,
21+
TimeUnit.MILLISECONDS,
22+
new ArrayBlockingQueue<>(queueSize),
23+
new NamedThreadFactory("proxyService"),
24+
handler);
25+
return proxyService;
26+
}
27+
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,5 @@
66
public interface HttpRequestFilter {
77

88
void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx);
9-
9+
1010
}
Lines changed: 61 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,38 @@
11
package io.github.kimmking.gateway.inbound;
22

3+
import io.github.kimmking.gateway.filter.HttpRequestFilter;
34
import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler;
5+
import io.github.kimmking.gateway.outbound.myselfhttpclient.MyHttpOutboundHandler;
6+
import io.netty.buffer.Unpooled;
7+
import io.netty.channel.ChannelFutureListener;
48
import io.netty.channel.ChannelHandlerContext;
59
import io.netty.channel.ChannelInboundHandlerAdapter;
6-
import io.netty.handler.codec.http.FullHttpRequest;
10+
import io.netty.handler.codec.http.*;
711
import io.netty.util.ReferenceCountUtil;
812
import org.slf4j.Logger;
913
import org.slf4j.LoggerFactory;
1014

11-
public class HttpInboundHandler extends ChannelInboundHandlerAdapter {
15+
import static io.netty.handler.codec.http.HttpHeaderValues.KEEP_ALIVE;
16+
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
17+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
18+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
19+
20+
public class HttpInboundHandler extends ChannelInboundHandlerAdapter implements HttpRequestFilter {
1221

1322
private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class);
1423
private final String proxyServer;
1524
private HttpOutboundHandler handler;
16-
25+
// 自己写的Httpclient组件
26+
private MyHttpOutboundHandler myHandler;
27+
1728
public HttpInboundHandler(String proxyServer) {
1829
this.proxyServer = proxyServer;
30+
// 老师
1931
handler = new HttpOutboundHandler(this.proxyServer);
32+
// 自己写的Httpclient组件
33+
myHandler = new MyHttpOutboundHandler(this.proxyServer);
2034
}
21-
35+
2236
@Override
2337
public void channelReadComplete(ChannelHandlerContext ctx) {
2438
ctx.flush();
@@ -27,50 +41,55 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
2741
@Override
2842
public void channelRead(ChannelHandlerContext ctx, Object msg) {
2943
try {
30-
//logger.info("channelRead流量接口请求开始,时间为{}", startTime);
44+
long startTime = System.currentTimeMillis();
45+
logger.info("channelRead流量接口请求开始,时间为{}", startTime);
3146
FullHttpRequest fullRequest = (FullHttpRequest) msg;
32-
// String uri = fullRequest.uri();
33-
// //logger.info("接收到的请求url为{}", uri);
34-
// if (uri.contains("/test")) {
35-
// handlerTest(fullRequest, ctx);
36-
// }
37-
38-
handler.handle(fullRequest, ctx);
39-
40-
} catch(Exception e) {
47+
// 自定义过滤器
48+
filter(fullRequest,ctx);
49+
// 自己写的HttpClient
50+
myHandler.handler(fullRequest,ctx);
51+
// 老师写的
52+
// handler.handle(fullRequest, ctx);
53+
54+
} catch (Exception e) {
4155
e.printStackTrace();
4256
} finally {
4357
ReferenceCountUtil.release(msg);
4458
}
4559
}
4660

47-
// private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
48-
// FullHttpResponse response = null;
49-
// try {
50-
// String value = "hello,kimmking";
51-
// response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
52-
// response.headers().set("Content-Type", "application/json");
53-
// response.headers().setInt("Content-Length", response.content().readableBytes());
54-
//
55-
// } catch (Exception e) {
56-
// logger.error("处理测试接口出错", e);
57-
// response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
58-
// } finally {
59-
// if (fullRequest != null) {
60-
// if (!HttpUtil.isKeepAlive(fullRequest)) {
61-
// ctx.write(response).addListener(ChannelFutureListener.CLOSE);
62-
// } else {
63-
// response.headers().set(CONNECTION, KEEP_ALIVE);
64-
// ctx.write(response);
65-
// }
66-
// }
67-
// }
68-
// }
69-
//
70-
// @Override
71-
// public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
72-
// cause.printStackTrace();
73-
// ctx.close();
74-
// }
61+
@Override
62+
public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
63+
HttpHeaders headers = fullRequest.headers();
64+
headers.set("nio","BAIFUKUAN");
65+
}
66+
private void handlerTest(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
67+
FullHttpResponse response = null;
68+
try {
69+
String value = "hello,kimmking";
70+
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(value.getBytes("UTF-8")));
71+
response.headers().set("Content-Type", "application/json");
72+
response.headers().setInt("Content-Length", response.content().readableBytes());
73+
74+
} catch (Exception e) {
75+
logger.error("处理测试接口出错", e);
76+
response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
77+
} finally {
78+
if (fullRequest != null) {
79+
if (!HttpUtil.isKeepAlive(fullRequest)) {
80+
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
81+
} else {
82+
response.headers().set("CONNECTION", KEEP_ALIVE);
83+
ctx.write(response);
84+
}
85+
}
86+
}
87+
}
88+
89+
@Override
90+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
91+
cause.printStackTrace();
92+
ctx.close();
93+
}
7594

7695
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,18 @@ public class HttpInboundServer {
1818
private static Logger logger = LoggerFactory.getLogger(HttpInboundServer.class);
1919

2020
private int port;
21-
21+
2222
private String proxyServer;
2323

2424
public HttpInboundServer(int port, String proxyServer) {
25-
this.port=port;
25+
this.port = port;
2626
this.proxyServer = proxyServer;
2727
}
2828

2929
public void run() throws Exception {
3030

3131
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
32-
EventLoopGroup workerGroup = new NioEventLoopGroup(16);
32+
EventLoopGroup workerGroup = new NioEventLoopGroup(8);
3333

3434
try {
3535
ServerBootstrap b = new ServerBootstrap();
@@ -43,8 +43,10 @@ public void run() throws Exception {
4343
.childOption(ChannelOption.SO_KEEPALIVE, true)
4444
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
4545

46-
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
47-
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpInboundInitializer(this.proxyServer));
46+
b.group(bossGroup, workerGroup)
47+
.channel(NioServerSocketChannel.class)
48+
.handler(new LoggingHandler(LogLevel.INFO))
49+
.childHandler(new HttpInboundInitializer(this.proxyServer));
4850

4951
Channel ch = b.bind(port).sync().channel();
5052
logger.info("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/');

02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/netty4/NettyHttpClient.java renamed to 02nio/nio02/src/main/java/io/github/kimmking/gateway/outbound/NettyHttpClient.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
//package io.github.kimmking.gateway.outbound;
2-
//
3-
//import io.netty.bootstrap.Bootstrap;
4-
//import io.netty.channel.ChannelFuture;
5-
//import io.netty.channel.ChannelInitializer;
6-
//import io.netty.channel.ChannelOption;
7-
//import io.netty.channel.EventLoopGroup;
8-
//import io.netty.channel.nio.NioEventLoopGroup;
9-
//import io.netty.channel.socket.SocketChannel;
10-
//import io.netty.channel.socket.nio.NioSocketChannel;
11-
//import io.netty.handler.codec.http.HttpRequestEncoder;
12-
//import io.netty.handler.codec.http.HttpResponseDecoder;
13-
//
14-
//public class NettyHttpClient {
1+
// package io.github.kimmking.gateway.outbound;
2+
//
3+
// import io.netty.bootstrap.Bootstrap;
4+
// import io.netty.channel.ChannelFuture;
5+
// import io.netty.channel.ChannelInitializer;
6+
// import io.netty.channel.ChannelOption;
7+
// import io.netty.channel.EventLoopGroup;
8+
// import io.netty.channel.nio.NioEventLoopGroup;
9+
// import io.netty.channel.socket.SocketChannel;
10+
// import io.netty.channel.socket.nio.NioSocketChannel;
11+
// import io.netty.handler.codec.http.HttpRequestEncoder;
12+
// import io.netty.handler.codec.http.HttpResponseDecoder;
13+
//
14+
// public class NettyHttpClient {
1515
// public void connect(String host, int port) throws Exception {
1616
// EventLoopGroup workerGroup = new NioEventLoopGroup();
1717
//
@@ -25,7 +25,7 @@
2525
// public void initChannel(SocketChannel ch) throws Exception {
2626
// // 客户端接收到的是httpResponse响应,所以要使用HttpResponseDecoder进行解码
2727
// ch.pipeline().addLast(new HttpResponseDecoder());
28-
// 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
28+
// // 客户端发送的是httprequest,所以要使用HttpRequestEncoder进行编码
2929
// ch.pipeline().addLast(new HttpRequestEncoder());
3030
// ch.pipeline().addLast(new HttpClientOutboundHandler());
3131
// }
@@ -34,7 +34,7 @@
3434
// // Start the client.
3535
// ChannelFuture f = b.connect(host, port).sync();
3636
//
37-
//
37+
//
3838
// f.channel().write(request);
3939
// f.channel().flush();
4040
// f.channel().closeFuture().sync();
@@ -48,4 +48,4 @@
4848
// NettyHttpClient client = new NettyHttpClient();
4949
// client.connect("127.0.0.1", 8844);
5050
// }
51-
//}
51+
// }

0 commit comments

Comments
 (0)