Netty通信中的粘包半包问题(一)

news2024/11/17 10:00:14

前言

我们在日常开发过程中,客户端和服务端的连接大多使用的是TCP协议,因为我们要保证数据的可靠传输,
当网络中出现丢包时要求,要求数据包的发送端重传给接收端。而TCP是一种面向连接的传输层协议,
当使用TCP进行传输时,客户端和服务端会各自维护两个缓冲区,它们分别是发送缓冲区、接收缓冲区,如图所示

我们在日常开发过程中,客户端和服务端的连接大多使用的是TCP协议,因为我们要保证数据的可靠传输,当网络中出现丢包时要求,要求数据包的发送端重传给接收端。而TCP是面向
在网络传输过程中,虽然对要发送的数据包大小没有要求,但是TCP又不可能一次性的把数据全部加载到
发送缓冲区中,这样会有可能撑爆TCP的发送缓冲区,比如说你要发送个1G的数据给服务端,TCP本身是不会
每次根据你要发送的数据包大小划定缓冲区大小的。但是它会收数据链路层的协议限制,
因为数据链路层是服务于传输层协议的,我们需要了解数据链路层当中传输的最大单元
在这里插入图片描述
MTU(Maximum Transmission Unit)最大传输单元,用来通知对方所能接受数据服务单元的最大尺寸,说明发送方能能够接受的有效载荷大小,另外传输层还会进行一个MSS大小的TCP分段。MSS是最大报文长度的缩写,MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段,所以MSS并不是TCP报文的最大长度,而是MSS=TCP报文长度 - TCP首部长度,在以太网中MTU是1500个字节,其中TCP报头占20个字节,IP包头占20个字节,剩下的大小才是我们能在发送的有效数据包大小,也就是1500-20-20=1460个字节,
也就是说,如果没有进行过调整,当你发送的数据包小于1460个字节的时候,客户端在发送缓冲区里放下你的报文时还会有空余,但是不会立马发送,而是会等待缓冲区达到一个阈值时再发送,这个时候就会出现粘包现象,因为TCP在发送缓冲区会存在其他报文的数据,看着就像粘在一起一样。反过来讲,当你要发送的数据包大于这个发送缓冲区的大小时,将会留到下一次发送缓冲区填充,相当于一个报文被拆分了多次进行发送.这两种现象会导致服务端必须要等待接收到完整数据报文才可以进行处理,不然就得现将数据暂存到内存中,影响处理效率,会影响服务端整体的吞吐量,因为服务端需要维持报文的中间状态,维护报文之间的顺序关系,增加了服务端的工作量

下面我以代码的形式复现一下:

1.Client

package splicing.demo;

import constant.Constant;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoCliStickyHalf {
    
    private final int port;
    
    private final String host;
    
    public EchoCliStickyHalf(int port, String host) {
        this.port = port;
        this.host = host;
    }
    
    public void start() throws InterruptedException {
        // 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 客户端启动必备, 和服务器的不同点
            Bootstrap b = new Bootstrap();
            b.group(group)
                    // 指定使用NIO的通信模式
                    .channel(NioSocketChannel.class)
                    // 指定服务器的IP地址和端口,和服务器的不同点
                    .remoteAddress(new InetSocketAddress(host, port))
                    // 和服务器的不同点
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new EchoCliStickyHalfHandler());
                        }
                    });
            
            // 异步连接到服务器,sync()会阻塞到完成,和服务器的不同点
            ChannelFuture f = b.connect().sync();
            // 阻塞当前进程,直到客户端的channel被关闭
            f.channel().closeFuture().sync();

        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        new EchoCliStickyHalf(Constant.DEFAULT_PORT, Constant.DEFAULT_SERVER_IP).start();
    }
}

package splicing.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class EchoCliStickyHalfHandler extends SimpleChannelInboundHandler<ByteBuf> {
    
    private AtomicInteger counter = new AtomicInteger(0);
    
    /**
     * 客户端读取到网络数据后的处理
     */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
        System.out.println("client Accept[" + byteBuf.toString(CharsetUtil.UTF_8) +
                "] and the counter is :" + counter.incrementAndGet()); 
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String request = "Mark,Zhuge,Fox,Zhouyu,Loulan" 
                + System.getProperty("line.separator");

        ByteBufAllocator alloc = ctx.alloc();
        ByteBuf msg = null;
        // 我们希望服务器接收到100个这样的报文
        for (int i = 0; i < 100; i++) {
            ByteBuf byteBuf = alloc.buffer();
            msg = alloc.buffer(request.length());
            msg.writeBytes(request.getBytes());
            ctx.writeAndFlush(msg);
                    
        }
//        super.channelActive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
//        super.exceptionCaught(ctx, cause);
    }
}

import java.util.Date;

/**
 * 常量
 */
public class Constant {
    
    public static final Integer DEFAULT_PORT = 7777;
    
    public static final String DEFAULT_SERVER_IP= "127.0.0.1";
    
    // 根据输入信息拼接出一个应答信息
    public static String response(String msg) {
        return "Hello, " + msg + ", Now is" + new Date(System.currentTimeMillis()).toString(); 
    }
}

