Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库

news2025/1/2 0:22:05

Debezium系列之:基于debezium将mysql数据库数据更改流式传输到 Elasticsearch和PostgreSQL数据库

  • 一、背景
  • 二、技术路线
  • 三、配置
  • 四、从mysql同步数据到Elasticsearch和PostgreSQL数据库
  • 五、总结

一、背景

  • 基于 Debezium 的端到端数据流用例,将数据流式传输到 Elasticsearch 服务器,以利用其出色的功能对我们的数据进行全文搜索。
  • 同时把数据流式传输到 PostgreSQL 数据库,通过 SQL 查询语言来优化对数据的访问。

二、技术路线

下面的图表显示了数据如何流经我们的分布式系统。首先,Debezium MySQL 连接器不断捕获 MySQL 数据库中的更改,并将每个表的更改发送到单独的 Kafka 主题。然后,Confluence JDBC 接收器连接器不断读取这些主题并将事件写入 PostgreSQL 数据库。同时,Confluence Elasticsearch 连接器不断读取这些相同的主题并将事件写入 Elasticsearch。

在这里插入图片描述
我们将把这些组件部署到几个不同的进程中。在此示例中,我们将所有三个连接器部署到单个 Kafka Connect 实例,该实例将代表所有连接器向 Kafka 写入和读取(在生产中,可能需要将连接器分开以实现更好的性能)。

在这里插入图片描述

三、配置

我们将使用此 Docker Compose 文件来快速部署演示。该部署由以下 Docker 映像组成:

  • Apache ZooKeeper

  • Apache Kafka

  • 一个丰富的 Kafka Connect / Debezium 图像,有一些变化:

    • PostgreSQL JDBC 驱动程序放置在 /kafka/libs 目录中
    • Confluence JDBC 连接器放置在 /kafka/connect/kafka-connect-jdbc 目录中
  • MySQL

  • PostgreSQL

  • Elasticsearch

Debezium 源连接器以及 JDBC 和 Elasticsearch 连接器的消息格式不同,因为它们是单独开发的,并且各自关注的目标略有不同。 Debezium 发出更复杂的事件结构,以便捕获所有可用信息。特别是,更改事件包含已更改记录的旧状态和新状态。另一方面,两个接收器连接器都期望一条简单的消息,该消息仅表示要写入的记录状态。

Debezium 的 UnwrapFromEnvelope 单消息转换 (SMT) 将复杂的更改事件结构折叠为两个接收器连接器所期望的相同的基于行的格式,并有效地充当上述两种格式之间的消息转换器。

四、从mysql同步数据到Elasticsearch和PostgreSQL数据库

当所有组件启动后,我们将注册 Elasticsearch Sink 连接器写入 Elasticsearch 实例。我们希望在源以及 PostgreSQL 和 Elasticsearch 中使用相同的密钥(主 id):

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @es-sink.json

我们正在使用此注册请求:

{
  {
    "name": "elastic-sink",
    "config": {
      "connector.class":
          "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
      "tasks.max": "1",
      "topics": "customers",
      "connection.url": "http://elastic:9200",
      "transforms": "unwrap,key",
      "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",        (1)
      "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",(2)
      "transforms.key.field": "id",                                                 (2)
      "key.ignore": "false",                                                        (3)
      "type.name": "customer"                                                       (4)
    }
  }
}

该请求配置这些选项:

  • 1.从 Debezium 的更改数据消息中仅提取新行的状态
  • 2.从密钥结构中提取 id 字段,然后将相同的密钥用于源和两个目标。这是为了解决 Elasticsearch 连接器仅支持数字类型和字符串作为键的事实。如果我们不提取 ID,则由于密钥类型未知,消息将被连接器过滤掉。
  • 3.使用事件中的密钥而不是生成合成密钥
  • 4.事件将在 Elasticsearch 中注册的类型

接下来我们将注册 JDBC Sink 连接器写入 PostgreSQL 数据库:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @jdbc-sink.json

最后,必须设置源连接器:

curl -i -X POST -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/ \
    -d @source.json

