flink-cdc同步数据到doris中

news2025/2/23 22:04:18

1 创建数据库和表

1.1 数据库脚本

这样直接创建数据库是有问题,因为后面发现superset连接使用doris://root:123456@10.101.12.82:9030/internal.eayc?charset=utf8mb4

-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

1

2 数据同步

2.1 flnk-cdc

参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述

2.1.1 最简单的单表同步

从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。

#Mysql的参数配置
source:
  type: mysql
  hostname: 10.101.10.11
  port: 3306
  username: flink
  password: 123456
  tables: eayc.eayc_user
  server-id: 5400
  # server-time-zone: UTC
#Doris的参数配置
sink:
  type: doris
  fenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030
  username: root
  password: 123456
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: eayc.eayc_user
    sink-table: ods_eayc.eayc_user
pipeline:
  name: eayc to doris
  parallelism: 1

注意连接mysql的server-id的要唯一,否则提示下面的错误

A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.

进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。
1

1
此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。
1

2.1.2 多表同步

如下配置

source:
	tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:
  - source-table: eayc.eayc_user
    sink-table: ods_eayc.eayc_user
  - source-table: eayc.eayc_company
    sink-table: ods_eayc.eayc_company
  - source-table: eayc.eayc_company_user
    sink-table: ods_eayc.eayc_company_user

下面这种方式不支持,会报下面的错误:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

1

2.1.3 分表导入

taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。

2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
	at 

如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。

Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.

doris权限问题,这个是FE集群有问题,更改过来就好了。

reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}

可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。

source:
	tables: acc.\.*
route:
  - source-table: acc.acc_account_balance_\.*
    sink-table: acc.acc_account_balance
  - source-table: acc.acc_account_subject_\.*
    sink-table: acc.acc_account_subject
  - source-table: acc.acc_initial_balance_\.*
    sink-table: acc.acc_initial_balance
  - source-table: acc.acc_voucher_\.*
    sink-table: acc.acc_voucher
  - source-table: acc.acc_voucher_entry_\.*
    sink-table: acc.acc_voucher_entry    

于是将parallelism: 4,很快后台又抛异常。

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

于是调整

taskmanager.memory.process.size: 8192m  # 增加 TaskManager 的内存

Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路

2.2 flink安装

2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml

flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录
1

2.2.2 监控

1

1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2304098.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Git命令行入门

诸神缄默不语-个人CSDN博文目录 之前写过一篇VSCode Git的博文:VSCode上的Git使用手记(持续更新ing…) 现在随着开发经历增加,感觉用到命令行之类复杂功能的机会越来越多了,所以我专门再写一篇Git命令行的文章。 G…

DeepSeek R1/V3满血版——在线体验与API调用

前言:在人工智能的大模型发展进程中,每一次新模型的亮相都宛如一颗投入湖面的石子,激起层层波澜。如今,DeepSeek R1/V3 满血版强势登场,为大模型应用领域带来了全新的活力与变革。 本文不但介绍在线体验 DeepSeek R1/…

关于 BK3633 上电时受串口 UART2 影响而无法启动的问题说明

1. 问题描述 BK3633 SDK 版本:BK3633_DesignKit_V06_2310 使用 BK3633 UART2 与指纹模块进行通讯,为了降低功耗,通过 GPIO 控制了指纹模块的供电电源。但每次给整个系统板子上电时,BK3633 很大概率会实际而无法正常运行程序&…

Redis7——基础篇(六)

前言:此篇文章系本人学习过程中记录下来的笔记,里面难免会有不少欠缺的地方,诚心期待大家多多给予指教。 基础篇: Redis(一)Redis(二)Redis(三)Redis&#x…

使用AI创建流程图和图表的 3 种简单方法

你可能已经尝试过使用 LLMs 生成图像,但你有没有想过用它们来创建 流程图和图表?这些可视化工具对于展示流程、工作流和系统架构至关重要。 通常,在在线工具上手动绘制图表可能会耗费大量时间。但你知道吗?你可以使用 LLMs 通过简…

机器学习实战(7):聚类算法——发现数据中的隐藏模式

第7集:聚类算法——发现数据中的隐藏模式 在机器学习中,聚类(Clustering) 是一种无监督学习方法,用于发现数据中的隐藏模式或分组。与分类任务不同,聚类不需要标签,而是根据数据的相似性将其划…

企业级RAG开源项目分享:Quivr、MaxKB、Dify、FastGPT、RagFlow

企业级 RAG GitHub 开源项目深度分享:Quivr、MaxKB、Dify、FastGPT、RagFlow 及私有化 LLM 部署建议 随着生成式 AI 技术的成熟,检索增强生成(RAG)已成为企业构建智能应用的关键技术。RAG 技术能够有效地将大型语言模型&#xff…

