Netty各组件基本用法、入站和出站详情、群聊系统的实现、粘包和拆包

news2024/11/15 11:18:49

Netty

  • Bootstrap和ServerBootstrap
  • Future和ChannelFuture
  • Channel
  • Selector
  • NioEventLoop和NioEventLoopGroup
  • ByteBuf
    • 示例代码
  • Channel相关组件
  • 入站详情
  • 出站详情
  • 对象编解码
  • ProtoBuf和ProtoStuff
  • netty实现群聊系统
  • 粘包和拆包
    • TCP协议特点
    • 举个例子


Bootstrap和ServerBootstrap

Bootstrap是Netty的启动程序,一个Netty应用通常由一个Bootstrap开始。Bootstrap的主要作用是配置Netty程序,串联Netty的各个组件。
在这里插入图片描述


Future和ChannelFuture

在这里插入图片描述这个方法是异步的(交给别的线程去执行该任务),当执行到这之后,netty不一定启动了

ChannelFuture channelFuture = bootstrap.bind(9090);
channelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture funture) throws Exception {
        if(funture.isSuccess()){
            System.out.println("监听9090成功");
        }else{
            System.out.println("监听9090失败");
        }
    }
});

该方法可以知晓有没有启动成功,或者改为同步的方式
ChannelFuture channelFuture = bootstrap.bind(9090).sync();

在这里插入图片描述
ChannelFuture和Future的子类,提供了针对于Channel的异步监听操作


Channel

NioSocketChannel:异步的客户端才CP Socket连接通道
NioServerSocketChannel:异步的服务端TCP Socket连接通道
NioDatagramChannel:异步的UDP连接通道
NioSctpChannel:异步的客户端Sctp连接通道
NioSctpServerChannel:异步的服务端Sctp连接通道


Selector

通过Selector多路复用器实现IO的多路复用。Selector可以监听多个连接的Channel事件,同时可以不断的查询已注册Channel是否处于就绪状态,实现一个线程可以高效的管理多个Channel。


NioEventLoop和NioEventLoopGroup

NioEventLoop本质就是一个线程,一个NioEventLoop就对应一个线程,但可以达到异步的处理事务,因为NioEventLoop内部维护了一个异步任务队列,用于存储需要在事件循环中执行的任务。通过将任务添加到队列中,NioEventLoop可以在空闲时间执行这些任务,从而实现了异步提交事务的能力。
NioEventLoopGroup管理NioEventLoop的生命周期,可以理解为是线程池,内部维护了一组线程。每个线程(即NioEventLoop)负责处理多个Channel上的事件。注意,一个Channel只对应一个线程,NioEventLoop和Channel它们是一对多的关系。
一个线程可以管理多个channel,但一个channel只能被一个线程执行


ByteBuf

初始情况
在这里插入图片描述

写入数据
在这里插入图片描述

读取数据
在这里插入图片描述
已读的区域:[0,readerIndex]
可读的区域:[readIndex,writeIndex)
可写的区域:[writeIndex,capacity)

示例代码

public class ByteBufDemo {
    public static void main(String[] args) {
        //创建一个有10个容量数据的ByteBuf对象
        ByteBuf buf = Unpooled.buffer(10);
        System.out.println("init buf:"+buf);
        //添加数据
        for(int i = 0;i<5;i++){
            buf.writeByte(i);
        }
        System.out.println("after write:"+buf);
        //按照索引读取数据
        for(int i = 0;i<3;i++){
            System.out.println(buf.getByte(i));
        }
        System.out.println("after get:"+buf);
        //读取数据
        for(int i = 0;i<3;i++){
            System.out.println(buf.readByte());
        }
        System.out.println("after read:"+buf);
    }
}

Channel相关组件

在这里插入图片描述

ChannelHandler
ChannelHandler用于处理拦截IO事件,往往在ChannelHandler中可以加入业务处理逻辑。ChannelHandler执行完后会将io事件转发到ChannelPipeline中的下一个处理程序。
ChannelHandlerContext
保存Channel相关的上下文信息,并关联一个ChannelHandler对象。
ChannelPipeline
ChannelPipeline是一个双向链表,其中保存着多个ChannelHandler。ChannelPipeline实现了一种高级形式的过滤器模式,在IO操作时发生的入站和出站事件,会导致ChannelPipeline中的多个ChannelHandler被依次调用。
在这里插入图片描述


入站详情

在这里插入图片描述
在这里插入图片描述
现在我们的客户端和服务端之间就有三个拦截器
我们在NettyServerHandler里面收到信息就不用解码了,为什么,因为解码器的拦截器已经帮我们做好了

在这里插入图片描述
当我们服务端读数据的时候,会从客户端读数据==入站,因为解码的handler和业务处理的handler是入站拦截器,所以会对数据产生作用,但编码的handler不会,因为它是一个出站handler


