Springboot集成Debezium监听postgresql变更

news2025/3/29 14:40:55

在这里插入图片描述

1.创建springboot项目引入pom

<dependencies>
   <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>1.4.2.Final</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-postgres</artifactId>
        <version>1.4.2.Final</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-api</artifactId>
        <version>1.4.2.Final</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>


    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.fastjson2</groupId>
        <artifactId>fastjson2</artifactId>
        <version>2.0.43</version>
    </dependency>

</dependencies>

2.application.properties配置

# Debezium Configuration
debezium.name=my-postgres-connector
debezium.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
debezium.offset.storage.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\offsets_0.dat
debezium.offset.flush.interval.ms=60000
debezium.database.hostname=192.168.159.103
debezium.database.port=15432
debezium.database.user=postgres
debezium.database.password=123456
debezium.database.dbname=db_test
debezium.database.server.id=12345
debezium.database.server.name=customer-postgres-db-server
debezium.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.database.history.file.filename=C:\\Users\\Linging\\Desktop\\debezinum\\history_0.dat
debezium.table.include.list=public.user
debezium.column.include.list=public.user.id,public.user.name
debezium.publication.autocreate.mode=filtered
debezium.plugin.name=pgoutput
debezium.slot.name=dbz_customerdb_listener

3.配置类:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import java.io.IOException;

@Configuration
public class DebeziumConnectorConfig {

    @Bean
    public io.debezium.config.Configuration customerConnector(Environment env) throws IOException {
        return io.debezium.config.Configuration.create()
            .with("name", env.getProperty("debezium.name"))
            .with("connector.class", env.getProperty("debezium.connector.class"))
            .with("offset.storage", env.getProperty("debezium.offset.storage"))
            .with("offset.storage.file.filename", env.getProperty("debezium.offset.storage.file.filename"))
            .with("offset.flush.interval.ms", env.getProperty("debezium.offset.flush.interval.ms"))
            .with("database.hostname", env.getProperty("debezium.database.hostname"))
            .with("database.port", env.getProperty("debezium.database.port"))
            .with("database.user", env.getProperty("debezium.database.user"))
            .with("database.password", env.getProperty("debezium.database.password"))
            .with("database.dbname", env.getProperty("debezium.database.dbname"))
            .with("database.server.id", env.getProperty("debezium.database.server.id"))
            .with("database.server.name", env.getProperty("debezium.database.server.name"))
            //.with("database.history", "io.debezium.relational.history.MemoryDatabaseHistory")
            .with("database.history", env.getProperty("debezium.database.history"))
            .with("database.history.file.filename", env.getProperty("debezium.database.history.file.filename"))
            .with("table.include.list", env.getProperty("debezium.table.include.list")) //表名
            .with("column.include.list", env.getProperty("debezium.column.include.list")) // 表中得哪些字段
            .with("publication.autocreate.mode", env.getProperty("debezium.publication.autocreate.mode"))
            .with("plugin.name", env.getProperty("debezium.plugin.name"))
            .with("slot.name", env.getProperty("debezium.slot.name"))
            .build();
    }
}

4.注册监听

import io.debezium.config.Configuration;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;

@Slf4j
@Component
public class DebeziumListener {

    private final Executor executor = Executors.newSingleThreadExecutor();
    private final DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine;

