Springboot+Netty搭建基于TCP协议的服务端

news2025/1/23 17:39:12

文章目录

    • 概要
    • pom依赖
    • Netty的server服务端类
    • Netty通道初始化
    • I/O数据读写处理
    • 测试发送消息 并 接收服务端回复
    • 异步启动Netty
    • 运行截图

概要

Netty是业界最流行的nio框架之一,它具有功能强大、性能优异、可定制性和可扩展性的优点
Netty的优点:
1.API使用简单,开发入门门槛低。
2.功能十分强大,预置多种编码解码功能,支持多种主流协议。
3.可定制、可扩展能力强,可以通过其提供的ChannelHandler进行灵活的扩展。
4.性能优异,特别在综合性能上的优异性。
5.成熟,稳定,适用范围广。
6.可用于智能GSM/GPRS模块的通讯服务端开发,使用它进行MQTT协议的开发。

好了,废话不多说了,上代码

pom依赖

        <!-- netty依赖 springboot2.x自动导入版本 -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>

Netty的server服务端类

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * Netty服务器(端口自行更换,默认端口10100)
 * @author wusiwee
 */
@Service
@Slf4j
public class NettyServer {

    /**
     * 注入Netty通道初始化处理器
     */
    private final NettyChannelInboundHandlerAdapter handlerAdapter;

    /**
     * 通过构造函数注入依赖
     * @param handlerAdapter 处理器
     */
    @Autowired
    public NettyServer(NettyChannelInboundHandlerAdapter handlerAdapter) {
        this.handlerAdapter = handlerAdapter;
    }

