“超越极限 - 如何使用 Netty 高效处理大型数据?“ - 掌握 Netty 技巧,轻松应对海量数据处理!

news2025/1/16 8:58:52

1 写大型数据

因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是特殊问题。由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知 ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。

考虑下将一个文件内容写出到网络。讨论传输(见 4.2 节)的过程中,提到 NIO 的零拷贝,这消除了将文件内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在 Netty 的核心中,所以应用程序所有需要做的就是使用FileRegion接口实现,其在 Netty 的 API 文档中的定义是: “通过支持零拷贝的文件传输的 Channel 来发送的文件区域。”

代码11-11展示如何通过从FileInputStream创建一个DefaultFileRegion,并将其写入Channel(甚至可利用 io.netty.channel.ChannelProgressivePromise实时获取传输的进度),从而利用零拷贝特性来传输一个文件的内容。

package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.File;
import java.io.FileInputStream;

/**
 * Listing 11.11 使用 FileRegion 传输文件的内容
 */
public class FileRegionWriteHandler extends ChannelInboundHandlerAdapter {
    private static final Channel CHANNEL_FROM_SOMEWHERE = new NioSocketChannel();
    private static final File FILE_FROM_SOMEWHERE = new File("");

    @Override
    public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        File file = FILE_FROM_SOMEWHERE; //get reference from somewhere
        Channel channel = CHANNEL_FROM_SOMEWHERE; //get reference from somewhere
        //...
        // 创建一个FileInputStream
        FileInputStream in = new FileInputStream(file);

        // 以该文件的完整长度创建一个新的 DefaultFileRegion
        FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length());
        // 发送该 DefaultFileRegion,并注册一个ChannelFutureListener
        channel.writeAndFlush(region).addListener(
                new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            // 处理失败
                            Throwable cause = future.cause();
                            // Do something
                        }
                    }
                });
    }
}

该示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量内存消耗。

关键是 interface ChunkedInput,类型参数 B 是 readChunk()方法返回的类型。Netty 预置该接口的 4 个实现,如表 11-7:

表11-7:ChunkedInput的实现每个都代表了一个将由 ChunkedWriteHandler 处理的不定长度的数据流。

代码清单 11-12 说明 ChunkedStream 用法,最常用的实现。所示类使用File及SslContext进行实例化。当initChannel()被调用,它将使用所示的 ChannelHandler 链初始化该 Channel。

表11-7:ChunkedInput的实现

当 Channel 的状态变为活动的时,WriteStreamHandler 将会逐块地把来自文件中的数据作为 ChunkedStream 写入。数据在传输之前将会由 SslHandler 加密。

package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedStream;
import io.netty.handler.stream.ChunkedWriteHandler;

import java.io.File;
import java.io.FileInputStream;

/**
 * 11.12 使用 ChunkedStream 传输文件内容
 */
public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {

    private final File file;

    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 将 SslHandler 添加到ChannelPipeline
        pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc())));
        // 添加 ChunkedWriteHandler以处理作为ChunkedInput传入的数据
        pipeline.addLast(new ChunkedWriteHandler());
        // 一旦连接建立,WriteStreamHandler就开始写文件数据
        pipeline.addLast(new WriteStreamHandler());
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // 当连接建立时,channelActive()将使用ChunkedInput写文件数据
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}

逐块输入

要使用你自己的 ChunkedInput 实现,请在 ChannelPipeline 中安装一个ChunkedWriteHandler。

本节讨论如何通过使用零拷贝特性来高效地传输文件,以及如何通过使用ChunkedWriteHandler写大型数据而又不必冒OOM风险。下一节研究几种序列化 POJO 方法。

2 序列化数据

JDK 提供了 ObjectOutputStream 和 ObjectInputStream,用于通过网络对 POJO 的基本数据类型和图进行序列化和反序列化。该 API 并不复杂,而且可以被应用于任何实现了java.io.Serializable接口的对象。但其性能也不是非常高效。这节,我们将看到 Netty 必须为此提供什么。

2.1 JDK 序列化

若你的应用程序必须要和使用了ObjectOutputStream、ObjectInputStream的远程节点交互,并且兼容性也是你最关心的,那么JDK序列化将是正确的选择,表11-8列出Nett y提供的用于和JDK进行互操作的序列化类:

CompatibleObjectDecoder类已经在 Netty 3.1 中废弃,并不存在于 Netty 4.x 中:https://issues.jboss.org/browse/NETTY-136

若你能自由使用外部依赖,则JBoss Marshalling将是理想选择:它比JDK序列化最多快3倍,且更紧凑。在JBoss Marshalling官方网站主页的概述对其定义:JBoss Marshalling 是一种可选的序列化 API,它修复 JDK 序列化 API 所发现的许多问题,同时保留与 java.io.Serializable 及其相关类的兼容性,并添加几个新的可调优参数及额外特性,所有这些都能通过工厂配置(如外部序列化器、类/实例查找表、类解析以及对象替换等)实现可插拔的。

2.2 使用 JBoss Marshalling 进行序列化

Netty 通过表11-9所示的两组解码器/编码器对为 Boss Marshalling 提供支持:

  • 第一组兼容只使用 JDK 序列化的远程节点
  • 第二组提供最大性能,适用于和使用 JBoss Marshalling 的远程节点一起使用

表11-9:JBoss Marshalling编解码器

代码11-13展示如何使用 MarshallingDecoder 和 MarshallingEncoder。同样,几乎只是适当地配置 ChannelPipeline。

package io.netty.example.cp11;

import io.netty.channel.*;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import java.io.Serializable;

/**
 * 11.13 使用 JBoss Marshalling
 */
public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;

    public MarshallingInitializer(UnmarshallerProvider unmarshallerProvider, MarshallerProvider marshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        // 添加 MarshallingDecoder 以 将 ByteBuf 转换为 POJO
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        // 添加 MarshallingEncoder 以将POJO 转换为 ByteBuf
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        pipeline.addLast(new ObjectHandler());
    }

    // 添加 ObjectHandler,以处理普通的实现了Serializable 接口的 POJO
    public static final class ObjectHandler extends SimpleChannelInboundHandler<Serializable> {

        @Override
        public void channelRead0(ChannelHandlerContext channelHandlerContext, Serializable serializable) throws Exception {
            // Do something
        }
    }
}

2.3 通过 Protocol Buffers 序列化

Netty序列化的最后一个解决方案是利用Protocol Buffers(https://protobuf.dev/)的编解码器,由Google开发、现已开源的数据交换格式。可在https://github.com/google/protobuf找到源代码。Protocol Buffers 以紧凑高效方式对结构化的数据进行编解码。它具有许多的编程语言绑定,使得它很适合跨语言项目。表 11-10 展示Netty为支持 protobuf 所提供ChannelHandler 实现。

表11-10:Protobuf编解码器

使用 protobuf 只不过是将正确的 ChannelHandler 添加到 ChannelPipeline 中,如代码清单 11-14 所示。

package io.netty.example.cp11;

import com.google.protobuf.MessageLite;
import io.netty.channel.*;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;

/**
 * Listing 11.14 Using protobuf
 */
public class ProtoBufInitializer extends ChannelInitializer<Channel> {
    private final MessageLite lite;

    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // 添加 ProtobufVarint32FrameDecoder 以分隔帧
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        // 还需要在当前的 ProtobufEncoder 之前添加一个相应的 ProtobufVarint32LengthFieldPrepender 以编码进帧长度信息
        // 添加 ProtobufEncoder以处理消息的编码
        pipeline.addLast(new ProtobufEncoder());
        // 添加 ProtobufDecoder以解码消息
        pipeline.addLast(new ProtobufDecoder(lite));
        // 加 ObjectHandler 以处理解码消息
        pipeline.addLast(new ObjectHandler());
    }

    public static final class ObjectHandler extends SimpleChannelInboundHandler<Object> {

        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
            // Do something with the object
        }
    }
}

这节探讨由 Netty 专门的解码器和编码器所支持的不同的序列化选项:标准JDK序列化、JBoss Marshalling 及 Google 的 Protocol Buffers。

3 总结

Netty 提供的编解码器以及各种 ChannelHandler 可以被组合和扩展,以实现非常广泛的处理方案。此外,它们也是被论证的、健壮的组件,已经被许多的大型系统所使用。

我们只涵盖最常见示例;Netty 的 API 文档提供了更加全面的覆盖。

