Flink CDC系列之:TiDB CDC 导入 Elasticsearch

news2025/1/13 10:04:55

Flink CDC系列之:TiDB CDC 导入 Elasticsearch

  • 一、通过docker 来启动 TiDB 集群
  • 二、下载 Flink 和所需要的依赖包
  • 三、在TiDB数据库中创建表和准备数据
  • 四、启动Flink 集群,再启动 SQL CLI
  • 五、在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 六、Kibana查看ElasticSearch数据
  • 七、在 TiDB增删改数据,观察 ElasticSearch 中的结果

一、通过docker 来启动 TiDB 集群

git clone https://github.com/pingcap/tidb-docker-compose.git

替换目录 tidb-docker-compose 里面的 docker-compose.yml 文件,内容如下所示:

version: "2.1"

services:
  pd:
    image: pingcap/pd:v5.3.1
    ports:
      - "2379:2379"
    volumes:
      - ./config/pd.toml:/pd.toml
      - ./logs:/logs
    command:
      - --client-urls=http://0.0.0.0:2379
      - --peer-urls=http://0.0.0.0:2380
      - --advertise-client-urls=http://pd:2379
      - --advertise-peer-urls=http://pd:2380
      - --initial-cluster=pd=http://pd:2380
      - --data-dir=/data/pd
      - --config=/pd.toml
      - --log-file=/logs/pd.log
    restart: on-failure

  tikv:
    image: pingcap/tikv:v5.3.1
    ports:
      - "20160:20160"
    volumes:
      - ./config/tikv.toml:/tikv.toml 
      - ./logs:/logs           
    command:
      - --addr=0.0.0.0:20160
      - --advertise-addr=tikv:20160
      - --data-dir=/data/tikv
      - --pd=pd:2379
      - --config=/tikv.toml
      - --log-file=/logs/tikv.log
    depends_on:
      - "pd"
    restart: on-failure

  tidb:
    image: pingcap/tidb:v5.3.1
    ports:
      - "4000:4000"
    volumes:
      - ./config/tidb.toml:/tidb.toml
      - ./logs:/logs
    command:
      - --store=tikv
      - --path=pd:2379
      - --config=/tidb.toml
      - --log-file=/logs/tidb.log
      - --advertise-address=tidb
    depends_on:
      - "tikv"
    restart: on-failure
    
  elasticsearch:
     image: elastic/elasticsearch:7.6.0
     container_name: elasticsearch
     environment:
       - cluster.name=docker-cluster
       - bootstrap.memory_lock=true
       - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
       - discovery.type=single-node
     ports:
       - "9200:9200"
       - "9300:9300"
     ulimits:
       memlock:
         soft: -1
         hard: -1
       nofile:
         soft: 65536
         hard: 65536
         
  kibana:
     image: elastic/kibana:7.6.0
     container_name: kibana
     ports:
       - "5601:5601"
     volumes:
       - /var/run/docker.sock:/var/run/docker.sock

该 Docker Compose 中包含的容器有:

  • TiDB 集群: tikv、pd、tidb。
  • Elasticsearch:orders 表将和 products 表进行 join,join 的结果写入 Elasticsearch 中。
  • Kibana:可视化 Elasticsearch 中的数据。

本机添加 host 映射 pd 和 tikv 映射 127.0.0.1。 在 docker-compose.yml 所在目录下运行如下命令以启动所有容器:

docker-compose up -d
mysql -h 127.0.0.1 -P 4000 -u root # Just test tidb cluster is ready,if you have install mysql local.

该命令会以 detached 模式自动启动 Docker Compose 配置中定义的所有容器。 你可以通过 docker ps 来观察上述的容器是否正常启动了。 也可以访问 http://localhost:5601/ 来查看 Kibana 是否运行正常。

另外可以通过如下命令停止并删除所有的容器:

docker-compose down

二、下载 Flink 和所需要的依赖包

下载 Flink 1.17.1 并将其解压至目录 flink-1.17.1

https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz

下载下面列出的依赖包,并将它们放到目录 flink-1.17.1/lib/ 下:

  • flink-connector-tidb-cdc-2.4.1.jar
  • flink-sql-connector-elasticsearch7-3.0.1-1.17.jar

三、在TiDB数据库中创建表和准备数据

创建数据库和表 products,orders,并插入数据:

