使用 Flink CDC 实现 MySQL 数据实时入 Apache Doris

news2025/1/8 18:11:11

简介

主要内容如下:

  1. MySQL 安装和开启binog
  2. Flink环境准备
  3. Apache Doris 环境准备
  4. 启动Flink CDC作业

1. MySQL 安装和开启binog

参考文章:Ubuntu 安装 Mysql server, 这篇文章介绍了MySQL的安装,用户创建,Binlog开启等内容。

MySQL安装后之后创建表,插入测试数据

CREATE TABLE `test`.`products` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) NOT NULL,
  `description` varchar(512) DEFAULT NULL,
  `address` varchar(50) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=153;

INSERT INTO products
     VALUES (default,"scooter","Small 2-wheel scooter"),
            (default,"car battery","12V car battery"),
            (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
            (default,"hammer","12oz carpenter's hammer"),
            (default,"hammer","14oz carpenter's hammer"),
            (default,"hammer","16oz carpenter's hammer"),
            (default,"rocks","box of assorted rocks"),
            (default,"jacket","water resistent black wind breaker"),
            (default,"spare tire","24 inch spare tire");

2. Flink 环境准备

##  下载 flink 1.15.3 的二进制安装包
axel -n 20 https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz

## 解压flink 1.15.3 的二进制安装包
tar xvf flink-1.15.3-bin-scala_2.12.tgz

## 下载 flink sql connector mysql cdc jar包
axel -n 20 https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar

## 把flink-sql-connector-mysql-cdc-2.3.0.jar拷贝到flink的lib目录
cp flink-sql-connector-mysql-cdc-2.3.0.jar flink-1.15.3/lib


## 下载 flink doris connector jar包
axel -n 20  https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/flink-doris-connector-1.15-1.2.1.jar

## 把flink-doris-connector-1.15-1.2.1.jar拷贝到拷贝到flink的lib目录
cp flink-doris-connector-1.15-1.2.1.jar flink-1.15.3/lib

## 启动单机集群
cd flink-1.15.3
bin/start-cluster.sh

## 查看 jobmanager 和 taskmanager 的进程是否存活
jps -m 
## 正常情况会出现两个进程,如下:
27353 TaskManagerRunner --configDir /home/ubuntu/installer/flink-1.15.3/conf -D taskmanager.memory.network.min=67108864b -D taskmanager.cpu.cores=6.0 -D taskmanager.memory.task.off-heap.size=0b -D taskmanager.memory.jvm-metaspace.size=268435456b -D external-resources=none -D taskmanager.memory.jvm-overhead.min=201326592b -D taskmanager.memory.framework.off-heap.size=134217728b -D taskmanager.memory.network.max=67108864b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=241591914b -D taskmanager.memory.task.heap.size=26843542b -D taskmanager.numberOfTaskSlots=6 -D taskmanager.memory.jvm-overhead.max=201326592b
27086 StandaloneSessionClusterEntrypoint -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=469762048b -D jobmanager.memory.jvm-overhead.max=201326592b --configDir /home/ubuntu/installer/flink-1.15.3/conf --executionMode cluster

3. Apache Doris 环境准备

参考文章:Apache Doris 安装,
注意配置以下参数:

## FE节点
enable_http_server_v2 = true

## BE节点
enable_stream_load_record = true

Apache Doris环境准备后之后创建表:

CREATE TABLE IF NOT EXISTS test.products_doris
(  
     id INT,
     name varchar(20),
     description varchar(100)
)  
ENGINE=olap  
Unique Key(`id`)  
DISTRIBUTED BY HASH(`id`) BUCKETS 2
PROPERTIES (  
 "replication_num" = "1"); 
--使用唯一模型,以支持Flink CDC update/delete操作
--只有一个BE节点,因此replication_num设置为1

4. 启动Flink SQL作业

使用Flink SQL命令行客户端bin/sql-client.sh执行以下SQL:

--设置checkpoint间隔
SET execution.checkpointing.interval = 3s;

-- source表
CREATE TABLE products_source (
     id INT,
     name STRING,
     description STRING,
     PRIMARY KEY (id) NOT ENFORCED
   ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = '192.168.56.104',
     'port' = '3306',
     'username' = 'test',
     'password' = 'test',
     'database-name' = 'test',
     'table-name' = 'products'
   );
   
-- sink表
CREATE TABLE products_flink_doris_sink (
    id INT,
    name STRING,
    description STRING
    ) 
    WITH (
      'connector' = 'doris',
      'fenodes' = '192.168.56.104:8030',
      'table.identifier' = 'test.products_doris',
      'username' = 'test',
      'password' = 'password123',
      'sink.label-prefix' = 'doris_label'
);

--执行insert语句
INSERT INTO products_flink_doris_sink select id,name,description from products_source

Flink SQL提交完成后,查询Doris客户端可以看到数据过来了在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/146518.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux基础入门和常用命令

Linux基础入门和常用命令一. Linux介绍1.1 Linux的发行版本二. Linux环境搭建三. Linux的常用指令3.1 Linux下的目录结构3.2 ls命令3.3 pwd命令3.4 cd指令3.5 touch指令3.6 mkdir指令3.7 rmdir指令和 rm 指令3.8 man指令3.9 mv指令3.10 cp指令3.11 cat3.12 more指令3.13 less指…

基于机器学习组合模型的个人信用评估

《基于机器学习组合模型的个人信用评估》课程报告 摘要 个人信用评估在信用经济市场发挥着及其重要的基础作用,促进信用经济的发展,稳定经济市场。个人信用信息主要有个人基本信息、还款能力和还款意愿;个人基本信息主要由年龄、性别、地区等特征构成&…

C语言灵魂核心——指针深度修炼

🐒个人主页:平凡的小苏📚学习格言:别人可以拷贝我的模式,但不能拷贝我不断往前的激情目录 1. 字符指针 2. 指针数组 3. 数组指针 3.1 数组指针的定义 3.2 &数组名VS数组名 3.3 数组指针的使用 4. 数组参数、…

【读论文】TCPMFNet

【读论文】TCPMFNet简单介绍网络结构编码器图像融合网络Vision Transformer特征融合网络网格连接解码器损失函数总结参考论文:https://www.sciencedirect.com/science/article/pii/S1350449522003863 如有侵权请联系博主 简单介绍 今天要介绍的是TCPMFNet&#xf…

六大排序算法

1. 插入排序步骤:1.从第一个元素开始,该元素可以认为已经被排序2.取下一个元素tem,从已排序的元素序列从后往前扫描3.如果该元素大于tem,则将该元素移到下一位4.重复步骤3,直到找到已排序元素中小于等于tem的元素5.tem…

如何使用 Pandas 清洗二手房数据并存储文件

目录 一、实战场景 二、知识点 python 基础语法 python 文件读写 pandas 数据清洗 三、菜鸟实战 清洗前的文件 读取源文件 对二手房数据进行清洗 清洗完成后保存到文件 运行结果 运行截图 结果文件 一、实战场景 如何使用 Pandas 清洗的二手房数据并存储文件 二…

初识结构体(详细版)

目录 一、结构体的声明 1、结构的基础知识 2、结构的声明 3、结构成员的类型 4、结构体变量的定义和初始化 二、结构体成员的访问 1、点操作符访问 2、->操作符访问 3、解引用访问 三、结构体嵌套 四、结构体传参 1、传值调用 2、传址调用 一、结构体的声明 1、结构的基…

Vue2前端路由(vue-router的使用)

一、vue2axiosExpressMySQL实现前后端交互1、后台:(1)确定MySQL的表格:明确数据库 (mvc) —- 数据表(ssm_book)(2)创建Express项目:mysql2、cors、Sequelize(ORM)、nodem…

imx6ull Linux sdk下载验证

本文章是基于整点原子的imx6ull alpha开发板一.Linux SDK源码以及image1.环境准备其他的工具我们就不做介绍了,比如ubuntu ftp,ssh等等,我们主要来介绍下编译链1.1 交叉编译链背景:因为在原子的教程中有强调最新的Linaro gcc编译完uboot后无法…

备受认可!中睿天下荣登“2022创业邦100未来独角兽”年度榜单

近日,由创业邦、复旦大学管理学院主办的2022创业邦100未来独角兽峰会暨创业邦年会在上海举办。在峰会现场,2022创业邦100未来独角兽榜单正式揭晓,中睿天下凭借出众的综合实力荣登榜单。作为一家以“实战对抗”为特点的能力价值型网络安全厂商…

Java版设计模式/设计模式的作用是什么/类之间有哪些关系?又怎么表示

继续整理记录这段时间来的收获,详细代码可在我的Gitee仓库SpringBoot克隆下载学习使用! 1. 设计模式概述 1.1 设计模式创始“4人组” ErichGamma–艾瑞克伽马Richard Helm—理查德赫尔码Ralph Johnson----拉尔夫约翰逊John Vlissides—约翰威力斯蒂斯…

【Linux】Linux编译器-gcc/g++的使用和程序执行的基础底层原理

Linux编译器1.gcc/g 的使用2. 程序的基本翻译过程3.预处理3.1验证预处理的功能(gcc -E)4.编译(变成汇编语言)4.1验证编译过程(gcc -S)5.汇编(生成机器可识别代码)5.1验证汇编过程&am…

K_A11_002 基于STM32等单片机驱动DS18B20串口与OLED0.96双显示

K_A11_002 基于STM32等单片机驱动DS18B20 串口与OLED0.96双显示一、资源说明二、基本参数1.参数2.引脚说明三、驱动说明时序对应程序:四、部分代码说明1、接线说明1.1、STC89C52RCDHT11模块1.2、STM32F103C8T6DHT11模块五、基础知识学习与相关资料下载六、视频效果展示与程序资…

内核解读之内存管理(4)内存管理三级架构之page

我们前面介绍了linux内存管理的三级架构,node->zone->page。本节就来介绍page。 页是内核管理内存的基本单位,体系结构不同,支持的页大小也不尽相同,还有些体系结构甚至支持几种不同的页大小。大多数32位体系结构支持4KB的页…

【Python】爬取弹幕并保存到Excel中

hello,我是李华同学,最近开始学习爬虫,下面是我实现的一个得到弹幕的代码😏找一个URL👉想要得到一个网站的内容,首先要找到你想要内容的具体位置,首先你先找到一个有弹幕的地方,找到…

linux系统重装yum工具与python环境

文章目录前言一、强制删除原有环境1.删除python2.删除yum二、安装新环境前置准备三、下载依赖命令1.内核版本7.6.1810下载python依赖包下载yum包2.内核版本7.4.1708下载python依赖包下载yum包3.内核版本7.8.2003下载python依赖包下载yum依赖包4.下载结果四、安装前言 安装之前…

swagger入门

目录 1.前后端分离的特点 2.在没有swagger之前 3.swagger的作用 4.swagger的优点 5.集成swagger 5.1 新建springboot项目 5.2 集成swagger 5.3 开发一个controller用于测试 5.4 启动服务,验证集成效果 6.swagger常用注解 7.swagger使用综合案例 8.会议OA之sw…

第三章 Flink DataStream API

Flink 系列教程传送门 第一章 Flink 简介 第二章 Flink 环境部署 第三章 Flink DataStream API 第四章 Flink 窗口和水位线 第五章 Flink Table API&SQL 第六章 新闻热搜实时分析系统 一、DataStream API是什么? Flink 中的 DataStream 程序是对数据流&a…

Android 深入系统完全讲解(2)

1 系统启动过程、嵌入式系统启动过程 这是我之前画的启动过程的图,这个主要就是给大家讲明白,启动过程整个的流程。 第一个阶段,bootloader 系统在上电的时候,系统会从固定的地方加载一段代码进入内部 ram 进行运行。这段代码 通…

【数学思维】Quasi-convex and quesi-concave

【数学思维】Quasi-convex and quesi-concaveConvex function 定义如下 f(λx(1−λ)y)≤λf(x)(1−λ)f(y)f(\lambda x(1-\lambda)y)\le \lambda f(x)(1-\lambda)f(y) f(λx(1−λ)y)≤λf(x)(1−λ)f(y)Quasi-convex function 定义如下 f(λx(1−λ)y)≤max⁡{f(x),f(y)}f(\l…