Apache SeaTunnel脚本升级及参数调优实战

news2025/3/22 21:19:15

最近作者针对实时数仓的Apache SeaTunnel同步链路,完成了双引擎架构升级与全链路参数深度调优,希望本文能够给大家有所启发,欢迎批评指正!

在这里插入图片描述

Apache SeaTunnel 版本 :2.3.9

Doris版本:2.0.6

MySQL JDBC Connector : 8.0.28

架构升级

  • 批处理链路:JDBC并行度进行提升,基于ID分区实现分片读取,结合批量参数(fetch_size=10000+batch_size=5000)使全量同步吞吐量大幅增加

  • 实时增量链路:引入MySQL-CDC组件,通过initial快照模式+chunk.size.rows=8096实现全量/增量平滑切换,事件延迟压降至500ms内

稳定性增强

  • 资源管控:JDBC连接池动态扩容(max_size=20)+ CDC限流策略(rows_per_second=1000),源库CPU峰值负载下降40%

  • 容错机制:Doris两阶段提交(enable-2pc=true)配合检查点(checkpoint.interval=10s),故障恢复时间缩短80%

写入优化

  • 缓冲区三级联控(buffer-size=10000+buffer-count=3+flush.interval=5s)提升Doris写入批次质量

  • Tablet粒度控制(request_tablet_size=5)使BE节点负载均衡度提升

实战演示

同步之前创建Doris表

在这里插入图片描述

-- DROP TABLE IF EXISTS ods.ods_activity_info_full;
CREATE TABLE ods.ods_activity_info_full
(
    `id`            VARCHAR(255) COMMENT '活动id',
    `k1`            DATE NOT NULL   COMMENT '分区字段',
    `activity_name` STRING COMMENT '活动名称',
    `activity_type` STRING COMMENT '活动类型',
    `activity_desc` STRING COMMENT '活动描述',
    `start_time`    STRING COMMENT '开始时间',
    `end_time`      STRING COMMENT '结束时间',
    `create_time`   STRING COMMENT '创建时间'
)
    ENGINE=OLAP  -- 使用Doris的OLAP引擎,适用于高并发分析场景
    UNIQUE KEY(`id`,`k1`)  -- 唯一键约束,保证(id, k1)组合的唯一性(Doris聚合模型特性)
COMMENT '活动信息全量表'
PARTITION BY RANGE(`k1`) ()  -- 按日期范围分区(具体分区规则由动态分区配置决定)
DISTRIBUTED BY HASH(`id`)  -- 按id哈希分桶,保证相同id的数据分布在同一节点
PROPERTIES
(
    "replication_allocation" = "tag.location.default: 1",  -- 副本分配策略:默认标签分配1个副本
    "is_being_synced" = "false",          -- 是否处于同步状态(通常保持false)
    "storage_format" = "V2",             -- 存储格式版本(V2支持更高效压缩和索引)
    "light_schema_change" = "true",      -- 启用轻量级schema变更(仅修改元数据,无需数据重写)
    "disable_auto_compaction" = "false", -- 启用自动压缩(合并小文件提升查询性能)
    "enable_single_replica_compaction" = "false", -- 禁用单副本压缩(多副本时保持数据一致性)

    "dynamic_partition.enable" = "true",            -- 启用动态分区
    "dynamic_partition.time_unit" = "DAY",          -- 按天创建分区
    "dynamic_partition.start" = "-60",             -- 保留最近60天的历史分区
    "dynamic_partition.end" = "3",                 -- 预先创建未来3天的分区
    "dynamic_partition.prefix" = "p",              -- 分区名前缀(如p20240101)
    "dynamic_partition.buckets" = "32",            -- 每个分区的分桶数(影响并行度)
    "dynamic_partition.create_history_partition" = "true", -- 自动创建缺失的历史分区

    "bloom_filter_columns" = "id,activity_name",  -- 为高频过滤字段(id/名称)创建布隆过滤器,加速WHERE查询
    "compaction_policy" = "time_series",          -- 按时间序合并策略优化时序数据(适合活动时间字段)
    "enable_unique_key_merge_on_write" = "true",  -- 唯一键写时合并(实时更新场景减少读放大)
    "in_memory" = "false"                        -- 关闭全内存存储(仅小表可开启)
);
配置SeaTunnel JDBC同步脚本

在这里插入图片描述

