初始Netty-HelloWorld
Netty在网络通信中的地位就如同Spring框架在JavaEE开发中的地位。
基于Netty网络通信开发简易的服务端、客户端,以实现客户端向服务端发送hello world,服务端仅接收不返回数据。
服务端代码:
@Slf4j
public class HelloServer {
public static void main(String[] args) {
new ServerBootstrap() // 服务器端的启动器,负责组装netty组件,启动服务器
// Group组,BossEventLoop,WorkerEventLoop(selector, thread)
.group(new NioEventLoopGroup()) // 1
.channel(NioServerSocketChannel.class) // 2
// boss 负责处理连接,worker(child)负责处理读写,决定了worker(child)能执行哪些操作(handler)
.childHandler(
// channel代表和客户端进行数据读写的通道 Initializer初始化,负责添加别的handler
new ChannelInitializer<NioSocketChannel>() { // 3
protected void initChannel(NioSocketChannel ch) {
// 添加具体的handler, StringDecoder字符串解码,将传输的ByteBuf转换为字符串
ch.pipeline().addLast(new StringDecoder()); // 5
ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { // 6,自定义handler
@Override // 读事件
protected void channelRead0(ChannelHandlerContext ctx, String msg) {
// 打印上一步转换的字符串
System.out.println(msg);
}
});
}
})
.bind(8080); // 4,绑定监听端口
}
}
客户端代码:
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap() // 创建启动器
// 添加EventLoop
.group(new NioEventLoopGroup()) // 1
// 选择客户端的channel
.channel(NioSocketChannel.class) // 2
// 添加处理器
.handler(new ChannelInitializer<Channel>() { // 3
@Override // 在连接建立后被调用
protected void initChannel(Channel ch) {
// 对发送的数据进行编码
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
// 连接到服务器
.connect("127.0.0.1", 8080) // 4
//
.sync() // 5
.channel() // 6
.writeAndFlush(new Date() + ": hello world!"); // 7 向服务器发送数据
}
}
主要执行流程:
可以理解如下:
- 把 channel 理解为数据的通道
- 把 msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline (流水线)的加工,会变成其它类型对象,最后输出又变成 ByteBuf
- 把 handler 理解为数据的处理工序
- 工序有多道,合在一起就是 pipeline,pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- handler 分 Inbound 和 Outbound 两类
- 把 eventLoop 理解为处理数据的工人
- 工人可以管理多个 channel 的 io 操作,并且一旦工人负责了某个 channel,就要负责到底(绑定)
- 工人既可以执行 io 操作,也可以进行任务处理,每位工人有任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- 工人按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每道工序指定不同的工人
EventLoop详解
EventLoop基础知识
事件循环对象
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。
它的继承关系比较复杂
- 一条线是继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另一条线是继承自 netty 自己的 OrderedEventExecutor,
- 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 parent 方法来看看自己属于哪个 EventLoopGroup
事件循环组
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
EventLoop使用示例
// 创建事件循环组
// NioEventLoopGroup可以处理io事件、普通任务、定时任务
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(2);
// DefaultEventLoopGroup可以处理普通任务、定时任务
EventLoopGroup eventLoopGroup = new DefaultEventLoopGroup();
创建事件循环组时,若不指定线程个数,则创建个数为机器可用CPU * 2,若指定个数,则按照指定个数进行创建。
// 处理普通任务
eventLoopGroup.next().submit(()->{ // 或者使用.execute()
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("ok");
});
// 执行定时任务
eventLoopGroup.next().scheduleAtFixedRate(()->{
log.info("test schedule task ...");
}, 0, 1, TimeUnit.SECONDS);
使用NioEventLoopGroup处理IO事件
采用多个NioEventLoopGroup来分别处理不同的事件,如boss专门处理accept事件,worker专门处理read事件。并且对于耗时较长的handler交给专门的EventLoopGroup来处理,从而不阻塞原有的NioEventLoopGroup监听事件的正常运行。
优化一:
上述图片中第23行为初始代码,一个NioEventLoopGroup处理所有的事件,包括accept、read
等,显然不符合各司其职的功能,将其优化为第24行所示,明确NioEventLoopGroup负责处理的
事件类别。
1.boss只负责NioServerSocketChannel上的accept事件。
2.worker负责NioSocketChannel上的read事件。
优化二:
对于比较耗时的handler,可以将其将给其他EventLoopGroup创建handler来执行。
服务端代码:
@Slf4j
public class EventLoopServer {
public static void main(String[] args) {
// 2.细分:如果某个handler执行事件比较长,可以独立出一个eventloopgroup进行处理
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
// 1.进行优化,明确eventloop负责处理的事件类别,boss和worker
// boss只负责NioServerSocketChannel上的accept事件,worker负责NioSocketChannel上的read事件
// 是否需要指定第一个负责accept的NioEventLoopGroup的适量,不需要,只会有一个NioServerSocketChannel
// .group(new NioEventLoopGroup())
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 转为toString的时候需要指定字符集,此处选择使用默认字符集
// 若在网络通信中,需要客户端和服务器协商字符集的使用,使用同一个标准进行处理
log.info(buf.toString(Charset.defaultCharset()));
ctx.fireChannelRead(msg); // 将消息传递给下一个handler
}
}).addLast(group, "handler2", new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.info(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}
客户端代码:
@Slf4j
public class EventLoopClient {
public static void main(String[] args) throws InterruptedException {
Channel channel = new Bootstrap() // 创建启动器
// 添加EventLoop
.group(new NioEventLoopGroup()) // 1
// 选择客户端的channel
.channel(NioSocketChannel.class) // 2
// 添加处理器
.handler(new ChannelInitializer<Channel>() { // 3
@Override // 在连接建立后被调用
protected void initChannel(Channel ch) {
// 对发送的数据进行编码
ch.pipeline().addLast(new StringEncoder()); // 8
}
})
// 连接到服务器
.connect("127.0.0.1", 8080) // 4
//
.sync() // 5
.channel();// 6
// for (int i = 0; i < 5; i++) {
// channel.writeAndFlush("Hello eventloop_" + i + " ");
// }
log.info("{}",channel);
System.out.println("");
}
}
EventLoop、Channel和Handler之间的关系
建立连接之后,channel和一个EventLoop进行绑定,并且一个线程可以管理多个Channel。
同一个Channel绑定多个不同的EventLoop(也就不同EventLoopGroup中对应的EventLoop),此时如何进行不同handler之间的切换。
- 如果两个 handler 绑定的是同一个线程,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的线程来调用
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
EventExecutor executor = next.executor();
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}}