Skip to content

Commit 7d1c6fb

Browse files
committed
add Netty HTTP demo
1 parent 017ff24 commit 7d1c6fb

File tree

8 files changed

+289
-63
lines changed

8 files changed

+289
-63
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.learnjava.concurent;
2+
3+
import java.util.concurrent.locks.Condition;
4+
import java.util.concurrent.locks.Lock;
5+
import java.util.concurrent.locks.ReentrantLock;
6+
7+
/**
8+
* @author LuoHaiYang
9+
*
10+
* Condition 通知/等待经典范式
11+
*
12+
*/
13+
public class BoundedQueue<T> {
14+
private Object[] items;
15+
// 添加下标,删除下标,数组当前数量
16+
private int addIndex, removeIndex, count;
17+
private Lock lock = new ReentrantLock();
18+
private Condition notEmpty = lock.newCondition();
19+
private Condition notFull = lock.newCondition();
20+
public BoundedQueue(int size) {
21+
items = new Object[size];
22+
}
23+
public void add(T t) throws InterruptedException{
24+
// 加锁
25+
lock.lock();
26+
try {
27+
while (count == items.length) {
28+
notFull.await();
29+
}
30+
items[addIndex] = t;
31+
if (++addIndex == items.length) {
32+
addIndex = 0;
33+
}
34+
++count;
35+
notEmpty.signal();
36+
} finally {
37+
lock.unlock();
38+
}
39+
}
40+
41+
public T remove() throws InterruptedException {
42+
lock.lock();
43+
try {
44+
while (count == 0) {
45+
notEmpty.await();
46+
}
47+
Object o = items[removeIndex];
48+
if (++removeIndex == items.length) {
49+
removeIndex = 0;
50+
}
51+
--count;
52+
notFull.signal();
53+
return (T) o;
54+
} finally {
55+
lock.unlock();
56+
}
57+
}
58+
}

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99