出站详情

站在服务端的立场
在这里插入图片描述
在Netty中,客户端和服务端的addLast方法有一些不同之处。具体来说,它们的区别如下:
1. 顺序:当调用addLast方法添加处理器时,它们的顺序略有不同。对于客户端来说,添加的处理器是按照添加的顺序进行顺序执行的,即先添加的处理器先执行。而对于服务端来说,添加的处理器是按照逆序执行的,即先添加的处理器后执行。
2. 作用对象:客户端的addLast方法主要作用于Outbound事件,用于处理从客户端发送到服务端的请求。而服务端的addLast方法主要作用于Inbound事件,用于处理从服务端接收到的请求。
3. 处理逻辑:客户端和服务端的addLast方法所添加的处理器,通常具有不同的处理逻辑。客户端的处理器通常用于编码请求、发送请求等操作。服务端的处理器通常用于解码请求、处理请求、编码响应等操作。


对象编解码

对象编码器
在这里插入图片描述
对象解码器
在这里插入图片描述


ProtoBuf和ProtoStuff

为了编解码提升性能,可以使用Protobuf域者Protpstuff对数据进行序列话和反序列化,效率更高。

第一步:导入依赖

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-api</artifactId>
    <version>1.0.10</version>
</dependency>

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-core</artifactId>
    <version>1.0.10</version>
</dependency>

<dependency>
    <groupId>com.dyuproject.protostuff</groupId>
    <artifactId>protostuff-runtime</artifactId>
    <version>1.0.11</version>
</dependency>

第二步:引入工具类

public class ProtostuffUtils {
    /**
     * 避免每次序列化都重新申请Buffer空间
     */
    private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
    /**
     * 缓存Schema
     */
    private static Map<Class<?>, Schema<?>> schemaCache = new ConcurrentHashMap<>();
    /**
     * 序列化方法,把指定对象序列化成字节数组
     *
     * @param obj
     * @param <T>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> clazz = (Class<T>) obj.getClass();
        Schema<T> schema = getSchema(clazz);
        byte[] data;
        try {
            data = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } finally {
            buffer.clear();
        }

        return data;
    }

    /**
     * 反序列化方法,将字节数组反序列化成指定Class类型
     *
     * @param data
     * @param clazz
     * @param <T>
     * @return
     */
    public static <T> T deserialize(byte[] data, Class<T> clazz) {
        Schema<T> schema = getSchema(clazz);
        T obj = schema.newMessage();
        ProtostuffIOUtil.mergeFrom(data, obj, schema);
        return obj;
    }

    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> clazz) {
        Schema<T> schema = (Schema<T>) schemaCache.get(clazz);
        if (Objects.isNull(schema)) {
            //这个schema通过RuntimeSchema进行懒创建并缓存
            //所以可以一直调用RuntimeSchema.getSchema(),这个方法是线程安全的
            schema = RuntimeSchema.getSchema(clazz);
            if (Objects.nonNull(schema)) {
                schemaCache.put(clazz, schema);
            }
        }

        return schema;
    }
}

netty实现群聊系统

服务端基本代码

//群聊系统的服务器
public class ChatServer {
    public static void main(String[] args) throws Exception {
        EventLoopGroup boosGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //配置参数
        serverBootstrap.group(boosGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG,1024)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //获得pipleline
                        ChannelPipeline pipeline = ch.pipeline();
                        //添加handler
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        //添加业务处理handler
                        pipeline.addLast(new ChatServerHandler());
                    }
                });
        System.out.println("聊天室启动了...");
        ChannelFuture channelFuture = serverBootstrap.bind(9090).sync();
        channelFuture.channel().closeFuture().sync();

        boosGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

服务端业务代码

public class ChatServerHandler extends SimpleChannelInboundHandler<String>{

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    //存放Channel的容器,而且还可以执行对每个channel执行的任务
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    //有客户端上线了
    //有新的客户端连接了,将该客户端的上线信息广播给其它所有客户端
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //得到客户端的channel
        Channel channel = ctx.channel();
        String message = "客户端-"+channel.remoteAddress()+"于"+sdf.format(new Date())+"上线了\n";
        //得到其它客户端的channel向其它客户端发送该客户端的channel
        channelGroup.writeAndFlush(message);
        //加入到channelGroup中
        channelGroup.add(channel);
    }

    /*
    * 客户端下线则广播给其它客户端*/

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        Channel channel = ctx.channel();
        //生成一个下线的信息
        String message = "客户端-"+channel.remoteAddress()+"于"+sdf.format(new Date())+"下线了\n";
        //广播给其它客户端
        channelGroup.writeAndFlush(message);
    }

    /*
    *具体读数据的业务 */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        //获得当前发消息的客户端channel
        Channel channel =   ctx.channel();
        //遍历所有的channel
        channelGroup.forEach(ch->{
            if(channel!=ch){
                ch.writeAndFlush("客户端-"+channel.remoteAddress()+"于"+sdf.format(new Date())+"说:"+
                        msg+"\n");
            }else{
                ch.writeAndFlush("我于"+sdf.format(new Date())+"说:"+msg+"\n");
            }
        });
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