下一章学习另一种先进的协议——WebSocket,被开发用以改进 Web 应用程序的性能以及响应性。Netty 提供你将会需要的工具,以便你快速、轻松地利用它强大的功能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/551022.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年最受欢迎的低代码平台排行榜

随着企业寻找在降低成本的同时加快软件开发的方法&#xff0c;低代码开发平台正变得越来越受欢迎。这些平台允许开发人员使用拖放界面和预置组件&#xff0c;以最少的代码创建复杂的应用程序。它不仅帮助企业加快了数字化转型的脚步&#xff0c;而且打破业务部门和IT部门之间的…

多元分类预测 | Matlab萤火虫算法(FA)优化BP神经网络分类预测,FA-BP分类预测,多特征输入模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元分类预测 | Matlab萤火虫算法(FA)优化BP神经网络分类预测,FA-BP分类预测,多特征输入模型,多特征输入模型,多特征输入模型,多特征输入模型,多特征输入模型,多特征输入模型 多特征输入单输出的二分类及多…

商品领域十二张基础表设计思路与实现

欢迎大家关注公众号「JAVA前线」查看更多精彩分享文章&#xff0c;主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等&#xff0c;同时欢迎大家加我微信「java_front」一起交流学习 1 文章概述 商品在电商领域中是一个非常重要的领域&#xff0c;交易行为前提是有…

Selenium + Java 的环境搭建

Selenium Java 的环境搭建 &#x1f50e;Chrome 浏览器下载 Chrome 浏览器检查对应版本下载 Chrome 浏览器驱动 &#x1f50e;配置环境变量&#x1f50e;验证环境是否搭建成功&#x1f50e;关于 pom.xml 出现错误的解决方案 &#x1f50e;Chrome 浏览器 下载 Chrome 浏览器 下…

使用命令启动默认程序(例如启动系统默认浏览器打开指定网址)

文章目录 目的基础说明代码示例&#xff08;Golang&#xff09;总结 目的 通过命令调用系统默认应用程序打开对应格式的文件是比较常用的功能。这篇文章将介绍下相关内容。 基础说明 Windows windows下可以使用 start 指令来启动默认程序打开对应格式文件&#xff1b; 比如 …

iptables中SNAT、DNAT及iptables服务启动时会自动还原规则

目录 SNAT原理与应用​编辑 SNAT转换前提条件 临时打开&#xff1a; 永久打开&#xff1a; 示例​编辑 DNAT原理与应用​编辑 DNAT转换前提条件 示例​编辑 防火墙规则的备份和还原 导出&#xff08;备份&#xff09;所有表的规则 清空规则​编辑 导入&#xff08;还…

【VMware】Ubunt 20.04时间设置

文章目录 设置本地时间 UTC8设置24小时制同步网络时间参考 Talk is cheap, show me the code. 设置本地时间 UTC8 查看当前时区状态 rootnode1:~/k8s# timedatectlLocal time: Sun 2023-05-21 15:24:02 CSTUniversal time: Sun 2023-05-21 07:24:02 UTCRTC time: Sun 2023-05-2…

计算机网络知识汇总(十万字超详细)

文章目录 1 计算机网络概述1.1 概念、组成、功能和分类1.2 标准化工作及相关组织1.3 速率相关的性能指标1.4 时延、时延带宽积、往返时间RTT、利用率1.5 分层结构、接口、协议、服务1.6 OSI参考模型1.7 TCP/IP与五层参考模型1.8 第一章知识大纲 2.物理层2.1 物理层基本概念2.2 …

GaussDB(for MySQL)云原生数据库技术演进和挑战

摘要&#xff1a;GaussDB(for MySQL)是华为自研云原生数据库&#xff0c;具有高性能&#xff0c;高扩展&#xff0c;高可靠的特点&#xff0c;完全兼容MySQL协议&#xff0c;自研架构和友好的生态兼容性&#xff0c;可以同时满足数据库管理员、应用开发者、CTO的运维、使用和业…

QT5.14.2下载安装与环境配置

1.QT5.14.2的下载 QT5.14.2的官方下载地址为 https://download.qt.io/archive/qt/5.14/5.14.2/ ![在这里插入图片描述](https://img-blog.csdnimg.cn/9ef2a92414cb48a482d3cde4dd19a9ac.png 由于exe文件名称只有x86&#xff0c;只能选择这个下载&#xff0c;但是在安装时可以选…