open webui 部署 以及解决,首屏加载缓慢,nginx反向代理访问404,WebSocket后端服务器链接失败等问题

项目地址:GitHub - open-webui/open-webui: User-friendly AI Interface (Supports Ollama, OpenAI API, ...) 选择了docker部署 如果 Ollama 在您的计算机上,请使用以下命令 docker run -d -p 3000:8080 --add-hosthost.docker.internal:host-gatewa…

内容中台架构下智能推荐系统的算法优化与分发策略

内容概要 在数字化内容生态中,智能推荐系统作为内容中台的核心引擎,承担着用户需求与内容资源精准匹配的关键任务。其算法架构的优化路径围绕动态特征建模与多模态数据融合展开,通过深度强化学习技术实现用户行为特征的实时捕捉与动态更新&a…

最新版IDEA下载安装教程

一、下载IDEA 点击前往官网下载 或者去网盘下载 点击前往百度网盘下载 点击前往夸克网盘下载 进去后点击IDEA 然后点击Download 选择自己电脑对应的系统 点击下载 等待下载即可 二、安装IDEA 下载好后双击应用程序 点击下一步 选择好安装目录后点击下一步 勾选这两项后点击…

DeepSeek最新开源动态:核心技术公布

2月21日午间,DeepSeek在社交平台X发文称,从下周开始,他们将开源5个代码库,以完全透明的方式与全球开发者社区分享他们的研究进展。并将这一计划定义为“Open Source Week”。 DeepSeek表示,即将开源的代码库是他们在线…

【R语言】绘图

一、散点图 散点图也叫X-Y图,它将所有的数据以点的形式展现在坐标系上,用来显示变量之间的相互影响程度。 ggplot2包中用来绘制散点图的函数是geom_point(),但在绘制前需要先用ggplot()函数指定数据集和变量。 下面用mtcars数据集做演示&a…

Linux基本指令(三)+ 权限

文章目录 基本指令grep打包和压缩zip/unzipLinux和windows压缩包互传tar(重要)Linux和Linux压缩包互传 bcuname -r常用的热键关机外壳程序 知识点打包和压缩 Linux中的权限用户权限 基本指令 grep 1. grep可以过滤文本行 done用于标记循环的结束&#x…

容器化部署tomcat

容器化部署tomcat 需求在docker容器中部署tomcat,并通过外部机器访问tomcat部署的项目 容器化部署要先装好docker容器(docker安装配置) 实现步骤: 拉取tomcat docker pull tomcat用于列出本地Docker主机上存储的所有镜像 docker images在root目录里面创建tomc…

vscode软件中引入vant组件

一、vant简介 Vant 是一个轻量、可靠的移动端组件库,于 2017 年开源。 目前 Vant 官方提供了 Vue 2 版本、Vue 3 版本和微信小程序版本,并由社区团队维护 React 版本和支付宝小程序版本。 官网:介绍 - Vant Weapp 里面的快速上手的教程&a…

DeepSeek vs ChatGPT:AI 领域的华山论剑,谁主沉浮?

一、引言 在当今科技飞速发展的时代,人工智能(AI)已然成为推动各领域变革的核心力量。而在人工智能的众多分支中,自然语言处理(NLP)因其与人类日常交流和信息处理的紧密联系,成为了最受瞩目的领…

Ubuntu 22.04 Install deepseek

前言 deepseekAI助手。它具有聊天机器人功能,可以与用户进行自然语言交互,回答问题、提供建议和帮助解决问题。DeepSeek 的特点包括: 强大的语言理解能力:能够理解和生成自然语言,与用户进行流畅的对话。多领域知识&…

如何将公钥正确添加到服务器的 authorized_keys 文件中以实现免密码 SSH 登录

1. 下载密钥文件 2. RSA 解析 将 id_ed25519 类型的私钥转换为 RSA 类型,要将 ED25519 私钥转换为 RSA 私钥,需要重新生成一个新的 RSA 密钥对。 步骤: 生成新的 RSA 密钥对 使用 ssh-keygen 来生成一个新的 RSA 密钥对。比如,执…

光明谷推出AT指令版本的蓝牙音箱SOC 开启便捷智能音频开发新体验

前言 在蓝牙音箱市场竞争日益激烈的当下,开发一款性能卓越且易于上手的蓝牙音箱,成为众多厂商追求的目标。而光明谷科技有限公司推出的 AT 指令版本的蓝牙音箱 SOC,无疑为行业带来了全新的解决方案,以其诸多独特卖点,迅…

TIP: Flex-DLD

Article: Flex-DLD: Deep Low-Rank Decomposition Model With Flexible Priors for Hyperspectral Image Denoising and Restoration, 2024 TIP. 文章的主要思想是用network来学low-rank decomposition的两个matrix(input是random input). 文章的framew…