Flink CDC系列之:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

news2025/1/12 6:08:45

Flink CDC系列之:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

  • 一、技术路线
  • 二、MySQL数据库建表
  • 三、PostgreSQL数据库建表
  • 四、在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 五、关联订单数据并且将其写入 Elasticsearch 中
  • 六、Kibana查看商品和物流信息的订单数据
  • 七、修改数据库中表的数据,Kibana查看更新

一、技术路线

在这里插入图片描述

二、MySQL数据库建表

mysql数据库创建数据库和表 products,orders

创建products表

-- MySQL

CREATE DATABASE mydb;USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));

ALTER TABLE products AUTO_INCREMENT = 101;

创建orders表

CREATE TABLE orders (
    order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
    order_date DATETIME NOT NULL,
    customer_name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 5) NOT NULL,
    product_id INTEGER NOT NULL,
    order_status BOOLEAN NOT NULL -- Whether order has been placed
 ) AUTO_INCREMENT = 10001;

products表插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

orders表插入数据

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

三、PostgreSQL数据库建表

创建表 shipments

-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL);

插入数据

ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;


ALTER TABLE public.shipments REPLICA IDENTITY FULL;

INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
       (default,10002,'Hangzhou','Shanghai',false),   
       (default,10003,'Shanghai','Hangzhou',false);

四、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

-- Flink SQL

Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);

五、关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中

-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;

六、Kibana查看商品和物流信息的订单数据

创建 index pattern enriched_orders
在这里插入图片描述
查看写入的数据
在这里插入图片描述

七、修改数据库中表的数据,Kibana查看更新

修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:

在 MySQL 的 orders 表中插入一条数据

--MySQL

INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

在 Postgres 的 shipment 表中插入一条数据

--PG
INSERT INTO shipmentsVALUES (default,10004,'Shanghai','Beijing',false);

在 MySQL 的 orders 表中更新订单的状态

--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

在 Postgres 的 shipment 表中更新物流的状态

--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

在 MYSQL 的 orders 表中删除一条数据

--MySQL
DELETE FROM orders WHERE order_id = 10004;

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/870448.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Java+SpringBoot+Vue的书籍学习平台设计与实现(源码+LW+部署文档等)

博主介绍: 大家好,我是一名在Java圈混迹十余年的程序员,精通Java编程语言,同时也熟练掌握微信小程序、Python和Android等技术,能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

Chrome

Chrome 简介下载 简介 Chrome 是由 Google 开发的一款流行的网络浏览器。它以其快速的性能、强大的功能和用户友好的界面而闻名,并且在全球范围内被广泛使用。Chrome 支持多种操作系统,包括 Windows、macOS、Linux 和移动平台。 Chrome官网: https://ww…

深度剖析堆栈指针

为什么打印root的值与&root->value的值是一样的呢 测试结果: *号一个变量到底取出来的是什么? 以前我写过一句话,就是说,如果看到一个*变量,那就是直逼这个变量所保存的内存地址,然后取出里面保存的…

Java负载均衡算法实现与原理分析(轮询、随机、哈希、加权、最小连接)

文章目录 一、负载均衡算法概述二、轮询(RoundRobin)算法1、概述2、Java实现轮询算法3、优缺点 三、随机(Random)算法1、概述2、Java实现随机算法 四、源地址哈希(Hash)算法1、概述2、Java实现地址哈希算法…

在Java中对XML的简单应用

XML 数据传输格式1 XML 概述1.1 什么是 XML1.2 XML 与 HTML 的主要差异1.3 XML 不是对 HTML 的替代 2 XML 语法2.1 基本语法2.2 快速入门2.3 组成部分2.3.1 文档声明格式属性 2.3.2 指令(了解):结合CSS2.3.3 元素2.3.4 属性**XML 元素 vs. 属…

c++ 学习系列 -- 智能指针

一 为什么引入智能指针?解决了什么问题? C 程序设计中使用堆内存是非常频繁的操作,堆内存的申请和释放都由程序员自己管理。但使用普通指针,容易造成内存泄露(忘记释放)、二次释放、程序发生异常时内存泄…

Springboot整合RabbitMq,详细步骤

Springboot整合RabbitMq,详细步骤 1 添加springboot-starter依赖2 添加连接配置3 在启动类上添加开启注解EnableRabbit4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。5 生产者推送消息6 消费者接收消息7 生产者…

闭环控制方法及其应用:优缺点、场景和未来发展