让我们检查一下数据库和搜索服务器是否同步。客户表的所有行都应该在源数据库 (MySQL) 以及目标数据库 (Postgres) 和 Elasticsearch 中找到:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+
docker-compose exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
curl 'http://localhost:9200/customers/_search?pretty'
{
  "took" : 42,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1001",
        "_score" : 1.0,
        "_source" : {
          "id" : 1001,
          "first_name" : "Sally",
          "last_name" : "Thomas",
          "email" : "sally.thomas@acme.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1004",
        "_score" : 1.0,
        "_source" : {
          "id" : 1004,
          "first_name" : "Anne",
          "last_name" : "Kretchmar",
          "email" : "annek@noanswer.org"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1002",
        "_score" : 1.0,
        "_source" : {
          "id" : 1002,
          "first_name" : "George",
          "last_name" : "Bailey",
          "email" : "gbailey@foobar.com"
        }
      },
      {
        "_index" : "customers",
        "_type" : "customer",
        "_id" : "1003",
        "_score" : 1.0,
        "_source" : {
          "id" : 1003,
          "first_name" : "Edward",
          "last_name" : "Walker",
          "email" : "ed@walker.com"
        }
      }
    ]
  }
}

在连接器仍在运行的情况下,我们可以向 MySQL 数据库添加一个新行,然后检查它是否已复制到 PostgreSQL 数据库和 Elasticsearch 中:

docker-compose exec mysql bash -c 'mysql -u $MYSQL_USER  -p$MYSQL_PASSWORD inventory'

mysql> insert into customers values(default, 'John', 'Doe', 'john.doe@example.com');
Query OK, 1 row affected (0.02 sec)
docker-compose exec -postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
...
Doe        | 1005 | John       | john.doe@example.com
(5 rows)

curl 'http://localhost:9200/customers/_search?pretty'
...
{
  "_index" : "customers",
  "_type" : "customer",
  "_id" : "1005",
  "_score" : 1.0,
  "_source" : {
    "id" : 1005,
    "first_name" : "John",
    "last_name" : "Doe",
    "email" : "john.doe@example.com"
  }
}
...

五、总结

我们设置了一个复杂的流数据管道来将 MySQL 数据库与另一个数据库以及 Elasticsearch 实例同步。我们设法在所有系统中保留相同的标识符,这使我们能够将整个系统的记录关联起来。

将数据更改从主数据库近乎实时地传播到 Elasticsearch 等搜索引擎可以实现许多有趣的用例。除了全文搜索的不同应用之外,我们还可以考虑使用 Kibana 创建仪表板和各种可视化效果,以进一步深入了解数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/736779.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

I/O 多路复用小结

Socket 模型 Socket 编程是一种使用 Socket 模型进行网络通信的编程技术。它是一种基于网络套接字的编程模型,用于实现不同计算机之间的数据传输。 事实上,在进行网络通信前,通信双方都要创建一个 Socket,双方的数据读写都要依赖于…

【Python】执行SQL报错

