JAVA通过debezium实时采集mysql数据

news2024/9/24 7:26:17

前期准备

需要提前安装mysql并且开启binlog,需要准备kafka和zookeeper环境

示例采用debezium1.9.0版本

Maven配置

<version.debezium>1.9.0.Final</version.debezium>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-api</artifactId>

    <version>${version.debezium}</version>

</dependency>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-embedded</artifactId>

    <version>${version.debezium}</version>

</dependency>

<dependency>

    <groupId>io.debezium</groupId>

    <artifactId>debezium-connector-mysql</artifactId>

    <version>${version.debezium}</version>

</dependency>

Java代码

import io.debezium.engine.ChangeEvent;

import io.debezium.engine.DebeziumEngine;

import io.debezium.engine.format.Json;

import java.io.IOException;

import java.util.Properties;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

public class DebeziumTest {

    private static DebeziumEngine<ChangeEvent<String, String>> engine;

    public static void main(String[] args) throws Exception {

        final Properties props = new Properties();

        props.setProperty("name", "dbz-engine");

        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");

        //offset config begin - 使用文件来存储已处理的binlog偏移量

        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");

//        props.setProperty("offset.storage", "org.apache.kafka.connect.storage.KafkaOffsetBackingStore");

        props.setProperty("offset.storage.file.filename", "D:/tmp/dbz/storage/mysql_offsets.dat");

        props.setProperty("offset.flush.interval.ms", "0");

        //offset config end

        props.setProperty("database.server.name", "mysql-connector");

        props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");

        props.setProperty("database.history.file.filename", "D:/tmp/dbz/storage/mysql_dbhistory.txt");

        props.setProperty("database.server.id", "122110"); //随机设置

        props.setProperty("database.hostname", "localhost");

        props.setProperty("database.port", "3306");

        props.setProperty("database.user", "root");

        props.setProperty("database.password", "123456");

        props.setProperty("database.include.list", "test");//要捕获的数据库名

        props.setProperty("topic.prefix", "mysql-");

//        props.setProperty("table.include.list", "inventory.a");//要捕获的数据表

        props.setProperty("snapshot.mode", "initial");//全量+增量

        props.setProperty("bootstrap.servers", "localhost:9092");

        props.setProperty("topic", "test");

//        props.setProperty("offset.storage.topic", "log-t");

//        props.setProperty("offset.storage.partitions", "1");

//        props.setProperty("offset.storage.replication.factor", "1");

//        KafkaProducerTest test = new KafkaProducerTest("test1841");

        // 使用上述配置创建Debezium引擎,输出样式为Json字符串格式

        engine = DebeziumEngine.create(Json.class)

                .using(props)

                .notifying(record -> {

                    //test.sendMsg(record.value());

                    System.out.println(record.value());//输出到控制台

                })

                .using((success, message, error) -> {

                    if (error != null) {

                        error.printStackTrace();

                        // 报错回调

                        System.out.println("------------error, message:" + message);

                        System.out.println( "exception:" + error);

                    }

                    closeEngine(engine);

                })

                .build();

        ExecutorService executor = Executors.newSingleThreadExecutor();

        executor.execute(engine);

        addShutdownHook(engine);

        awaitTermination(executor);

        System.out.println("------------main finished.");

    }

    private static void closeEngine(DebeziumEngine<ChangeEvent<String, String>> engine) {

        try {

            engine.close();

        } catch (IOException ignored) {

        }

    }

    private static void addShutdownHook(DebeziumEngine<ChangeEvent<String, String>> engine) {

        Runtime.getRuntime().addShutdownHook(new Thread(() -> closeEngine(engine)));

    }

    private static void awaitTermination(ExecutorService executor) {

        if (executor != null) {

            try {

                executor.shutdown();

                while (!executor.awaitTermination(5, TimeUnit.SECONDS)) {

                }

            } catch (InterruptedException e) {

                Thread.currentThread().interrupt();

            }

        }

    }

}

运行效果

随意修改一条mysql表中的数据

修改后

