Spring Boot与Netty打造TCP服务端(解决粘包问题)

news2025/1/11 18:40:10

欢迎来到我的博客,代码的世界里,每一行都是一个故事


在这里插入图片描述

Spring Boot与Netty打造TCP服务端

    • 前言
    • 功能目标
    • 项目实现
      • maven坐标
      • 构建自定义Handler
      • ChannelInitializer实现
      • server实现

前言

在物联网时代,设备之间的通信变得愈发重要。本文将带你踏上一场关于如何用Spring Boot和Netty搭建TCP服务端的冒险之旅。无论是智能家居、工业自动化还是其他物联网应用,构建一个稳健的通信桥梁将成为连接未来的关键。

功能目标

  • 实现springboot+netty整合TCP服务端(基础)
  • 实现消息回复功能
  • 实现消息太长导致的粘包问题(比如发送一个base64的图片信息)
  • 实现在自定义Handler中注入spring的bean
  • 保证完成任务,哈哈哈哈哈

项目实现

maven坐标

<!-- netty 这里你也可以引入全部-->
<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-common</artifactId>
  <version>4.1.79.Final</version>
</dependency>

构建自定义Handler

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;

/**
 * I/O数据读写处理类
 *
 * @author xiaobo
 */
@Slf4j
public class CarTcpNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {


    /**
     * 从客户端收到新的数据时,这个方法会在收到消息时被调用
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
        // 这里是在前面的DelimiterBasedFrameDecoder转为了ByteBuf,验证是否是ByteBuf
        if (msg instanceof ByteBuf) {
            ByteBuf byteBuf = (ByteBuf) msg;
            try {
                String receivedData = byteBuf.toString(CharsetUtil.UTF_8);
                // 接收完整数据
                handleReceivedData(receivedData);
            } finally {
                // 释放 ByteBuf 占用的资源
                byteBuf.release();
                // 回复消息
                ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8));
            }
        }
    }

    private void handleReceivedData(String receivedData) {
        // 数据处理
        // 这里如果想实现spring中bean的注入,可以用geBean的方式获取
    }

    /**
     * 从客户端收到新的数据、读取完成时调用
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        log.info("channelReadComplete");
        ctx.flush();
    }

    /**
     * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
        cause.printStackTrace();
        ctx.close();// 抛出异常,断开与客户端的连接
    }

    /**
     * 客户端与服务端第一次建立连接时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = socket.getAddress().getHostAddress();
        // 此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
        System.out.println("channelActive:" + clientIp + ctx.name());
      	// 这里是向客户端发送回应
        ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8));
    }

    /**
     * 客户端与服务端 断连时 执行
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelInactive(ctx);
        InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = socket.getAddress().getHostAddress();
        // 断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
        ctx.close();
        log.info("channelInactive:{}", clientIp);
    }

    /**
     * 服务端当read超时, 会调用这个方法
     *
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
        super.userEventTriggered(ctx, evt);
        InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = socket.getAddress().getHostAddress();
        ctx.close();// 超时时断开连接
        log.info("userEventTriggered:" + clientIp);
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelRegistered");
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("channelUnregistered");
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        log.info("channelWritabilityChanged");
    }

}

ChannelInitializer实现

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;

/**
 * description: <h1>通道初始化</h1>
 *
 * @author bo
 * @version 1.0
 * @date 2024/2/27 16:13
 */
public class CarTcpNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel ch) throws Exception {

        ByteBuf delemiter = Unpooled.buffer();
        delemiter.writeBytes("$".getBytes());
        // 这里就是解决数据过长问题,而且数据是以$结尾的
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(907200, true, true, delemiter));

        // 自定义ChannelInboundHandlerAdapter
        ch.pipeline().addLast(new CarTcpNettyChannelInboundHandlerAdapter());

    }

}

server实现

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

/**
 * description: <h1>netty创建的TCP</h1>
 *
 * @author bo
 * @version 1.0
 * @date 2024/2/27 16:25
 */
@Slf4j
public class CarTcpNettyServer {