ChatGPT也能助力建筑设计,这么智能?

ChatGPT也称为 Generative Pre-trained Transformer&#xff0c;是一种强大的语言生成工具&#xff0c;具有生成类人文本的能力。这项技术有可能通过为建筑师提供与客户、承包商和其他利益相关者沟通和协作的新方式来彻底改变建筑行业。在这篇文章中&#xff0c;我们将探讨架构…

css学习-内容加载占位动画(渐变动画)

文章目录 学习链接纯CSS渐变动画结合vue指令简单使用 学习链接 Git Hub前端50天50个项目 | 第24 内容文本 纯CSS渐变动画 <style lang"scss" scoped> .card-wrapper {width: 100%;height: 100%;display: flex;align-items: center;justify-content: center; …

tinymce富文本编辑器使用到二开

tinymce tinymce 一款现代化的富文本编辑器&#xff0c;有专门团队维护&#xff0c;是目前主流的富文本编辑器选择。 安装注意事项&#xff1a; 有两种方案分别是安装对应的vue/react组件&#xff0c;然后直接用组件&#xff0c;或者直接使用tinymce去按原生操作会报找不到文…

HTML- 标签学习之- 列表、表格

无序列表/有序列表&#xff1a; 标签组成( 无序ul 有序 ol ) -> li 父子级标签&#xff0c; ul只能包含li标签&#xff0c; li标签可以包含任意内容。 自定义列表 dl :自定义列表的整体&#xff0c;用于包裹dt/dd 标签dt:自定义列表主题dd:自定义列表的针对主题的…

【VMware】VM安装虚拟机

文章目录 VMware教程创建新的虚拟机自定义安装选择稍后安装操作系统这里选择Linux操作系统&#xff0c;版本为Centos7 64位选择名称和安装位置选择处理器、内核数量&#xff08;可根据电脑硬件以及需求进行调整&#xff09;选择2G内存&#xff08;可根据电脑硬件以及需求进行调…

计算机视觉 day94 DDH - YOLOv5:基于双IoU感知解耦头改进的YOLOv5,用于对象检测

DDH - YOLOv5:基于双IoU感知解耦头改进的YOLOv5&#xff0c;用于对象检测 I. IntroductionII. Related workPrediction head 预测头 III. Methodology3.1 Decoupled Head3.2 Double IoU‑aware3.3 Training3.4 Inference IV. Experiments4.1 与YOLOv5等检测头对PASCAL VOC2007测…

Netty实战(五)

ByteBuf—Netty的数据容器 一、什么是ByteBuf二、 ByteBuf 的 API三、ByteBuf 类——Netty 的数据容器3.1 ByteBuf如何工作&#xff1f;3.2 ByteBuf 的使用模式3.2.1 堆缓冲区3.2.2 直接缓冲区3.2.3 复合缓冲区 四、字节级操作4.1 随机访问索引4.2 顺序访问索引4.3 可丢弃字节4…

使用Docker部署Jenkins

Jenkins是一款开源的持续集成&#xff08;DI&#xff09;工具&#xff0c;广泛用于项目开发&#xff0c;能提供自动构建&#xff0c;测试&#xff0c;部署等功能。 文章目录 1、安装2、配置镜像加速3、登录初始化Jenkins4、配置Jenkins 1、安装 接下来使用Docker部署Jenkins&a…

【腾讯云 Finops Crane集训营】关于Crane的认识和体验总结

一、Crane 是什么 Crane 是一个基于 FinOps 的云资源分析与成本优化平台。它的愿景是在保证客户应用运行质量的前提下实现极致的降本。Crane 是 FinOps 基金会认证的云优化方案。 Crane基于Docker和Kubernetes技术&#xff0c;支持常见的容器化应用场景&#xff0c;如部署多个…

分布式项目 09.服务器之间的通信和三个工具类

项目的结构&#xff1a;1.通过Nginx首先把访问首页的请求发送到前端web服务器&#xff0c;2.web服务器会根据请求的url中的一些细节&#xff0c;来把相关的请求发送到相关的服务器中&#xff0c;3.相关的服务器会处理业务&#xff0c;并且返回结果到web服务器中&#xff0c;最后…