1010
- JDK源码学习
1111
- JDK版本:1.8.0_77
12+
- [深入学习String源码与底层(一)](https://blog.csdn.net/CoderBruis/article/details/94884673)
13+
- [深入学习String源码与底层(二)](https://blog.csdn.net/CoderBruis/article/details/95620935)
1214
- [深入解读CompletableFuture源码与原理](https://blog.csdn.net/CoderBruis/article/details/103181520)
1315

1416
- Spring源码学习

Spring-Boot/src/main/java/com/bruis/learnsb/LearnsbApplication.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import org.springframework.boot.SpringApplication;
44
import org.springframework.boot.autoconfigure.SpringBootApplication;
5-
import org.springframework.context.annotation.PropertySource;
65

76
import java.util.Properties;
87

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.bruis.learnnetty.netty.demo01;
2+
3+
import io.netty.bootstrap.Bootstrap;
4+
import io.netty.channel.ChannelFuture;
5+
import io.netty.channel.ChannelInitializer;
6+
import io.netty.channel.EventLoopGroup;
7+
import io.netty.channel.nio.NioEventLoopGroup;
8+
import io.netty.channel.socket.SocketChannel;
9+
import io.netty.channel.socket.nio.NioSocketChannel;
10+
11+
/**
12+
* @author LuoHaiYang
13+
*/
14+
public class NettyClient {
15+
16+
public static void main(String[] args) throws Exception {
17+
// 客户端需要一个事件循环组
18+
EventLoopGroup group = new NioEventLoopGroup();;
19+
20+
try {
21+
// 创建客户端启动对象
22+
// 注意客户端使用的不是ServerBootStrap,而是BootStrap
23+
Bootstrap bootstrap = new Bootstrap();
24+
25+
// 设置相关参数, 设置线程组
26+
bootstrap.group(group)
27+
// 设置客户端通道的实现类(反射)
28+
.channel(NioSocketChannel.class)
29+
.handler(new ChannelInitializer<SocketChannel>() {
30+
@Override
31+
protected void initChannel(SocketChannel ch) throws Exception {
32+
// 加入自己的处理器
33+
ch.pipeline().addLast(new NettyClientHandler());
34+
}
35+
});
36+
System.out.println("客户端 ok...");
37+
38+
// 启动客户端去连接服务器端
39+
// ChannelFuture用到了netty的异步模型
40+
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
41+
// 关闭通道监听
42+
channelFuture.channel().closeFuture().sync();
43+
} finally {
44+
group.shutdownGracefully();
45+
}
46+
}
47+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package com.bruis.learnnetty.netty.demo01;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import io.netty.channel.ChannelHandlerContext;
6+
import io.netty.channel.ChannelInboundHandlerAdapter;
7+
import io.netty.util.CharsetUtil;
8+
9+
/**
10+
* @author LuoHaiYang
11+
*/
12+
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
13+
14+
public NettyClientHandler() {
15+
super();
16+
System.out.println("NettyClientHandler构造方法");
17+
}
18+
19+
@Override
20+
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
21+
System.out.println("channelRegistered...注册通道");
22+
}
23+
24+
/**
25+
* 什么时候调用?
26+
* @param ctx
27+
* @throws Exception
28+
*/
29+
@Override
30+
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
31+
System.out.println("channelUnregistered...解除注册通道");
32+
}
33+
34+
@Override
35+
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
36+
System.out.println("channelInactive...");
37+
}
38+
39+
@Override
40+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
41+
System.out.println("channelReadComplete...读取通道完成之后的回调方法");
42+
}
43+
44+
@Override
45+
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
46+
System.out.println("userEventTriggered...用户事件触发器");
47+
}
48+
49+
@Override
50+
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
51+
System.out.println("channelWritabilityChanged...");
52+
}
53+
54+
/**
55+
* 当通道就绪就会触发该方法
56+
* @param ctx 上下文
57+
* @throws Exception
58+
*/
59+
@Override
60+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
61+
System.out.println("client " + ctx);
62+
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server", CharsetUtil.UTF_8));
63+
}
64+
65+
/**
66+
* 当通道有读取事件时,就会触发
67+
* @param ctx
68+
* @param msg
69+
* @throws Exception
70+
*/
71+
@Override
72+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
73+
ByteBuf buf = (ByteBuf) msg;
74+
System.out.println("服务端回复的消息:" + buf.toString(CharsetUtil.UTF_8));
75+
System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
76+
}
77+
78+
@Override
79+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
80+
cause.printStackTrace();
81+
ctx.close();
82+
}
83+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package com.bruis.learnnetty.netty.demo01;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import io.netty.channel.Channel;
6+
import io.netty.channel.ChannelHandlerContext;
7+
import io.netty.channel.ChannelInboundHandlerAdapter;
8+
import io.netty.channel.ChannelPipeline;
9+
import io.netty.util.CharsetUtil;
10+
11+
/**
12+
* @author LuoHaiYang
13+
*/
14+
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
15+
@Override
16+
public void channelActive(ChannelHandlerContext ctx) {
17+
System.out.println("channelActive");
18+
}
19+
20+
@Override
21+
public void channelRegistered(ChannelHandlerContext ctx) {
22+
System.out.println("channelRegistered...");
23+
}
24+
25+
@Override
26+
public void handlerAdded(ChannelHandlerContext ctx) {
27+
System.out.println("handlerAdded");
28+
}
29+
30+
/**
31+
* 读取实际数据,这里就是我们读取客户端发送的消息
32+
* @param ctx 上下文对象,包含了管道pipeline,通道channel,地址
33+
* @param msg 就是客户端发送的数据 默认Object
34+
* @throws Exception
35+
*/
36+
@Override
37+
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
38+
System.out.println("服务器读取线程 " + Thread.currentThread().getName());
39+
System.out.println("server ctx = " + ctx);
40+
System.out.println("看看Channel和pipeline的关系");
41+
Channel channel = ctx.channel();
42+
// 本质是一个双向连接,出站入站
43+
ChannelPipeline pipeline = ctx.pipeline();
44+
// 将msg转成一个ByteBuf
45+
// ByteBuf是Netty提供的,不是NIO的ByteBuffer
46+
ByteBuf byteBuf = (ByteBuf)msg;
47+
System.out.println("客户端发送消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
48+
System.out.println("客户端地址:" + channel.remoteAddress());
49+
}
50+
51+
/**
52+
* 数据读取完毕
53+
* @param ctx
54+
* @throws Exception
55+
*/
56+
@Override
57+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
58+
// writeAndFlush 是write + flush
59+
// 将数据写入到缓存,并刷新
60+
// 我们对这个发送的数据会进行编码
61+
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, client...", CharsetUtil.UTF_8));
62+
}
63+
64+
/**
65+
* 处理异常,一般是需要关闭通道
66+
* @param ctx 上下文
67+
* @param cause 异常
68+
* @throws Exception
69+
*/
70+
@Override
71+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
72+
ctx.close();
73+
}
74+
}

