Flink+Paimon多流拼接性能优化实战

news2024/11/20 8:31:58

目录

(零)本文简介

(一)背景

(二)探索梳理过程

(三)源码改造

(四)修改效果

1、JOB状态

2、Level5的dataFile总大小

3、数据延迟

(五)未来展望:异步Compact


(零)本文简介

Paimon多流拼接/合并性能优化;

        为解决离线T+1多流拼接数据时效性Flink实时状态太大任务稳定性问题,这里基于数据湖工具Apache Paimon进行近实时的多流拼接。

        使用Flink+Paimon基于ParmaryKey TablePartialUpdate)进行多流拼接的时候,跑一段时间有时会遇到周期性背压、checkpoint时间过长等情况,本文通过剖析源码逻辑、修改源码,在一定程度上解决了这个问题。

Apache Paimon基础 、多流拼接方法 及 与Hudi 的对比 可参考前面文章:

新一代数据湖存储技术Apache Paimon入门Demo_Leonardo_KY的博客-CSDN博客

基于数据湖的多流拼接方案-HUDI概念篇_Leonardo_KY的博客-CSDN博客

(一)背景

       这里使用 Flink 1.14 + Apache Paimon 0.5 snapshot 进行多流拼接(前端埋点流 + 服务端埋点流);

        当前情况是一天一个分区,一个分区100个bucket;就会出现如下情况:分区/bucket中的数据越来越多,到达下午或者傍晚的时候就会出现 paimon 作业周期性背压(因为mergeTree中维护的数据越来越多,tree越来越大),checkpoint时间也会比较长;于是决定将mergeTree中的过期数据删除,即让其不进入tree中,减少计算量;

        这里的“过期”按需自定义,比如调研发现99.9%的数据都可以使用3个小时之内的数据拼接上,那就根据时间戳与当前时间戳(假设没有很严重的消费积压)相比,时间差超过3小时的数据就将其丢弃;

具体细节涉及到(这里先将结论给出):

    1. data文件创建后是否还会修改?(不会)
    2. 根据时间排序的data数据文件是增量还是全量?(几个最新文件加起来就是全量)
    3. 应该根据dataFile的创建/修改时间判断过期 还是 通过具体每个record字段值的时间戳判断过期?(通过record)

(二)探索梳理过程

1、首先观察hdfs文件之后发现,dataFile只保留最近一个小时的文件,超过一小时的文件就会被删除,这里应该对应参数 partition.expiration-check-interval = 1h,由此可知data文件不是增量的【下文compact只有几个文件再次加强验证】(那么就不能通过dataFile的最新修改时间判断文件过期将数据过滤);

2、观察flink log发现,每次compaction都只读几个文件,如下所示:

        每次其实只读取一个level0的file,再加上几个level5的file(level5这里file就是之前的全部数据,包含多个流的),最后将compact之后的文件再命名为新的名字写到level5;

        随着分区数据量的增多,参与compact的file也会越来越多(这也是会导致tree偏大,出现周期性背压的原因);

另外,dataFile命名呈现如下规律:

        level5的第二个文件总是跟第一个中间隔一个(这个跟改源码没有关系,只是适合观察规律);

到晚间的时候参与compact的file更多了:

3、观察每次level5生成的dataFile(理论上level5的dataFile会越来越大/多,当单个文件大小超过128M *(1+rate)时,会生成新文件);

        所有level5的文件大小加起来会越来越大,即永远是呈增长趋势;

        如下每一层的总大小在不断增大,同时当文件到一定程度之后,每层2个文件变成3个文件;

4、【以上3点均为原始实现思路,从这里开始改造】思考:既然已知每个bucket中只要最新的几个dataFile就包含了全部的data数据(dataFile不是增量的),那么就不能通过文件最新修改时间来判断数据是否过期,只能从最新的几个dataFile的每条记录来进行判断了,即原本每次参与合并的record是从这个partition+bucket建立开始的全部数据,那么是否可以通过修改源码判断每条record是否过期,从而不参与mergeTree,在compact完成之后也不会再次写入新的dataFile(如果还是写进来,每次读进tree时都需要判断是否过期,是否进入tree)?【答案当然是可以的!】

