2024.07纪念一 debezium : spring-boot结合debezium

news2025/1/12 12:21:31

使用前提:

一、mysql开启了logibin

在mysql的安装路径下的my.ini中 

【mysqlid】下

添加
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

参考gitee的项目,即拉即用。参考地址

zhanghl/spring-boot-debeziumicon-default.png?t=N7T8https://gitee.com/zhl001/spring-boot-debezium项目中一个三个文件,pom和两个类需要参考。

pom.xml

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.13.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.felord</groupId>
    <artifactId>spring-boot-debezium</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-debezium</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
        <debezium.version>1.5.2.Final</debezium.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-api</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-connector-mysql</artifactId>
            <version>${debezium.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--springboot与mybatis的整合包-->
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.3.0</version>
        </dependency>

        <!--mysql驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>

        <!--springboot与JDBC整合包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!--sqlserver数据源-->
        <dependency>
            <groupId>com.microsoft.sqlserver</groupId>
            <artifactId>sqljdbc4</artifactId>
            <version>4.0</version>
        </dependency>
    </dependencies>

两个java类

DebeziumConfiguration.java
package cn.felord.debezium.debezium;

import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;
import java.util.Map;

import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;


/**
 * The type Debezium configuration.
 *
 * @author n1
 * @since 2021 /6/1 17:01
 */
@Configuration
public class DebeziumConfiguration {


    /**
     * Debezium 配置.
     *
     * @return configuration
     */
    @Bean
    io.debezium.config.Configuration debeziumConfig() {
        return io.debezium.config.Configuration.create()
//            连接器的Java类名称
                .with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值
                .with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                偏移量持久化文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
//                如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。
                .with("offset.storage.file.filename", "/tmp/offsets.dat")
//                捕获偏移量的周期
                .with("offset.flush.interval.ms", "1")
//               连接器的唯一名称
                .with("name", "mysql-connector")
//                数据库的hostname
                .with("database.hostname", "10.1.1.1")
//                端口
                .with("database.port", "3306")
//                用户名
                .with("database.user", "canal")
//                密码
                .with("database.password", "canal")
//                 包含的数据库列表,你的数据库
                .with("database.include.list", "md_test")
//                是否包含数据库表结构层面的变更,建议使用默认值true
                .with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id
                .with("database.server.id", 1)
//                	MySQL 服务器或集群的逻辑名称
                .with("database.server.name", "customer-mysql-db-server")
//                历史变更记录
                .with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
//                历史变更记录存储位置,存储DDL
                .with("database.history.file.filename", "/tmp/dbhistory.dat")
                .build();
    }

    /**
     * Debezium server bootstrap debezium server bootstrap.
     *
     * @param configuration the configuration
     * @return the debezium server bootstrap
     */
    @Bean
    DebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {
        DebeziumServerBootstrap debeziumServerBootstrap = new DebeziumServerBootstrap();
        DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
                .using(configuration.asProperties())
                .notifying(this::handlePayload)
                .build();

        debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);
        return debeziumServerBootstrap;
    }


    private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {
        recordChangeEvents.forEach(r -> {
            SourceRecord sourceRecord = r.record();
            String topic = sourceRecord.topic();
            Struct sourceRecordChangeValue = (Struct) sourceRecord.value();
            if (sourceRecordChangeValue != null) {
                // 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置
                Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));
                if (operation != Envelope.Operation.READ) {
                    String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;
                    // 获取增删改对应的结构体数据
                    Struct struct = (Struct) sourceRecordChangeValue.get(record);
                    // 将变更的行封装为Map
                    Map<String, Object> payload = struct.schema().fields().stream()
                            .map(Field::name)
                            .filter(fieldName -> struct.get(fieldName) != null)
                            .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                            .collect(toMap(Pair::getKey, Pair::getValue));
                    // 这里简单打印一下
                    System.out.println("operation = " + operation);
                    System.out.println("data = " + payload);
                    if(operation.toString().equals("CREATE")){
                        System.out.println("新增记录一条");
                    }

                    //tabelName
                    if(topic.split("\\.").length > 2){
                        String tableName = topic.split("\\.")[2];
                        System.out.println("tabelName" + tableName);
                    }
                }
            }
        });
    }

}
DebeziumServerBootstrap.java
package cn.felord.debezium.debezium;

import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * @author n1
 * @since 2021/6/2 10:45
 */
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private DebeziumEngine<?> debeziumEngine;

    @Override
    public void start() {
        executor.execute(debeziumEngine);
    }

    @SneakyThrows
    @Override
    public void stop() {
        debeziumEngine.close();
    }

    @Override
    public boolean isRunning() {
        return false;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(debeziumEngine, "debeziumEngine must not be null");
    }
}

 结语:

太强了,比canal强10倍,非侵入性。对比可参考:

不想引入MQ?不妨试试 Debezium-CSDN博客

为什么是debezium

这么多技术框架,为什么选debezium?

看起来很多。但一一排除下来就debezium和canal。

sqoop,kettle,datax之类的工具,属于前大数据时代的产物,地位类似于web领域的structs2。而且,它们基于查询而非binlog日志,其实不属于CDC。首先排除。

flink cdc是大数据领域的框架,一般web项目的数据量属于大材小用了。

同时databus,maxwell相对比较冷门,用得比较少。

「最后不用canal的原因有以下几点:」

  • canal需要安装,这违背了“如非必要,勿增实体”的原则。

  • canal只能对MYSQL进行CDC监控。有很大的局限性。

  • 大数据领域非常流行的flink cdc(阿里团队主导)底层使用的也是debezium,而非同是阿里出品的canal。

  • debezium可借助kafka组件,将变动的数据发到kafka topic,后续的读取操作只需读取kafka,可有效减少数据库的读取压力。可保证一次语义,至少一次语义。

  • 同时,也可基于内嵌部署模式,无需我们手动部署kafka集群,可满足”如非必要,勿增实体“的原则。

  • 而且canal只支持源端MySQL版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

实时监视同步数据库变更,这个框架真是神器_Mysql

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1962027.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql超大分页问题处理~

大家好&#xff0c;我是程序媛雪儿&#xff0c;今天咱们聊mysql超大分页问题处理。 超大分页问题是什么&#xff1f; 数据量很大的时候&#xff0c;在查询中&#xff0c;越靠后&#xff0c;分页查询效率越低 例如 select * from tb_sku limit 0,10; select * from tb_sku lim…

专治408开始的晚!8月一定要完成这些事!

八月份才开始408&#xff0c;那到考试最多也只有4-5个月的时间 别担心&#xff0c;可以复习两轮&#xff01; 其实我一直建议大家408复习三轮&#xff0c;但是如果时间不够&#xff0c;那就要在复习质量上下功夫&#xff01; 考408有一个好处&#xff0c;就是不用先确定学校…

【错误总结】Ubuntu系统中执行 sudo apt-get update报错

Ubuntu系统中执行 sudo apt-get update报错 命令行描述升级sudo报错并解决错误描述错误解决原因1&#xff1a;系统网络问题 原因2&#xff1a;设置清华源后/etc/apt/sources.list不匹配原因3&#xff1a;ubuntu自带的源/etc/apt/sources.list有问题 apt-get update成功log参考 …

【Story】《程序员面试的“八股文”辩论:技术基础与实际能力的博弈》

目录 程序员面试中的“八股文”&#xff1a;助力还是阻力&#xff1f;1. “八股文”的背景与定义1.1 “八股文”的起源1.2 “八股文”的常见类型 2. “八股文”的作用分析2.1 理论基础的评价2.1.1 助力2.1.2 阻力 3. 实际工作能力的考察3.1 助力3.2 阻力 4. 面试中的背题能力4.…

利用代理IP助力社媒营销的指南来了!

文章目录 前言一、有效数据收集二、建立流量矩阵三、精准定制内容选择正确的代理类型定时监测和更新代理IP遵守平台政策 总结 前言 在当今数字化时代&#xff0c;社交媒体营销已成为企业推广品牌、增强用户互动不可或缺的一环。从本质上看&#xff0c;社媒营销是公共关系和客户…

借助 NGINX 对本地的 Kubernetes 服务进行自动化的 TCP 负载均衡

原文作者&#xff1a;Chris Akker - F5 技术解决方案架构师&#xff0c;Steve Wagner - F5 NGINX 解决方案架构师 原文链接&#xff1a;借助 NGINX 对本地的 Kubernetes 服务进行自动化的 TCP 负载均衡 转载来源&#xff1a;NGINX 中文官网 NGINX 唯一中文官方社区 &#xff0c…

Windows11安装MongoDB7.0.12详细教程

下载 地址&#xff1a;https://www.mongodb.com/try/download/community 我使用的是迅雷下载&#xff1a; 安装 选择自定义安装&#xff1a; 选择安装目录&#xff1a; 开始安装&#xff1a; 这个玩意会卡比较长的时间&#xff1a; 最后这一步如果没有科学上网&#…

虾皮笔试0620-选择题

虚拟存储的基础是程序局部性理论&#xff0c;它的基本含义是程序执行时对内存访问的不均匀性。这一理论具体体现在两个方面&#xff1a; 时间局部性&#xff1a;时间局部性是指如果程序中的某个数据项被访问&#xff0c;那么在不久的将来它可能再次被访问。这通常是因为程序存在…

thinkphp框架远程代码执行

