SpringBoot 2.7 集成 Netty 4 实现 UDP 通讯

news2024/11/18 11:57:04

文章目录

    • 1 摘要
    • 2 核心 Maven 依赖
    • 3 核心代码
      • 3.1 服务端事务处理器(DemoUdpNettyServerHandler)
      • 3.2 服务端连接类(InitUdpNettyServer)
      • 3.3 客户端事务处理类(DemoUdpNettyClientHandler)
      • 3.4 客户端连接类(DemoUdpNettyClient)
    • 4 高并发性能配置
    • 5 推荐参考资料
    • 6 Github 源码

1 摘要

Netty 作为异步通讯框架,支持多种协议。本文将介绍基于 SpringBoot 2.7 整合 Netty 4 实现 UDP 通讯。

2 核心 Maven 依赖

demo-netty-server/pom.xml
        <!-- Netty -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>${netty.version}</version>
        </dependency>

netty 版本:

<netty.version>4.1.96.Final</netty.version>

3 核心代码

3.1 服务端事务处理器(DemoUdpNettyServerHandler)

demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoUdpNettyServerHandler.java
package com.ljq.demo.springboot.netty.server.handler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;


/**
 * @Description: UDP Netty 服务端事务处理器
 * @Author: junqiang.lu
 * @Date: 2023/8/25
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class DemoUdpNettyServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    /**
     * 工作线程池
     */
    private final ExecutorService executorService = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10000), new DefaultThreadFactory("UDP-netty-work-pool"),
            new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, DatagramPacket packet) throws Exception {
        ByteBuf byteBuf = packet.content();
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        // 异步处理业务
        executorService.execute(() -> {
            // 读取数据
            log.info("UDP server receive client msg:" + new String(bytes));
            try {
                // 添加休眠,模拟业务处理
                Thread.sleep(5L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("UDP server read message error", cause);
    }
}

代码说明: 这里使用线程池来异步处理事务,提高系统并发性能

3.2 服务端连接类(InitUdpNettyServer)

demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/init/InitUdpNettyServer.java
package com.ljq.demo.springboot.netty.server.init;

import com.ljq.demo.springboot.netty.server.handler.DemoUdpNettyServerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.InetSocketAddress;

/**
 * @Description: 初始化 udt netty 服务
 * @Author: junqiang.lu
 * @Date: 2023/8/25
 */
@Slf4j
@Component
public class InitUdpNettyServer implements ApplicationRunner {

    @Value("${netty.portUdp:9130}")
    private Integer nettyPort;

    @Resource
    private DemoUdpNettyServerHandler udpNettyServerHandler;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.start();
    }

    /**
     * 启动服务
     *
     * @throws InterruptedException
     */
    public void start() throws InterruptedException {
        // 连接管理线程池
        EventLoopGroup mainGroup = new NioEventLoopGroup(2);
        EventLoopGroup workGroup = new NioEventLoopGroup(8);
        // 工作线程池
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(mainGroup)
                // 指定 nio 通道,支持 UDP
                .channel(NioDatagramChannel.class)
                // 广播模式
                .option(ChannelOption.SO_BROADCAST, true)
                // 设置读取缓冲区大小为 10M
                .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
                // 设置发送缓冲区大小为 10M
                .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
                // 线程池复用缓冲区
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                // 指定 socket 地址和端口
                .localAddress(new InetSocketAddress(nettyPort))
                // 添加通道 handler
                .handler(new ChannelInitializer<NioDatagramChannel>() {
                    @Override
                    protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
                        nioDatagramChannel.pipeline()
                                // 指定工作线程,提高并发性能
                                .addLast(workGroup,udpNettyServerHandler);
                    }
                });
        // 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
        bootstrap.bind().sync();
        log.info("---------- [init] UDP netty server start ----------");
    }

}

代码说明:

UDP 协议需要使用 NioDatagramChannel.class 通道

设置缓冲区的大小有利于提高系统吞吐量,线程池复用也利于提升系统处理性能

3.3 客户端事务处理类(DemoUdpNettyClientHandler)

demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/DemoUdpNettyClientHandler.java
package com.ljq.demo.springboot.netty.server.handler;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * @Description: UDP Netty 客户端事务处理器
 * @Author: junqiang.lu
 * @Date: 2023/8/25
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class DemoUdpNettyClientHandler extends SimpleChannelInboundHandler<DatagramPacket> {


    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket datagramPacket) throws Exception {
        // 读取数据
        ByteBuf byteBuf = datagramPacket.content();
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        log.info("receive server msg:" + new String(bytes));
    }
}