2.Server

package splicing.demo;

import constant.Constant;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;

public class EchoSvrStickyHalf {
    
    private static final Logger LOG = LoggerFactory.getLogger(EchoSvrStickyHalf.class);
    
    private final int port;
    
    public EchoSvrStickyHalf(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws InterruptedException {
        EchoSvrStickyHalf echoSvrStickyHalf =
                new EchoSvrStickyHalf(Constant.DEFAULT_PORT);
        
        LOG.info("服务器即将启动");
        echoSvrStickyHalf.start();
        LOG.info("服务器关闭");
    }
    
    public void start() throws InterruptedException {
        // 线程组
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 服务端启动必备
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new EchoSvrStickyHalfHandler());
                        }
                    });
            // 异步绑定到服务器,sync()会阻塞到完成
            ChannelFuture f = b.bind().sync();
            LOG.info("服务器启动完成。");
            // 阻塞当前线程,直到服务器的ServerChannel被关闭
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
    
}

package splicing.demo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

import java.util.concurrent.atomic.AtomicInteger;

public class EchoSvrStickyHalfHandler extends ChannelInboundHandlerAdapter {
    
    
    private AtomicInteger counter = new AtomicInteger(0);


    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf in = (ByteBuf)msg;
        String request = in.toString(CharsetUtil.UTF_8);
        System.out.println("Server Accept[" + request + "] and the counter is :" + counter.incrementAndGet() );
        