    /**
     * 启动Netty服务器
     * @throws Exception 如果启动过程中发生异常
     */
    public void bind() throws Exception {
        // 定义bossGroup和workerGroup来处理网络事件
        // 用于接受客户端连接
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 用于实际的业务处理操作
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建ServerBootstrap实例来引导绑定和启动服务器
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    // 指定使用NIO的传输Channel
                    .channel(NioServerSocketChannel.class)
                    // 设置TCP接收缓冲区大小
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    // 设置自定义的Channel初始化器
                    .childHandler(new NettyChannelInitializer(handlerAdapter));

            log.info("netty server start success!");
            // 绑定端口,并同步等待成功,即启动Netty服务
            ChannelFuture f = serverBootstrap.bind(10100).sync();
            // 等待服务端监听端口关闭
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            log.error("Netty server startup interrupted", e);
            Thread.currentThread().interrupt();
        } finally {
            // 优雅关闭事件循环组
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

Netty通道初始化

import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.stereotype.Component;

/**
 * 通道初始化
 * @author wusiwee
 */
@Component
public class NettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {

    /**
     * 注入,目的是在该 HandlerAdapter 可以正确的注入业务Service
     */
    private final NettyChannelInboundHandlerAdapter handlerAdapter;

    public NettyChannelInitializer(NettyChannelInboundHandlerAdapter handlerAdapter) {
        this.handlerAdapter = handlerAdapter;
    }

    @Override
    protected void initChannel(Channel ch) {
        ChannelPipeline pipeline = ch.pipeline();
        // 响应字符串
        pipeline.addLast(new StringEncoder());
        // 自定义ChannelInboundHandlerAdapter
        pipeline.addLast(handlerAdapter);
    }

I/O数据读写处理

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.time.LocalDateTime;
import java.util.Date;

/**
 * I/O数据读写处理类
 *  客户端发送的消息 以及 回复客户端消息 均在此处
 *  @ChannelHandler.Sharable 此注解用于在多个 Channel 中重复使用同一个 Handler 实例
 * @author wusiwee
 */
@Slf4j
@ChannelHandler.Sharable
@Component
public class NettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter{

    /**
     * 这里可以注入自己的service
     */
    @Autowired
    private IUserService iUserService;

    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        // 确保接收的数据长度足够,minimumLength 是所有字段长度的总和
        if (in.readableBytes() < MINIMUM_LENGTH) {
            ctx.writeAndFlush("报文长度过低,数据不完整"+"\n");
            return;
        }
        // 1,读取固定长度字符
        byte[] frameStart = new byte[4];
        in.readBytes(frameStart);
        String frameStartStr = new String(frameStart, java.nio.charset.StandardCharsets.UTF_8);
        log.info("1.解析:"+frameStartStr);
        ctx.writeAndFlush("I got it\n");
    }

    /**
     * 从客户端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        log.info("读取完成 channelReadComplete");
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.info("exceptionCaught");
        cause.printStackTrace();
        //抛出异常,断开与客户端的连接
        ctx.close();
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
        log.info("客户端连接 channelActive{}", clientIp+" "+ctx.name());
    }

    /**
     * 客户端与服务端 断连时 执行
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
        ctx.close();
        log.info("channelInactive{}", clientIp);
    }

    /**
     * 服务端当read超时, 会调用这个方法
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inSocket.getAddress().getHostAddress();
        //超时 断开连接
        ctx.close();
        log.info("userEventTriggered{}", clientIp);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        log.info("注册 channelRegistered");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        log.info("channelUnregistered");
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        log.info("channelWritabilityChanged");
    }
}

测试发送消息 并 接收服务端回复


    @Test
    void contextLoads() {

        try {
            // 服务器地址
            String host = "127.0.0.1";
            // 服务器端口
            int port = 10100;
            // 要发送的消息
            String message = "7E7E010038401010123433004D02000B22";
            Socket socket = new Socket(host, port);

            // 获取输出流
            OutputStream outputStream = socket.getOutputStream();

            // 将字符串转换为字节数组
            byte[] data = message.getBytes();

            // 写入数据到输出流
            outputStream.write(data);
            // 刷新输出流,确保数据发送
            outputStream.flush();

            InputStream input = socket.getInputStream();
            //读取服务器返回的消息
            BufferedReader br = new BufferedReader(new InputStreamReader(input, StandardCharsets.UTF_8));
            String mess = br.readLine();
            System.out.println("服务器回复:" + mess);
            input.close();
            outputStream.close();
            socket.close();
        }catch (Exception e){
            System.out.println("出现异常");
        }
    }

异步启动Netty

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * 启动类
 */
@SpringBootApplication
@EnableAsync
public class NettyApplication implements ApplicationRunner{
    
    /**
     * 启动springboot
     */
    public static void main( String[] args ) {
        SpringApplication.run(NettyApplication.class, args);
    }

    /**
     * 创建独立线程池
     */
    private final ExecutorService executorService = new ThreadPoolExecutor(
            1, 1, 30, TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(2),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardOldestPolicy());
    /**
     * 注入Netty消息处理器
     */
    @Resource
    private NettyChannelInboundHandlerAdapter handlerAdapter;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 使用线程池 异步启动Netty服务器
        executorService.submit(() -> {
            try {
                // 启动netty,绑定端口号
                new NettyServer(handlerAdapter).bind();
            } catch (Exception e) {
                // 异常处理
                System.out.println("启动netty出现异常:"+e.getMessage());
            }
        });
    }
}

运行截图

启动服务
回复客户端消息的代码片段
消息回复
测试发送
发送测试
客户端收到回复,断开连接
在这里插入图片描述

攀峰之高险,岂有崖颠;搏海之明辉,何来彼岸?前进不止,奋斗不息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1415514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

深度强化学习(王树森)笔记03

深度强化学习&#xff08;DRL&#xff09; 本文是学习笔记&#xff0c;如有侵权&#xff0c;请联系删除。本文在ChatGPT辅助下完成。 参考链接 Deep Reinforcement Learning官方链接&#xff1a;https://github.com/wangshusen/DRL 源代码链接&#xff1a;https://github.c…

分布式id-雪花算法

一、雪花算法介绍 Snowflake&#xff0c;雪花算法是有Twitter开源的分布式ID生成算法&#xff0c;以划分命名空间的方式将64bit位分割成了多个部分&#xff0c;每个部分都有具体的不同含义&#xff0c;在Java中64Bit位的整数是Long类型&#xff0c;所以在Java中Snowflake算法生…

Linux 文件和文件夹的创建与删除

目录 一. 新建1.1 mkdir 新建文件夹1.2 touch 新建空文件1.3 vi命令创建文件1.4 > 和 >> 新建文件 二. 删除 一. 新建 1.1 mkdir 新建文件夹 -p&#xff1a;递归的创建文件夹&#xff0c;当父目录不存在的时候&#xff0c;会自动创建 mkdir -p test1/test2/test31.…

stable-diffusion-webui 汉化(中文界面)

大家好&#xff0c;我是水滴~~ 本文主要介绍 Stable Diffusion WebUI 是如何汉化的&#xff0c;文章详细的介绍汉化过程&#xff0c;并加上配图能够清晰的展示该过程。 Stable Diffusion WebUI 官方并没有出中文界面&#xff0c;需要通过安装插件来汉化&#xff0c;下面是详细…

工业空调转IEC104协议转换网关BE108

随着电力系统信息化建设和数字化转型的进程不断加速&#xff0c;对电力能源的智能化需求也日趋增强。健全稳定的智慧电力系统能够为工业生产、基础设施建设以及国防建设提供稳定的能源支持。在此背景下&#xff0c;高性能的工业电力数据传输解决方案——协议转换网关应运而生&a…

如何免费注册一个二级域名

目录 1.sitelutions账号注册 2.添加域名 3.做A记录或者cname解析步骤 1.sitelutions账号注册 注册网址:Sitelutions - Solutions for your site. All in one place. 打开首页点击右上角的红色 free sign up 来注册。注册只需邮箱即可。 首先填写注册信息,然后提交。提交之后…

Tortoise-tts Better speech synthesis through scaling——TTS论文阅读

笔记地址&#xff1a;https://flowus.cn/share/a79f6286-b48f-42be-8425-2b5d0880c648 【FlowUs 息流】tortoise 论文地址&#xff1a; Better speech synthesis through scaling Abstract: 自回归变换器和DDPM&#xff1a;自回归变换器&#xff08;autoregressive transfo…

算法38:子数组的最小值之和(力扣907题)----单调栈

题目&#xff1a; 给定一个整数数组 arr&#xff0c;找到 min(b) 的总和&#xff0c;其中 b 的范围为 arr 的每个&#xff08;连续&#xff09;子数组。 示例 1&#xff1a; 输入&#xff1a;arr [3,1,2,4] 输出&#xff1a;17 解释&#xff1a; 子数组为 [3]&#xff0c;[…

设计模式:工厂方法模式

工厂模式属于创建型模式&#xff0c;也被称为多态工厂模式&#xff0c;它在创建对象时提供了一种封装机制&#xff0c;将实际创建对象的代码与使用代码分离&#xff0c;有子类决定要实例化的产品是哪一个&#xff0c;把产品的实例化推迟到子类。 使用场景 重复代码 : 创建对象…

机器学习---可能近似正确(PAC)、出错界限框架

1. 计算学习理论概述 从理论上刻画了若干类型的机器学习问题中的困难和若干类型的机器学习算法的能力 这个理论要回答的问题是&#xff1a; 在什么样的条件下成功的学习是可能的&#xff1f; 在什么条件下某个特定的学习算法可保证成功运行&#xff1f; 这里考虑两种框架&…

【开源】基于JAVA+Vue+SpringBoot的固始鹅块销售系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 鹅块类型模块2.3 固始鹅块模块2.4 鹅块订单模块2.5 评论管理模块 三、系统设计3.1 用例设计3.2 数据库设计3.2.1 鹅块类型表3.2.2 鹅块表3.2.3 鹅块订单表3.2.4 鹅块评论表 四、系统展示五、核心代码5.…

基于C语言的趣味游戏之五子棋

目录 趣味五子棋游戏 第一步 text.c文件 第二步 game.h文件 第三步 初始化 打印棋盘 玩家输入 电脑输入 判断输赢 game.c 趣味五子棋游戏 第一步 先写菜单&#xff0c;然后在主函数里调用&#xff0c;由于这是一个可以重复的游戏所以将do while循环里调用menu函数。…

C/C++ - 类的封装特性

目录 类的封装 语法格式 声明定义 分文件 访问权限 类作用域 对象模型 构造函数 默认构造函数 带参构造函数 拷贝构造函数 构造函数重载 委托构造函数 初始数据列表 构造默认参数 构造函数删除 析构函数 析构函数概念 析构函数特性 析构函数示例 析构调用…

【Unity】【游戏开发】Pico打包后项目出现运行时错误如何Debug

【背景】 开发过程中的报错可以通过控制台查看&#xff0c;但是PICO项目这类依赖特定设备环境的应用往往存在打包后在设备端发生运行时错误。这时如何能查看到Debug信息呢&#xff1f; 【分析】 Pico也是安卓系统&#xff0c;所以这个问题就可以泛化为Unity有哪些在安卓端运…

dnSpy调试工具二次开发2-输出日志到控制台

本文在上一篇文章的基础上继续操作&#xff1a; dnSpy调试工具二次开发1-新增菜单-CSDN博客 经过阅读dnSpy的源码&#xff0c;发现dnSpy使用到的依赖注入用了MEF框架&#xff0c;所以在源码中可以看到接口服务类的上面都打上了Export的特性或在构造方法上面打上ImportingConst…

力扣hot100 最小栈 变种栈

Problem: 155. 最小栈 文章目录 思路&#x1f496; Stack 自定义 Node&#x1f37b; Code 思路 &#x1f469;‍&#x1f3eb; 甜姨 &#x1f496; Stack 自定义 Node 时间复杂度: O ( 1 ) O(1) O(1) 空间复杂度: O ( n ) O(n) O(n) &#x1f37b; Code class MinS…

数据结构-顺序表的实现 [王道]

本博客记录个人寒假学习内容。此篇博客内容为 顺序表的定义。 博客中截图来自王道数据结构公开课 目录 顺序表的定义 顺序表的特点 顺序表的实现--静态分配 顺序表的实现--动态分配 顺序表的定义--知识结构框架 顺序表的定义 >线性表是具有相同(每个数据元素所占的空间…

Spring Boot使用AOP

一、为什么需要面向切面编程&#xff1f; 面向对象编程&#xff08;OOP&#xff09;的好处是显而易见的&#xff0c;缺点也同样明显。当需要为多个不具有继承关系的对象添加一个公共的方法的时候&#xff0c;例如日志记录、性能监控等&#xff0c;如果采用面向对象编程的方法&…

CSS优先级内容

定义CSS样式时&#xff0c;经常出现两个或多个样式规则应用在同一元素的情况&#xff0c;这时就会出现优先级的情况&#xff0c;那么应用的元素应该显示哪一个样式呢&#xff1f; 一.下面举例对优先级进行具体讲解。 p{color:red;} .blue{color:orange;} #header{color:blu…

OpenCV-27 Canny边缘检测

一、概念 Canny边缘检测算法是John F.Canny与1986年开发出来的一个多级边缘检测算法&#xff0c;也被很多人认为是边缘检测的最优算法。最优边缘检测的三个主要评价标准是&#xff1a; 低错频率&#xff1a;表示出尽可能多的实际边缘&#xff0c;同时尽可能的减小噪声产生的误…