flink cdc oceanbase(binlog模式)

news2025/1/7 18:42:54

接上文:一文说清flink从编码到部署上线
环境:①操作系统:阿里龙蜥 7.9(平替CentOS7.9);②CPU:x86;③用户:root。

预研初衷:现在很多项目有国产化的要求,操作系统、数据库需要国产化,然后就想着找既能开源免费,又能很好的兼容MySQL,还能很好支持flink。然后就在信创目录找到OceanBase数据库。

flink探索:flink CDC 找到这个文章Flink CDC 配置 OceanBase 实战指南,官网论坛感觉比较靠谱,然而发现按照说明引入依赖后,相关语法是不支持的。也在网上找了比较多的其它资料,中间比较坎坷,都未解决,不再赘述。最后转换思路:既然OceanBase支持MySQL binlog,那就把OceanBase当MySQL用,使用MySQL CDC是不是可以,最后问题得到解决。下面展开说明。

1.OceanBase部署

1.1 obd 部署

官方文档:oceanbase部署

注意:①这个地方最好选择obd 图形化部署,docker部署虽然简单,但是后续安装obbinlog会比较麻烦。②操作系统不要使用CentOS了,好多yum源不能用了。可以使用“阿里龙蜥 7.9”。

部署完,记得保存相关账号信息(供参考):

[
    {
        "component": "oceanbase-ce",
        "access_url": "10.86.97.168:2881",
        "user": "root",
        "password": "pwd",
        "connect_url": "obclient -h10.86.97.168 -P2881 -uroot -p'pwd' -Doceanbase -A"
    },
    {
        "component": "obproxy-ce",
        "access_url": "10.86.97.168:2883",
        "user": "root@proxysys",
        "password": "Y6.B4s)pt",
        "connect_url": "obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A \n"
    },
    {
        "component": "ocp-express",
        "access_url": "10.86.97.168:8180",
        "user": "admin",
        "password": "DSxF-{odkdX-bmL6fjrF2{3mLL",
        "connect_url": "http://10.86.97.168:8180"
    }
]

这个“ocp-express”是个监控页面,能看到集群信息,访问“http://10.86.97.168:8180”:
在这里插入图片描述

1.2 常用命令

启动:obd cluster start myoceanbase(改成具体集群名称)

常用命令:
# 查看集群列表
obd cluster list
# 查看集群状态,以部署名为 obtest 为例
obd cluster display obtest
# 停止运行中的集群,以部署名为 obtest 为例
obd cluster stop obtest
# 销毁已部署的集群,以部署名为 obtest 为例
obd cluster destroy obtest

2.obbinlog

2.1 部署

官方文档:obbinlog部署

部署过程中,会遇到这个错误:“https://mirrors.oceanbase.com/community/stable/el/7.9/x86_64/repodata/repomd.xml: [Errno 14] HTTPS Error 404 - Not Found”

解决方法:修改“/etc/yum.repos.d/OceanBase.repo”中,“$releasever”改为“7”。
在这里插入图片描述
解决完上面这个错误,其它地方就比较顺利了。

查看是否安装成功:

netstat -anp | grep 2983

2.2 创建租户

由于“不可以用root@sys创建binlog任务”,所以要创建租户。

1.查看所有的租户信息:
SELECT * FROM oceanbase.DBA_OB_TENANTS;

2.查看resource pool:
SELECT * FROM oceanbase.DBA_OB_RESOURCE_POOLS;

3.创建“资源规格(UNIT)”
CREATE RESOURCE UNIT S1_unit_flink_test
                MEMORY_SIZE = '2G',
                MAX_CPU = 1, MIN_CPU = 1,
                LOG_DISK_SIZE = '6G',
                MAX_IOPS = 10000, MIN_IOPS = 10000, IOPS_WEIGHT=1;

4.创建resource pool(仅 sys 租户的 root 用户(root@sys)可以创建资源池,其他租户不支持创建资源池)
-- sys_unit_config大概2GB内存。
CREATE RESOURCE POOL tenant_pool_flink_test UNIT='sys_unit_config', UNIT_NUM=1, ZONE_LIST=('zone1');

5.创建租户:创建一个名为  flink_test_tenant 的租户(默认为 MySQL 模式租户),副本数为1,资源池指定为 flink_test_tenant_pool_01,Primary Zone 为 zone1,允许所有 IP 连接数据库。
CREATE TENANT IF NOT EXISTS flink_test_tenant  PRIMARY_ZONE='zone1', RESOURCE_POOL_LIST=('tenant_pool_flink_test') set OB_TCP_INVITED_NODES='%';

