Netty简单案例
- 前言
- 环境准备
- 前置知识
- 网络传输的几种实现方式
- BIO——同步阻塞IO
- NIO——同步非阻塞IO
- AIO——异步非阻塞IO
- 适用范围
- Netty
- 简介
- 特点
- 核心组件
- 使用场景
- 运行简图
- 案例
- 简介
- 关键代码
- 客户端
- 服务器端
- 运行状况
- 总结
前言
最近学完了Netty,在这里关于Netty中实现NIO做一些小总结,并附上一个小案例,最好读者有一点Netty的基础。这里附上git的地址,看一下netty的各种案例运行一下。github
环境准备
Maven 3.X、JDK15
前置知识
网络传输的几种实现方式
建议看一下之前的一篇文章《Java网络编程之阻塞式IO与非阻塞IO》中关于阻塞和非阻塞IO在Java中的使用。
BIO——同步阻塞IO
阻塞式的IO,服务器以轮询的方式,不断查看是否有新的连接。当然其性能可以使用线程池得到略微改善。
NIO——同步非阻塞IO
通过Selector以及Channel的组合使用,实现了多路复用,虽然实现了服务器的异步处理,但是客户端必须要在服务器响应到达才能发起下一个请求,即客户端需要某线程持续监听是否有响应发送回来。大体流程如下:
AIO——异步非阻塞IO
该模式不仅服务器实现了异步、客户端也实现了异步,能够在请求没有到达之前,继续向服务器发送数据。这里之后补充。
适用范围
-
NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,JDK1.4开始支持。
-
AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如HTTP服务器等,充分调用OS参与并发操作,JDK7开始支持
Netty
简介
Netty是高性能的水平扩展的分布式架构
特点
健壮、安全、高可用、高性能、更新快、易用
核心组件
- channel:传入、传出的数据载体
- 回调:给请求的响应
- Feature:异步编程的的一个任务的开启
- 事件和ChannelHandler:很多框架,前端后端都包含这类"发布者订阅者"设计思想
使用场景
- 内部的RPC框架
低延迟高吞吐量 - 负载和性能测试
用于负载和性能测试框架,可以通过Netty和Redis结合,来以最小的负载测试端到端的消息吞吐量。 - 同步协议的异步客户端
Netty为同步的协议创建异步的客户端,如Kafka、Memcached。使得在同步和异步之间来回切换,不需要更改任何上有代码。
消息推送、实时流量监控都可以使用Netty,有待大家自己探索。
运行简图
简而言之,服务器监听端口,就是新建一个ServerChannel,客户端建立连接connect也是建立一个连接,之后ServerChannel收到之后通过EventLoopGroup把该channel分配到EventLoop,如果该channel对应的任务已经存在于EventLoop中就直接执行,否则就要放入EventLoop中,等待执行。
-
EventLoopGroup
EventLoopGroup好比线程池,EventLoop好比线程,Channel的pipeline了所有的Handler
-
EventLoop的Task注册
-
连接建立示意图
-
服务器
-
客户端
-
案例
简介
客户通过一个终端输入查询的编号,如果问题在库中存在,人工客服返回相应的回答;若不存在,就会返回默认回复——”致电人工客服“;
如果是非法的输入,就直接模拟服务器宕机,所有的连接都断开。
注意 :这里后续作为入门案例展示,如果这里看不太懂,建议看一下git的quickstart部分。这里只展示关键代码
关键代码
客户端
public void start() {
Bootstrap bootstrap = new Bootstrap ( );
// 引导类的配置,包括事件组,channel类型以及处理器
bootstrap.group (group)
.channel (NioSocketChannel.class)
.handler (createInitializer ( ));
ChannelFuture future = bootstrap.connect (new InetSocketAddress (8080));
future.syncUninterruptibly ( );
channel = future.channel ( );
}
// 注册所有的处理器
private ChannelInitializer<Channel> createInitializer() {
return new TerminalChatClientInitializer ( );
}
// 通过控制台不断地询问智能客服
static void chat() throws IOException {
while (true) {
String input = getInputMsg ( );
if (!input.equals (OVER)) {
channel.writeAndFlush (Unpooled.copiedBuffer (input, StandardCharsets.UTF_8));
} else {
break;
}
}
}
// 处理器代码
public class TerminalClientInHandler extends SimpleChannelInboundHandler<ByteBuf> {
// 把响应打印出来
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
out.println (LocalDateTime.now ( ).format (DateTimeFormatter.ofPattern ("yyyy/MM/dd HH:mm:ss")));
String resp = msg.toString (StandardCharsets.UTF_8);
out.println (resp);
}
// 提示连接成功
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
out.println ("connect to server successfully : " + ctx.channel ( ).remoteAddress ( ));
}
// 提示连接关闭
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
out.println ("已经关闭连接");
}
}
服务器端
public void start() {
// 初始化服务器引导
ServerBootstrap server = new ServerBootstrap ( );
server.group (mainGroup)
.channel (NioServerSocketChannel.class)
.childHandler (createInitializer (channelGroup));
// 监听8080端口
ChannelFuture future = server.bind (8080);
future.syncUninterruptibly ( );
channel = future.channel ( );
}
private ChannelInitializer<Channel> createInitializer(ChannelGroup channelGroup) {
return new TerminalChatServerInitializer (channelGroup);
}
public void destroy() {
if (channel != null) {
channel.close ( );
}
mainGroup.shutdownGracefully ( );
subGroup.shutdownGracefully ( );
}
// 处理器关键代码
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
// 打印收到的时间和消息
String input = msg.toString (StandardCharsets.UTF_8);
out.println (ctx.channel ().remoteAddress () + "\n" + input);
try {
// 解析查询的Id,并且响应相应的答案
int id = Integer.parseInt (input);
String res = resMap.getOrDefault (id, "请致电人工:10086");
ctx.writeAndFlush (Unpooled.copiedBuffer (res, StandardCharsets.UTF_8));
} catch (Exception e) {
// 非法输入,模拟服务器宕机,向所有的客户端发送 服务器关闭消息
group.writeAndFlush (Unpooled.copiedBuffer ("不明原因,服务器暂时关闭",StandardCharsets.UTF_8));
group.close ();
}
}
// 有新连接,打印客户机地址,并且加入到聊天群组
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
out.println ("client connected successfully : " + ctx.channel ( ).remoteAddress ( ));
group.add (ctx.channel ( ));
}
// 某客户端下线,打印客户机地址,这里不需要手动从群组移除,Netty已经帮我们实现了该功能
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
out.println ("客户端: " + ctx.channel ( ).remoteAddress ( ).toString ( ) + " 结束");
}
运行状况
总结
相对于Java原生Nio的消息读取以及消息处理来说,Netty的实现方式更加简单。