env {
  # 环境配置
  parallelism = 8                     # 增加并行度以提高吞吐量
  job.mode = "STREAMING"              # 使用流式处理模式进行实时同步
  checkpoint.interval = 10000         # 检查点间隔,单位毫秒

  # 限流配置 - 避免对源数据库造成过大压力
  read_limit.bytes_per_second = 10000000  # 每秒读取字节数限制,约10MB/s
  read_limit.rows_per_second = 1000       # 每秒读取行数限制

  # 本地检查点配置
  execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"
  execution.checkpoint.max-concurrent = 1  # 最大并发检查点数

  # 性能优化参数
  execution.buffer-timeout = 5000          # 缓冲超时时间(毫秒)
  execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}

source {
  MySQL-CDC {
    # 基本连接配置
    # server-id = 5652-5657             # MySQL复制客户端的唯一ID范围
    username = "root"                # 数据库用户名
    password = ""                # 数据库密码
    table-names = ["gmall.activity_info"]  # 要同步的表
    base-url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"

    # CDC 特有配置
    schema-changes.enabled = true     # 启用架构变更捕获
    server-time-zone = "Asia/Shanghai"  # 服务器时区

    # 性能优化配置
    snapshot.mode = "initial"         # 初始快照模式
    snapshot.fetch.size = 10000       # 快照获取大小
    chunk.size.rows = 8096            # 分块大小,用于并行快照
    connection.pool.size = 10         # 连接池大小

    # 高级配置
    include.schema.changes = true     # 包含架构变更事件
    scan.startup.mode = "initial"     # 启动模式:initial(全量+增量)
    scan.incremental.snapshot.chunk.size = 8096  # 增量快照分块大小
    debezium.min.row.count.to.stream.results = 1000  # 流式结果的最小行数

    # 容错配置
    connect.timeout = 30000           # 连接超时时间(毫秒)
    connect.max-retries = 3           # 最大重试次数

    # 输出表名
    result_table_name = "mysql_cdc_source"
  }
}

# 可选的转换逻辑,如果需要对数据进行处理
transform {
  Sql {
    source_table_name = "mysql_cdc_source"
    result_table_name = "doris_sink_data"

    # 根据需要转换字段,这里添加了一个分区字段k1
    query = """
      select
        id,
        formatdatetime(create_time,'yyyy-MM-dd') as k1,
        activity_name,
        activity_type,
        activity_desc,
        start_time,
        end_time,
        create_time
      from mysql_cdc_source
    """
  }
}

sink {
  Doris {
    # 基本连接配置
    source_table_name = "doris_sink_data"  # 或直接使用 "mysql_cdc_source"
    fenodes = "192.168.241.128:8030"
    username = "root"
    password = ""
    table.identifier = "ods.ods_activity_info_full"  # Doris目标表

    # 事务和标签配置
    sink.enable-2pc = "true"          # 启用两阶段提交,确保一致性
    sink.label-prefix = "cdc_sync"    # 导入标签前缀

    # 写入模式配置
    sink.properties {
      format = "json"
      read_json_by_line = "true"
      column_separator = "\t"         # 列分隔符
      line_delimiter = "\n"           # 行分隔符
      max_filter_ratio = "0.1"        # 允许的最大错误率

      # CDC特有配置 - 处理不同操作类型
      # 使用Doris的UPSERT模式处理CDC事件
      merge_type = "MERGE"            # 合并类型:APPEND或MERGE
      delete_enable = "true"          # 启用删除操作
    }

    # 性能优化配置
    sink.buffer-size = 10000          # 缓冲区大小
    sink.buffer-count = 3             # 缓冲区数量
    sink.flush.interval-ms = 5000     # 刷新间隔
    sink.max-retries = 3              # 最大重试次数
    sink.parallelism = 8              # 写入并行度

    # Doris连接优化
    doris.config = {
      format = "json"
      read_json_by_line = "true"
      request_connect_timeout_ms = "5000"  # 连接超时
      request_timeout_ms = "30000"         # 请求超时
      request_tablet_size = "5"            # 每个请求的tablet数量
    }
  }
}
配置SeaTunnel MySQLCDC 同步脚本

在这里插入图片描述

env {
  parallelism = 8
  job.mode = "BATCH"
  checkpoint.interval = 30000

  # 本地文件系统检查点
  execution.checkpoint.data-uri = "file:///opt/seatunnel/checkpoints"
  execution.buffer-timeout = 5000

  # JVM 参数优化
  execution.jvm-options = "-Xms4g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=100"
}

source {
  Jdbc {
    result_table_name = "mysql_seatunnel"
    url = "jdbc:mysql://192.168.241.128:3306/gmall?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true"
    driver = "com.mysql.cj.jdbc.Driver"
    connection_check_timeout_sec = 30
    user = "gmall"
    password = "gmall"

    # 使用分区并行读取
    query = "select id, activity_name, activity_type, activity_desc, start_time, end_time, create_time from gmall.activity_info"
    partition_column = "id"
    partition_num = 8

    # 连接池配置
    connection_pool {
      max_size = 20
      min_idle = 5
      max_idle_ms = 60000
    }

    # 批处理配置
    fetch_size = 10000
    batch_size = 5000
    is_exactly_once = true
  }
}