6.使用新创建的租户管理员登录:
用户名:root@flink_test_tenant
密码:默认为空(有需要可以自己设置密码)

7.创建用户( CREATE USER 权限较大,默认仅集群管理员和租户管理员拥有此系统权限):
CREATE USER 'test' IDENTIFIED BY 'pwd';
GRANT ALL ON *.* TO 'test';

8.常用命令
其它命令,删除用户:
drop user 'test';
删除“资源规格”:
DROP RESOURCE UNIT S1_unit_flink_test;
查询已有的“资源规格”信息:
SELECT * FROM oceanbase.DBA_OB_UNIT_CONFIGS;

2.3 创建数据库

账号:test@flink_test_tenant
密码:pwd。
使用上面账号登录oceanbase后创建数据库。

CREATE DATABASE IF NOT EXISTS `flink_test`;
USE `flink_test`;

SET FOREIGN_KEY_CHECKS=0;

-- ----------------------------
-- Table structure for rv_table
-- ----------------------------
DROP TABLE IF EXISTS `rv_table`;
CREATE TABLE `rv_table` (
  `dt` varchar(10) NOT NULL ,
  `uuid` varchar(30) DEFAULT NULL,
  `report_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`dt`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

2024-12-25	uid20241225	1735090201740
2024-12-26	uid20241226	1735090201741

2.4 创建binlog

进入binlog server服务:

cd /home/oceanbase-all-in-one/obclient/u01/obclient/bin
obclient -h127.0.0.1 -P2983

创建binlog:

CREATE BINLOG FOR TENANT `myoceanbase`.`flink_test_tenant` TO USER `root` PASSWORD `pwd` WITH CLUSTER URL `http://10.86.97.168:8080/services?Action=ObRootServiceInfo&ObCluster=myoceanbase`,REPLICATE NUM 2;

2.5 配置ODP

账号密码见安装完成保存的账号信息。
obclient -h10.86.97.168 -P2883 -uroot@proxysys -p'Y6.B4s)pt' -Doceanbase -A
ALTER proxyconfig SET binlog_service_ip='10.86.97.168:2983';

2.6 验证结果

obclient -h10.86.97.168 -P2883 -uroot@flink_test_tenant  -p -Doceanbase -A
默认密码为空(到输密码时直接回车就行)。

SHOW MASTER STATUS;

SHOW BINLOG EVENTS;

2.7 常见问题

问题描述:CREATE BINLOG 报 “ERROR 1236 (HY000): Internal error”

查看日志:“[error] mysql_connecton_wrapper.cpp(121): Failed to execute query, error: (conn=3221748588) Table ‘flink_test.instances_gtid_seq’ doesn’t exist”,提示没有binlog相关表。

日志路径:/home/ds/oblogproxy/log/logproxy.log

解决:重新执行“sudo sh env/deploy.sh -m deploy -f env/deploy.conf.json”
相关的数据表会重建。然后再执行“CREATE BINLOG”即可。

或者说:应该先创建数据库,再安装obbinlog组件。安装后会在数据库创建binlog相关数据库表,如下:
在这里插入图片描述

3. fink CDC

3.1 核心代码

package com.zl.oceanbase;

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.zl.utils.EnvUtil;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
import java.util.List;

/**
 * 就当成MySQL使用就行。
 */
public class OceanBaseCDCLikeMySQLExample {
    public static void main(String[] args) throws Exception {

        List<String> SYNC_TABLES = Arrays.asList("flink_test.rv_table");
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("10.86.97.168")
                .port(2883)// oceanbase安装时obproxy-ce组件端口
                .databaseList("flink_test")
                .tableList(String.join(",", SYNC_TABLES))
                .username("root@flink_test_tenant")
                .password("")// 记得修改为实际密码
                .startupOptions(StartupOptions.initial())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

        // 配置运行环境,并行度1
        StreamExecutionEnvironment env = EnvUtil.setFlinkEnv(1);
        // 程序间隔离,每个程序单独设置
        env.getCheckpointConfig().setCheckpointStorage("hdfs://10.86.97.191:9000/flinktest/OceanBaseCDCLikeMySQLExample");

        // 如果不能正常读取mysql的binlog:①可能是mysql没有打开binlog或者mysql版本不支持(当前在mysql5.7.20环境下,功能正常);
        // ②可能是数据库ip、port、账号、密码错误。
        env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1).print();

        env.execute("Print MySQL Snapshot + Binlog");

    }
}

