Flink 通过 Chunjun Oracle LogMiner 实时读取 Oracle 变更日志并写入 Doris 的方案

news2025/3/21 20:44:19

文章目录

  • 一、 技术背景
  • 二、 关键技术
    • 1、 Oracle LogMiner
    • 2、 Chunjun 的 LogMiner 关键流程
    • 3、修复 Chunjun Oracle LogMiner 问题

一、 技术背景

在大数据实时同步场景中,需要将 Oracle 数据库的变更数据(CDC) 采集并写入 Apache Doris,以支持 数据分析、BI 报表、实时数据仓库 等应用。

本方案基于 Flink + Chunjun,通过 Oracle LogMiner 解析 Redo Log,实现 低延迟 写入Doris。

 

二、 关键技术

1、 Oracle LogMiner

LogMiner 是 Oracle 提供的 redo log 解析工具,用于跟踪 INSERTUPDATEDELETE 操作。

使用LogMiner需要现在Oracle中开启,具体开启操作见:Oracle配置LogMiner

 

2、 Chunjun 的 LogMiner 关键流程

Chunjun(原 FlinkX)是 Flink 生态的数据同步框架,支持多种数据源连接器(如 Oracle、MySQL、PostgreSQL、Doris)。
其中 Chunjun Oracle LogMiner Source 用于解析 Oracle Redo Log 并转换为 Flink 数据流

如下整个流程架构:

在这里插入图片描述

Flink任务启动后

  1. 通过Chunjun的oracle logMiner连接器, 建立 Oracle 连接,启动 LogMiner 解析 Redo Log。
  2. 实时监听 V$LOGMNR_CONTENTS,解析变更数据并转换为 Flink 事件流。具体地会将Oracle不同的操作日志解析为如下数据类型即重放数据操作,
  3. Flink 任务处理数据,完成转换、清洗等操作。
  4. Flink Sink 组件(Chunjun Doris Sink)将数据写入 Doris
操作类型before(旧数据)after(新数据)Flink 处理逻辑
INSERT{新数据}直接插入
UPDATE{旧数据}{新数据}先删除旧数据,再插入新数据
DELETE{旧数据}删除数据

最后如下示例flink sql:


CREATE TABLE source  
(  
    ID             int,  
    NAME          string  
) WITH (  
      'connector' = 'oraclelogminer-x'  
      ,'url' = 'jdbc:oracle:thin:@//xxx:1521/ORCL'  
      ,'username' = 'system'  
      ,'password' = 'xxx'  
      ,'cat' = 'insert,delete,update'  
      ,'table' = 'TEST.TEST_USER'  
      ,'timestamp-format.standard' = 'SQL'  
      );  
  
  
CREATE TABLE sink  
(  
     k4             int,  
     k3          string  
) WITH (  
'connector' = 'doris-x',  
'schema'='demo',  
      'password' = 'xxx',  
      'table-name' = 'mytable',  
      'url' = 'jdbc:mysql://xxx:9030',  
      'username' = 'root',  
      'sink.parallelism' = '1',  
      'lookup.error-limit' = '100',  
      'lookup.cache-type' = 'LRU',  
      'lookup.parallelism' = '1',  
      'lookup.cache.ttl' = '60000',  
      'lookup.cache.max-rows' = '10000',  
      'writeMode'='UPSERT'  
      );  
  
  
insert into sink  
select ID as k4, NAME as k3  
from source;  
  

 

3、修复 Chunjun Oracle LogMiner 问题

在实际使用中,Chunjun Oracle LogMiner 会遇到以下问题:

  1. 关于全量增量读数据的问题
//LogMinerConfig,没有全量同步的外部配置,默认是增量读取数据
private boolean enableFetchAll = true;

  1. 无法获取监听的表
//LogMinerListener 中的LogMinerConfig没有set table的地方,
//即无法获取被监听的表,改成直接获取
logMinerConfig.getListenerTables(); 

  1. PavingData和Split 不能同时开启,默认都开启,将PavingData关闭

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2319164.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WordPress系统获取webshell的攻略

一.后台修改模板拿WebShell 1.进入Vulhub靶场并执⾏以下命令开启靶场;在浏览器中访问并安装好 #执⾏命令 cd /vulhub/wordpress/pwnscriptum docker-compose up -d 2. 修改其WP的模板,登陆WP后点击 【外 观】 --》 【编辑】 --》 404.php 3.插入一句话木…

蓝桥杯2023年第十四届省赛真题-子矩阵

题目来自DOTCPP: 暴力思路(两个测试点超时): 题目要求我们求出子矩阵的最大值和最小值的乘积,我们可以枚举矩阵中的所有点,以这个点为其子矩阵的左上顶点,然后判断一下能不能构成子矩阵。如果可…

如何在 Node.js 中使用 .env 文件管理环境变量 ?

Node.js 应用程序通常依赖于环境变量来管理敏感信息或配置设置。.env 文件已经成为一种流行的本地管理这些变量的方法,而无需在代码存储库中公开它们。本文将探讨 .env 文件为什么重要,以及如何在 Node.js 应用程序中有效的使用它。 为什么使用 .env 文…

Redis BitMap 用户签到

Redis Bitmap Bitmap(位图)是 Redis 提供的一种用于处理二进制位(bit)的特殊数据结构,它基于 String 类型,每个 bit 代表一个布尔值(0 或 1),可以用于存储大规模的二值状…

未来办公与生活的新范式——智慧园区