一、环境 vulfocus网上自行下载 启动命令&#xff1a; docker run -d --privileged -p 8081:80 -v /var/run/docker.sock:/var/run/docker.sock -e VUL_IP192.168.131.144 8e55f85571c8 一定添加--privileged不然只能拉取环境首页不显示 二、thinkphp远程代码执行 首页&a…

【吊打面试官系列-Dubbo面试题】Dubbo SPI 和 Java SPI 区别?

大家好&#xff0c;我是锋哥。今天分享关于 【Dubbo SPI 和 Java SPI 区别&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; Dubbo SPI 和 Java SPI 区别&#xff1f; JDK SPI JDK 标准的 SPI 会一次性加载所有的扩展实现&#xff0c;如果有的扩展吃实话很耗时&…

Javascript前端面试基础(九)

浏览器缓存 浏览器缓存分为强缓存和协商缓存。当客户端请求某个资源时&#xff0c;获取缓存的流程如下 先根据这个资源的一些http header判断它是否命中强缓存&#xff0c;如果命中则直接从本地获取缓存资源&#xff0c;不会发请求到服务器;当强缓存没有命中时&#xff0c;客户…

通过进程协作显示图像-C#

前言 如果一个软件比较复杂或者某些情况下需要拆解&#xff0c;可以考试将软件分解成两个或多个进程&#xff0c;但常规的消息传递又不能完全够用&#xff0c;使用消息共享内存&#xff0c;实现图像传递&#xff0c;当然性能这个方面我并没有测试&#xff0c;仅是一种解决思路…

Anaconda配置记录-linux环境

Anaconda Distribution 是一个 Python/R 数据科学分发&#xff0c;其中包含&#xff1a; conda - 用于命令行界面的包和环境管理器 Anaconda Navigator - 基于 conda 构建的桌面应用程序&#xff0c;具有从托管环境中启动其他开发应用程序的选项 超过 300 个自动安装的软件包…

记录一次Dump文件分析之旅

背景 在生产环境中&#xff0c;服务运行一段时间后&#xff0c;我们遇到了JVM内存使用率超过90%的告警。考虑到我们的服务正常情况下每周都会进行重启&#xff0c;通常不应该出现如此高的内存使用率问题。 前置操作 在检查JVM相关配置时&#xff0c;我们使用Jinfo命令发现当…

Covalent 启动面向 CXT 质押者的生态伙伴空投计划

Covalent Network&#xff08;CXT&#xff09;是模块化人工智能数据基础设施&#xff0c;其宣布了合作伙伴生态系统空投计划的首个项目&#xff1a;TAIKO。此举旨在为 CXT 代币质押者提供来自其庞大生态系统的空投机会。首次空投将于 2024 年 8 月 1 日进行&#xff0c;向 CXT …

疯狂交互学习的BM3推荐算法(论文复现)

疯狂交互学习的BM3推荐算法&#xff08;论文复现&#xff09; 本文所涉及所有资源均在传知代码平台可获取 文章目录 疯狂交互学习的BM3推荐算法&#xff08;论文复现&#xff09;多模态推荐系统优点 示例对比学习什么是对比学习&#xff1f;关键思想优点 自监督学习什么是自监督…

【只出现一次的数字 III】python刷题记录

R2-位运算专题. 目录 哈希表 位运算 ps: 一眼哈希表啊 哈希表 class Solution:def singleNumber(self, nums: List[int]) -> List[int]:dictdefaultdict(int)ret[]for num in nums:dict[num]1for key in dict.keys():if dict[key]1:ret.append(key)return ret怎么用位…

支持AI的好用的编辑器aieditor

一、工具概述 AiEditor 是一个面向 AI 的下一代富文本编辑器&#xff0c;她基于 Web Component&#xff0c;因此支持 Layui、Vue、React、Angular 等几乎任何前端框架。她适配了 PC Web 端和手机端&#xff0c;并提供了 亮色 和 暗色 两个主题。除此之外&#xff0c;她还提供了…

Hive3:库操作常用语句

1、创建库 create database if not exists myhive;2、选择库 use myhive;3、查看当前选择的库 SELECT current_database();4、查看库详细信息 desc database myhive;可以查看数据文件在hdfs集群中的存储位置 5、创建库时制定hdfs的存储位置 create database myhive2 …

机器学习课程学习周报六

机器学习课程学习周报六 文章目录 机器学习课程学习周报六摘要Abstract一、机器学习部分1.1 循环神经网络概述1.2 循环神经网络架构1.2.1 深层循环神经网络1.2.2 Elman网络和Jordan网络1.2.3 双向循环神经网络 1.3 长短期记忆网络1.4 LSTM原理1.5 RNN的学习方式1.6 RNN中的梯度…