文章目录
- 1 摘要
- 2 核心 Maven 依赖
- 3 核心代码
- 3.1 服务端事务处理器 (DemoNettyServerHandler)
- 3.2 服务端连接类(InitNettyServer)
- 3.3 客户端事务处理器(DemoNettyClientHandler)
- 3.4 客户端连接类(DemoNettyClient)
- 4 测试
- 4.1 测试流程
- 4.2 测试结果
- 4.3 测试结论
- 5 推荐参考资料
- 6 Github 源码
1 摘要
Netty 作为一款 NIO 底层通讯框架,在高并发场景有着广泛的应用,众多消息中间件内部也是基于 Netty 进行开发。本文将介绍基于 SpringBoot 2.7 集成 Netty 来模拟服务端与客户端进行通讯。
2 核心 Maven 依赖
demo-netty-server/pom.xml
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
netty 版本:
<netty.version>4.1.96.Final</netty.version>
3 核心代码
3.1 服务端事务处理器 (DemoNettyServerHandler)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoNettyServerHandler.java
package com.ljq.demo.springboot.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @Description: Netty 服务处理器
* @Author: junqiang.lu
* @Date: 2023/8/18
*/
@Slf4j
@Component
// 标记该类实例可以被多个 channel 共享
@ChannelHandler.Sharable
public class DemoNettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 每个传入的消息都会调用该方法
*
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
byte[] body = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(body);
log.info("服务端接收到数据:{}", new String(body));
byte[] responseBytes = "hi,客户端,我是服务端".getBytes();
ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes));
}
/**
* 在读取期间,有异常抛出时会调用
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 打印异常栈跟踪
log.error("netty server error",cause);
//关闭该channel
ctx.close();
}
}
3.2 服务端连接类(InitNettyServer)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/init/InitNettyServer.java
package com.ljq.demo.springboot.netty.server.init;
import com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
/**
* @Description: 初始化 netty 服务端
* @Author: junqiang.lu
* @Date: 2023/8/18
*/
@Slf4j
@Component
public class InitNettyServer implements ApplicationRunner {
@Value("${netty.port:9120}")
private Integer nettyPort;
@Resource
private DemoNettyServerHandler nettyServerHandler;
@Override
public void run(ApplicationArguments args) throws Exception {
this.start();
}
/**
* 启动服务
*
* @throws InterruptedException
*/
public void start() throws InterruptedException {
// 连接管理线程池
EventLoopGroup mainGroup = new NioEventLoopGroup(1);
// 工作线程池
EventLoopGroup workGroup = new NioEventLoopGroup(4);
ServerBootstrap bootstrap = new ServerBootstrap();
try {
bootstrap.group(mainGroup, workGroup)
// 指定 nio 通道
.channel(NioServerSocketChannel.class)
// 指定 socket 地址和端口
.localAddress(new InetSocketAddress(nettyPort))
// 添加子通道 handler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(nettyServerHandler);
}
});
// 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
ChannelFuture channelFuture = bootstrap.bind().sync();
log.info("---------- [init] netty server start ----------");
// 获取Channel的CloseFuture,并且阻塞当前线程直到它完成
channelFuture.channel().closeFuture().sync();
} finally {
// 关闭 EventLoopGroup,释放资源
mainGroup.shutdownGracefully().sync();
}
}
}
注意事项: 为什么要定义两个 EventLoopGroup
? 实际上这个 EventLoopGroup
相当于一个线程组,如果只定义一个,则这个线程组既负责服务端与客户端的连接管理,也负责服务端的事务处理,这就大大降低了服务端的吞吐量,定义两个 EventLoopGroup
可以极大地提高系统的并发性能。
3.3 客户端事务处理器(DemoNettyClientHandler)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoNettyClientHandler.java
package com.ljq.demo.springboot.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @Description: netty 客户端处理器
* @Author: junqiang.lu
* @Date: 2023/8/18
*/
@Slf4j
@Component
// 标记该类实例可以被多个 channel 共享
@ChannelHandler.Sharable
public class DemoNettyClientHandler extends ChannelInboundHandlerAdapter {
/**
* 接收消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf byteBuf= (ByteBuf)msg;
byte[] body=new byte[byteBuf.readableBytes()];
byteBuf.readBytes(body);
log.info("接收到来自服务端的消息:{}",new String(body));
// 释放消息
ReferenceCountUtil.release(msg);
}
/**
* 和服务器建立连接时触发
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接到服务器!!!");
// 向服务端发送上线消息
byte[] bytes = "hi,服务端,我是客户端!".getBytes();
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
ctx.writeAndFlush(byteBuf);
}
/**
* 有异常时触发
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("客户端异常", cause);
super.exceptionCaught(ctx, cause);
}
}
3.4 客户端连接类(DemoNettyClient)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/client/DemoNettyClient.java
package com.ljq.demo.springboot.netty.server.client;
import com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.bytes.ByteArrayEncoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
/**
* @Description: Netty 客户端
* @Author: junqiang.lu
* @Date: 2023/8/22
*/
@Slf4j
public class DemoNettyClient {
private final String host;
private final int port;
private final EventLoopGroup mainGroup;
private final Bootstrap bootstrap;
private Channel channel;
public DemoNettyClient(String host, int port) {
this.host = host;
this.port = port;
this.mainGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
}
public Channel getChannel() {
return this.channel;
}
/**
* 创建连接
*/
public void connect() throws InterruptedException {
bootstrap.group(mainGroup)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new StringEncoder())
.addLast(new ByteArrayEncoder())
.addLast(new DemoNettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect().sync();
this.channel = future.channel();
}
/**
* 发送消息
*
* @param message
*/
public void sendMessage(String message) {
log.info("客户端待发送消息:{}", message);
Channel channel = this.getChannel();
byte[] bytes = message.getBytes(StandardCharsets.UTF_8);
ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
channel.writeAndFlush(byteBuf);
}
public void close() throws InterruptedException {
log.info("关闭客户端");
mainGroup.shutdownGracefully().sync();
}
public static void main(String[] args) throws InterruptedException {
String serverHost = "127.0.0.1";
int serverPort = 9120;
String message = "ahahaha啊哈哈哈啊哈";
DemoNettyClient nettyClient = new DemoNettyClient(serverHost, serverPort);
nettyClient.connect();
for (int i = 0; i < 10; i++) {
nettyClient.sendMessage(message + i);
}
log.info("--------开始休眠 5 秒------------");
Thread.sleep(5000L);
for (int i = 0; i < 5; i++) {
nettyClient.sendMessage(i + message);
Thread.sleep(100L);
}
nettyClient.close();
}
}
4 测试
4.1 测试流程
先启动 SpringBoot 应用程序,此时服务端已经启动完成
再执行客户端连接类(DemoNettyClient) 中的 main 方法
4.2 测试结果
客户端日志:
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈0
2023-08-23 11:27:11 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 48| 客户端连接到服务器!!!
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈1
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈2
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈3
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈4
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈5
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈6
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈7
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈8
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:ahahaha啊哈哈哈啊哈9
2023-08-23 11:27:11 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 96| --------开始休眠 5 秒------------
2023-08-23 11:27:11 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:16 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:0ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:16 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:1ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:16 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:2ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:17 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:3ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:17 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 74| 客户端待发送消息:4ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-2-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyClientHandler 35| 接收到来自服务端的消息:hi,客户端,我是服务端
2023-08-23 11:27:17 | INFO | main | com.ljq.demo.springboot.netty.server.client.DemoNettyClient 82| 关闭客户端
Disconnected from the target VM, address: '127.0.0.1:56274', transport: 'socket'
服务端日志:
2023-08-23 11:27:11 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:hi,服务端,我是客户端!ahahaha啊哈哈哈啊哈0ahahaha啊哈哈哈啊哈1ahahaha啊哈哈哈啊哈2ahahaha啊哈哈哈啊哈3ahahaha啊哈哈哈啊哈4ahahaha啊哈哈哈啊哈5ahahaha啊哈哈哈啊哈6ahahaha啊哈哈哈啊哈7ahahaha啊哈哈哈啊哈8ahahaha啊哈哈哈啊哈9
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:0ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:1ahahaha啊哈哈哈啊哈
2023-08-23 11:27:16 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:2ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:3ahahaha啊哈哈哈啊哈
2023-08-23 11:27:17 | INFO | nioEventLoopGroup-3-1 | com.ljq.demo.springboot.netty.server.handler.DemoNettyServerHandler 33| 服务端接收到数据:4ahahaha啊哈哈哈啊哈
4.3 测试结论
- (1)从日志中可以看出客户端与服务端已经通讯成功
- (2)在第一个客户端循环发送消息中,客户端发了10次,然而服务端一次性接收到了所有消息,并不是发送一次接收一次
- (3)在第二个客户端循环发送消息过程中,客户端每间隔100毫秒发送一条消息,此时服务端是一条一条接收的
关于第(2)点,这是 Netty 通讯过程中的粘包问题,欲知如何解决,且听下回分解。
关于第(3)点,由于添加了时间间隔,Netty 会认为一条消息发送完成,因此就能发送一条接收一条。
5 推荐参考资料
[Netty入门] 最简单的Netty应用程序实例
Netty4:一个简单的消息传递的demo(分析和解析)
6 Github 源码
Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo/tree/master/demo-netty-server
个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.