    public DebeziumListener(Configuration customerConnectorConfiguration) {
        this.debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class))
            .using(customerConnectorConfiguration.asProperties())
            .notifying(this::handleChangeEvent)
            .build();
    }

    private void handleChangeEvent(RecordChangeEvent<SourceRecord> sourceRecordRecordChangeEvent) {
        SourceRecord sourceRecord = sourceRecordRecordChangeEvent.record();

        log.info("Key = {}, Value = {}", sourceRecord.key(), sourceRecord.value());
        Struct sourceRecordChangeValue= (Struct) sourceRecord.value();
        //log.info("SourceRecordChangeValue = '{}'",sourceRecordRecordChangeEvent);
         if (sourceRecordChangeValue != null) {
             Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));

             // 处理非读操作
             if(operation != Envelope.Operation.READ) {
                 String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;

                 Struct struct = (Struct) sourceRecordChangeValue.get(record);
                 Map<String, Object> payload = struct.schema().fields().stream()
                     .map(Field::name)
                     .filter(fieldName -> struct.get(fieldName) != null)
                     .map(fieldName -> Pair.of(fieldName, struct.get(fieldName)))
                     .collect(toMap(Pair::getKey, Pair::getValue));

                 // this.customerService.replicateData(payload, operation);
                 log.info("Updated Data: {} with Operation: {}", payload, operation.name());
             }
         }
    }

    @PostConstruct
    private void start() {
        this.executor.execute(debeziumEngine);
    }

    @PreDestroy
    private void stop() throws IOException {
        if (Objects.nonNull(this.debeziumEngine)) {
            this.debeziumEngine.close();
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2322353.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ubuntu22.04搭建freeradius操作说明

Ubuntu22.04搭建freeradius操作说明 更新依赖库 sudo apt update sudo apt install build-essential sudo apt install libtalloc-dev sudo apt install libssl-dev 按照freeradius sudo apt install freeradius 修改freeradius配置 文件路径如下 /etc/freeradius/3.…

【重装系统】全流程记录,在 MacOS 的电脑上烧录 Ubuntu 启动盘

背景 Ubuntu 无法联网&#xff0c;排查下来应该是网卡驱动的问题&#xff0c;安装驱动的过程中又缺失各种包需要网络&#xff0c;陷入死循环。 全流程以及相关资料 整体流程参考&#xff1a;【史上最全】重装ubuntu20.04系统基本环境配置 烧录启动盘启动盘插入需要重装的服务…

去中心化金融

什么是去中心化金融 去中心化金融&#xff08;Decentralized Finance&#xff0c;简称 DeFi&#xff09;是一种基于区块链技术构建的金融系统&#xff0c;旨在通过去除传统金融机构&#xff08;如银行、证券公司等&#xff09;作为中介&#xff0c;提供各种金融服务。这些服务…

centos 7 部署FTP 服务用shell 脚本搭建

#!/bin/bash# 检查是否以root身份运行脚本 if [ "$EUID" -ne 0 ]; thenecho "请以root身份运行此脚本。"exit 1 fi# 安装vsftpd yum install -y vsftpd# 启动vsftpd服务并设置开机自启 systemctl start vsftpd systemctl enable vsftpd# 配置防火墙以允许F…

VMware启动虚拟机报“另一个程序已锁定文件的一部分,进程无法访问”

解决方案&#xff1a; 1&#xff09;定位到虚拟机磁盘目录&#xff0c;我这里是“E\VM_Disk\CactiEZ\”这个目录&#xff0c;每个人目录不一样&#xff0c;详见上图报错位置 2&#xff09;在这个目录中找到后缀名以“.lck”结尾的目录&#xff0c;将所有以 .lck 结尾的目录删…

CPU架构和微架构

CPU架构&#xff08;CPU Architecture&#xff09; CPU架构是指处理器的整体设计框架&#xff0c;定义了处理器的指令集、寄存器、内存管理方式等。它是处理器设计的顶层规范&#xff0c;决定了软件如何与硬件交互。 主要特点&#xff1a; 指令集架构&#xff08;ISA, Instr…

帕金森病致生活艰难,如何缓解心理负担?

你是否留意到身边有人手部不由自主地颤抖&#xff0c;且肢体变得僵硬&#xff0c;行动也愈发迟缓&#xff1f;这很可能是帕金森病的症状。帕金森病是一种常见的神经系统退行性疾病&#xff0c;多发生于中老年人。​ 静止性震颤往往是帕金森病的首发症状&#xff0c;患者在安静状…

[Windows] Edge浏览器_134.0.3124.83绿色便携增强版-集成官方Deepseek侧边栏

微软Edge浏览器 绿色便携增强版 长期更新 链接&#xff1a;https://pan.xunlei.com/s/VOMA-aVC_GPJiv-MzRS89lsVA1?pwdemxj# Edge浏览器_134.0.3124.83绿色便携增强版-集成官方Deepseek侧边栏

从零构建大语言模型全栈开发指南:第二部分:模型架构设计与实现-2.2.3实战案例:在笔记本电脑上运行轻量级LLM

👉 点击关注不迷路 👉 点击关注不迷路 👉 点击关注不迷路 文章大纲 实战案例:在笔记本电脑上运行轻量级LLM2.2.3 模型架构设计与实现1. 环境与工具准备1.1 硬件要求1.2 软件栈选择2. 轻量级模型架构设计2.1 模型参数配置2.2 关键技术优化3. 实战流程3.1 数据准备流程3.2…

CAN基础知识学习二

一、控制器局域网总线&#xff08;CAN&#xff0c;Controller Area Network&#xff09;&#xff1b; 二、CAN FD 是CAN with Flexible Data rate的缩写&#xff0c;翻译为【可变速率的 CAN】 CAN-FD 采用了两种位速率&#xff1a;从控制场中的 BRS 位到 ACK 场之前&#xff08…

新能源行业:卓越 UE/UI 设计,引领业务腾飞的新引擎

在全球积极推动可持续发展的大背景下&#xff0c;新能源行业蓬勃兴起&#xff0c;成为经济发展的新引擎。在这个充满机遇与挑战的赛道上&#xff0c;优秀的用户体验&#xff08;UE&#xff09;和用户界面&#xff08;UI&#xff09;设计正扮演着愈发关键的角色&#xff0c;它不…

Docker镜像相关命令(Day2)

文章目录 前言一、问题描述二、相关命令1.查看镜像2.搜索镜像3.拉取镜像4.删除镜像5.镜像的详细信息6.标记镜像 三、验证与总结 前言 Docker 是一个开源的容器化平台&#xff0c;它让开发者能够将应用及其依赖打包到一个标准化的单元&#xff08;容器&#xff09;中运行。在 D…

LangChain4J开源开发框架简介

目录 1.1、前言1.2、集成方式简单1.3、核心功能与优势1.4、两种调用方式1.5、链式调用示例代码1.6、AI服务调用示例代码1.7、典型使用场景1.8、总结 1.1、前言 LangChain4J 是一个专为 Java 开发者设计的开源框架&#xff0c;旨在简化大型语言模型&#xff08;LLMs&#xff09;…

SpringBoot集成Elasticsearch 7.x spring-boot-starter-data-elasticsearch 方式

SpringBoot集成Elasticsearch 7.x | spring-boot-starter-data-elasticsearch 方式 前言添加maven依赖配置application.properties测试实体类 方式一&#xff1a;继承 ElasticsearchRepository&#xff08;适合简单查询&#xff09; 直接使用想自定义自己的Repository接口 方式…

STM32蜂鸣器播放音乐

STM32蜂鸣器播放音乐 STM32蜂鸣器播放音乐 Do, Re, Mi, Fa, 1. 功能概述 本系统基于STM32F7系列微控制器&#xff0c;实现了以下功能&#xff1a; 通过7个按键控制蜂鸣器发声&#xff0c;按键对应不同的音符。每个按键对应一个音符&#xff08;Do, Re, Mi, Fa, Sol, La, Si&a…

解码未来:DeepSeek开源FlashMLA,推理加速核心技术,引领AI变革

前言&#xff1a; DeepSeek 兑现了自己的诺言&#xff0c;开源了一款用于 Hopper GPU 的高效型 MLA 解码核&#xff1a;FlashMLA。 项目地址&#xff1a;https://github.com/deepseek-ai/FlashMLA 1:FlashMLA 是什么呀&#xff1f; MLA是DeepSeek大模型的重要技术创新点&…

leetcode:136. 只出现一次的数字(python3解法)

难度&#xff1a;简单 给你一个 非空 整数数组 nums &#xff0c;除了某个元素只出现一次以外&#xff0c;其余每个元素均出现两次。找出那个只出现了一次的元素。 你必须设计并实现线性时间复杂度的算法来解决此问题&#xff0c;且该算法只使用常量额外空间。 示例 1 &#xf…

Isaac Sim与Isaac Lab初使用

目录 基于Omiverse下载Isaacsim安装Isaac Lab配置isaacsim环境测试克隆仓库配置python环境强化学习训练的测试 IsaacLab模板配置vscode环境ros接口安装 作为nvidia出品的仿真软件&#xff0c;很多机器人、机器狗【具身智能】都可以有很不错的效果&#xff0c;所以会使用isaac s…

Spring AI Alibaba 工具(Function Calling)使用

一、工具(Function Calling)简介 Spring AI Alibaba工具(Function Calling)&#xff1a;https://java2ai.com/docs/1.0.0-M6.1/tutorials/function-calling/ 1、工具(Function Calling) “工具&#xff08;Tool&#xff09;”或“功能调用&#xff08;Function Calling&#xf…

Touch Diver:Weart为XR和机器人遥操作专属设计的触觉反馈动捕手套

在虚拟现实&#xff08;VR&#xff09;和扩展现实&#xff08;XR&#xff09;领域&#xff0c;触觉反馈技术正逐渐成为提升沉浸感和交互体验的重要因素。Weart作为这一领域的创新者&#xff0c;凭借其TouchDIVER Pro和TouchDIVER G1触觉手套&#xff0c;为用户带来了高度逼真的…