Skip to content

Commit 23ca290

Browse files
committed
nio网关:通过okHttp转发请求到后端,添加过滤器逻辑,并实现一个添加header的过滤器
1 parent 01d9794 commit 23ca290

8 files changed

Lines changed: 210 additions & 8 deletions

File tree

02nio/nio02/pom.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@
5252
<artifactId>httpasyncclient</artifactId>
5353
<version>4.1.4</version>
5454
</dependency>
55+
56+
<dependency>
57+
<groupId>com.squareup.okhttp3</groupId>
58+
<artifactId>okhttp</artifactId>
59+
<version>4.9.0</version>
60+
</dependency>
61+
<dependency>
62+
<groupId>org.jetbrains.kotlin</groupId>
63+
<artifactId>kotlin-stdlib</artifactId>
64+
<version>1.3.70</version>
65+
</dependency>
5566

5667
<!--
5768
<dependency>
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package io.github.kimmking.gateway.filter;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.handler.codec.http.FullHttpRequest;
5+
6+
/**
7+
* 过滤器调用链接口
8+
*/
9+
public interface FilterChain {
10+
11+
void doFilter(FullHttpRequest request, ChannelHandlerContext ctx);
12+
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,6 @@
55

66
public interface HttpRequestFilter {
77

8-
void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx);
8+
void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx,HttpRequestFilterChain chain);
99