可以再数据库查询界面执行的SQL,一直报错 unsupported format character Y (0x59) at index 61 SQL如下: datapd.read_sql_query(sql"""selectdate_format(create_time,%Y-%m) as mon,count(distinct order_id) as ord_cntfrom prod.o…

HTTP与HTTPS

HTTP与HTTPS介绍 超文本传输协议HTTP协议被用于在Web浏览器和网站服务器之间传递信息,HTTP协议以明文方式发送内容,不提供任何方式的数据加密,如果攻击者截取了Web浏览器和网站服务器之间的传输报文,就可以直接读懂其中的信息&…

qt源码--事件系统

qt的事件传播主要依赖于QCoreApplication、QAbstractEventDispatcher(会根据不同的平台生成各自的处理对象)、QEvent(各种事件类型)等。 首先看下QCoreApplication的实现: 2、了解QCoreApplication的构造函数 其构造函…

在最新ICP备案域名的基础上,结合其他网络营销手段,打造强大的品牌推广效果

API接口是一种软件系统之间进行交互的方式,通过API接口,可以在不同的系统之间传递数据、命令等信息。在网络营销中,API接口可以帮助我们更加高效地进行品牌推广。本文将以在最新ICP备案域名的基础上,结合其他网络营销手段&#xf…

JVM回收算法(标记-清除算法, 复制算法, 标记-整理算法)

1.标记-清除算法 最基础的算法,分为两个阶段,“标记”和“清除” 原理: - 标记阶段:collector从mutator根对象开始进行遍历,对从mutator根对象可以访问到的对象都打上一个标识,一般是在对象的header中&am…

vue-router 4.0 动态路由会跳转到 404 页面的问题

引子 开发过前端单页面应用的小伙伴们,应该对前端路由都不陌生吧。 无论是用 vue 或者 react,都有官方提供的 router 方案。 但是有些场景下,处于安全性和友好性考虑,我们需要用到动态路由。 如果你不知道什么叫动态路由&…

翻遍整个牛客网,整理出了全网最全的Java面试八股文大合集,整整4000多页

大家从 Boss 直聘上或者其他招聘网站上都可以看到 Java 岗位众多,Java 岗位的招聘薪酬天差地别,人才要求也是五花八门。而很多 Java 工程师求职过程中,也是冷暖自知。很多时候技术有,但是面试的时候就是过不了! 为了帮…

4.7 x64dbg 应用层的钩子扫描

所谓的应用层钩子(Application-level hooks)是一种编程技术,它允许应用程序通过在特定事件发生时执行特定代码来自定义或扩展其行为。这些事件可以是用户交互,系统事件,或者其他应用程序内部的事件。应用层钩子是在应用…

【Zabbix 监控 Windows 系统,Java应用,SNMP】

目录 一、Zabbix 监控 Windows 系统1、下载 Windows 客户端 Zabbix agent 22、安装客户端,配置3、在服务端 Web 页面添加主机,关联模板 二、Zabbix 监控 java 应用1、客户端开启 java jmxremote 远程监控功能1、配置 java jmxremote 远程监控功能2、启动…

【ARM Coresight 系列文章 3.1 - ARM Coresight DP 对 AP 的访问 2】

文章目录 图 1-1 如上图1-1 所示,DAP上可以集成多个MEM-AP,上图是集成了3个MEM-AP,它们可能是AXI-AP, AHB-AP, APB-AP。 那么AP的类型是如何区分的呢? 不同的组件会使用不同MEM-AP接口,如Cortex-A/Coretex-R 系列的c…

ai绘画工具有哪些?这几款好用的ai绘画工具免费分享给你

上周,我去了一家现代艺术画廊,墙上挂满了令人叹为观止的绘画作品。我被其中一幅细腻而充满情感的油画所深深吸引,想要了解背后的创作过程。这时,一位热情的艺术导师走到我身边。她微笑着说:“这幅作品实际上是由ai绘画…

解决“_mkdir无法识别空格目录“问题

在C编程里,有时候需要创建一个文件夹,通常使用库函数_mkdir(const char* dirname)来新建一个文件夹,该库函数每次只能创建一个文件夹,不能级联创建。若要级联创建文件,则请用递归方式或者for循环方式调用_mkdir()。 #…

7月6日华为云盘古气象大模型登上《Nature》杂志:相比传统数值预报快10000倍

7月6日,国际顶级学术期刊《自然》(Nature)杂志正刊发表了华为云盘古大模型研发团队的最新研究成果——《三维神经网络用于精准中期全球天气预报》(《Accurate medium-range global weather forecasting with 3D neural networks》…

CrossKD 原理与代码解析

paper:CrossKD: Cross-Head Knowledge Distillation for Dense Object Detection official implementation: https://github.com/jbwang1997/CrossKD 前言 蒸馏可以分为预测蒸馏prediction mimicking和特征蒸馏feature imitation两种,201…

【LeetCode】HOT 100(26)

题单介绍: 精选 100 道力扣(LeetCode)上最热门的题目,适合初识算法与数据结构的新手和想要在短时间内高效提升的人,熟练掌握这 100 道题,你就已经具备了在代码世界通行的基本能力。 目录 题单介绍&#…

什么是 AOP?对于 Spring IoC 和 AOP 的理解?Spring AOP 和 AspectJ AOP 有什么区别?

什么是 AOP? AOP(Aspect-Oriented Programming),即 面向切面编程, 它与OOP( ObjectOriented Programming, 面向对象编程) 相辅相成,提供了与OOP 不同的抽象软件结构的视角 在 OOP 中, 我们以类(class)作为我们的基本单元 而 A…

Zynq 多个UDP客户端组网启动问题(Auto negotiation error)PS:附UDP客户端初始化代码

最近正在进行一个Zynq项目,根据设计需求,需要将上位机作为UDP服务器,而FPGA则充当UDP客户端。同时,服务器需要能够接收和控制多个UDP客户端。 开发过程中,我是基于lwip UDP Perf Client 官方模版开发的。我遇到了以下几…

重磅!高通大客户「跳单」,智能座舱SoC赛道进入「混战期」

哪家车企是高通座舱芯片的最大客户?答案不是蔚来、理想、小鹏等智能化布局领先的新势力,而是比亚迪。 高工智能汽车研究院监测数据显示,2022年中国市场(不含进出口)乘用车智能座舱前装标配高通芯片交付367.45万辆&…

DALL·E2(unCLIP)、Stable Diffusion、IS、FID要点总结

DALLE 1 DALLE 1可以看成是VQ-VAE和文本经过BPE编码得到的embedding AE(Auto Encoder) encoder decoder结构,AE在生成任务时只会模仿不会创造,所有有了后面的VAE VAE(Variational AutoEncoder) 不再学习固定的bottleneck特征…