Debezium-Embedded 实时监控MySQL数据变更

news2024/11/15 18:04:11

1.Debezium-Embedded 简介

        Debezium连接器的操作通常是将它们部署到Kafka Connect服务,并配置一个或多个连接器来监控上游数据库,并为它们在上游数据库中看到的所有更改生成数据更改事件。这些数据更改事件被写入Kafka,在那里它们可以被许多不同的应用程序独立使用。Kafka Connect提供了出色的容错性和可扩展性,因为它作为分布式服务运行,并确保所有注册和配置的连接器始终在运行。例如,即使集群中的一个Kafka Connect端点出现故障,其余的Kafka连接端点也会重新启动以前在现已终止的端点上运行的任何连接器,从而最大限度地减少停机时间并消除管理活动。

        并不是每个应用程序都需要这种级别的容错和可靠性,他们可能不想依赖外部的Kafka代理和Kafka Connect服务集群。相反,一些应用程序更喜欢将Debezium连接器直接嵌入到应用程序空间中。他们仍然想要相同的数据更改事件,但更喜欢让连接器将它们直接发送到应用程序,而不是将它们保存在Kafka中。

        这个Debezium-Embedded模块定义了一个小型库,允许应用程序轻松配置和运行debezium连接器。

2.MySQL端配置

2.1 开启日志

        MySQL开启日志配置可参考MySQL 主从配置-CSDN博客实现。

show variables like 'log_%';

2.2 创建监控账号并授权

#创建账号
create user debezium@'%' identified with mysql_native_password by 'wsx-123';
#给账号授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';
#刷新权限
FLUSH PRIVILEGES;

3.应用端开发

3.1 maven 引用debezium-embedded

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${debezium-embedded.version}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-mysql</artifactId>
    <version>${debezium-embedded.version}</version>
</dependency>

3.2 代码开发

package com.dayesmart.dataplusjava.util;

import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

@Slf4j
public class DebeziumTest {
    public static void main(String[] args) {
        Executor executor = Executors.newSingleThreadExecutor();
        Configuration config = Configuration.create()
                /* begin engine properties */
                .with("connector.class",
                        "io.debezium.connector.mysql.MySqlConnector")
                .with("offset.storage",
                        "org.apache.kafka.connect.storage.FileOffsetBackingStore")
                .with("offset.storage.file.filename",
                        "E:/tmp/debezium/offset.dat")
                .with("offset.flush.interval.ms", 60000)
                /* begin connector properties */
                .with("name", "my-sql-connector")
                .with("database.hostname", "127.0.0.1")
                .with("database.port", 3307)
                .with("database.user", "debezium")
                .with("database.password", "wsx-123")
                .with("database.connectionTimeZone", "Asia/Shanghai")
                .with("database.server.id", 85744)
                .with("database.include.list","test")
                .with("snapshot.mode","initial")
                .with("database.server.name","weisx")
                .with("database.history",
                        "io.debezium.relational.history.FileDatabaseHistory")
                .with("database.history.file.filename",
                        "E:/tmp/debezium/schemahistory.dat")
                .build();

// Create the engine with this configuration ...
        EmbeddedEngine engine = EmbeddedEngine.create()
                .using(config)
                .notifying(new EmbeddedEngine.ChangeConsumer(){
                    @Override
                    public void handleBatch(List<SourceRecord> list, DebeziumEngine.RecordCommitter<SourceRecord> recordCommitter) throws InterruptedException {
                        log.info("{}",list);
                    }
                })
                .using((success,message,error) ->{
                    log.info("success:{},message:{},error:{}",success,message,error);

                })
                .build();

// Run the engine asynchronously ...

        executor.execute(engine);

    }

}


 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1212051.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Wireshark抓包工具配置以及MQTT抓包分析

1、Wireshark抓包工具使用 打开Wireshark选择&#xff0c;需要抓取的物理网卡&#xff0c;添加过滤设置。 单击“捕获”&#xff0c;选择选项&#xff0c;输入需要捕获的IP地址和端口号。 如&#xff1a; ip host 10.60.4.45 and tcp port 1883 ip host 10.60.4.45 and http p…

JVM虚拟机:垃圾回收之三色标记

本文重点 在前面的课程中我们已经学习了垃圾回收器CMS和G1,其中CMS和G1中的mixedGC都存在四个过程,这四个过程中有一个过程叫做并发标记,也就是说程序一边运行,一边标记垃圾。这个过程最困难的是:如果在标记垃圾的时候,如果对象的引用关系发生了改变,此时应该如何处理?…

吴恩达《机器学习》8-5->8-6:特征与直观理解I、样本与值观理解II

8.5、特征与直观理解I 一、神经网络的学习特性 神经网络通过学习可以得出自身的一系列特征。相对于普通的逻辑回归&#xff0c;在使用原始特征 x1​,x2​,...,xn​ 时受到一定的限制。虽然可以使用一些二项式项来组合这些特征&#xff0c;但仍然受到原始特征的限制。在神经网…

mysql之搭建MHA架构实现高可用

1、定义 全称是masterhigh avaliabulity。基于主库的高可用环境下可以实现主从复制及故障切换&#xff08;基于主从复制才能故障切换&#xff09; MHA最少要求一主两从&#xff0c;半同步复制模式 2、作用 解决mysql的单点故障问题。一旦主库崩溃&#xff0c;MHA可以在0-30…

Postman工具简介

介绍 Postman是一个商业的接口测试工具。免费的版本也可以使用不少功能。 官网&#xff1a;https://www.postman.com/ 下载、安装、应用界面 下载 安装、安装成功以后的应用界面 双击下载下来的可执行文件进行安装&#xff0c;出现如下界面&#xff1a; 可以注册一个账…