1010
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package io.github.kimmking.gateway.filter;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.handler.codec.http.FullHttpRequest;
5+
6+
/**
7+
* 过滤器调用链实现类,链式调用各过滤器doFilter接口
8+
*/
9+
public final class HttpRequestFilterChain implements FilterChain {
10+
/**
11+
* 当前执行到的filter
12+
*/
13+
private int pos = 0;
14+
/**
15+
* filter个数
16+
*/
17+
private int n;
18+
19+
private HttpRequestFilter[] filters = new HttpRequestFilter[]{};
20+
/**
21+
* 每次扩容的大小
22+
*/
23+
private final static int INCREMENT = 10;
24+
25+
26+
27+
28+
@Override
29+
public void doFilter(FullHttpRequest request, ChannelHandlerContext ctx) {
30+
if(pos < n) {
31+
HttpRequestFilter filter = filters[pos++];
32+
filter.filter(request, ctx,this);
33+
}
34+
35+
}
36+
37+
public void addFilter(HttpRequestFilter filter) {
38+
if(filter == null) {
39+
return;
40+
}
41+
for(HttpRequestFilter requestFilter : filters) {
42+
if(requestFilter == filter) {
43+
return;
44+
}
45+
}
46+
if(filters.length == n) {
47+
HttpRequestFilter[] newFilters = new HttpRequestFilter[n + INCREMENT];
48+
System.arraycopy(filters,0,newFilters,0,n);
49+
filters = newFilters;
50+
}
51+
filters[n++] = filter;
52+
53+
}
54+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.github.kimmking.gateway.filter;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.handler.codec.http.FullHttpRequest;
5+
import io.netty.handler.codec.http.HttpHeaders;
6+
7+
/**
8+
* http request filter ,在filter中拿到请求头,添加key 是 nio value是chengfpvoid
9+
*/
10+
public class NioKeyRequestFilter implements HttpRequestFilter {
11+
@Override
12+
public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx, HttpRequestFilterChain chain) {
13+
HttpHeaders httpHeaders = fullRequest.headers();
14+
httpHeaders.add("nio","chengfpvoid");
15+
chain.doFilter(fullRequest,ctx);
16+
}
17+
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package io.github.kimmking.gateway.inbound;
22

3-
import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler;
3+
import io.github.kimmking.gateway.filter.HttpRequestFilterChain;
4+
import io.github.kimmking.gateway.filter.NioKeyRequestFilter;
5+
import io.github.kimmking.gateway.outbound.okhttp.OkhttpOutboundHandler;
46
import io.netty.channel.ChannelHandlerContext;
57
import io.netty.channel.ChannelInboundHandlerAdapter;
68
import io.netty.handler.codec.http.FullHttpRequest;
@@ -12,11 +14,12 @@ public class HttpInboundHandler extends ChannelInboundHandlerAdapter {
1214

1315
private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class);
1416
private final String proxyServer;
15-
private HttpOutboundHandler handler;
16-
17+
//private HttpOutboundHandler handler;
18+
private OkhttpOutboundHandler okHandler;
1719
public HttpInboundHandler(String proxyServer) {
1820
this.proxyServer = proxyServer;
19-
handler = new HttpOutboundHandler(this.proxyServer);
21+
//handler = new HttpOutboundHandler(this.proxyServer);
22+
okHandler = new OkhttpOutboundHandler(this.proxyServer);
2023
}
2124

2225
@Override
@@ -34,8 +37,12 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
3437
// if (uri.contains("/test")) {
3538
// handlerTest(fullRequest, ctx);
3639
// }
37-
38-
handler.handle(fullRequest, ctx);
40+
//链式模式添加过滤器
41+
HttpRequestFilterChain chain = new HttpRequestFilterChain();
42+
chain.addFilter(new NioKeyRequestFilter());
43+
chain.doFilter(fullRequest,ctx);
44+
//handler.handle(fullRequest, ctx);
45+
okHandler.handle(fullRequest,ctx);
3946

4047
} catch(Exception e) {
4148
e.printStackTrace();

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,8 @@ public void run() throws Exception {
4747
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HttpInboundInitializer(this.proxyServer));
4848

4949
Channel ch = b.bind(port).sync().channel();
50-
logger.info("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/');
50+
//logger.info("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + '/');
51+
System.out.println("开启netty http服务器,监听地址和端口为 http://127.0.0.1:" + port + "/");
5152
ch.closeFuture().sync();
5253
} finally {
5354
bossGroup.shutdownGracefully();
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,104 @@
11
package io.github.kimmking.gateway.outbound.okhttp;
22

3+
import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory;
4+
import io.netty.buffer.Unpooled;
5+
import io.netty.channel.ChannelFutureListener;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.handler.codec.http.*;
8+
import okhttp3.*;
9+
10+
import java.io.IOException;
11+
import java.util.concurrent.ArrayBlockingQueue;
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.ThreadPoolExecutor;
14+
import java.util.concurrent.TimeUnit;
15+
16+
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
17+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
18+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
19+
320
public class OkhttpOutboundHandler {
21+
22+
private ExecutorService proxyService;
23+
24+
private String backendUrl;
25+
26+
private OkHttpClient okHttpClient;
27+
28+
public OkhttpOutboundHandler(String backendUrl) {
29+
this.backendUrl = backendUrl.endsWith("/")?backendUrl.substring(0,backendUrl.length()-1):backendUrl;
30+
int coreSize = Runtime.getRuntime().availableProcessors() * 2;
31+
proxyService = new ThreadPoolExecutor(coreSize, coreSize,
32+
1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1024),
33+
new NamedThreadFactory("proxyService"));
34+
okHttpClient = new OkHttpClient();
35+
36+
37+
}
38+
39+
public void handle(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
40+
final String url = backendUrl + fullRequest.uri();
41+
proxyService.submit(() -> doGet(fullRequest,ctx,url));
42+
//doGet(fullRequest, ctx,url);
43+
44+
}
45+
46+
private void doGet(final FullHttpRequest fullRequest, ChannelHandlerContext ctx, String url) {
47+
HttpHeaders httpHeaders = fullRequest.headers();
48+
Request.Builder builder = new Request.Builder()
49+
.url(url).addHeader("Content-Type", "application/json");
50+
httpHeaders.forEach(entry-> builder.addHeader(entry.getKey(),entry.getValue()));
51+
Request request = builder.build();
52+
okHttpClient.newCall(request).enqueue(new Callback() {
53+
@Override
54+
public void onFailure(Call call, IOException e) {
55+
e.printStackTrace();
56+
}
57+
58+
@Override
59+
public void onResponse(Call call, Response response) {
60+
try {
61+
handleResponse(fullRequest,ctx,response);
62+
} catch (IOException e) {
63+
e.printStackTrace();
64+
}
65+
}
66+
});
67+
}
68+
69+
private void handleResponse(final FullHttpRequest fullRequest,final ChannelHandlerContext ctx,Response response) throws IOException {
70+
FullHttpResponse httpResponse = null;
71+
try (ResponseBody responseBody = response.body()) {
72+
if (!response.isSuccessful()) {
73+
throw new IOException("Unexpected code " + response);
74+
}
75+
76+
byte[] body = responseBody.bytes();
77+
httpResponse = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(body));
78+
httpResponse.headers().set("Content-Type", "application/json");
79+
httpResponse.headers().setInt("Content-Length", (int) responseBody.contentLength());
80+
Headers responseHeaders = response.headers();
81+
System.out.println("======response-headers============");
82+
for (int i = 0, size = responseHeaders.size(); i < size; i++) {
83+
System.out.println(responseHeaders.name(i) + ": " + responseHeaders.value(i));
84+
}
85+
// System.out.println(responseBody);
86+
87+
88+
}catch (Exception e){
89+
e.printStackTrace();
90+
httpResponse = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
91+
ctx.close();
92+
93+
}finally {
94+
if (fullRequest != null) {
95+
if (!HttpUtil.isKeepAlive(fullRequest)) {
96+
ctx.write(httpResponse).addListener(ChannelFutureListener.CLOSE);
97+
} else {
98+
ctx.write(httpResponse);
99+
}
100+
}
101+
ctx.flush();
102+
}
103+
}
4104
}

0 commit comments

Comments
 (0)