-- TiDB
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
                         id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                         name VARCHAR(255) NOT NULL,
                         description VARCHAR(512)
) AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
      (default,"car battery","12V car battery"),
      (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
      (default,"hammer","12oz carpenter's hammer"),
      (default,"hammer","14oz carpenter's hammer"),
      (default,"hammer","16oz carpenter's hammer"),
      (default,"rocks","box of assorted rocks"),
      (default,"jacket","water resistent black wind breaker"),
      (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
                       order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
                       order_date DATETIME NOT NULL,
                       customer_name VARCHAR(255) NOT NULL,
                       price DECIMAL(10, 5) NOT NULL,
                       product_id INTEGER NOT NULL,
                       order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
      (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
      (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

四、启动Flink 集群,再启动 SQL CLI

使用下面的命令跳转至 Flink 目录下

cd flink-1.17.1

使用下面的命令启动 Flink 集群

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/ 访问到 Flink Web UI,如下所示:
在这里插入图片描述
使用下面的命令启动 Flink SQL CLI

./bin/sql-client.sh

启动成功后,可以看到如下的页面:

在这里插入图片描述

五、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = '127.0.0.1:2379',
    'database-name' = 'mydb',
    'table-name' = 'products'
  );

Flink SQL> CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(3),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
    'connector' = 'tidb-cdc',
    'tikv.grpc.timeout_in_ms' = '20000',
    'pd-addresses' = '127.0.0.1:2379',
    'database-name' = 'mydb',
    'table-name' = 'orders'
);

Flink SQL> CREATE TABLE enriched_orders (
   order_id INT,
   order_date DATE,
   customer_name STRING,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://localhost:9200',
     'index' = 'enriched_orders_1'
 );

将关联后的数据插入到ElasticSearch

Flink SQL> INSERT INTO enriched_orders
  SELECT o.order_id, o.order_date, o.customer_name, o.order_status, p.name, p.description
  FROM orders AS o
  LEFT JOIN products AS p ON o.product_id = p.id;

六、Kibana查看ElasticSearch数据

检查最终的结果是否写入 ElasticSearch 中,可以在 Kibana 看到 ElasticSearch 中的数据。

首先访问 http://localhost:5601/app/kibana#/management/kibana/index_pattern 创建 index pattern enriched_orders.

在这里插入图片描述
然后就可以在 http://localhost:5601/app/kibana#/discover 看到写入的数据了.
在这里插入图片描述

七、在 TiDB增删改数据,观察 ElasticSearch 中的结果

通过如下的 SQL 语句对 TiDB 数据库进行一些修改,然后就可以看到每执行一条 SQL 语句,Elasticsearch 中的数据都会实时更新。

INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

UPDATE orders SET order_status = true WHERE order_id = 10004;

DELETE FROM orders WHERE order_id = 10004;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/867134.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

页面静态化(模板引擎Freemarker)

1、浏览器请求web服务器 2、服务器渲染页面,渲染的过程就是向jsp页面(模板)内填充数据(模型)。 3、服务器将渲染生成的页面返回给浏览器。 所以模板引擎就是:模板数据输出,Jsp页面就是模板,页面中嵌入的jsp标签就是数据&#x…

【Linux】Reactor模式

​🌠 作者:阿亮joy. 🎆专栏:《学会Linux》 🎇 座右铭:每个优秀的人都有一段沉默的时光,那段时光是付出了很多努力却得不到结果的日子,我们把它叫做扎根 目录 👉Reactor …

生成式人工智能模型:提升营销分析用户体验

使用生成式人工智能来改善分析体验,使业务用户能够询问有关我们数据平台中可用数据的任何信息。 在本文中,我们将解释如何使用新的生成式人工智能模型 ( LLM ) 来改善业务用户在我们的分析平台上的体验。假设我们为零售销售经理提供 Web 应用程序或移动应…

【雕爷学编程】Arduino动手做(24)---水位传感器模块3

37款传感器与模块的提法,在网络上广泛流传,其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块,依照实践出真知(一定要动手做)的理念,以学习和交流为目的&#x…

斯坦福「小镇」开源AI智能体;小米应用商店将要求AI应用符合资质标准

🦉 AI新闻 🚀 斯坦福「小镇」开源AI智能体 摘要:斯坦福研究人员开源了一个类似《西部世界》的数字化「小镇」,里面有25个AI智能体可以生活、工作、社交。这项研究被视为AGI的重要开端,可能会改变游戏、企业应用领域。网友期待这项技术改善游戏NPC的交互…

【Fegin技术专题】「原生态」打开Fegin之RPC技术的开端,你会使用原生态的Fegin吗?(上)

前提介绍 Feign是SpringCloud中服务消费端的调用框架,通常与ribbon,hystrix等组合使用。由于遗留原因,某些项目中,整个系统并不是SpringCloud项目,甚至不是Spring项目,而使用者关注的重点仅仅是简化http调…

软工导论知识框架(七)面向对象设计

一.设计准则 分析:提取、整理用户需求,建立问题域精确模型。设计:转变需求为系统实现方案,建立求解域模型。 在实际的软件开发过程中分析和设计的界限是模糊的,分析和设计活动是一个多次反复迭代的过程。分析的结果可…

无涯教程-Perl - msgsnd函数

描述 此功能使用可选的FLAGS将消息MSG发送到消息队列ID。 语法 以下是此函数的简单语法- msgsnd ID, MSG, FLAGS返回值 该函数在错误时返回0,在成功时返回1。 Perl 中的 msgsnd函数 - 无涯教程网无涯教程网提供描述此功能使用可选的FLAGS将消息MSG发送到消息队列ID。 语法…

接地电阻测试仪的原理和使用事项

接地电阻测试仪(Ground Resistance Tester)是用来测量接地电阻的一种仪器。接地系统是指用于保护人员和设备的设施,它将电流引导到地下,将任何潜在危险的电流导向地面。 接地电阻测试仪的作用是通过测量接地系统的电阻值来评估其…

C++ STL string类模拟实现

目录 string类成员变量 一.构造函数 二.析构函数 三.拷贝构造 四.size(),capacity() 五.operator [ ] 六. operator 七.字符串比较 八.reserve() 九.push_back(),append() 十.operato…

【雕爷学编程】Arduino动手做(12)---霍尔磁场传感器模块5

37款传感器与模块的提法,在网络上广泛流传,其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块,依照实践出真知(一定要动手做)的理念,以学习和交流为目的&#x…

数据归一化:优化数据处理的必备技巧

文章目录 🍀引言🍀数据归一化的概念🍀数据归一化的应用🍀数据归一化的注意事项与实践建议🍀代码演示🍀在sklearn中使用归一化🍀结语 🍀引言 在当今数据驱动的时代,数据的…

Vue在页面输出JSON对象,测试接口可复制使用

效果图&#xff1a; 数据处理前&#xff1a; 数据处理后&#xff1a; 代码实现&#xff1a; HTML: <el-table height"600" :data"tableData" border style"width: 100%" tooltip-effect"dark" size"mini"><el-…

Django笔记之数据库函数之日期函数

日期函数主要介绍两个大类&#xff0c;Extract() 和 Trunc() Extract() 函数作用是提取日期&#xff0c;比如我们可以提取一个日期字段的年份&#xff0c;月份&#xff0c;日等数据 Trunc() 的作用则是截取&#xff0c;比如 2022-06-18 12:12:12&#xff0c;我们可以根据需求…

深度学习基础知识笔记

深度学习要解决的问题 1 深度学习要解决的问题2 应用领域3 计算机视觉任务4 视觉任务中遇到的问题5 得分函数6 损失函数7 前向传播整体流程8 返向传播计算方法1 梯度下降9 神经网络整体架构 11 神经元个数对结果的影响12 正则化和激活函数1 正则化2 激活函数 13 神经网络过拟合…

人工智能可解释性(二)(梯度计算,积分梯度等)

目录 1.定义 2.详述 2.1局部解释 可视化方法 梯度计算 2.2积分梯度Integrated Gradients&#xff08;梯度计算进阶&#xff09; 2. 3全局解释 2.3.1Activation Maximization 2.3.2GAN,VAE 2. 4用一个可解释模型解释不可解释模型 2. 4.1LIME 局部解释 参考文献 1.定义 可…

access怎么做进销存?借助access开发进销存管理应用

我不太推荐使用Access&#xff0c;因为他的缺点还是比较明显的&#xff1a; 1、软件自身限制 不能用于互联网&#xff1a;使用Access制作好的管理软件&#xff0c;访问页只能在局域网中使用&#xff1b;只能在Windows上运行&#xff1a;Access仅支持windows的运行环境&#x…

从零开始学习 Java:简单易懂的入门指南之多态(十)

多态&包&final&权限修饰符&代码块 第一章 多态1.1 多态的形式1.2 多态的使用场景1.3 多态的定义和前提1.4 多态的运行特点1.5 多态的弊端1.6 引用类型转换1.6.1 为什么要转型1.6.2 向上转型&#xff08;自动转换&#xff09;1.6.3 向下转型&#xff08;强制转换…

【将回声引入信号中】在语音或音频文件中引入混响或简单回声,以研究回声延迟和回波幅度对生成的回波信号感知的影响(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

积分代换和周期函数

昨晚上看书&#xff0c;有一个稳定随机过程的例题&#xff0c;涉及积分上下限代换、周期函数的微积分性质等知识点。这种题型以前肯定接触过&#xff0c;当下遇到了&#xff0c;思维仍然迷迷糊糊&#xff0c;像是一团乱麻&#xff0c;纠缠不清&#xff0c;照着答案思考了半天&a…