文章目录
- 一、 技术背景
- 二、 关键技术
- 1、 Oracle LogMiner
- 2、 Chunjun 的 LogMiner 关键流程
- 3、修复 Chunjun Oracle LogMiner 问题
一、 技术背景
在大数据实时同步场景中,需要将 Oracle 数据库的变更数据(CDC) 采集并写入 Apache Doris,以支持 数据分析、BI 报表、实时数据仓库 等应用。
本方案基于 Flink + Chunjun,通过 Oracle LogMiner 解析 Redo Log,实现 低延迟 写入Doris。
二、 关键技术
1、 Oracle LogMiner
LogMiner 是 Oracle 提供的 redo log 解析工具,用于跟踪 INSERT
、UPDATE
、DELETE
操作。
使用LogMiner需要现在Oracle中开启,具体开启操作见:Oracle配置LogMiner
2、 Chunjun 的 LogMiner 关键流程
Chunjun(原 FlinkX)是 Flink 生态的数据同步框架,支持多种数据源连接器(如 Oracle、MySQL、PostgreSQL、Doris)。
其中 Chunjun Oracle LogMiner Source
用于解析 Oracle Redo Log 并转换为 Flink 数据流。
如下整个流程架构:
Flink任务启动后
- 通过Chunjun的oracle logMiner连接器, 建立 Oracle 连接,启动 LogMiner 解析 Redo Log。
- 实时监听
V$LOGMNR_CONTENTS
,解析变更数据并转换为 Flink 事件流。具体地会将Oracle不同的操作日志解析为如下数据类型即重放数据操作, - Flink 任务处理数据,完成转换、清洗等操作。
- Flink Sink 组件(Chunjun Doris Sink)将数据写入 Doris。
操作类型 | before(旧数据) | after(新数据) | Flink 处理逻辑 |
---|---|---|---|
INSERT | 无 | {新数据} | 直接插入 |
UPDATE | {旧数据} | {新数据} | 先删除旧数据,再插入新数据 |
DELETE | {旧数据} | 无 | 删除数据 |
最后如下示例flink sql:
CREATE TABLE source
(
ID int,
NAME string
) WITH (
'connector' = 'oraclelogminer-x'
,'url' = 'jdbc:oracle:thin:@//xxx:1521/ORCL'
,'username' = 'system'
,'password' = 'xxx'
,'cat' = 'insert,delete,update'
,'table' = 'TEST.TEST_USER'
,'timestamp-format.standard' = 'SQL'
);
CREATE TABLE sink
(
k4 int,
k3 string
) WITH (
'connector' = 'doris-x',
'schema'='demo',
'password' = 'xxx',
'table-name' = 'mytable',
'url' = 'jdbc:mysql://xxx:9030',
'username' = 'root',
'sink.parallelism' = '1',
'lookup.error-limit' = '100',
'lookup.cache-type' = 'LRU',
'lookup.parallelism' = '1',
'lookup.cache.ttl' = '60000',
'lookup.cache.max-rows' = '10000',
'writeMode'='UPSERT'
);
insert into sink
select ID as k4, NAME as k3
from source;
3、修复 Chunjun Oracle LogMiner 问题
在实际使用中,Chunjun Oracle LogMiner 会遇到以下问题:
- 关于全量增量读数据的问题
//LogMinerConfig,没有全量同步的外部配置,默认是增量读取数据
private boolean enableFetchAll = true;
- 无法获取监听的表
//LogMinerListener 中的LogMinerConfig没有set table的地方,
//即无法获取被监听的表,改成直接获取
logMinerConfig.getListenerTables();
- PavingData和Split 不能同时开启,默认都开启,将PavingData关闭