3.2 flink web

在这里插入图片描述

3.3 控制台日志

在这里插入图片描述

3.4 完整代码

完成代码见:flink-cdc-mysql

4.扩展

本文主要基于oceanbase oblogproxy的binlog模式。
其实oblogproxy还支持CDC模式,详见官网文档:CDC模式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2272089.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

和为0的四元组-蛮力枚举(C语言实现)

目录 一、问题描述 二、蛮力枚举思路 1.初始化&#xff1a; 2.遍历所有可能的四元组&#xff1a; 3.检查和&#xff1a; 4.避免重复&#xff1a; 5.更新计数器&#xff1a; 三、代码实现 四、运行结果 五、 算法复杂度分析 一、问题描述 给定一个整数数组 nums&…

某xx到家app逆向

去官网下载app即可 https://www.dongjiaotn.com/#/home查壳 360的壳子 直接脱壳即可 抓包 请求地址 https://api.gateway.znjztfn.cn/server/user/index 请求参数 {"lng": "xxxx","lat": "xxxx","city_id": "1376&…

docker搭建gitlab和jenkins

搭建gitlab 搭建gitlab首先需要一个gitlab的镜像 其次最好为他设置一个单独的目录 然后编写一个docker-compose文件 version: 3.1 services:gitlab:image: gitlab_zh:latest //此处为你的镜像名称container_name: gitlab //容器名称restart: always …

嵌入式linux中socket控制与实现

一、概述 1、首先网络,一看到这个词,我们就会想到IP地址和端口号,那IP地址和端口各有什么作用呢? (1)IP地址如身份证一样,是标识的电脑的,一台电脑只有一个IP地址。 (2)端口提供了一种访问通道,服务器一般都是通过知名端口号来识别某个服务。例如,对于每个TCP/IP实…

推荐系统重排:MMR 多样性算法

和谐共存&#xff1a;相关性与多样性在MMR中共舞 推荐系统【多样性算法】系列文章&#xff08;置顶&#xff09; 1.推荐系统重排&#xff1a;MMR 多样性算法 2.推荐系统重排&#xff1a;DPP 多样性算法 引言 在信息检索和推荐系统中&#xff0c;提供既与用户查询高度相关的文…

概述(讲讲python基本语法和第三方库)

我是北子&#xff0c;这是我自己写的python教程&#xff0c;主要是记录自己的学习成果方便自己日后复习&#xff0c; 我先学了C/C&#xff0c;所以这套教程中可能会将很多概念和C/C去对比&#xff0c;所以该教程大概不适合零基础的人。 it seems that python nowadays 只在人工…

【python因果库实战16】双重稳健模型1

这里写目录标题 双重稳健模型数据简单双重稳健模型双重稳健 IP 特征模型 双重稳健模型 基本上&#xff0c;这些是利用加权模型增强结果模型的不同集合模型。 本笔记展示了不同的结果模型和倾向性模型组合方式&#xff0c; 但由于可能的组合非常多&#xff0c;本笔记并不打算展…

如何恢复已删除的 Telegram 消息 [iOSamp;Android]

Telegram 是一款功能强大的消息应用程序&#xff0c;因其易用性、隐私保护和众多炫酷功能而深受用户喜爱。然而&#xff0c;有时我们会不小心删除重要的消息。在这种情况下你应该做什么&#xff1f; 本文将为您提供简单有效的解决方案来恢复 Telegram 上已删除的消息&#xff…

第431场周赛:最长乘积等价子数组、计算字符串的镜像分数、收集连续 K 个袋子可以获得的最多硬币数量、不重叠区间的最大得分

Q1、最长乘积等价子数组 1、题目描述 给你一个由 正整数 组成的数组 nums。 如果一个数组 arr 满足 prod(arr) lcm(arr) * gcd(arr)&#xff0c;则称其为 乘积等价数组 &#xff0c;其中&#xff1a; prod(arr) 表示 arr 中所有元素的乘积。gcd(arr) 表示 arr 中所有元素的…

【微服务】2、网关

