最新版Flink CDC MySQL同步Elasticsearch(一)

news2025/1/10 3:04:46

1.环境准备

首先我们要基于Flink CDC MySQL同步MySQL的环境基础上(flink-1.17.1、Java8、MySQL8)搭建Elasticsearch7-17-10和Kibana 7.17.10。笔者已经搭建好环境,这里不做具体演示了,如果需要Es的搭建教程情况笔者其他博客

注意: 建议生产环境统一使用稳定版本Flink1.16.*。笔者这里只是作为教程编写采用当下最新版本,生产环境不推荐使用

2.编译flink-sql-connector-mysql-cdc

最新版本flink-1.17.1 mysql同步Es具体jar依赖版本如下所示:

注意:下载链接仅适用于稳定版本,SNAPSHOT依赖需要您自己构建。

flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

flink-sql-connector-mysql-cdc-2.5-SNAPSHOT.jar(需要自行进行构建编译,笔者构建的已经上次至次博客。需要可以进行下载,csdn需要积分下载,无法设置免费的,需要免费版可以直接联系笔者)

下载所需的JAR包并放在下面flink-1.17.1/lib/:

git clone https://github.com/ververica/flink-cdc-connectors.git
cd flink-cdc-connectors
mvn clean install -DskipTests

3.建立mysql和Es映射关系表

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。
在这里插入图片描述首先,每 3 秒启用一次检查点

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

编辑源数据库Flink Sql代码,如下所示:

CREATE TABLE products (
 id INT NOT NULL,
 name STRING,
 description STRING,
 PRIMARY KEY(id) NOT ENFORCED
) WITH (
 'connector' = 'mysql-cdc', #引入的CDC jar包驱动,没有引入会报错提示需要引入
 'hostname' = '192.168.50.163',#源数据库连接host地址,可以根据自己的具体设置,此处为笔者本机的
 'port' = '3306', #源数据库端口
 'username' = 'root',#源数据库账号
 'password' = '*****',#源数据库密码
 'database-name' = 'mydb',#源数据库
 'table-name' = 'products'#源数据库表
);

在Flink SQL 执行以下语句创建从相应数据库表捕获更改数据的表

-- Flink SQL
Flink SQL> CREATE TABLE products (
>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>     'connector' = 'mysql-cdc',
>     'hostname' = '192.168.50.163',
>     'port' = '3306',
>     'username' = 'root',
>     'password' = '****',
>     'database-name' = 'mydb',
>     'table-name' = 'products'
>   );

在es创建要同步的目标索引,具体语句如下:

PUT product1
{
  "settings": {
    "number_of_shards": 12,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "integer"
      },
      "name": {
        "type": "keyword"
      },
      "description": {
        "type": "text"
      }
    }
  }
}

编辑目标ES映射Flink Sql代码,如下所示:

   CREATE TABLE product1 (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'elasticsearch-7',#目标ES版本,最新目前支持7
     'hosts' = 'http://192.168.50.236:9200',#连接信息
     'index' = 'product1'#索引信息
 );

注意: 本文Es为测试版本没有配置账号密码,如果有账号密码配置即可 ‘username’ = ‘xxxx’,‘password’=‘xxxx’

建立目标索引与Flink SQL的映射关系,具体语句如下:

-- Flink SQL
 CREATE TABLE product1 (

>     id INT,
>     name STRING,
>     description STRING,
>     PRIMARY KEY (id) NOT ENFORCED
>   ) WITH (
>      'connector' = 'elasticsearch-7',#目标ES版本,最新目前支持7
>      'hosts' = 'http://192.168.50.236:9200',#连接信息
>      'index' = 'product1'#索引信息
>  );

使用Flink SQL添加mysql和Es映射表数据关联关系

-- Flink SQL
Flink SQL> insert into product1 select * from products;

4.时区问题处理

错误:
The MySQL server has a timezone offset (28800 seconds ahead of UTC) which does not match the configured timezone Etc/UTC. Specify the right server-time-zone to avoid inconsistencies for time-related fields.