transform {
  Sql {
    source_table_name = "mysql_seatunnel"
    result_table_name = "seatunnel_doris"

    query = """
      select 
        id, 
        formatdatetime(create_time,'yyyy-MM-dd') as k1,  
        activity_name, 
        activity_type, 
        activity_desc, 
        start_time, 
        end_time, 
        create_time 
      from mysql_seatunnel
    """
  }
}

sink {
  Doris {
    source_table_name = "seatunnel_doris"
    fenodes = "192.168.241.128:8030"
    username = "root"
    password = ""
    table.identifier = "ods.ods_activity_info_full"
    sink.enable-2pc = "true"
    sink.label-prefix = "test_json"

    # 优化Doris写入配置
    sink.properties {
      format = "json"
      read_json_by_line = "true"
      column_separator = "\t"
      line_delimiter = "\n"
      max_filter_ratio = "0.1"
    }

    # 批量写入配置
    sink.buffer-size = 10000
    sink.buffer-count = 3
    sink.flush.interval-ms = 5000
    sink.max-retries = 3
    sink.parallelism = 8

    doris.config = {
      format = "json"
      read_json_by_line = "true"
      request_connect_timeout_ms = "5000"
      request_timeout_ms = "30000"
      request_tablet_size = "5"
    }
  }
}

最终Apache Doris数据:
在这里插入图片描述

本文完!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2319786.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

学习记录-cssjs-综合复习案例(二)

目录 商城复合案例功能实现(二)商城首页实现步骤1.准备工作2. 搭建html框架3. 编写js代码 完整实例代码完整项目心得 商城复合案例功能实现(二) 使用html,css,基于bootstrap框架以及媒体查询搭建响应式布局…

图解AUTOSAR_CP_EEPROM_Abstraction

AUTOSAR EEPROM抽象模块详细说明 基于AUTOSAR标准的EEPROM抽象层技术解析 目录 1. 概述 1.1 核心功能1.2 模块地位2. 架构概览 2.1 架构层次2.2 模块交互3. 配置结构 3.1 主要配置容器3.2 关键配置参数4. 状态管理 4.1 基本状态4.2 状态转换5. 接口设计 5.1 主要接口分类5.2 接…

汇川EASY系列之以太网通讯(MODBUS_TCP做从站)