闭环控制是一种基本的控制方法,它通过对系统输出与期望值之间的误差进行反馈,从而调整系统输入,使系统输出更加接近期望值。闭环控制的主要目标是提高系统的稳定性、精确性和鲁棒性。在实际应用中,闭环控制有多种方法,…

开源代码分享(13)—整合本地电力市场与级联批发市场的投标策略(附matlab代码)

1.引言 1.1摘要 本地电力市场是在分配层面促进可再生能源的效率和使用的一种有前景的理念。然而,作为一个新概念,如何设计和将这些本地市场整合到现有市场结构中,并从中获得最大利润仍然不清楚。在本文中,我们提出了一个本地市场…

linux添加磁盘

一、linux虚拟机添加一块新的硬盘 四步: (1) (2)为硬盘进行分区 (3)初始化硬盘分区 (4)挂载 在虚拟机上添加一块硬盘 (1)、 虚拟机添加一块新的硬盘作为数据盘 (2) ls…

Idea Live Template 功能总结

文章目录 Java自带的template属性模板psf——public static finalpsfi——public static final intpsfi——public static final StringSt——String 方法模板psvm——main方法sout——打印语句iter——for迭代循环fori——for循环 代码块模板if-e —— if elseelse-if 自定义自…

中国首款量子计算机操作系统本源司南 PilotOS正式上线

中国安徽省量子计算工程研究中心近日宣布,中国国产量子计算机操作系统本源司南 PilotOS 客户端正式上线。 如果把量子芯片比喻成人的“心脏”,那么量子计算机操作系统就相当于人的“大脑”,量子计算应用软件则是人的“四肢”。 据安徽省量子…

Linux 终端命令之文件浏览(1) cat

Linux 文件浏览命令 cat, more, less, head, tail,此五个文件浏览类的命令皆为外部命令。 hannHannYang:~$ which cat /usr/bin/cat hannHannYang:~$ which more /usr/bin/more hannHannYang:~$ which less /usr/bin/less hannHannYang:~$ which head /usr/bin/he…

论文总结《Towards Evaluating the Robustness of Neural Networks(CW)》

原文链接 C&W 这篇论文更像是在讲一个优化问题,后面讲述如何针对生成对抗样本的不可解问题近似为一个可解的问题,很有启发。本文后面将总结论文各个部分的内容。 Motivation 文章提出了一个通用的设计生成对抗样本的方法,根据该论文提…

YAPi在线接口文档简单案例(结合Vue前端Demo)

在前后端分离开发中,我们都是基于文档进行开发,那前端人员有时候无法马上拿到后端的数据,该怎么办?我们一般采用mock模拟伪造数据直接进行测试,本篇文章主要介绍YApi在线接口文档的简单使用,并结合Vue的小d…

【C++学习】STL容器——stack和queue

目录 一、stack的介绍和使用 1.1 stack的介绍 1.2 stack的使用 1.3 stack的模拟实现 二、queue的介绍和使用 2.1 queue的介绍 2.2 queue的使用 2.3 queue的模拟实现 三、priority_queue的介绍和使用 3.1 priority_queue的介绍和使用 3.2 priority_queue的使用 3.4 p…

【Powershell 】(Windows下)常用命令 | 命令别名 | 运行Windows命令行工具 | 运行用户程序(vim、gcc、gdb)

微软官方Powershell文档:https://learn.microsoft.com/zh-cn/powershell/ 命令详细说明,在PDF的最后面: 一、Powershell及命令简介1.1 命令格式1.2 命令的别名 二、cmdlet别名三、cmdlet分类介绍3.1 基础命令1. Get-Command2. Get-Help3. S…

[HDLBIts] Exams/m2014 q4j

Implement the following circuit: ("FA" is a full adder) module top_module (input [3:0] x,input [3:0] y, output [4:0] sum);assign sumxy; endmodule

C数据结构与算法——无向图(邻接矩阵) 应用

实验任务 (1) 掌握图的邻接矩阵存储及基本算法&#xff1b; (2) 掌握该存储方式下的DFS和BFS算法。 实验内容 实现图的邻接矩阵存储结构实现基于邻接矩阵的相关算法及遍历算法 实验源码 #include <malloc.h> #include <stdio.h>#define MAXSIZE 1000 #define …

SpringBoot07——VueX

共享组件之间的数据&#xff0c;集中管理 这一部分某人要打ow我就跳过没看了&#xff0c;哼&#xff0c;都怪某人