解决思路:

  • Flink集群开启NTP服务器 时间同步
  • 把服务器时区改成和数据库一样的时间本文为(Asia/Shanghai)
  • 配置Flink sql的时区为Asia/Shanghai,具体命令如下所示:
Flink SQL> SET 'table.local-time-zone' = 'Asia/Shanghai';

注意:这是笔者遇到的问题,具体问题具体解决即可

5.具体实现结果

整体实现结果如下图所示:

Flink 运行任务

在这里插入图片描述

mysql 源数据表数据

在这里插入图片描述

Es目标索引已经数据查询图

在这里插入图片描述至此,笔者的Flink CDC MySQL同步Elasticsearch第一篇讲解完毕,希望能帮助到搭建

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/726788.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【案例教程】GPT模型支持下的Python-GEE遥感云大数据分析、管理与可视化技术及多领域案例实践实践技术

随着航空、航天、近地空间等多个遥感平台的不断发展,近年来遥感技术突飞猛进。由此,遥感数据的空间、时间、光谱分辨率不断提高,数据量也大幅增长,使其越来越具有大数据特征。对于相关研究而言,遥感大数据的出现为其提…

海格里斯HEGERLS智能四向穿梭车系统是如何赋能企业降本增效的?

随着人工智能和物联网等新技术的更新迭代,物流行业数字化,智能仓储已成趋势。我国智能仓储在“互联网”战略的带动下快速发展,与大数据、云计算等新一代互联网技术深度融合,智能仓储整个行业向着运行高效、便捷、低成本的方向迈进…

Vision Transformer推理中线性-角度注意转换压缩自注意

文章目录 Castling-ViT: Compressing Self-Attention via Switching Towards Linear-Angular Attention at Vision Transformer Inference摘要本文方法实验结果 Castling-ViT: Compressing Self-Attention via Switching Towards Linear-Angular Attention at Vision Transform…

Angular 调试工具(Augury)

目录 1、简介 2、检验代码 3、Angury 本地构建和安装 3.1 添加到Chrome 浏览器: 3.2 添加到Firefox浏览器 4、项目中对应的Npm脚本 5、Augury 三大主要功能 5.1 组件树(Component Tree) 5.1.1 Component Tree 5.2 路由树&#xff0…

HarmonyOS学习路之开发篇—数据管理(对象关系映射数据库)

HarmonyOS对象关系映射(Object Relational Mapping,ORM)数据库是一款基于SQLite的数据库框架,屏蔽了底层SQLite数据库的SQL操作,针对实体和关系提供了增删改查等一系列的面向对象接口。应用开发者不必再去编写复杂的SQ…

港联证券|如何区分大盘股和小盘股?

1、依据个股的市值来区别。一般来说,大盘股:流转市值在500亿及以上,小盘股:流转市值一般在50亿及以下,市值在二者之间的被称为中盘股。 2、依据流转股本区别。一般来说,大盘股:流转股本大于5亿&…

mysql重点复习

1.MySQL如何对用户smart授权访问,密码为123456。 2.授权用户tom可以在网络中的192.168.4.254主机登录,仅对对userdb库下的user表有查看记录、更新name字段的权限 , 登录密码userweb888。 GRANT SELECT,UPDATE(name) ON userdb.user TO tom192…

零拷贝小结