(三)源码改造

1、首先说明一下,在源码中有这么一段

// IntervalPartition.partition()
public List<List<SortedRun>> partition() {
    List<List<SortedRun>> result = new ArrayList<>();
    List<DataFileMeta> section = new ArrayList<>();
    BinaryRow bound = null;

    for (DataFileMeta meta : files) {
        if (!section.isEmpty() && keyComparator.compare(meta.minKey(), bound) > 0) {
            // larger than current right bound, conclude current section and create a new one
            result.add(partition(section));
            section.clear();
            bound = null;
        }
        section.add(meta);
        if (bound == null || keyComparator.compare(meta.maxKey(), bound) > 0) {
            // update right bound
            bound = meta.maxKey();
        }
    }
    if (!section.isEmpty()) {
        // conclude last section
        result.add(partition(section));
    }

    return result;
}

        此处为了将文件排序、再将有overlap的放在一个list里边,一但产生gap(即没有overlap),那么就创建新的list,最终将这些 list 再放到List>中:

示意图如下:

2、后续通过一些处理变成 List> 的格式,这里的KeyValue就包含我们想要去操纵的record!

源码是这样的:

public <T> RecordReader<T> mergeSort(
        List<ReaderSupplier<KeyValue>> lazyReaders,
        Comparator<InternalRow> keyComparator,
        MergeFunctionWrapper<T> mergeFunction)
        throws IOException {
    if (ioManager != null && lazyReaders.size() > spillThreshold) {
        return spillMergeSort(lazyReaders, keyComparator, mergeFunction);
    }

    List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());
    for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
        try {
            readers.add(supplier.get());
        } catch (IOException e) {
            // if one of the readers creating failed, we need to close them all.
            readers.forEach(IOUtils::closeQuietly);
            throw e;
        }
    }

    return SortMergeReader.createSortMergeReader(
            readers, keyComparator, mergeFunction, sortEngine);
}

        这里的return就会创建sortMergeReader了,我们可以在将数据传入这里之前,先进行过滤(通过判断每一条record是否超过过期时间),修改如下:

public <T> RecordReader<T> mergeSort(
        List<ReaderSupplier<KeyValue>> lazyReaders,
        Comparator<InternalRow> keyComparator,
        MergeFunctionWrapper<T> mergeFunction)
        throws IOException {
    if (ioManager != null && lazyReaders.size() > spillThreshold) {
        return spillMergeSort(lazyReaders, keyComparator, mergeFunction);
    }

    List<RecordReader<KeyValue>> readers = new ArrayList<>(lazyReaders.size());
    for (ReaderSupplier<KeyValue> supplier : lazyReaders) {
        try {
            // 过滤掉过期数据
            RecordReader<KeyValue> filterSupplier =
                    supplier.get()
                            .filter(
                                    (KeyValue keyValue) ->
                                            isNotExpiredRecord(
                                                    keyValue.value(), expireTimeMillis));
            readers.add(filterSupplier);
        } catch (IOException e) {
            // if one of the readers creating failed, we need to close them all.
            readers.forEach(IOUtils::closeQuietly);
            throw e;
        }
    }

    return SortMergeReader.createSortMergeReader(
            readers,
            keyComparator,
            mergeFunction,
            sortEngine,
            keyType.getFieldTypes(),
            valueType.getFieldTypes());
}

// 判断这条数据是否过期
public boolean isNotExpiredRecord(InternalRow row, long expireTimeMillis) {
    if (expireTimeMillis <= 0) {
        return true;
    }
    // 只要有一个字段不为空,且大于0,且过期时间大于expireTimeMillis,就判断为过期
    for (Integer pos : expireFieldsPosSet) {
        if ((!row.isNullAt(pos))
                && row.getLong(pos) > 0
                && (System.currentTimeMillis() - row.getLong(pos)) > expireTimeMillis) {
            return false;
        }
    }
    return true;
}

