文章目录
- 1 摘要
- 2 核心 Maven 依赖
- 3 核心代码
- 3.1 服务端事务处理器(DemoUdpNettyServerHandler)
- 3.2 服务端连接类(InitUdpNettyServer)
- 3.3 客户端事务处理类(DemoUdpNettyClientHandler)
- 3.4 客户端连接类(DemoUdpNettyClient)
- 4 高并发性能配置
- 5 推荐参考资料
- 6 Github 源码
1 摘要
Netty 作为异步通讯框架,支持多种协议。本文将介绍基于 SpringBoot 2.7 整合 Netty 4 实现 UDP 通讯。
2 核心 Maven 依赖
demo-netty-server/pom.xml
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
netty 版本:
<netty.version>4.1.96.Final</netty.version>
3 核心代码
3.1 服务端事务处理器(DemoUdpNettyServerHandler)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoUdpNettyServerHandler.java
package com.ljq.demo.springboot.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: UDP Netty 服务端事务处理器
* @Author: junqiang.lu
* @Date: 2023/8/25
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class DemoUdpNettyServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
/**
* 工作线程池
*/
private final ExecutorService executorService = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000), new DefaultThreadFactory("UDP-netty-work-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());
@Override
protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
ByteBuf byteBuf = packet.content();
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
// 异步处理业务
executorService.execute(() -> {
// 读取数据
log.info("UDP server receive client msg:" + new String(bytes));
try {
// 添加休眠,模拟业务处理
Thread.sleep(5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("UDP server read message error", cause);
}
}
代码说明: 这里使用线程池来异步处理事务,提高系统并发性能
3.2 服务端连接类(InitUdpNettyServer)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/init/InitUdpNettyServer.java
package com.ljq.demo.springboot.netty.server.init;
import com.ljq.demo.springboot.netty.server.handler.DemoUdpNettyServerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
/**
* @Description: 初始化 udt netty 服务
* @Author: junqiang.lu
* @Date: 2023/8/25
*/
@Slf4j
@Component
public class InitUdpNettyServer implements ApplicationRunner {
@Value("${netty.portUdp:9130}")
private Integer nettyPort;
@Resource
private DemoUdpNettyServerHandler udpNettyServerHandler;
@Override
public void run(ApplicationArguments args) throws Exception {
this.start();
}
/**
* 启动服务
*
* @throws InterruptedException
*/
public void start() throws InterruptedException {
// 连接管理线程池
EventLoopGroup mainGroup = new NioEventLoopGroup(2);
EventLoopGroup workGroup = new NioEventLoopGroup(8);
// 工作线程池
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(mainGroup)
// 指定 nio 通道,支持 UDP
.channel(NioDatagramChannel.class)
// 广播模式
.option(ChannelOption.SO_BROADCAST, true)
// 设置读取缓冲区大小为 10M
.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
// 设置发送缓冲区大小为 10M
.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
// 线程池复用缓冲区
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 指定 socket 地址和端口
.localAddress(new InetSocketAddress(nettyPort))
// 添加通道 handler
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
nioDatagramChannel.pipeline()
// 指定工作线程,提高并发性能
.addLast(workGroup,udpNettyServerHandler);
}
});
// 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
bootstrap.bind().sync();
log.info("---------- [init] UDP netty server start ----------");
}
}
代码说明:
UDP 协议需要使用 NioDatagramChannel.class
通道
设置缓冲区的大小有利于提高系统吞吐量,线程池复用也利于提升系统处理性能
3.3 客户端事务处理类(DemoUdpNettyClientHandler)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoUdpNettyClientHandler.java
package com.ljq.demo.springboot.netty.server.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
* @Description: UDP Netty 客户端事务处理器
* @Author: junqiang.lu
* @Date: 2023/8/25
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class DemoUdpNettyClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
// 读取数据
ByteBuf byteBuf = datagramPacket.content();
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
log.info("receive server msg:" + new String(bytes));
}
}
3.4 客户端连接类(DemoUdpNettyClient)
demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/client/DemoUdpNettyClient.java
package com.ljq.demo.springboot.netty.server.client;
import cn.hutool.core.util.RandomUtil;
import com.ljq.demo.springboot.netty.server.handler.DemoUdpNettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
/**
* @Description: UDP netty 客户端
* @Author: junqiang.lu
* @Date: 2023/8/25
*/
@Slf4j
public class DemoUdpNettyClient {
private final String serverHost;
private final int serverPort;
private final int clientPort;
private final EventLoopGroup mainGroup;
private final Bootstrap bootstrap;
private Channel channel;
public DemoUdpNettyClient(String serverHost, int serverPort, int clientPort) {
this.serverHost = serverHost;
this.serverPort = serverPort;
this.clientPort = clientPort;
this.mainGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap();
}
public Channel getChannel() {
return this.channel;
}
/**
* 创建连接
*/
public void connect() throws InterruptedException {
bootstrap.group(mainGroup)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.localAddress(clientPort)
.handler(new DemoUdpNettyClientHandler());
ChannelFuture future = bootstrap.bind().sync();
this.channel = future.channel();
}
/**
* 发送消息
*
* @param message
*/
public void sendMessage(String message) {
log.info("客户端待发送消息:{}", message);
Channel channel = this.getChannel();
byte[] resBytes = message.getBytes();
DatagramPacket sendPacket = new DatagramPacket(Unpooled.copiedBuffer(resBytes), new InetSocketAddress(serverHost, serverPort));
channel.writeAndFlush(sendPacket);
}
public void close() throws InterruptedException {
log.info("关闭客户端");
mainGroup.shutdownGracefully();
}
public static void main(String[] args) throws InterruptedException {
String serverHost = "127.0.0.1";
int serverPort = 9130;
int clientPort = 9131;
String message = RandomUtil.randomString(1024);
DemoUdpNettyClient nettyClient = new DemoUdpNettyClient(serverHost, serverPort, clientPort);
nettyClient.connect();
for (int i = 0; i < 10000; i++) {
nettyClient.sendMessage(message + i);
}
log.info("--------开始休眠 5 秒------------");
Thread.sleep(5000L);
log.info("--------休眠 5 秒结束------------");
for (int i = 0; i < 5; i++) {
nettyClient.sendMessage(i + message);
Thread.sleep(100L);
}
Thread.sleep(5000L);
nettyClient.close();
}
}
这里包含了测试方法
4 高并发性能配置
-
1 在服务端事务处理类中使用异步处理消息
-
Netty 服务端设置较高的读写缓存,提高吞吐量;
-
线程池复用缓冲区
// 设置读取缓冲区大小为 10M .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10) // 设置发送缓冲区大小为 10M .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10) // 线程池复用缓冲区 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
-
Netty 设置主现成以及工作线程,提升消息处理效率
// 连接管理线程池 EventLoopGroup mainGroup = new NioEventLoopGroup(2); EventLoopGroup workGroup = new NioEventLoopGroup(8); // 工作线程池 Bootstrap bootstrap = new Bootstrap(); bootstrap.group(mainGroup) ... ... // 添加通道 handler .handler(new ChannelInitializer<NioDatagramChannel>() { @Override protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception { nioDatagramChannel.pipeline() // 指定工作线程,提高并发性能 .addLast(workGroup,udpNettyServerHandler); } });
5 推荐参考资料
基于Netty实现UDP双向通信
Java入门:UDP协议发送/接收数据实现
读取tcp/udp默认缓冲区大小
Netty之UDP丢包解决
6 Github 源码
Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo/tree/master/demo-netty-server
个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.