代码日志

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"create_time"}],"optional":true,"name":"mysql_connector.di_test.t_user.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":true,"field":"name"},{"type":"int32","optional":true,"field":"age"},{"type":"string","optional":true,"field":"phone"},{"type":"string","optional":true,"field":"address"},{"type":"string","optional":true,"name":"io.debezium.time.ZonedTimestamp","version":1,"field":"create_time"}],"optional":true,"name":"mysql_connector.di_test.t_user.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false,incremental"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"sequence"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"mysql_connector.di_test.t_user.Envelope"},"payload":{"before":{"id":2,"name":"lisi2","age":19,"phone":"18444444","address":"北京海淀1","create_time":"2023-12-28T06:47:07Z"},"after":{"id":2,"name":"lisi2","age":19,"phone":"18444444","address":"北京海淀","create_time":"2023-12-28T06:47:07Z"},"source":{"version":"1.9.0.Final","connector":"mysql","name":"mysql-connector","ts_ms":1722423711000,"snapshot":"false","db":"di_test","sequence":null,"table":"t_user","server_id":1,"gtid":null,"file":"binlog.000008","pos":3134,"row":0,"thread":17,"query":null},"op":"u","ts_ms":1722423711663,"transaction":null}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1964013.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【大模型系列篇】本地问答系统-部署Ollama、Open WebUI

部署本地大模型&#xff0c;结合Ollama、Open WebUI以及本地RAG&#xff08;Retrieval-Augmented Generation&#xff09;可以为用户提供一个强大的本地知识库和交互式对话系统。以下是详细的部署步骤和功能介绍&#xff1a; 一、部署Ollama 访问Ollama官网&#xff1a;首先&…

【3】Blazor链接数据库

【3】Blazor链接数据 一、引入Nuget包二、添加链接字符串三、创建DbContext四、注入SqlServer数据库五、执行数据库迁移六、创建用户信息页面七、结果展示 一、引入Nuget包 Microsoft.EntityFrameworkCore Microsoft.EntityFrameworkCore.SqlServer Microsoft.EntityFramework…

Kafka的搭建及使用

Kafka搭建及使用 Kafka搭建 1、上传解压修改环境变量 # 解压 tar -zxvf kafka_2.11-1.0.0.tgz -C /usr/local/soft mv kafka_2.11-1.0.0 kafka-1.0.0tar -xvf 是一个在Unix和类Unix操作系统&#xff08;如Linux和macOS&#xff09;中用于解压缩或解包.tar文件的命令。 tar -…

java调用WebService接口

案例&#xff1a; 接口&#xff1a; http://xxxxx:8080/GetSPService.asmx 调用方法&#xff1a;GetSPByStnCodeToJsonStr 参数1&#xff1a;begin 开始时间 格式 yyyymmdd hh:mi &#xff08;日和小时之间有空格&#xff09; 例如&#xff1a;20230718 06:00 参数2: end …

IO模型思维导图

背景 &#xff1a; 并发服务器模型可以在同一时刻响应多个用户请求 多路复用IO&#xff1a; 4.多路复用IO 1.select 2.poll 3.epoll 1.select 缺点: 1.select监听文件描述符最大个数为1024 &#xff08;数组&#xff…

【CN】Argo 持续集成和交付(二)

7.25.通知 概述 Argo CD 通知持续监控 Argo CD 应用程序&#xff0c;并提供一种灵活的方式来通知用户应用程序状态的重要变化。使用灵活的触发器和模板机制&#xff0c;可以配置何时发送通知以及通知内容。Argo CD 通知包含有用的触发器和模板目录。因此&#xff0c;可以直接…

什么是住宅IP代理?有何用途?

在大数据时代&#xff0c;互联网成为我们生活与工作中不可或缺的一部分。然而&#xff0c;随着网络环境的日益复杂&#xff0c;隐私保护、网络访问限制等问题也逐渐凸显&#xff1b;以及跨境业务蓬勃发展。在这样的背景下&#xff0c;住宅IP代理作为一种技术解决方案&#xff0…

android studio 无法识别androidTest模块Test模块

android studio 无法识别androidTest模块Test模块 问题案例解决方法 androidTest是UI测试&#xff0c;可以运行在设备或虚拟设备上&#xff0c;需要编译打包为APK在设备上运行 版本依赖 android studio 2023.2.1 gradle kts架构 问题案例 下面无法识别到androidTest&#xff…

供应链|OR论文解读:具有内生随机交货期的双源库存最优策略

编者按 本次解读的文章发表于 Operations Research&#xff0c;原文信息&#xff1a;Song, J. S., Xiao, L., Zhang, H., & Zipkin, P. (2017). Optimal policies for a dual-sourcing inventory problem with endogenous stochastic lead times. Operations Research, 65…

11. 统计(均值、方差、正态分布)和聚类(接近kmeans的聚类)分类(python和c++代码)