3.4 客户端连接类(DemoUdpNettyClient)

demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/client/DemoUdpNettyClient.java
package com.ljq.demo.springboot.netty.server.client;

import cn.hutool.core.util.RandomUtil;
import com.ljq.demo.springboot.netty.server.handler.DemoUdpNettyClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramPacket;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * @Description: UDP netty 客户端
 * @Author: junqiang.lu
 * @Date: 2023/8/25
 */
@Slf4j
public class DemoUdpNettyClient {

    private final String serverHost;

    private final int serverPort;

    private final int clientPort;

    private final EventLoopGroup mainGroup;

    private final Bootstrap bootstrap;

    private Channel channel;

    public DemoUdpNettyClient(String serverHost, int serverPort, int clientPort) {
        this.serverHost = serverHost;
        this.serverPort = serverPort;
        this.clientPort = clientPort;
        this.mainGroup = new NioEventLoopGroup();
        this.bootstrap = new Bootstrap();

    }

    public Channel getChannel() {
        return this.channel;
    }

    /**
     * 创建连接
     */
    public void connect() throws InterruptedException {
        bootstrap.group(mainGroup)
                .channel(NioDatagramChannel.class)
                .option(ChannelOption.SO_BROADCAST, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .localAddress(clientPort)
                .handler(new DemoUdpNettyClientHandler());
        ChannelFuture future = bootstrap.bind().sync();
        this.channel = future.channel();
    }

    /**
     * 发送消息
     *
     * @param message
     */
    public void sendMessage(String message) {
        log.info("客户端待发送消息:{}", message);
        Channel channel = this.getChannel();
        byte[] resBytes = message.getBytes();
        DatagramPacket sendPacket = new DatagramPacket(Unpooled.copiedBuffer(resBytes), new InetSocketAddress(serverHost, serverPort));
        channel.writeAndFlush(sendPacket);
    }

    public void close() throws InterruptedException {
        log.info("关闭客户端");
        mainGroup.shutdownGracefully();
    }




    public static void main(String[] args) throws InterruptedException {
        String serverHost = "127.0.0.1";
        int serverPort = 9130;
        int clientPort = 9131;
        String message = RandomUtil.randomString(1024);
        DemoUdpNettyClient nettyClient = new DemoUdpNettyClient(serverHost, serverPort, clientPort);
        nettyClient.connect();
        for (int i = 0; i < 10000; i++) {
            nettyClient.sendMessage(message + i);
        }
        log.info("--------开始休眠 5 秒------------");
        Thread.sleep(5000L);
        log.info("--------休眠 5 秒结束------------");
        for (int i = 0; i < 5; i++) {
            nettyClient.sendMessage(i + message);
            Thread.sleep(100L);
        }
        Thread.sleep(5000L);
        nettyClient.close();
    }

}

这里包含了测试方法

4 高并发性能配置

  • 1 在服务端事务处理类中使用异步处理消息

  • Netty 服务端设置较高的读写缓存,提高吞吐量;

  • 线程池复用缓冲区

                    // 设置读取缓冲区大小为 10M
                    .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
                    // 设置发送缓冲区大小为 10M
                    .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
                    // 线程池复用缓冲区
                    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    
  • Netty 设置主现成以及工作线程,提升消息处理效率

    // 连接管理线程池
            EventLoopGroup mainGroup = new NioEventLoopGroup(2);
            EventLoopGroup workGroup = new NioEventLoopGroup(8);
            // 工作线程池
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(mainGroup)
            
            ... ...
            
            // 添加通道 handler
                    .handler(new ChannelInitializer<NioDatagramChannel>() {
                        @Override
                        protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
                            nioDatagramChannel.pipeline()
                                    // 指定工作线程,提高并发性能
                                    .addLast(workGroup,udpNettyServerHandler);
                        }
                    });
            
    