    public void bind(int port) throws Exception {

        // 配置服务端的NIO线程组
        // NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
        // bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
        // bossGroup接收到连接后就会把连接信息注册到workerGroup
        // workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍

        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    // netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费    最小  初始化  最大 (根据生产环境实际情况来定)
                    // 使用对象池,重用缓冲区
                    .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576))
                    // 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息
                    .childHandler(new CarTcpNettyChannelInitializer<SocketChannel>());
            log.info("<===========netty server start success!==============>");
            // 绑定端口,同步等待成功
            ChannelFuture f = serverBootstrap.bind(port).sync();
            // 等待服务器监听端口关闭
            f.channel().closeFuture().sync();

        } finally {
            // 退出,释放线程池资源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1486338.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序云开发教程——墨刀原型工具入门(添加批注+其他操作)

引言 作为一个小白&#xff0c;小北要怎么在短时间内快速学会微信小程序原型设计&#xff1f; “时间紧&#xff0c;任务重”&#xff0c;这意味着学习时必须把握微信小程序原型设计中的重点、难点&#xff0c;而非面面俱到。 要在短时间内理解、掌握一个工具的使用&#xf…

程序员的金三银四求职宝典:如何在关键时期脱颖而出

程序员的金三银四求职宝典&#xff1a;如何在关键时期脱颖而出 程序员的金三银四求职宝典&#xff1a;如何在关键时期脱颖而出摘要 面试技巧分享 &#x1f60a;1. 自我介绍 Tips简洁明了 ✨重点突出 &#x1f50d;结合实例 &#x1f310; 2. 技术问题回答 Tips冷静应对 &#x…

重学SpringBoot3-自动配置机制

重学SpringBoot3-自动配置机制 引言Spring Boot 自动配置原理示例&#xff1a;Spring Boot Web 自动配置深入理解总结相关阅读 引言 Spring Boot 的自动配置是其最强大的特性之一&#xff0c;它允许开发者通过最少的配置实现应用程序的快速开发和部署。这一切都得益于 Spring …

扑克牌翻牌记忆小游戏源码

源码由HTMLCSSJS组成&#xff0c;双击html文件可以本地运行效果&#xff0c;也可以上传到服务器里面 效果预览 下载地址 https://www.qqmu.com/2296.html

LeetCode每日一题之 移动0

前言&#xff1a; 我的每日一题专栏正式开始更新&#xff0c;我会分享关于我在LeetCode上刷题时的经验&#xff0c;将经典题型拿出来详细讲解&#xff0c;来提升自己及大家的算法能力&#xff0c;希望这篇博客对大家有帮助。 题目介绍&#xff1a; 题目链接&#xff1a;. - …

HTML5+CSS3+移动web——列表、表格、表单

系列文章 HTML5CSS3移动web——HTML 基础-CSDN博客https://blog.csdn.net/ymxk2876721452/article/details/136070953?spm1001.2014.3001.5501 目录 一、列表 无序列表 有序列表 定义列表 二、表格 表格结构标签 基本使用 合并单元格 三、表单 input 标签 input 标签占位文…

模版进阶C++

非类型模版 之前我们写的模版都是在不知道模版&#xff08;类&#xff09;中有的变量的类型是什么的时候&#xff0c;我们先用模版参数定义&#xff0c;当类实例化的时候在传参确认 非类型模版&#xff1a;模版参数定义的时候也可以定义整型类型&#xff08;c20之后才支持其…

Topaz DeNoise AI:一键让照片重获清晰 mac/win版

Topaz DeNoise AI是一款革命性的图片降噪软件&#xff0c;它利用先进的人工智能算法&#xff0c;帮助用户轻松去除照片中的噪点&#xff0c;恢复图像的清晰度和细节。无论是专业摄影师还是摄影爱好者&#xff0c;Topaz DeNoise AI都能成为他们处理图片时的得力助手。 Topaz De…

【Matlab】Matlab电话拨号音合成与识别(代码+论文)【独一无二】

&#x1f449;博__主&#x1f448;&#xff1a;米码收割机 &#x1f449;技__能&#x1f448;&#xff1a;C/Python语言 &#x1f449;公众号&#x1f448;&#xff1a;测试开发自动化【获取源码商业合作】 &#x1f449;荣__誉&#x1f448;&#xff1a;阿里云博客专家博主、5…

springboot240基于Spring boot的名城小区物业管理系统

基于Spring boot的名城小区物业管理系统的设计与实现 摘要 当下&#xff0c;正处于信息化的时代&#xff0c;许多行业顺应时代的变化&#xff0c;结合使用计算机技术向数字化、信息化建设迈进。以前相关行业对于物业信息的管理和控制&#xff0c;采用人工登记的方式保存相关数…

第三百八十回

文章目录 1. 概念介绍2. 使用方法3. 代码与效果3.1 示例代码3.2 运行效果 4. 内容总结 013pickers2.gif 我们在上一章回中介绍了"如何实现Numberpicker"相关的内容&#xff0c;本章回中将介绍wheelChoose组件.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念…

文件操作命令touch、cat、more、cp、mv

touch 创建文件 1&#xff09;可以通过touch命令创建文件。 2&#xff09;语法&#xff1a; touch Linux路径 3&#xff09;touch命令无选项&#xff0c;参数必填&#xff0c;表示要创建的文件路径&#xff0c;相对、绝对、特殊路径符均可以使用。 注&#xff1a;以 d 开头的…

Apache POI的简单介绍与应用

介绍 Apache POI 是一个处理Miscrosoft Office各种文件格式的开源项目。我们可以使用 POI 在 Java 程序中对Miscrosoft Office各种文件进行读写操作。PS&#xff1a; 一般情况下&#xff0c;POI 都是用于操作 Excel 文件&#xff0c;如图&#xff1a; Apache POI 的应用场景&…

韦东山嵌入式Liunx入门驱动开发五

文章目录 一、驱动程序基石1-1 休眠与唤醒1-2 POLL机制1-3 异步通知(1) 异步通知程序解析(2) 异步通知机制内核代码详解 1-4 阻塞与非阻塞1-5 定时器(1) 内核函数(2) 定时器时间单位 1-6 中断下半部 tasklet 本人学习完韦老师的视频&#xff0c;因此来复习巩固&#xff0c;写以…

【web | CTF】BUUCTF [HCTF 2018]WarmUp

天命&#xff1a;这题本地php代码是无法复现的 首先打开网站&#xff0c;啥也没有&#xff0c;查看源码 发现文件&#xff0c;打开访问一下看看&#xff0c;发现是代码审计 <?phphighlight_file(__FILE__);class emmm{public static function checkFile(&$page){$whit…

IOS 发布遇到“Unable to authenticate with App Store Connect”错误咋解决?

问题&#xff1a; 在开发ios app后&#xff0c;先发布adhoc版本&#xff0c;测试通过后&#xff0c;再发布testflight版本测试&#xff0c;但是可能会遇到一下问题。 解决办法&#xff1a; 在Signing &Capabilities中&#xff0c;在ios下边要指定有发布权限的Team账号&a…

文件底层的理解之缓冲区

目录 一、缓冲区的初步认识 二、向文件中写数据的具体过程 三、缓冲区刷新的时机 一、缓冲区的初步认识 缓冲区其实就是一块内存区域&#xff0c;采用空间来换时间&#xff0c;可以提高使用者的效率。我们一直说的缓冲区其实是语言层面上的缓冲区&#xff0c;其实操作系统内部…

黑马点评-商户查询业务

缓存原理 本文的业务就是redis的经典应用&#xff0c;标准的操作方式就是查询数据库之前先查询缓存&#xff0c;如果缓存数据存在&#xff0c;则直接从缓存中返回&#xff0c;如果缓存数据不存在&#xff0c;再查询数据库&#xff0c;然后将数据存入redis。 缓存更新策略 根据…

iMazing 3.0.0.3 for mac 中文破解版2024最新图文安装教程

我们刚刚发布了iMazing 3.0.0.3 for mac 中文版本。Windows和macOS用户现在都可以试驾并体验iPhone管理的未来。 备受期待的第一个Windows版本得益于过去几个月macOS测试版的所有改进&#xff0c;使其成为一个稳定的初始版本。 我们的开发团队创造了一种无缝的外观和体验&#…

sql 注入 之sqli-labs/less-6 双注入,双引号报错注入

和第五关类似&#xff0c;只不过闭合符号是双引号 1&#xff0c;查数据库 1"and%20(updatexml(1,concat(0x7e,(select%20database()),0x7e),1))%20-- 2.查表 内容有多行&#xff0c;所以使用limit依次查询 1"and%20(updatexml(1,concat(0x7e,(select%20table_nam…