        String resp = "Hello, " +request + ". Welcome to Netty World" + System.getProperty("line.separator");
        ctx.writeAndFlush(Unpooled.copiedBuffer(resp.getBytes()));
//        super.channelRead(ctx, msg);
    }

    /**
     * 发生异常后的处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        super.exceptionCaught(ctx, cause);
        cause.printStackTrace();
        ctx.close();
    }
}

3.分析

代码中明明发送了100个报文,结果服务端收到的报文个数,却是两个报文,中间的某一个报文还发生了半包,
下期我们再来讨论如何解决这种现象
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1377446.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Mysql时间差8小时解决方案

目录 1. MySQL 本身的问题1-1. 验证MySQL时间1-2. 修改Mysql时区配置文件修改Mysql时区SQL修改Mysql时区 2.JDBC 连接的问题3. 返回 JSON 时间不对 在开发中&#xff0c;有可能会遇到这种情况&#xff1a; 插入数据库中的时间时正常。但是将时间传到前端页面上显示时&#xff…

【Linux】 nohup命令使用

nohup命令 nohup是Linux和Unix系统中的一个命令&#xff0c;其作用是在终端退出时&#xff0c;让进程在后台继续运行。它的全称为“no hang up”&#xff0c;意为“不挂起”。nohup命令可以让你在退出终端或关闭SSH连接后继续运行命令。 nohup 命令&#xff0c;在默认情况下&…

大数据StarRocks(五) :数据类型

StarRocks 支持数据类型&#xff1a;数值类型、字符串类型、日期类型、半结构化类型、其他类型。您在建表时可以指定以下类型的列&#xff0c;向表中导入该类型的数据并查询数据。 5.1 数值类型 SMALLINT2 字节有符号整数&#xff0c;范围 [-32768, 32767] INT4 字节有符号整…

探索SQL性能优化之道:实用技巧与最佳实践

SQL性能优化可能是每个数据库管理员和开发者在日常工作中必不可少的一个环节。在大数据时代&#xff0c;为确保数据库系统的响应速度和稳定性&#xff0c;掌握一些实用的SQL优化技巧至关重要。 本文将带着开发人员走进SQL性能优化的世界&#xff0c;深入剖析实用技巧和最佳实践…

彻底解决charles抓包https乱码的问题

最近做js逆向&#xff0c;听说charles比浏览器抓包更好用&#xff0c;结果发现全是乱码&#xff0c;根本没法用。 然后查询网上水文&#xff1a;全部都是装证书&#xff0c;根本没用&#xff01; 最后终于找到解决办法&#xff0c;在这里记录一下&#xff1a; 乱码的根本原因…

Paddle模型转ONNX

深度学习模型在硬件加速器上的部署常常要用到ONNX&#xff08;Open Neural Network Exchange&#xff0c;开放神经网络交换&#xff09;格式&#xff0c;也可以通过ONNX实现不同AI框架&#xff08;如Pytorch、TensorFlow、Caffe2、PaddlePaddle等&#xff09;之间的模型转换。 …

MySQL-外键等信息

38. 基础-多表查询-概述_哔哩哔哩_bilibili 1、流程函数 2、约束字段 删除外键 &#xff1a; alter table emp2 drop foreign key 外键名 //外键可以保持数据的一致性和完整性&#xff0c;外键的话&#xff0c;就是类似一个主表&#xff0c;一个从表&#xff0c;从表的其中一…

【grpc】利用protobuf实现java或kotlin调用python脚本,含实现过程和全部代码

前言 在一些特殊场景中&#xff0c;我们可能需要使用java或者其他任意语言调用python脚本或sdk等。本文的需求衍生也不例外于此&#xff0c;python端有sdk&#xff0c;但只能在python中调用&#xff0c;于是就有了本文章。 常见的调用方式如jython、python提供http rest接口、…

波动,热传导,扩散方程建立

数学物理方程是从自然科学的各个领域和工程技术领域中导出的偏微分方程和积分方程.在这些以偏微分方程为基础的数学模型中&#xff0c;二阶线性偏微分方程中的三个典型方程与定解条件的建立、解法及其应用&#xff0e;描述振动和波动过程的波动方程、描述输运过程的热传导&…

2024年MIA最新生成综述:基于深度学习的MRI/CT/PET合成【文献阅读】

2024年MIA最新生成综述&#xff1a;基于深度学习的MRI/CT/PET合成【文献阅读】 基本信息 标题&#xff1a;Deep learning based synthesis of MRI, CT and PET: Review and analysis发表年份: 2024期刊/会议: Medical Image Analysis分区&#xff1a; SCI 1区IF&#xff1a;1…

知道IP怎么反查域名?这几个方法一查一个准!

知道网络IP怎么反查出真实域名来&#xff1f;给大家分享几个我常用的方法&#xff0c;就算你不懂技术你都能查得出来&#xff01; 一、fofa 这是一个白帽黑客非常喜欢用的社工平台&#xff0c;只要你输入IP就能查到很多背后的信息。 传送门&#xff1a;https://fofa.info 二…

智慧厂区烟火识别系统应用

在当今的智能制造行业中&#xff0c;安全管理已成为优先考虑的重要议题。集度汽车公司在其实验室场区引入了一项创新技术——富维图像厂区烟火识别系统。这个项目的核心是利用先进的烟火识别系统&#xff0c;保障厂区的安全与稳定运行。 系统特点 烟火识别系统的准确率高和误报…

Spring Cloud Alibaba整合RocketMQ架构原理分析

关于RocketMQ的原理&#xff0c;本文就不做详细分析了&#xff0c;这里就重点关注Spring Cloud Alibaba是如何整合RocketrMQ的。 Part.1 使用原生RocketMQ客户端&#xff1f; RocketMQ提供了RocketMQ Client SDK&#xff0c;开发者可以直接依赖这个SDK&#xff0c;就可以完成…

NAS使用的一些常见命令 ssh sftp 上传 下载 ALL in one

目录 登陆上传/下载内网穿透 登陆 ssh 登陆 ssh usernameserverIP -p portNumsftp 登陆 sftp -P portNum usernameserverIP上传/下载 如ls等&#xff0c;远程服务器操作 如lls等&#xff0c;本机操作&#xff0c;前缀为l 文件 put **** 将本机上文件上传到远程服务器上当…

初识Ubuntu

其实还是linux操作系统 命令都一样 但是在学习初级阶段&#xff0c;我还是将其分开有便于我的学习和稳固。 cat 查看文件 命令 Ubuntu工作中经常是用普通用户&#xff0c;在需要时才进行登录管理员用户 sudn -i 切换成管理用户 我们远程连接时 如果出现 hostname -I没有出现…

【LeetCode每日一题】2085. 统计出现过一次的公共字符串(哈希表)

2024-1-12 文章目录 [2085. 统计出现过一次的公共字符串](https://leetcode.cn/problems/count-common-words-with-one-occurrence/)思路&#xff1a;哈希表计算 2085. 统计出现过一次的公共字符串 思路&#xff1a;哈希表计算 1.用两个哈希表分别统计word1和word2中字符出现的…

一本数学教材严谨和通俗哪个更重要?

一本教材也许无法同时兼顾严谨和通俗&#xff0c;而且在不同的场景下&#xff0c;严谨和通俗的重要性也不尽相同&#xff1a; 在正式的学术场合&#xff0c;严谨当然重要&#xff0c;一些不严谨的教材可能无法通过审校&#xff0c;在读者存在疑问的时候&#xff0c;也不一定能给…

C++ Webserver从零开始:基础知识(一)——Linux网络编程基础API

前言 本专栏将从零开始制作一个C Webserver&#xff0c;用以记录笔者学习的过程 如果你想要跟着我这个专栏制作一个C Webserver,你需要掌握以下前置基础课程知识&#xff1a; 1.C/C的语法&#xff08;在Leetcode刷100~200题的程度即可&#xff09; 2.计算机网络基础知识 3…

WSL2-Ubuntu20.04-配置

WSL2-Ubuntu20.04-配置 安装wsl2安装Ubuntu20.04安装anacondaWSL2可视化&#xff08;VcXsrv&#xff09; 安装wsl2 wsl --install wsl -l -v # 版本查看 默认的都是 wsl2 &#xff08;如果是wsl1 就自行升级 wsl --update&#xff09; 官方教程 安装Ubuntu20.04 安装wsl2之后…

CRLF漏洞靶场记录

搭建 利用 docker 搭建 vulhub 靶场 git clone https://github.com/vulhub/vulhub.git 进入 /vulhub/nginx/insecure-configuration 目录 启动前关闭现有的 8080、8081、8082 端口服务&#xff0c;避免端口占用 docker-compose up -d 进入容器 docker exec -it insecure-…