Spring-Netty/src/main/java/com/bruis/learnnetty/netty/demo01/Server.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,37 +8,51 @@
88
import io.netty.channel.nio.NioEventLoopGroup;
99
import io.netty.channel.socket.SocketChannel;
1010
import io.netty.channel.socket.nio.NioServerSocketChannel;
11-
import io.netty.util.AttributeKey;
1211

1312
/**
1413
* @author LuoHaiYang
1514
*/
1615
public class Server {
1716
public static void main(String[] args) throws Exception {
1817

19-
// bossGroup对应着socket编程中的服务端的Thread,用于监听是否有client连接
18+
/**
19+
* 1. 创建两个线程组:bossGroup和workerGroup
20+
* 2. bossGroup只是处理连接请求,真正的和客户端业务处理的话,会交给workerGroup
21+
* 3. bossGroup和workerGroup含有的子线程(NioEventLoop)的个数,默认实际为CPU核数 * 2
22+
*
23+
* bossGroup对应着socket编程中的服务端的Thread,用于监听是否有client连接
24+
* workGroup对应着socket编程中的数据读取,读取server端读到的数据
25+
*
26+
*/
2027
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
21-
// workGroup对应着socket编程中的数据读取,读取server端读到的数据
2228
EventLoopGroup workGroup = new NioEventLoopGroup();
2329

2430
try {
31+
// 创建服务器端的启动对象,配置参数
2532
ServerBootstrap bootstrap = new ServerBootstrap();
33+
// 使用链式编程来进行设置, group设置两个线程组
2634
bootstrap.group(bossGroup, workGroup)
27-
// 给服务端channel设置SocketChannel类型
35+
// 使用NioServerSocketChannel来作为服务器的通道实现
2836
.channel(NioServerSocketChannel.class)
29-
// 给每个客户端连接设置TCP基本属性
30-
.childOption(ChannelOption.TCP_NODELAY, true)
31-
// 绑定属性
32-
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
33-
// handler表示客户端的一个处理逻辑
34-
.handler(new ServerHandler())
37+
// 设置线程队列得到的连接数
38+
.option(ChannelOption.SO_BACKLOG, 128)
39+
// 设置保持活动链接状态
40+
.childOption(ChannelOption.SO_KEEPALIVE, true)
41+
// 创建一个通道测试对象
42+
// 给workerGroup的EventLoop对于的管道设置处理器
3543
.childHandler(new ChannelInitializer<SocketChannel>() {
44+
// 给pipeline设置处理器
3645
@Override
3746
public void initChannel(SocketChannel channel) {
38-
//channel.pipeline().addLast(new AuthHandler());
47+
channel.pipeline().addLast(new NettyServerHandler());
3948
}
4049
});
50+
51+
System.out.println("...服务器 is ready...");
52+
53+
// 绑定一个端口并且同步,生成一个ChannelFuture对象
4154
ChannelFuture channelFuture = bootstrap.bind(8888).sync();
55+
// 对关闭通道进行监听
4256
channelFuture.channel().closeFuture().sync();
4357
} finally {
4458
bossGroup.shutdownGracefully();

Spring-Netty/src/main/java/com/bruis/learnnetty/netty/demo01/ServerHandler.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

0 commit comments

Comments
 (0)