Spring Cloud微服务网关技术介绍 单体项目拆分微服务后的问题 服务地址问题&#xff1a;单体项目端口固定&#xff08;如黑马商城为8080&#xff09;&#xff0c;拆分微服务后端口各异&#xff08;如购物车808、商品8081、支付8086等&#xff09;且可能变化&#xff0c;前端难…

使用JMeter玩转tidb压测

作者&#xff1a; du拉松 原文来源&#xff1a; https://tidb.net/blog/3f1ada39 一、前言 tidb是mysql协议的&#xff0c;所以在使用过程中使用tidb的相关工具连接即可。因为jmeter是java开发的相关工具&#xff0c;直接使用mysql的jdbc驱动包即可。 二、linux下安装jmet…

2024网络安全运营方案概述(附实践资料合集)

以下是网络安全运营方案的详细内容&#xff1a; 一、目标与原则 目标&#xff1a;建立一套安全高效、灵活性强的网络安全运营体系&#xff0c;实现对网络安全的全面监控、防护和应急响应。原则&#xff1a; 全员参与&#xff1a;网络安全是全员共同的责任&#xff0c;所有员工…

使用Python进行图像裁剪和直方图分析

一、简介 在数字图像处理领域&#xff0c;裁剪和分析图像的直方图是两个非常基本且重要的操作。本文将通过一个简单的Python项目&#xff0c;展示如何使用skimage和matplotlib库来裁剪图像并分析其RGB通道的直方图。 二、环境准备 在开始之前&#xff0c;请确保你已经安装了以…

vue3-dom-diff算法

vue3diff算法 什么是vue3diff算法 Vue3中的diff算法是一种用于比较虚拟DOM树之间差异的算法&#xff0c;其目的是为了高效地更新真实DOM&#xff0c;减少不必要的重渲染 主要过程 整个过程主要分为以下五步 前置预处理后置预处理仅处理新增仅处理后置处理包含新增、卸载、…

【U8+】用友U8软件中,出入库流水输出excel的时候提示报表输出引擎错误。

【问题现象】 通过天联高级版客户端登录拥有U8后&#xff0c; 将出入库流水输出excel的时候&#xff0c;提示报表输出引擎错误。 进行报表输出时出现错误&#xff0c;错误信息&#xff1a;找不到“fd6eea8b-fb40-4ce4-8ab4-cddbd9462981.htm”。 如果您正试图从最近使用的文件列…

[SMARTFORMS] 创建样式模板

通过事务码SMARTFORMS创建样式模板 选择样式&#xff0c;自定义样式模板名称ZST_DEMO_2025 点击"创建"按钮&#xff0c;跳转至样式模板详情页面&#xff0c;我们可以在该页面上设置SMARTFORMS表单相关的样式 在段落样式处&#xff0c;右键选择创建节点&#xff0c;输…

基于51单片机和DS3231时钟模块、LCD1602(I2C通信)模块的可调时钟+温度测量+计时+闹钟

目录 系列文章目录前言一、效果展示二、原理分析三、各模块代码1、延时函数2、定时器03、定时器14、独立按键5、DS3231时钟模块6、LCD1602模块&#xff08;PCF8574T驱动&#xff09; 四、主函数总结 系列文章目录 前言 之前做过一个类似的&#xff0c;用到了很多外设&#xff…

通义视觉推理大模型QVQ-72B-preview重磅上线

Qwen团队推出了新成员QVQ-72B-preview&#xff0c;这是一个专注于提升视觉推理能力的实验性研究模型。提升了视觉表示的效率和准确性。它在多模态评测集如MMMU、MathVista和MathVision上表现出色&#xff0c;尤其在数学推理任务中取得了显著进步。尽管如此&#xff0c;该模型仍…

企业级Nosql数据库和Redis集群

一、关系数据库和Nosql数据库 关系数据库 定义&#xff1a;关系数据库是建立在关系模型基础上的数据库。它使用表格&#xff08;关系&#xff09;来存储数据&#xff0c;通过行和列的形式组织信息。例如&#xff0c;一个简单的学生信息表可能有 “学号”“姓名”“年龄”“班级…

Ant Design中Flex布局、Grid布局和Layout布局详解

好的&#xff0c;我们来更详细地探讨 Ant Design 中的 Flex布局、Grid布局 和 Layout布局 的特点、用法、适用场景&#xff0c;以及如何灵活运用它们来构建页面。下面将从各个方面进行更深入的分析&#xff0c;并提供具体的实例。 VueFlex布局实现响应式布局 1. Flex布局 概念…