5 推荐参考资料

基于Netty实现UDP双向通信

Java入门:UDP协议发送/接收数据实现

读取tcp/udp默认缓冲区大小

Netty之UDP丢包解决

6 Github 源码

Gtihub 源码地址 : https://github.com/Flying9001/springBootDemo/tree/master/demo-netty-server

个人公众号:404Code,分享半个互联网人的技术与思考,感兴趣的可以关注.
404Code

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/956056.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ROLL.DBF回滚表空间增长问题(达梦数据库)

达梦数据库 - 回滚表空间增长问题 环境介绍1 环境搭建1.1 创建表与测试数据1.2 查询待提交的数据量1.3 查询回滚表空间使用情况1.3.1 插入数据前查询结果1.3.2 插入数据后未提交事务查询结果1.3.3 插入数据后提交事务查询结果 环境介绍 达梦数据库ROLL.DBF 在某些业务系统厂商…

防破解暗桩思路:检查菜单是否被非法修改过源码

本篇文章属于《518抽奖软件开发日志》系列文章的一部分。 我在开发《518抽奖软件》&#xff08;www.518cj.net&#xff09;的时候&#xff0c;为了防止被破解&#xff0c;需用添加一些暗桩&#xff0c;在合适的时机检查软件是否被非法修改过&#xff0c;如果被非法修改就做出提…

【位运算】位运算常用技巧总结

目录 前言 一.常见的小问题 1.给定一个数n,确定它的二进制表示中的第x位是0还是1 2.给定一个数n&#xff0c;将它的二进制表示中的第x位修改成1 3.给定一个数n&#xff0c;将它的二进制表示中的第x位修改成0 4.给定一个数n&#xff0c;提取它的二进制表示中最右侧的1&…

AUTOSAR开发工具DaVinci Configurator里的Modules

DaVinci Configurator 里面有个Module这个概念。 如你所想&#xff0c;基本上跟AUTOSAR架构里面的Module相对应 从软件的Project菜单中的Basic Editor项可以打开 打开这个菜单后&#xff0c;会看到很多Modules项以及其相关配置项 这个Basic Editor显示出整个ECU配置中的所有…

Windows 安装 RabbitMq

Windows 上安装 RabbitMQ 的步骤 RabbitMQ 是一个强大的开源消息队列系统&#xff0c;广泛用于构建分布式、可扩展的应用程序。本教程将带您一步一步完成在 Windows 系统上安装 RabbitMQ 的过程。无需担心&#xff0c;即使您是初学者&#xff0c;也能够轻松跟随这些简单的步骤…

element-plus 设置 el-date-picker 弹出框位置

前言 概述&#xff1a;el-date-picker 组件会自动根据空间范围进行选择比较好的弹出位置&#xff0c;但特定情况下&#xff0c;它自动计算出的弹出位置并不符合我们的实际需求&#xff0c;故需要我们手动设置。 存在的问题&#xff1a;element-plus 中 el-date-picker 文档中并…

渗透测试漏洞原理之---【任意文件包含漏洞】

文章目录 1、文件包含概述1.1 文件包含语句1.1.1、相关配置 1.2、动态包含1.2.1、示例代码1.2.2、本地文件包含1.2.3、远程文件包含 1.3、漏洞原理1.3.1、特点 2、文件包含攻防2.1、利用方法2.1.1、包含图片木马2.1.2、读取敏感文件2.1.3、读取PHP文件源码2.1.4、执行PHP命令2.…

mybatisPlus多数据源方案

背景 在微服务李娜一般一个服务只有一个数据源&#xff0c;但是在有的老项目或者一些特定场景需要多数据源链接不同的数据库&#xff0c;本文以mybatisPlus为基础给出解决方案 多数据源场景分类 情形一&#xff1a;项目启动就确定了情形一&#xff1a;一些sass系统里面动态确…

python-数据分析-numpy、pandas、matplotlib的常用方法

一、numpy import numpy as np1.numpy 数组 和 list 的区别 输出方式不同 里面包含的元素类型 2.构造并访问二维数组 使用 索引/切片 访问ndarray元素 切片 左闭右开 np.array(list) 3.快捷构造高维数组 np.arange() np.random.randn() - - - 服从标准正态分布- - - …