以下代码的每个函数功能都做了注释&#xff0c;分别用python和c代码做了具体的实现&#xff0c;不是最终效果&#xff0c;后续会继续优化。以下代码中&#xff0c;python代码在每个步骤处理完数据后都画了散点图显示了处理后的数据效果&#xff0c;c代码是从python代码翻译过来…

学习大数据DAY28 python基础语法

目录 思维导图 数据类型 变量 输入 输出 运算符 bool 类型 常量变量拼接 流程控制 选择结构 if 循环结构 while 及 for 循环 字符串(String) 字符串的索引截取 索引 1.len() #获取字符串长度 2.str.find()字符查找 ,找到返回索引&#xff0c;没找到返回-1 3.st…

[Linux安全运维] iptables包过滤

前言 防火墙是网络安全中非常重要的设备&#xff0c;是一种将内部网络和外部网络隔离开的技术。简单来说&#xff0c;防火墙技术就是访问控制技术&#xff0c;由规则和动作组成。 目录 前言1. Linux 包过滤防火墙1 .1 概述1 .2 四表五链结构1 . 2 .1 规则表1 . 2 .2 规则链1 .…

扩散模型系列ControlNet: Adding Conditional Control to Text-to-Image Diffusion Models

向文本到图像扩散模型添加条件控制 摘要解读&#xff1a; 我对摘要英文的理解&#xff1a; 我们提出了一个神经网络架构ControlNet&#xff0c;可以向大规模的预训练好的文本到图像的扩散模型中添加空间条件控制。ControlNet锁住了准备生产的大规模扩散模型&#xff0c;并且重…

SLAM特征提取新变革:神经符号学结合自适应优化,实现环境适应性大飞跃!

论文标题&#xff1a; A Neurosymbolic Approach to Adaptive Feature Extraction in SLAM 论文作者&#xff1a; Yasra Chandio, Momin A. Khan, Khotso Selialia, Luis Garcia, Joseph DeGol, Fatima M. Anwar 导读&#xff1a; 本研究提出了一种创新的神经符号学方法&a…

Mybatis进阶提升-(一)Mybatis入门

前言 Mybatis是Java 项目开发使用率非常高的一款持久层框架&#xff0c;它支持自定义 SQL、存储过程以及高级映射。MyBatis 免除了几乎所有的 JDBC 代码以及设置参数和获取结果集的工作。MyBatis 可以通过简单的 XML 或注解来配置和映射原始类型、接口和 Java POJO&#xff08…

python+selenium+unittest自动化测试框架

前言 关于自动化测试的介绍&#xff0c;网上已有很多资料&#xff0c;这里不再赘述&#xff0c;UI自动化测试是自动化测试的一种&#xff0c;也是测试金字塔最上面的一层&#xff0c;selenium是应用于web的自动化测试工具&#xff0c;支持多平台、多浏览器、多语言来实现自动化…

【实战】SpringBoot整合ffmpeg实现动态拉流转推

SpringBoot整合ffmpeg实现动态拉流转推 在最近的开发中&#xff0c;遇到一个 rtsp 协议的视频流&#xff0c;前端vue并不能直接播放&#xff0c;因此需要对流进行处理。在网上查阅后&#xff0c;ffmpeg和webrtc是最多的解决方案&#xff0c;但是使用webrtc的时候没成功&#x…

交通 | 不确定条件下旅行者路径选择的K阶均值偏差模型

摘要 现实世界中的交通网络通常具有随机特性&#xff0c;旅行时间可靠性自然成为影响旅行者路线选择的关键因素。在这种情况下&#xff0c;仅凭平均路径旅行时间可能无法充分代表路径对旅行者的吸引力&#xff0c;本研究引入了 k k k 阶均值偏差模型&#xff0c;用于优化大型…

紫辉创投开启Destiny of Gods首轮投资,伯乐与千里马的故事仍在继续

近日&#xff0c;上海紫辉创业投资有限公司&#xff08;以下简称“紫辉创投”&#xff09;宣布开启GameFi链游聚合平台Destiny of Gods首轮投资500,000美金&#xff0c;并与其达成全面战略及业务层合作&#xff0c;双方将协同布局链上生态&#xff0c;共同推动链游行业健康发展…

研究人员可以采用什么策略来批判性地评估和综合其领域的不同文献

VersaBot Literature Review 一键生成文献综述 研究人员可以采用各种策略来批判性地评估和综合其领域内的不同文献&#xff1b; 评估策略 审查方法论&#xff1a; 分析每个来源中使用的研究设计、样本选择、数据收集和分析方法。考虑每种方法的潜在偏见、局限性和优势。评估…