使用Debezium的API实时获取Oracle的变更

news2024/11/30 12:50:23

Debezium包

想在代码中自定义监控Oralce,以及其它很多数据库的变更,可以通过Debezium的API。

在我们的项目中,需要引入debezium-api、debezium-embeded这两个包。

比如maven项目:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-api</artifactId>
    <version>${version.debezium}</version>
</dependency>
<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-embedded</artifactId>
    <version>${version.debezium}</version>
</dependency>

另外就是oralce的专用包debezium-connector-oralce:

<dependency>
    <groupId>io.debezium</groupId>
    <artifactId>debezium-connector-oracle</artifactId>
    <version>${version.debezium}</version>
</dependency>

其中version.debezium需要自己做好选择,因为不同的版本,需要的JDK并不一样。

目前,最新的debezium稳定版本是3.0,需要的Java版本至少是17。更早的稳定版本是2.7,需要的Java版本是11。具体信息可以查看这里这张表

为了更久的支持,推荐选用更新的版本。

核心回调函数

使用Debezium的API监控数据库变更,非常简单,核心是使用DebeziumEngine。

DebeziumEngine是一个Runable,使用构建者模式的Builder类来完成构造之后,把它提交给一个Executor即可。

这个DebeziumEngine支持两种回调函数,一个是```

java.util.function.Consumer<R>

,即每次进入回调函数,处理一条记录。

还有一个回调函数,是

 void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;

为了提高处理性能,最好使用这个handleBatch的“批处理”回调函数。

需要注意的是,这俩回调函数没有返回值。Debezium的处理方法是,只要回调函数抛出异常,就整个监听结束。

这个函数的第一个参数records,是一个R的列表,即如果使用Consumer<R>的话那个记录,使用这个handleBatch的话则是记录列表。

第二个参数是一个RecordCommiter,这个是用来控制监控进度的,它有两个主要的方法:

  • markProcessed(R record); 用来标记一条记录已经处理;
  • makrBatchFinished(); 用来表示整个一批记录已经处理。

EmbededEngine的构造

现在最新的DebeziumEngine叫做AsyncDebeziumEngine,相比过去的老Engine,它多了很多功能,比如可以支持并发处理。

构造EmbededEngine的最简单示例为:

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine
        .create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),
                "io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory")
        .using(props)
        .notifying(record -> {
            System.out.println(record);
        }).build()
    ) {

    ExecutorService executor = Executors.newSingleThreadExecutor();
    executor.execute(engine);

其中,notifying就是注册的回调函数,这里示例用了一个匿名函数,还可以写一个handleBatch的,同样是使用notifying构建进去。

而.create()传进去的KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class)则会在回调函数中,把变更记录的Key、Value还有Header用Json格式输出。

这里支持这几种格式,可以灵活选用:

  • Connect.class - 输出是Kafka连接器的SourceRecord
  • Json.class - 输出是一对JSON字符串
  • JsonByteArray.class - 输出是使用UTF-8编码为byte数组的JSON
  • Avro.class - 参见Avro Serialization
  • CloudEvents.class - 参见Cloud Events

Debezium参数配置

传入进去的Properties,有很多参数,下面通过一个示例来举例说明:

        properties = new Properties();

        // 任务名称
        properties.setProperty("name", "test1");

        // 数据库类型,这里是Oracle
        properties.setProperty("connector.class", "io.debezium.connector.oracle.OracleConnector");

        // 数据库地址、端口、用户名、密码
        properties.setProperty("database.hostname", "192.168.1.2");
        properties.setProperty("database.port", String.valueOf(1521));
        properties.setProperty("database.user", "usr1");
        properties.setProperty("database.password", "psw1");
        
        // 数据库的DBId
        properties.setProperty("database.dbid", String.valueOf(1111111111));
        // 数据库名
        properties.setProperty("database.dbname", "orcl");

        // 快照模式
        properties.setProperty("snapshot.mode", "initial");

        // 主题
        properties.setProperty("topic.prefix", "test1");

        properties.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
                 properties.setProperty("schema.history.internal.file.filename", "/data/history.dat");

        // 每5秒刷一次偏移量
        properties.setProperty("offset.flush.interval.ms", "5000");
        properties.setProperty("offset.storage.file.filename", "/data/offsets.log");

        // 监控的数据库列表
        properties.setProperty("database.include.list", "orcl");
        // 监控的schema列表
        properties.setProperty("schema.include.list", "usr1");

        // 监控的数据表列表
        properties.setProperty("table.include.list", "usr1.userinfo,usr1.permission");

其中,监控的schema默认是用户名,而监控的数据表则是schema.table这种格式。

在使用过程中,如果遇到内存不够的问题,还可以调整这几个参数:

        // 批量大小设置,这里三个数值是默认值。
        properties.setProperty("log.mining.batch.size.min", String.valueOf(1000));
        properties.setProperty("log.mining.batch.size.max", String.valueOf(100000));
        properties.setProperty("log.mining.batch.size.default", String.valueOf(20000));

Oracle设置

要使用Debezium监控数据库变更,还需要Oracle的服务

  1. Oracle开启补充日志
  2. 打开归档日志模式

开启补充日志比较简单,以下一条命令就可以:

alter database add supplemental log data;

打开归档模式也很简单,但是需要注意需要用户登录为sysdba。

以下命令可以查询是否打开了归档模式:

select log_mode from v$database;

如果显示不是归档日志模式,以下命令可以打开。过程为先连接到sysdba,之后关闭数据库实例,之后用mount模式启动实例(即不打开数据库),然后更改为归档日志模式,然后再打开数据库。

conn system/oracle as sysdba; 
shutdown immediate; 
startup mount; 
alter database archivelog;
alter database open;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2250432.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

.net XSSFWorkbook 读取/写入 指定单元格的内容

方法如下&#xff1a; using NPOI.SS.Formula.Functions;using NPOI.SS.UserModel;using OfficeOpenXml.FormulaParsing.Excel.Functions.DateTime;using OfficeOpenXml.FormulaParsing.Excel.Functions.Numeric;/// <summary>/// 读取Excel指定单元格内容/// </summa…

10个Word自动化办公脚本

在日常工作和学习中&#xff0c;我们常常需要处理Word文档&#xff08;.docx&#xff09;。 Python提供了强大的库&#xff0c;如python-docx&#xff0c;使我们能够轻松地进行文档创建、编辑和格式化等操作。本文将分享10个使用Python编写的Word自动化脚本&#xff0c;帮助新…

红日靶场-5

环境搭建 这个靶场相对于前几个靶场来说较为简单&#xff0c;只有两台靶机&#xff0c;其中一台主机是win7&#xff0c;作为我们的DMZ区域的入口机&#xff0c;另外一台是windows2008&#xff0c;作为我们的域控主机&#xff0c;所以我们只需要给我们的win7配置两张网卡&#…

[Java]微服务之分布式事务

介绍 下单业务&#xff0c;前端请求首先进入订单服务&#xff0c;创建订单并写入数据库。然后订单服务调用购物车服务和库存服务: 购物车服务负责清理购物车信息库存服务负责扣减商品库存 问题分析: 下单过程中, 订单服务创建订单, 插入自己的数据库, 执行成功购物车服务, 清…

存储结构及关系(一)

学习目标 描述数据库的逻辑结构列出段类型及其用途列出控制块空间使用的关键字获取存储结构信息 段的类型 段是数据库中占用空间的对象。它们使用数据库数据文件中的空间。介绍不同类型的段。 表 表是在数据库中存储数据的最常用方法。表段用于存储既没有集群也没有分区的表…

cesium 3dtile ClippingPlanes 多边形挖洞ClippingPlaneCollection

原理就是3dtiles里面的属性clippingPlanes 采用ClippingPlaneCollection&#xff0c;构成多边形来挖洞。 其次就是xyz法向量挖洞 clippingPlanes: new this.ffCesium.Cesium.ClippingPlaneCollection({unionClippingRegions: true, // true 表示多个切割面能合并为一个有效的…

AMD的AI芯片Instinct系列介绍

AMD最强AI芯片发布&#xff01; 在旧金山举行的Advancing AI 2024大会上&#xff0c;AMD推出Instinct MI325X AI加速器&#xff08;以下简称MI325X&#xff09;&#xff0c;直接与英伟达的Blackwell芯片正面交锋。 现场展示的数据显示&#xff0c;与英伟达H200的集成平台H200 …

【大数据学习 | Spark调优篇】Spark之内存调优

1. 内存的花费 1&#xff09;每个Java对象&#xff0c;都有一个对象头&#xff0c;会占用16个字节&#xff0c;主要是包括了一些对象的元信息&#xff0c;比如指向它的类的指针。如果一个对象本身很小&#xff0c;比如就包括了一个int类型的field&#xff0c;那么它的对象头实…

基于深度学习的卷积神经网络十二生肖图像识别系统(PyQt5界面+数据集+训练代码)

本研究提出了一种基于深度学习的十二生肖图像识别系统&#xff0c;旨在利用卷积神经网络&#xff08;CNN&#xff09;进行图像分类&#xff0c;特别是十二生肖图像的自动识别。系统的核心采用了两种经典的深度学习模型&#xff1a;ResNet50和VGG16&#xff0c;进行图像的特征提…

kali linux 装 virtual box 增强工具 Guest Addition

kali linux 装 virtual box 增强工具 Guest Addition install Virtual Box Guest Addition in kali linux 搞了一下午&#xff0c;最终发现是白折腾。 kali linux 自带 virtual box 的增强工具。 kali linux 2021.3 之后的版本都是自带virtual box 增强工具 解决方法 直接…

vue3请求接口报错:Cannot read properties of undefined (reading ‘data‘)

文章目录 报错内容解决方案 报错内容 Cannot read properties of undefined (reading ‘data’) 解决方案 响应未按预期返回 确保服务器返回的数据结构符合预期。例如&#xff0c;服务器可能返回了一个错误响应&#xff0c;而不是预期的 JSON 数据。 检查响应 在 response 拦…

RocketMQ rocketmq-tools管理主题

RocketMQ rocketmq-tools管理主题 环境和软件版本增删改查 环境和软件版本 Win10、IDEA、Jdk1.8、rocketmq 5.1.3、rocketmq-tools 5.1.3 引入依赖 <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-tools</artifactId&g…

《datawhale2411组队学习 模型压缩技术7:NNI剪枝》

文章目录 一、NNI简介二、 NNI剪枝快速入门2.1 加载并训练模型2.2 模型剪枝2.3 模型加速&#xff08;剪枝永久化&#xff09;2.4 微调压缩模型2.5 Slim Pruner测试 三、 使用NNI3.0进行Bert压缩&#xff08;剪枝、蒸馏)3.1 数据预处理3.2 训练模型3.3 设置模型蒸馏函数3.4 修剪…

C#学写了一个程序记录日志的方法(Log类)

1.错误和警告信息单独生产文本进行记录&#xff1b; 2.日志到一定内存阈值可以打包压缩&#xff0c;单独存储起来&#xff0c;修改字段MaxLogFileSizeForCompress的值即可&#xff1b; 3.Log类调用举例&#xff1a;Log.Txt(JB.信息,“日志记录内容”,"通道1"); usi…

Java设计模式——职责链模式:解锁高效灵活的请求处理之道

嘿&#xff0c;各位 Java 编程大神和爱好者们&#xff01;今天咱们要一同深入探索一种超厉害的设计模式——职责链模式。它就像一条神奇的“处理链”&#xff0c;能让请求在多个对象之间有条不紊地传递&#xff0c;直到找到最合适的“处理者”。准备好跟我一起揭开它神秘的面纱…

安装SQL Server 2022提示需要Microsoft .NET Framework 4.7.2 或更高版本

安装SQL Server 2022提示需要Microsoft .NET Framework 4.7.2 或更高版本。 原因是&#xff1a;当前操作系统版本为Windows Server 2016 Standard版本&#xff0c;其自带的Microsoft .NET Framework 版本为4.6太低&#xff0c;不满足要求。 根据报错的提示&#xff0c;点击链接…

高德地图 Readme GT 定制版 10.25.0.3249 | 极致简洁

这款定制版高德地图去除了广告&#xff0c;运行速度更快。虽然没有车道级导航、打车功能和红绿灯倒计时等功能&#xff0c;但支持正常登录和收藏功能。检测更新始终为最新版本。 大小&#xff1a;82.5M 下载地址&#xff1a; 百度网盘&#xff1a;https://pan.baidu.com/s/1Y…

Admin.NET框架使用宝塔面板部署步骤

文章目录 Admin.NET框架使用宝塔面板部署步骤&#x1f381;框架介绍部署步骤1.Centos7 部署宝塔面板2.部署Admin.NET后端3.部署前端Web4.访问前端页面 Admin.NET框架使用宝塔面板部署步骤 &#x1f381;框架介绍 Admin.NET 是基于 .NET6 (Furion/SqlSugar) 实现的通用权限开发…

Excel中根据某列内容拆分为工作簿

简介&#xff1a;根据A列的内容进行筛选&#xff0c;将筛选出来的数据生成一个新的工作簿(可以放到指定文件夹下)&#xff0c;且工作簿名为筛选内容。 举例&#xff1a; 将上面的内容使用VBA会在当前test1下生成5个工作簿&#xff0c;工作簿名分别为TEST1.xls TEST2.xls TEST3…

JavaWeb实战(1)(重点:分页查询、jstl标签与jsp、EL表达式、Bootstrap组件搭建页面、jdbc)

目录 一、jstl标签。 &#xff08;1&#xff09;基本概念。 &#xff08;2&#xff09;使用前提。 &#xff08;3&#xff09;"<%...%>"与"<%%>"。 &#xff08;4&#xff09;使用jstl标签的步骤。 1、导入对应jar包。 2、引入核心标签库。&am…