Debezium包
想在代码中自定义监控Oralce,以及其它很多数据库的变更,可以通过Debezium的API。
在我们的项目中,需要引入debezium-api、debezium-embeded这两个包。
比如maven项目:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-api</artifactId>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
</dependency>
另外就是oralce的专用包debezium-connector-oralce:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-oracle</artifactId>
<version>${version.debezium}</version>
</dependency>
其中version.debezium需要自己做好选择,因为不同的版本,需要的JDK并不一样。
目前,最新的debezium稳定版本是3.0,需要的Java版本至少是17。更早的稳定版本是2.7,需要的Java版本是11。具体信息可以查看这里这张表
为了更久的支持,推荐选用更新的版本。
核心回调函数
使用Debezium的API监控数据库变更,非常简单,核心是使用DebeziumEngine。
DebeziumEngine是一个Runable,使用构建者模式的Builder类来完成构造之后,把它提交给一个Executor即可。
这个DebeziumEngine支持两种回调函数,一个是```
java.util.function.Consumer<R>
,即每次进入回调函数,处理一条记录。
还有一个回调函数,是
void handleBatch(List<R> records, RecordCommitter<R> committer) throws InterruptedException;
为了提高处理性能,最好使用这个handleBatch的“批处理”回调函数。
需要注意的是,这俩回调函数没有返回值。Debezium的处理方法是,只要回调函数抛出异常,就整个监听结束。
这个函数的第一个参数records,是一个R的列表,即如果使用Consumer<R>
的话那个记录,使用这个handleBatch的话则是记录列表。
第二个参数是一个RecordCommiter,这个是用来控制监控进度的,它有两个主要的方法:
- markProcessed(R record); 用来标记一条记录已经处理;
- makrBatchFinished(); 用来表示整个一批记录已经处理。
EmbededEngine的构造
现在最新的DebeziumEngine叫做AsyncDebeziumEngine,相比过去的老Engine,它多了很多功能,比如可以支持并发处理。
构造EmbededEngine的最简单示例为:
DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine
.create(KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class),
"io.debezium.embedded.async.ConvertingAsyncEngineBuilderFactory")
.using(props)
.notifying(record -> {
System.out.println(record);
}).build()
) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
其中,notifying就是注册的回调函数,这里示例用了一个匿名函数,还可以写一个handleBatch的,同样是使用notifying构建进去。
而.create()传进去的KeyValueHeaderChangeEventFormat.of(Json.class, Json.class, Json.class)则会在回调函数中,把变更记录的Key、Value还有Header用Json格式输出。
这里支持这几种格式,可以灵活选用:
Connect.class
- 输出是Kafka连接器的SourceRecord
Json.class
- 输出是一对JSON
字符串JsonByteArray.class
- 输出是使用UTF-8编码为byte数组的JSON
Avro.class
- 参见Avro SerializationCloudEvents.class
- 参见Cloud Events
Debezium参数配置
传入进去的Properties,有很多参数,下面通过一个示例来举例说明:
properties = new Properties();
// 任务名称
properties.setProperty("name", "test1");
// 数据库类型,这里是Oracle
properties.setProperty("connector.class", "io.debezium.connector.oracle.OracleConnector");
// 数据库地址、端口、用户名、密码
properties.setProperty("database.hostname", "192.168.1.2");
properties.setProperty("database.port", String.valueOf(1521));
properties.setProperty("database.user", "usr1");
properties.setProperty("database.password", "psw1");
// 数据库的DBId
properties.setProperty("database.dbid", String.valueOf(1111111111));
// 数据库名
properties.setProperty("database.dbname", "orcl");
// 快照模式
properties.setProperty("snapshot.mode", "initial");
// 主题
properties.setProperty("topic.prefix", "test1");
properties.setProperty("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory");
properties.setProperty("schema.history.internal.file.filename", "/data/history.dat");
// 每5秒刷一次偏移量
properties.setProperty("offset.flush.interval.ms", "5000");
properties.setProperty("offset.storage.file.filename", "/data/offsets.log");
// 监控的数据库列表
properties.setProperty("database.include.list", "orcl");
// 监控的schema列表
properties.setProperty("schema.include.list", "usr1");
// 监控的数据表列表
properties.setProperty("table.include.list", "usr1.userinfo,usr1.permission");
其中,监控的schema默认是用户名,而监控的数据表则是schema.table这种格式。
在使用过程中,如果遇到内存不够的问题,还可以调整这几个参数:
// 批量大小设置,这里三个数值是默认值。
properties.setProperty("log.mining.batch.size.min", String.valueOf(1000));
properties.setProperty("log.mining.batch.size.max", String.valueOf(100000));
properties.setProperty("log.mining.batch.size.default", String.valueOf(20000));
Oracle设置
要使用Debezium监控数据库变更,还需要Oracle的服务
- Oracle开启补充日志
- 打开归档日志模式
开启补充日志比较简单,以下一条命令就可以:
alter database add supplemental log data;
打开归档模式也很简单,但是需要注意需要用户登录为sysdba。
以下命令可以查询是否打开了归档模式:
select log_mode from v$database;
如果显示不是归档日志模式,以下命令可以打开。过程为先连接到sysdba,之后关闭数据库实例,之后用mount模式启动实例(即不打开数据库),然后更改为归档日志模式,然后再打开数据库。
conn system/oracle as sysdba;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;