零拷贝(Zero-copy)是一种优化技术,用于减少数据传输过程中的拷贝操作,从而提高系统性能和效率。在传统的数据传输中,涉及多个缓冲区之间的数据拷贝操作(例如从磁盘到内存的拷贝、内存到网络缓冲区的拷贝等&…

gitlab ci/cd+harbor+k8s实现一键部署(python项目)

大致架构: gitlab变量 使用 kaniko 构建 Docker 镜像 .gitlab-ci.yml stages:- test- build- deployvariables:DOCKERFILE: "Dockerfile2"CONTAINER_IMAGE: "archeros/workspace/platform"GIT_SSL_NO_VERIFY: "true"before_script…

canvas.js、node-canvas的坑

一、依赖下载后半天没 install 完,最后还报错, \node_modules\canvas: Command failed. Exit code: 1 Command: node-pre-gyp install --fallback-to-build Arguments: 解决方法:官方: Installation: Windows Automattic/node-ca…

ArcGis如何通过Python进行插件开发?

文章目录 0.引言1.准备Python加载项工具2.创建一个加载项工具3.编写代码4.生成安装文件5.安装和调出加载项6.使用加载项 0.引言 ArcGIS 插件(Add-ins)可以让用户更加容易的自定义和扩展ArcGIS Desktop应用程序,它创建一系列自定义工具提供了一…

Linux基础_3

一、Linux安全模型 资源分派: Authentication: 认证:验证用户身份Authorization: 授权:不同的用户设置不同权限Accouting|Audition: 审计 当用户成功登录时,系统会自动分配令牌token,包括:用户标识…

从CPU缓存结构到原子操作

文章目录 一、CPU缓存结构1.1 CPU的多级缓存1.2 Cache Line 二、写回策略三、缓存一致性问题及解决方案3.1 缓存一致性问题3.2 解决方案3.2.1 总线嗅探3.2.2 事务的串行化3.2.3 MESI 四、原子操作4.1 什么是原子操作4.2 c 标准库的原子类型4.2.1 atomic<T\>4.2.2 is_lock…

软件安全测试流程与方法分享(上)

安全测试是在IT软件产品的生命周期中&#xff0c;特别是产品开发基本完成到发布阶段&#xff0c;对产品进行检验以验证产品符合安全需求定义和产品质量标准的过程。安全是软件产品的一个重要特性&#xff0c;安全测试也是软件测试重的一个重要类别&#xff0c;本系列文章我们与…

MySQL简单查询操作

系列文章目录 前言SELECT子句SELECT后面之间跟列名DISTINCT,ALL列表达式列更名 WHERE子句WHERE子句中可以使用的查询条件比较运算特殊比较运算符BETWEEN...AND...集合查询&#xff1a;IN模糊查询&#xff1a;LIKE空值比较&#xff1a;IS NULL 多重条件查询 ORDER BY子句排序复杂…

线性规划解的概念

一、线性规划的可行解 若x1,x2满足条件[1]-[4],则称向量为线性规划问题的一个可行解。 例如 其中x(1),x(2)为可行解&#xff0c;而x(3),x(4)不是可行解。 二、线性规划的可行域 所有可行解构成的集合称为该线性规划的可行域。 三、线性规划的最优解 使目标函数最大或最小的…

Git ① 通过git将本地两个项目进行合并

一、新建一个本地仓库 ① 新建一个文件夹&#xff0c;打开之后在命令行输入git init 初始化仓库。 git init ② 在新建的文件夹中随便创建一个文件&#xff08;这样才能新建新的分支&#xff0c;不然新建分支命令没有作用&#xff09; ③ 输入命令 git add . 和 git commit…

如何实现对视频录像文件的AI算法分析?

有用户提出需求&#xff0c;提供视频文件给平台&#xff0c;并进行AI算法分析。值得一提的是&#xff0c;我们的平台不仅仅可以基于AI算法&#xff0c;对设备实时传输的视频流进行分析&#xff0c;也能对视频回放录像文件进行智能分析。那么是如何实现的呢&#xff1f; EasyDSS…

Linux 共享内存

概念&#xff1a; 在Linux系统中&#xff0c;共享内存是一种用于进程间通信的机制&#xff0c;它允许多个进程共享同一块内存区域。 Linux 共享内存的作用和目的&#xff1a; Linux共享内存的主要目的是在不同的进程之间实现高效的数据交换和共享。它可以用于以下几个方面&…

在uniapp 小程序 vue中报 错 Cannot read property ‘substring‘ of undefined

是因 是因为对字符串使用substring的时候页面中的数据还没有加载 。 错误代码&#xff1a; 可以使用 v-if 修改为&#xff1a;