客户端基本代码

public class ChatClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast(new StringDecoder());
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new ChatClientHandler());
                    }
                });
        //发送消息
        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090).sync();
        Channel channel = channelFuture.channel();
        System.out.println("欢迎进入Yc聊天室");
        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNextLine()){
            String message = scanner.nextLine();
            channel.writeAndFlush(message);
        }
        eventLoopGroup.shutdownGracefully();
    }
}

客户端业务代码

public class ChatClientHandler extends SimpleChannelInboundHandler<String> {

    //打印在控制台
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
    }
}

粘包和拆包

TCP协议特点

作为一个流式传输协议,数据在TCP中传输是没有边界的。也就是说,客户端发送的多条数据,有可能会被认为是一条数据。或者,客户端发送的一条数据,有可能会被分成多条数据。这是由于TCP协议并不了解上层业务数据的具体含义,在使用TCP协议传输数据时,是根据TCP缓冲区的实际情况进行数据包的划分。

举个例子

我们要发两句话
我是杨 他是李
可能别人收到的信息就是我是杨他是李一条数据,也可能收到我是 杨他是李这两句话
假设我们这有个客户端

在这里插入图片描述
发送两百次消息
在这里插入图片描述
就可能得到这样的结果
粘包:缓冲区还可以放的下 拆包:缓冲区不可以放的下(乱码发生的原因是因为一个字的字节放在不同缓冲区内发送)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/817766.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【管理设计篇】聊聊分布式配置中心

为什么需要配置中心 对于一个软件系统来说&#xff0c;除了数据、代码&#xff0c;还有就是软件配置&#xff0c;比如操作系统、数据库配置、服务配置 端口 ip 、邮箱配置、中间件软件配置、启动参数配置等。如果说是一个小型项目的话&#xff0c;可以使用Spring Boot yml文件…

Nginx解决文件服务器文件名显示不全的问题

Nginx可以搭建Http文件服务器&#xff0c;但默认的搭建会长文件名显示不全&#xff0c;比如如下&#xff1a; 问题&#xff1a;显示不全&#xff0c;出现...&#xff0c;需要进行解决 这里使用重新编绎nginx的方式&#xff0c;见此文&#xff1a; https://unix.stackexchange…

CS5265国产Typec转HDMI音视频转换芯片可替代RTD2172

集睿致远/ASL推出的CS5265是一款高性能Type-C/DP1.4至HDMI2.0b转换器IC&#xff0c;设计用于将USB type c源或DP1.4源连接至HDMI2.0b接收器。CS5265集成了DP1.4兼容接收机和HDMI2.0b兼容发射机。此外&#xff0c;CC控制器还用于CC通信&#xff0c;以实现DP Alt模式。DP接口包括…

ARM 常见汇编指令学习 9 - 缓存管理指令 DC 与 IC

文章目录 ARM64 DC 与 IC 指令 上篇文章&#xff1a;ARM 常见汇编指令学习 8 - dsb sy 指令及 dsb 参数介绍 ARM64 DC 与 IC 指令 AArch64指令集中有两条关于缓存维护&#xff08;cache maintenance&#xff09;的指令&#xff0c;分别是IC和DC。 IC 是用于指令缓存操作&…

设备管理升级:揭秘设备健康管理的核心优势

随着工业企业迎来数字化转型的浪潮&#xff0c;设备管理在实现升级和卓越运营方面扮演着关键角色。传统的设备管理方式已经难以适应复杂多变的生产环境&#xff0c;因此设备健康管理作为数字化转型的核心优势应运而生。本文将深入探讨设备健康管理的核心优势&#xff0c;以揭示…

了解ai绘画软件哪个好,我分享这几款你看看

以前绘画是一项需要花费长时间学习的艺术&#xff0c;绘画创作需要耗费许多时间和人工成本。但人工智能的发展&#xff0c;让ai绘画工具成为学习画画、创作的另一种新形式。这些绘画工具仅仅通过输入文字描述就可以生成个性化的创作&#xff0c;帮助我们快速实现绘画创作的梦想…

使用ansible playbook编写lnmp架构

使用ansible playbook编写lnmp架构 - name: nginx playgather_facts: falsehosts: lnmpremote_user: roottasks: - name: stop firewalldservice: namefirewalld statestopped- name: syslinuxcommand: /usr/sbin/setenforce 0ignore_errors: true- name: nginx.repocopy: src/…

Spring基于注解管理bean及全注解开发