在信息化与智能化技术飞速发展的今天,智慧园区作为一种新兴的城市发展形态,正逐步成为推动产业升级、提升城市管理效率、改善居民生活质量的重要力量。智慧园区不仅融合了先进的信息技术,还深刻体现了可持续发展的理念,为园区内的…

Hugging Face预训练GPT微调ChatGPT(微调入门!新手友好!)

Hugging Face预训练GPT微调ChatGPT(微调入门!新手友好!) 在实战中,⼤多数情况下都不需要从0开始训练模型,⽽是使⽤“⼤⼚”或者其他研究者开源的已经训练好的⼤模型。 在各种⼤模型开源库中,最…

【CSS3】化神篇

目录 平面转换平移旋转改变旋转原点多重转换缩放倾斜 渐变线性渐变径向渐变 空间转换平移视距旋转立体呈现缩放 动画使现步骤animation 复合属性animation 属性拆分逐帧动画多组动画 平面转换 作用:为元素添加动态效果,一般与过渡配合使用 概念&#x…

Unity音频混合器如何暴露参数

音频混合器是Unity推荐管理音效混音的工具,那么如何使用代码对它进行管理呢? 首先我在AudioMixer的Master组中创建了BGM和SFX的分组,你也可以直接用Master没有问题。 这里我以BGM为例,如果要在代码中进行使用就需要将参数暴露出去…

如何理解分布式光纤传感器?

关键词:OFDR、分布式光纤传感、光纤传感器 分布式光纤传感器是近年来备受关注的前沿技术,其核心在于将光纤本身作为传感介质和信号传输介质,通过解析光信号在光纤中的散射效应,实现对温度、应变、振动等物理量的连续、无盲区、高…

PMP-项目运行环境

你好!我是 Lydia-穎穎 ♥感谢你的陪伴与支持 ~~~ 欢迎一起探索未知的知识和未来,现在lets go go go!!! 1. 影响项目的要素 项目存在在不同的环境下,环境对于项目的交付产生不同的影响。需了解环境对于项目的影响,采取相应措施应对…

shell 脚本搭建apache

#!/bin/bash # Set Apache version to install ## author: yuan# 检查外网连接 echo "检查外网连接..." ping www.baidu.com -c 3 > /dev/null 2>&1 if [ $? -eq 0 ]; thenecho "外网通讯良好!" elseecho "网络连接失败&#x…

Huawei 鲲鹏(ARM/Aarch64)服务器安装KVM虚拟机(非桌面视图)

提出问题 因需要进行ARM架构适配,需要在Huawei Taishan 200k(CPU: Kunpeng 920 5231K)上,创建几台虚拟机做为开发测试环境。 无奈好久没搞了,看了一下自己多年前写的文章:Huawei 鲲鹏&#xf…

《Python实战进阶》No28: 使用 Paramiko 实现远程服务器管理

No28: 使用 Paramiko 实现远程服务器管理 摘要 在现代开发与运维中,远程服务器管理是必不可少的一环。通过 SSH 协议,我们可以安全地连接到远程服务器并执行各种操作。Python 的 Paramiko 模块是一个强大的工具,能够帮助我们实现自动化任务&…

【Kafka】深入了解Kafka

集群的成员关系 Kafka使用Zookeeper维护集群的成员信息。 每一个broker都有一个唯一的标识,这个标识可以在配置文件中指定,也可以自动生成。当broker在启动时通过创建Zookeeper的临时节点把自己的ID注册到Zookeeper中。broker、控制器和其他一些动态系…

C++特性——RAII、智能指针

RAII 就像new一个需要delete,fopen之后需要fclose,但这样会有隐形问题(忘记释放)。RAII即用对象把这个过程给包起来,对象构造的时候,new或者fopen,析构的时候delete. 为什么需要智能指针 对于…

CentOS系类普通挂载磁盘挂载命令

检查磁盘是否有分区 lsblk如果 vdb 下面没有分区(比如 vdb1),你需要先创建分区。 创建分区(如果需要) fdisk /dev/vdb然后在 fdisk 交互界面: 输入 n 创建新分区 选择 p 创建主分区 默认分区号和大小 输…

强化学习(赵世钰版)-学习笔记(9.策略梯度法)

本章是课程的导数第二章,旨在讲解策略的函数化形式。 之前的方法,描述一个策略都是用表格的形式,每一行代表一个状态,每一列代表一个行为,表格中的元素对应相关状态下执行相关行为的概率。 函数化的策略表征形式是指&a…

【c++】【STL】unordered_set 底层实现(简略版)

【c】【STL】unordered_set 底层实现&#xff08;简略版&#xff09; ps:这个是我自己看的不保证正确&#xff0c;觉得太长的后面会总结整个调用逻辑 unordered_set 内部实现 template <class _Kty, class _Hasher hash<_Kty>, class _Keyeq equal_to<_Kty>…

网络安全设备配置与管理-实验4-防火墙AAA服务配置

实验4-p118防火墙AAA服务配置 从这个实验开始&#xff0c;每一个实验都是长篇大论&#x1f613; 不过有好兄弟会替我出手 注意&#xff1a;1. gns3.exe必须以管理员身份打开&#xff0c;否则ping不通虚拟机。 win10虚拟机无法做本次实验&#xff0c;必须用学校给的虚拟机。首…

【论文阅读】Contrastive Clustering Learning for Multi-Behavior Recommendation

论文地址&#xff1a;Contrastive Clustering Learning for Multi-Behavior Recommendation | ACM Transactions on Information Systems 摘要 近年来&#xff0c;多行为推荐模型取得了显著成功。然而&#xff0c;许多模型未充分考虑不同行为之间的共性与差异性&#xff0c;以…