黑马程序员微服务第四天课程 分布式搜索引擎1

分布式搜索引擎01 – elasticsearch基础 0.学习目标 1.初识elasticsearch 1.1.了解ES 1.1.1.elasticsearch的作用 elasticsearch是一款非常强大的开源搜索引擎&#xff0c;具备非常多强大功能&#xff0c;可以帮助我们从海量数据中快速找到需要的内容 例如&#xff1a; …

新一轮SocialFi浪潮来袭,Atem Network 再次打响注意力争夺战

火爆如潮的 Atem Network 再次从 CyberConnect 以及 Friend.tech 手中接过 SocialFi 赛道的热度大棒&#xff0c;同时这也表明&#xff0c;协议层仍将是 Web3 社交领域的主要叙事。 前不久&#xff0c;Web3社交协议Atem Network 在白皮书中披露了ATEM的代币经济模型&#xff0c…

【论文阅读】(VAE)Auto-Encoding Variational Bayes

论文地址&#xff1a;[1312.6114] Auto-Encoding Variational Bayes (arxiv.org) 【前言】&#xff1a;VAE模型是Kingma(也是Adam的作者)大神在2014年发表的文章&#xff0c;是一篇非常非常经典&#xff0c;且实现非常优雅的生成模型&#xff0c;同时它还为bayes概率图模型难以…

WorkPlus AI助理知识问答机器人,助力企业级私有化AI构建

ChatGPT以及其他大语言模型展现了令人惊叹的广博知识、语义理解能力与创造能力。它们能够在会话中承认自身错误并进行改正&#xff0c;还能进行一定程度的逻辑推理&#xff0c;具备多语种翻译与多语言编程等"超能力"&#xff0c;可胜任多种自然语言处理任务。 然而&…

JavaEE进阶学习:Spring 的创建和使用

Spring 就是⼀个包含了众多工具方法的 IoC 容器。既然是容器那么它就具备两个最基本的功能&#xff1a; 将对象存储到容器&#xff08;Spring&#xff09;中从容器中将对象取出来 接下来使用 Maven 方式来创建一个 Spring 项目&#xff0c;创建 Spring 项目和 Servlet 类似&a…

RequestContextHolder详解

最近遇到的问题是在service获取request和response,正常来说在service层是没有request的,然而直接从controlller传过来的话解决方法太粗暴,后来发现了SpringMVC提供的RequestContextHolder遂去分析一番,并借此对SpringMVC的结构深入了解一下,后面会再发文章详细分析源码 1.Reque…

vite环境变量相关

环境变量&#xff1a;根据环境的不同&#xff0c;灵活的自动读取相应的变量。避免了手动修改。 import path from path import postCssPxToRem from postcss-pxtorem import { defineConfig, loadEnv } from vite import createVitePlugins from ./vite/plugins import copy f…

初识VBA代码及应用VBA代码第四节:如何录制宏

《VBA之Excel应用》&#xff08;10178983&#xff09;是非常经典的&#xff0c;是我推出的第七套教程&#xff0c;定位于初级&#xff0c;目前是第一版修订。这套教程从简单的录制宏开始讲解&#xff0c;一直到窗体的搭建&#xff0c;内容丰富&#xff0c;实例众多。大家可以非…

早安心语早读:熬出来的人生,有你预见不了的美好

1、熬出来的岁月&#xff0c;有你想象不到的精彩&#xff1b;熬出来的人生&#xff0c;有你预见不了的美好&#xff01;来这个世界一次&#xff0c;谁没有办法活第二次&#xff0c;一次的生命&#xff0c;一定要无悔才行&#xff01;那就让我们&#xff0c;拼一个岁月静好&…

C++ Qt 学习(八):Qt 绘图技术与图形视图

1. 常见 18 种 Qt 绘图技术 1.1 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <memory> #include <QTreeView> #include "CPaintWidget.h"using namespace std;class Widget : public QWidget {Q_OBJECTpublic:Widget…

Oneid方案

一、前文 用户画像的前提是标识出用户&#xff0c;存在以下场景&#xff1a;不同业务系统对同一个人的标识&#xff0c;匿名用户行为的行为归因&#xff1b;本文提供多种解决方案&#xff0c;提供大家思考。 二、方案矩阵 三、其他 相关连接&#xff1a; 如何通过图算法能力获…

【经验记录】Ubuntu系统安装xxxxx.tar.gz报错ImportError: No module named setuptools

最近在Anaconda环境下需要离线状态&#xff08;不能联网的情况&#xff09;下安装一个xxxxx.tar.gz格式的包&#xff0c;将对应格式的包解压后&#xff0c;按照如下命令进行安装 sudo python setup.py build # 编译 sudo python setup.py install # 安装总是报错如下信息&am…

金蝶云星空单据体启用块粘贴

文章目录 金蝶云星空单据体启用块粘贴 金蝶云星空单据体启用块粘贴

浅谈JavaScript闭包,小白的JS学习之路!

前言 在JavaScript中&#xff0c;闭包是一种强大而灵活的特性&#xff0c;它不仅允许变量私有化&#xff0c;而且提供了一种在函数执行完毕后仍然保持对外部作用域变量引用的机制。本文将深入讨论JavaScript闭包的概念、优点、缺点以及如何避免潜在的内存泄漏问题。 调用栈与…

【Ubuntu·系统·的Linux环境变量配置方法最全】

文章目录 概要读取环境变量的方法小技巧 概要 在Linux环境中&#xff0c;配置环境变量是一种常见的操作&#xff0c;用于指定系统或用户环境中可执行程序的搜索路径。 读取环境变量的方法 在Linux中&#xff0c;可以使用以下两个命令来读取环境变量&#xff1a; export 命令…