与此同时,将相关参数暴露出来,可以在建表时进行自定义配置:

public static final ConfigOption<Integer> RECORDS_EXPIRED_HOUR =
        key("record.expired-hour")
                .intType()
                .defaultValue(-1)
                .withDescription(
                        "Records in streams WON'T be offered into MergeTree when they are expired."
                                + " (Inorder to avoid too large MergeTree; -1 means never expired). ");

public static final ConfigOption<String> RECORDS_EXPIRED_FIELDS =
        key("record.expired-fields")
                .stringType()
                .noDefaultValue()
                .withDescription(
                        "Records in streams WON'T be offered into MergeTree when they are judged as [expired] according to these fields."
                                + "If you specify multiple fields, delimiter is ','.");

使用方法:

val createPaimonJoinTable = (
  s"CREATE TABLE IF NOT EXISTS ${paimonTable}(\n"
    + " uuid STRING,\n"
    + " metaid STRING,\n"
    + " cid STRING,\n"
    + " area STRING,\n"
    + " ts1 bigint,\n"
    + " ts2 bigint,\n"
    + " d STRING, \n"
    + " PRIMARY KEY (d, uuid) NOT ENFORCED \n"
    + ") PARTITIONED BY (d) \n"
    + " WITH (\n" +
    "    'merge-engine' = 'partial-update',\n" +
    "    'changelog-producer' = 'full-compaction', \n" +
    "    'file.format' = 'orc', \n" +
    s"    'sink.managed.writer-buffer-memory' = '${sinkWriterBuffer}', \n" +
    s"    'full-compaction.delta-commits' = '${fullCompactionCommits}', \n" +
    s"    'scan.mode' = '${scanMode}', \n" +
    s"    'bucket' = '${bucketNum}', \n" +
    s"    'sink.parallelism' = '${sinkTaskNum}', \n" +
    s"    'record.expired-hour' = '3' , \n" +   // user defined para
    "     'record.expired-fileds' = '4,5' , \n" +   // user defined para
    "     'sequence.field' = 'ts1' \n" +
    ")"
  )
tableEnv.executeSql(createPaimonJoinTable)

(四)修改效果

1、JOB状态

运行到晚上20点尚未出现背压:

checkpoint时间也没有过长(如果不剔除过期数据,到这个时间cp时长应该在3分钟左右):

生产到Kafka的消息也没有严重的断流或者锯齿现象:

还是有可能出现exception如下(但对数据量没有任何影响):

2、Level5的dataFile总大小

        上边只是现象,最终还是要数据说话。

        修改源码之后,观察dataFile,理论上每一层的size总大小可能会出现减小的情况 (因为过期数据就不会再写入到 level5 新的data文件中了)

        如下图:levelSize diff(下一次level总size - 上一次level总size),确实出现了“有正有负”的情况,于是验证源码修改生效(即每次进行compact只会读取近 n 个小时的数据进行合并)!

3、数据延迟

有意思的是,当我们修改源码(将过期的数据丢弃)之后,数据延迟也变小了。

数据延迟计算方法:paimon处理完将数据写到kafka队列的时间戳 - 前端埋点被触发被服务器接收到的时间戳;

修改前:

修改后:

(五)未来展望:异步Compact

官方提供的paimon源码,里边的compaction是 sync 模式的,我尝试改成过 async 的,但是时不时会出现很少量的数据丢失(感觉可能是因为同一时刻有多个compact任务在进行),后续有机会可以再继续尝试一下。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/955354.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【zookeeper】zookeeper介绍

分布式协调技术 在学习ZooKeeper之前需要先了解一种技术——分布式协调技术。那么什么是分布式协调技术&#xff1f;其实分布式协调技术主要用来解决分布式环境当中多个进程之间的同步控制&#xff0c;让他们有序的去访问某种临界资源&#xff0c;防止造成"脏数据"的…