汇川easy系列PLC做MODBUS_TCP从站,不需要任何操作,但是有一些需要知道的东西。具体如下: 1、汇川easy系列PLC做MODBUS_TCP从站,,ModbusTCP服务器默认开启,无需设置通信协议(即不需要配置),端口号为“502”。ModbusTCP从站最多支持31个ModbusTCP客户端(ModbusTCP主站…

QT 图表(拆线图,栏状图,饼状图 ,动态图表)

效果 折线图 // 创建折线数据系列// 创建折线系列QLineSeries *series new QLineSeries;// series->append(0, 6);// series->append(2, 4);// series->append(3, 8);// 创建图表并添加系列QChart *chart new QChart;chart->addSeries(series);chart->setTit…

基于vue框架的在线影院系统a079l(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。

系统程序文件列表 项目功能:用户,电影,电影类别,电影库 开题报告内容 基于Vue框架的在线影院系统开题报告 一、研究背景与意义 随着文化娱乐产业的蓬勃发展,电影院作为人们休闲消遣的重要场所,其管理效率和服务质量直接影响着顾客的观影体…

OpenCV图像拼接(1)概述

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 此图说明了在Stitcher类中实现的拼接模块流程。使用该类,可以配置/移除某些步骤,即根据特定需求调整拼接流程。流程中的所…

基于ssm学科竞赛小程序的设计及实现(源码+lw+部署文档+讲解),源码可白嫖!

摘要 随着信息时代的来临,过去的学科竞赛管理方式的缺点逐渐暴露,本次对过去的学科竞赛管理方式的缺点进行分析,采取计算机方式构建学科竞赛小程序。本文通过阅读相关文献,研究国内外相关技术,提出了一种关于竞赛信息…

[特殊字符][特殊字符][特殊字符][特殊字符][特殊字符][特殊字符]壁紙 流光染墨,碎影入梦

#Cosplay #🧚‍♀️Bangni邦尼🐰. #📷 穹妹 Set.01 #后期圈小程序 琼枝低垂,霜花浸透夜色,风起时,微光轻拂檐角,洒落一地星辉。远山隐于烟岚,唯余一抹青黛,勾勒出天光水…

虚拟机的三种 Linux 网络配置原理图解读

前言 虚拟机的网络连接方式主要有 三种模式:桥接模式(Bridged)、NAT 模式(Network Address Translation)、仅主机模式(Host-Only)。每种模式都有不同的使用场景和网络适应性,具体解释…

AI Agent系列(七) -思维链(Chain of Thought,CoT)

AI Agent系列【七】 前言一、CoT技术详解1.1 CoT组成1.2 CoT的特点 二、CoT的作用三、CoT的好处四、CoT适用场景五、CoT的推理结构 前言 思维链(Chain of Thought,CoT),思维链就是一系列中间的推理步骤(a series of intermediate reasoning steps),通过…

SpringBoot实现异步调用的方法

在Java中使用Spring Boot实现异步请求和异步调用是一个常见的需求,可以提高应用程序的性能和响应能力。以下是实现这两种异步操作的基本方法: 一、异步请求(Asynchronous Request) 异步请求允许客户端发送请求后立即返回&#x…

PurpleLlama大模型安全全套检测方案

1. 引入 PurpleLlama是Meta的大模型安全整体解决方案(参考1),它包括了 (1)安全评估 CyberSecEval是一个用于评估大型语言模型(LLMs)安全风险的基准套件,其目标是解决随着 LLMs 的广…

vue el-table 设置selection选中状态

toggleRowSelection 方法 vue el-table 设置selection选中状态 关键代码 multipleTableRef.value!.toggleRowSelection(item, true);<el-table:data"data":border"setBorder"v-bind"$attrs"row-key"id"stripestyle"width: 1…

STM32学习笔记之常用总线(原理篇)

&#x1f4e2;&#xff1a;如果你也对机器人、人工智能感兴趣&#xff0c;看来我们志同道合✨ &#x1f4e2;&#xff1a;不妨浏览一下我的博客主页【https://blog.csdn.net/weixin_51244852】 &#x1f4e2;&#xff1a;文章若有幸对你有帮助&#xff0c;可点赞 &#x1f44d;…

【数据结构】栈(Stack)、队列(Queue)、双端队列(Deque) —— 有码有图有真相

目录 栈和队列 1. 栈&#xff08;Stack&#xff09; 1.1 概念 1.2 栈的使用&#xff08;原始方法&#xff09; 1.3 栈的模拟实现 【小结】 2. 栈的应用场景 1、改变元素的序列 2、将递归转化为循环 3、逆波兰表达式求值 4、括号匹配 5、出栈入栈次序匹配 6、最小栈…

OpenCV中的矩阵操作

OpenCV中的矩阵操作主要围绕Mat类展开&#xff0c;涵盖创建、访问、运算及变换等。 1. 创建矩阵 ‌零矩阵/单位矩阵‌&#xff1a; Mat zeros Mat::zeros(3, 3, CV_32F); // 3x3浮点零矩阵 Mat eye Mat::eye(3, 3, CV_32F); // 3x3单位矩阵 自定义初始化‌&#xff1a…

OAK相机入门(一):深度测距原理

文章目录 1. 测距参数介绍2. 测距原理3. 总结 官方文档 Configuring Stereo Depth 1. 测距参数介绍 理论范围&#xff1a;0.2-35m 推荐范文&#xff1a;不低于0.5m 存储类型&#xff1a;uint16&#xff0c;0代表没有数据&#xff0c;或者测不到 2. 测距原理 通过视差进行测距…

Powershell WSL .wslconfig 实现与宿主机的网络互通

前言.wslconfig .wslconfig 用于在 WSL 2 上运行的所有已安装发行版中配置全局设置 wsl 2 网络模式介绍 Bridged (外部): 桥接模式将虚拟机的网络连接直接桥接到物理网络适配器上Mirrored (镜像): 镜像模式并不是一个标准的 Hyper-V 网络类型,但它通常指的是在网络适配器级…

Vue:Vue2和Vue3创建项目的几种常用方式以及区别

前言 Vue.js 和 Element UI 都是用 JavaScript 编写的。 1、Vue.js 是一个渐进式 JavaScript 框架。2、Element UI 是基于 Vue.js 的组件库。3、JavaScript 是这两个项目的主要编程语言。 而Element Plus是基于TypeScript开发的。 一、Vue2 1、基于vuecli工具创建 vue2 …

IRF拆除

冗余口、冗余组、备份组、虚墙、MAD检测、被控制器纳管、转换为安全策略 黑洞路由的定义: 有来无回的路由。 对设备拆除IRF操作流程。 1、关闭主框的业务口&#xff08;对设备的接口使用shutdown&#xff09;&#xff0c;关闭MAD检测口&#xff08;BFD/NQA/MAD&#xff09;&…