DHorse v1.3.2 发布,基于 k8s 的发布平台

版本说明 新增特性 构建版本、部署应用时的线程池可配置化&#xff1b; 优化特性 构建版本跳过单元测试&#xff1b; 解决问题 解决Vue应用详情页面报错的问题&#xff1b;解决Linux环境下脚本运行失败的问题&#xff1b;解决下载Maven安装文件失败的问题&#xff1b; 升…

VPN网关

阿里云VPN网关(VPN Gateway&#xff0c;简称VPN)是一款基于Internet&#xff0c;通过加密通道将企业数据中心、办公网或终端与专有网络(VPC) 安全可靠连接起来的服务。 VPN网关提供IPsec-VPN和SSL-VPN两种。 网络连接方式应用场景IPsec-VPN支持在企业本地数据中心、企业办公网…

Go语言基础之指针

区别于C/C中的指针&#xff0c;Go语言中的指针不能进行偏移和运算&#xff0c;是安全指针。 要搞明白Go语言中的指针需要先知道3个概念&#xff1a;指针地址、指针类型和指针取值。 Go语言中的指针 任何程序数据载入内存后&#xff0c;在内存都有他们的地址&#xff0c;这就…

瓦格纳老板普里戈任坠机身亡,这是他的必然归宿

大清晨&#xff0c;刷到一则美联社大半夜播发的新闻&#xff0c;瓦格纳的领袖普里戈任&#xff0c;连同其他高层&#xff0c;他们好像是被凑齐了一样&#xff0c;登上同一架飞机&#xff0c;然后走向了死亡。 这太突然了&#xff0c;今天咱们不聊技术&#xff0c;聊聊政治。 瓦…

云性能监控具体功能有哪些?

随着越来越多的企业将业务迁移到云端&#xff0c;云性能监控变得至关重要&#xff0c;能帮助企业提高云环境的效率和可靠性。那么&#xff0c;云性能监控具体功能有哪些?下面&#xff0c;就来看看具体介绍吧! 1、实时监测&#xff1a;通过监测关键指标和事件&#xff0c;及时了…

AcWing 794. 高精度除法

AcWing 794. 高精度除法 题目描述代码展示 题目描述 代码展示 #include <iostream> #include <vector> #include <algorithm>using namespace std;vector<int> div(vector<int> &A, int b, int &r) {vector<int> C;r 0;for (int…

为什么曾经一马当先的C语言,如今却开始出现骂声

今日话题&#xff0c;为什么曾经一马当先的C语言&#xff0c;如今却开始出现各种骂声&#xff1f;C语言的发展历程可以追溯到20世纪70年代初期&#xff0c;它的设计理念、简洁性、可移植性以及对底层硬件的直接控制能力使其在计算机科学领域逐渐受到重视从而成为了天王搬到存在…

(15)线程的实例认识:同步,异步,并发,并发回调,事件,异步线程,UI线程

参看&#xff1a;https://www.bilibili.com/video/BV1xA411671D/?spm_id_from333.880.my_history.page.click&vd_source2a0404a7c8f40ef37a32eed32030aa18 下面是net framework版本 一、文件构成 1、界面如下。 (1)同步与异步有什么区别&#xff1f; …

云备份——第三方库使用介绍(下)

httplib库&#xff0c;一个C11单文件头的跨平台HTTP/HTTPS库。安装起来非常容易。只需包含httplib.h在你的代码中即可。 httplib库实际上是用于搭建一个简单的http服务器或者客户端的库&#xff0c;这种第三方网络库&#xff0c;可以让我们免去搭建服务器或客户端的时间&#x…

lv3 嵌入式开发-linux介绍及环境配置

目录 1 UNIX、Linux和GNU简介 2 环境介绍 3 VMwareTools配置 4 vim配置&#xff1a; 1 UNIX、Linux和GNU简介 什么是UNIX? unix是一个强大的多用户、多任务操作系统&#xff0c;支持多种处理器架构 中文名 尤尼斯 外文名 UNIX 本质 操作系统 类型 分时操作系统 开…