Joint contrast enhancement and exposure fusion for real-world image dehazing、近红外去雾、(近)红外和可见光数据集

今天给大家分享一篇发表在IEEE TMM上的去雾文章Joint Contrast Enhancement and Exposure Fusion for Real-World Image Dehazing 作者从对比度增强和曝光融合的视角来解决图像去雾问题&#xff0c;在真实场景上取得了较好的去雾效果。此外&#xff0c;作者将所提出的方法应用…

mqtt集群搭建并使用nginx做负载均衡_亲测得结论

mqtt集群搭建 RabbitMQ集群搭建和测试总结_亲测 搭建好RabbitMQ集群,并开启mqtt插件功能,mqtt集群也就搭建好了 nginx配置mqtt负载均衡 #修改rabbitmq1节点ip为1.19的nginx配置 vim /etc/nginx/nginx.confhttp { } #在http外添加如下配置 stream {upstream rabbitmqtt {ser…

OpenCV(十二):图像透视变换

目录 1.透视变换介绍 2.计算透视变换矩阵getPerspectiveTransform(&#xff09; 3.透视变换函数warpPerspective() 4.demo 1.透视变换介绍 透视变换是一种将原始图像映射到目标图像平面上的投影变换&#xff0c;又称为四点变换。 透视变换矩阵的一般形式如下所示&#xff…

城市内涝积水监测预警系统 yolov8

城市内涝积水监测预警系统通过yolov8网络深度学习框架&#xff0c;算法一旦识别到道路出现积水&#xff0c;城市内涝积水监测预警系统会立即发出预警信号。并及时通知相关人员。YOLO检测速度非常快。标准版本的YOLO可以每秒处理 45 张图像&#xff1b;YOLO的极速版本每秒可以处…

具有 70V 总线故障保护功能的ISO1042BQDWVRQ1、ISO1042BQDWVQ1、ISO1042QDWVRQ1汽车类隔离式 CAN 收发器

一、目标应用 • 起动机/发电机 • 电池管理系统 (BMS) • 直流/直流转换器 • 车载充电器 (OBC) 和无线充电器 • 逆变器和电机控制 二、器件规格 1、ISO1042BQDWVRQ1 完全版 收发器&#xff0c;隔离式 1/1 CANbus 8-SOIC 类型&#xff1a;收发器&#xff0c;隔离式 协议&a…

day30 日期转换

一&#xff1a;Date Date类&#xff1a; 这个类是java.util.Date getTime() : 获取内部维护的long值 Date date new Date(); long time date.getTime(); setTime()&#xff1a;按照指定的long值&#xff08;表示的时间&#xff09;设置Date表示的时间 time 60*60*24*1000;…

项目实践:类平面抓取点计算(占位,后面补充)

文章目录 文章目录&#xff1a;3D视觉个人学习目录微信&#xff1a;dhlddxB站: Non-Stop_

贯穿嵌入式开发的编程语言?

有个朋友和我说嵌入式行业中有没有什么神技巧&#xff0c;所谓的一招鲜吃遍天一般&#xff0c;那还真的有&#xff0c;在嵌入式开发中&#xff0c;C语言是最广泛使用的编程语言之一&#xff0c;覆盖范围几乎涵盖了整个领域。视频后方有免费的嵌入式学习资料&#xff0c;入门和进…

PHP 接入微信支付分

♦ 背景 最近项目中需要接入【微信支付分】的服务&#xff0c; 本文以 【免确认订单模式】&#xff1a;即先享模式&#xff08;评估不通过不可使用服务&#xff09;的使用 在此做一下实现步骤&#xff0c;希望能对小伙伴有所帮助&#xff0c;欢迎指摘 … 实现语言&#xff1a;P…

【NLP】手把手使用PyTorch实现Transformer以及Transformer-XL

手把手使用PyTorch实现Transformer以及Transformer-XL Abstract of Attention is all you need使用PyTorch实现Transformer1. 构建Encoder-Decoder模型1.1 导入依赖库1.2 创建Encoder-Decoder类1.3 创建Generator类 2. 构建Encoder2.1 定义复制模块的函数2.2 创建Encoder2.3 构…

语言基础篇3——学习第一步,Python环境搭建

环境搭建 基础环境搭建 https://www.python.org/downloads/&#xff0c;以Python3.11.5为例&#xff1a; Install for Windows 提供安装程序或者压缩包&#xff0c;安装程序点击下一步即可&#xff0c;压缩包解压即可&#xff0c;注意配置根目录到系统环境变量PATH。 Ins…

uniapp项目实战系列(3):底部导航栏与头部导航栏的配置

目录 系列往期文章&#xff08;点击跳转&#xff09;uniapp项目实战系列(1)&#xff1a;导入数据库&#xff0c;启动后端服务&#xff0c;开启代码托管&#xff08;点击跳转&#xff09;uniapp项目实战系列(2)&#xff1a;新建项目&#xff0c;项目搭建&#xff0c;微信开发工具…

MySQL— 基础语法大全及操作演示!!!(事务)

MySQL—— 基础语法大全及操作演示&#xff08;事务&#xff09; 六、事务6.1 事务简介6.2 事务操作6.2.1 未控制事务6.2.2 控制事务一6.2.3 控制事务二 6.3 事务四大特性6.4 并发事务问题6.5 事务隔离级别 MySQL— 基础语法大全及操作演示&#xff01;&#xff01;&#xff01…

docker 部署springboot(成功、截图)

1.新建sringboot工程并打包 2.编写Dockerfile文件 # 基础镜像使用java FROM openjdk:8 # 作者 MAINTAINER feng # VOLUME 指定了临时文件目录为/tmp。 # 其效果是在主机 /var/lib/docker 目录下创建了一个临时文件&#xff0c;并链接到容器的/tmp VOLUME /tmp # 将jar包添加…

什么是数据丢失防护(DLP)

数据丢失防护 &#xff08;DLP&#xff09; 是一种安全策略&#xff0c;旨在保护企业的关键数据免遭未经授权的用户盗窃、丢失或访问。一个好的 DLP 系统是用于数据发现和分类、数据传输和访问控制、策略和事件管理以及细致的审核和警报的工具的组合。 数据丢失的原因是什么 …

Databricks 入门之sql(二)常用函数

1.类型转换函数 使用CAST函数转换数据类型&#xff08;可以起别名&#xff09; SELECTrating,CAST(timeRecorded as timestamp) FROMmovieRatings; 支持的数据类型有&#xff1a; BIGINT、BINARY、BOOLEAN、DATE 、DECIMAL(p,s)、 DOUBLE、 FLOAT、 INT、 INTERVAL interva…

嵌入式学习之popen函数

相比于system输出的好处&#xff0c;popen可以直接输出运行结果 14.进程总结 需要重点掌握进程配合相关概念&#xff0c;创建进程函数fork的使用&#xff0c;理解进程创建发生了什么事&#xff0c;exec族函数&#xff0c;exec族函数配合fork使用。

2009-2022年商业银行资产利息相关数据

2009-2022年商业银行资产利息相关数据 1、时间&#xff1a;2009-2022年 2、来源&#xff1a;整理自wind 3、指标&#xff1a;利息支出、资产总计、员工总数、固定资产、存款总额、应付职工薪酬、营业支出、营业收入、扣除人员开支后的营业支出 银行&#xff1a;平安银行兰州…

富而喜悦九仔短短10秒的拥抱让百万网友直呼“太可爱!”

现如今网络发展速度非常快&#xff0c;各种各样的走红层出不穷&#xff0c;甚至有很多人都是一夜之间爆红的&#xff0c;出名的速度非常快。近期&#xff0c;在新浪微博的热榜中&#xff0c;有一个富而喜悦九仔的话题横空出世&#xff0c;微博博主富而喜悦外事部小九&#xff0…