文章目录 spring概述Spring定义Spring核心Spring Framework的特点 基于注解管理bean依赖开启组件扫描使用注解定义Bean案例:Autowired注入属性注入set注入形参上注入只有一个构造函数&#xff0c;无注解Autowire注解和Qualifier注解联合 Resource注入Spring全注解开发 spring概…

信息系统项目管理的计算机基础知识

一、信息化发展 &#xff08;一&#xff09;信息与信息化 1、信息 信息是确定性的增加。单位为比特&#xff08;bit&#xff09;。 2、信息系统 信息系统是通过输入数据&#xff0c;然后进行加工处理&#xff0c;最后产生信息的系统。面向管理和支持生产是信息系统的显著特…

【PHP】简记问题:使用strtotime(‘-1 month‘, time)获取上个月第一天时间戳出错

发生场景 在7月31号是查看统计上个月订单购买总金额&#xff0c;查询结果为0 $preMonthStart strtotime(date(Ym01, strtotime("-1 month"))); $curMonthStart strtotime(date(Ym01)); # 统计上月份实际订单金额 $sql "SELECT count(money) FROM orders WH…

Flowable-服务-服务任务

文章目录 定义图形标记XML内容界面操作 定义 服务任务&#xff08;Service Task&#xff09;是一个自动化任务&#xff0c;无须人为参与&#xff0c;一般被用作调用服务。当流程执行到服务任务 时&#xff0c;可以自动执行编写的 Java 程序实现自定义的业务逻辑&#xff0c;完…

SpringMVC中的相关注解

文章目录 RequestMappingGetMappingPostMappingResponseBody传参RequestParamPathVariableRequestBodyRequestPartCookieValueSessionAttributeRequestHeader总结 RequestMapping 支持 GET/POST 类型的请求。&#xff08;&#xff09;内写请求的访问地址。 即可以修饰类也可以修…

springboot2实现图片文件上传与mysql存储路径并回显

环境介绍 技术栈 springbootmybatismysql 软件 版本 mysql 8 IDEA IntelliJ IDEA 2022.2.1 JDK 1.8 Spring Boot 2.7.13 mybatis 2.3.1 springboot是基于spring 4.0&#xff0c;springboot2是基于spring5.0,springboot2由pivotal公司在2018发布,这个框架主要用来…

Pytorch深度学习-----神经网络之池化层用法详解及其最大池化的使用

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…

一些类型推导相关的功能(C++)

目录 auto关键的新用法&#xff08;C11起&#xff09; 背景介绍 用法示例 注意事项 typeid运算符 type_info类 typeid的用法 decltype运算符 用法阐述 用法示例 用法建议 function类模板 用法阐述 用法示例 function较函数指针的优势 std::function和decltype的…

苍穹外卖day10——订单状态定时处理(Spring Task)、来单提醒和客户催单(WebSocket)

预期效果 对于超时没处理的需要定时程序处理。基于SpringTask实现。 来单提醒和客户催单。基于WebSocket实现。 Spring Task 介绍 Cron表达式 周几通常不能和日一起指定。 cron表达式在线生成器 在线Cron表达式生成器 入门案例 创建定时任务类 /*** 定义定时任务类*/ Slf4j…

OBS推流工具介绍及桌面录屏推流功能实现

OBS推流工具介绍及桌面录屏推流功能实现 文章目录 OBS推流工具介绍及桌面录屏推流功能实现1 OBS工具介绍2 OBS工具安装及简单使用2.1 安装步骤2.2 简单使用介绍 3 OBS实现桌面录屏推流工具实现4 总结 流媒体开发工程中&#xff0c;我们除了使用ffmpeg等工具辅助调试外&#xff…

《工具箱-数据库相关》Dbeaver数据导入“CSV格式“、“Txt格式“导入配置

《工具箱-数据库相关》DBeaver线下数据导入数据库表 Dbeaver数据导入"CSV格式"、"Txt格式"导入配置 使用CSV、Txt导入的时候&#xff0c;数据格式不同&#xff0c;在导入数据的时候&#xff0c;要根据数据编码样式设置不同的配置。 一、Txt格式导入 1.1 …

汇编语言基础知识

目录 前言&#xff1a; 汇编语言的产生 汇编语言的组成 内存 指令和数据 cpu对内存的读写操作 地址总线 数据总线 控制总线 内存地址空间 前言&#xff1a; 汇编语言是直接在硬件之上工作的 编程语言&#xff0c;我们首先了解硬件系统的机构&#xff0c;才能有效地应用…

CHI中的resp type

Rsp分为4大类&#xff1b; Completion response □ 除了PCrdReturn&#xff0c;PrefetchTgt&#xff0c;其他所有的trans都需要comp resp; □ 通常是一个trans的最后一个发送的message, 来自completer; 这个响应保证trans到达了POS/POC; □ 通常RN还会发送一个compack;Read an…