Debezium故障演练

news2024/10/6 12:34:00

1、搭建演练环境

postgresql及wal2json插件安装:https://blog.csdn.net/li281037846/article/details/128411222
kafka及kafka-connect安装,略

//添加debezium connector
curl -i -X POST -H "Content-Type:application/json" -H "Accepted:application/json" http://172.19.102.150:8083/connectors -d '{"name":"debezium-pg-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"172.19.103.5","database.port":"5432","database.user":"dev","database.password":"123456","database.dbname":"postgres","database.server.name":"debezium-pg-test","table.include.list":"public.table1_with_pk","slot.name":"debezium_pg_test","plugin.name":"wal2json","value.converter": "org.apache.kafka.connect.json.JsonConverter","value.converter.schemas.enable":"false","tombstones.on.delete": "true"}}'
 
//查看connector
curl 172.19.102.150:8083/connectors/debezium-pg-connector/status
 
//重启connnector
curl -i -X POST http://172.19.102.150:8083/connectors/debezium-pg-connector/restart?includeTasks=true&onlyFailed=true
 
 
//消费topic
bin/kafka-console-consumer.sh --bootstrap-server 172.19.102.150:9092 --topic debezium-pg-test.public.table1_with_pk --group group-tianzy-test

debezium connector会监听table1_with_pk表的wal日志,发到kafka

flink程序:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.tianzy.flink.test</groupId>
	<artifactId>flink-test</artifactId>
	<version>0.1</version>
	<packaging>jar</packaging>

	<name>Flink Walkthrough DataStream Java</name>
	<url>https://flink.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<flink.version>1.14.4</flink.version>
		<target.java.version>1.8</target.java.version>
		<scala.binary.version>2.11</scala.binary.version>
		<maven.compiler.source>${target.java.version}</maven.compiler.source>
		<maven.compiler.target>${target.java.version}</maven.compiler.target>
		<log4j.version>2.17.1</log4j.version>
	</properties>

	<repositories>

		<repository>
			<id>central</id>
			<url>https://maven.aliyun.com/repository/central</url>
			<releases>
				<enabled>true</enabled>
			</releases>
			<snapshots>
				<enabled>true</enabled>
			</snapshots>
		</repository>
	</repositories>

	<dependencies>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>



		<dependency>
			<groupId>org.postgresql</groupId>
			<artifactId>postgresql</artifactId>
			<version>42.3.4</version>
		</dependency>

		<!-- Add logging framework, to produce console output when running in the IDE. -->
		<!-- These dependencies are excluded from the application JAR by default. -->
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-slf4j-impl</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-api</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>${log4j.version}</version>
			<scope>runtime</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>

			<!-- Java Compiler -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.1</version>
				<configuration>
					<source>${target.java.version}</source>
					<target>${target.java.version}</target>
				</configuration>
			</plugin>

			<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
			<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>3.0.0</version>
				<executions>
					<!-- Run shade goal on package phase -->
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<artifactSet>
								<excludes>
									<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<exclude>org.slf4j:*</exclude>
									<exclude>org.apache.logging.log4j:*</exclude>
								</excludes>
							</artifactSet>
							<filters>
								<filter>
									<!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
									<artifact>*:*</artifact>
									<excludes>
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<mainClass>com.tianzy.test.DebeziumTaskSync</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;

public class DebeziumTaskSync {

    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.newInstance().build();
        TableEnvironment tEnv = TableEnvironment.create(settings);

        tEnv.executeSql("CREATE TABLE topic_table1_with_pk (\n" +
                "  -- schema 与 MySQL 的 products 表完全相同\n" +
                "  a int NOT NULL,\n" +
                "  b STRING,\n" +
                "  c timestamp(6) NOT NULL\n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = 'debezium-pg-test.public.table1_with_pk',\n" +
                " 'properties.bootstrap.servers' = '172.19.102.150:9092',\n" +
                " 'properties.group.id' = 'group-tianzy-test',\n" +
                "  -- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息\n" +
                "  -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'\n" +
                " 'format' = 'debezium-json',  -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'\n" +
                " 'debezium-json.schema-include' = 'true'\n" +
                ")");
        
        tEnv.executeSql("CREATE TABLE sync_table1_with_pk (\n" +
                "   a int NOT NULL,\n" +
                "  b STRING,\n" +
                "  c timestamp(6) NOT NULL,\n" +
                "  PRIMARY KEY (a, c)  NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'jdbc',\n" +
                "   'url' = 'jdbc:postgresql://172.19.103.5:5432/postgres',\n" +
                "   'table-name' = 'sync_table1_with_pk2',\n" +
                "   'username' = 'xxx',\n" +
                "   'password' = 'xxx'\n" +
                ")");
        Table transactions = tEnv.from("topic_table1_with_pk");
        transactions.executeInsert("sync_table1_with_pk");
    }
}

启动flink程序,会消费debezium发到kafka的消息,并将数据同步到sync_table1_with_pk2表

2、故障演练

正常情况,增删改source表数据,sink表数据会实时同步数据

1.部署单台kafka-connect,直接kill进程

此时修改pg表source表数据,sink表不会正常同步数据,kafka对应topic也不会产生消息

重新启动kafka-conenct进程,connector task也会自动启动继续开始工作。sink表会同步刚才所做的修改

2.通过rest接口暂停connector

暂停:curl -X PUT 172.19.102.150:8083/connectors/debezium-pg-connector/pause
此时修改pg表source表数据,sink表不会正常同步数据,kafka对应topic也不会产生消息
恢复:curl -X PUT 172.19.102.150:8083/connectors/debezium-pg-connector/resume
恢复后sink表立马同步数据

3.通过rest接口删除connector

删除:curl -X DELETE 172.19.102.150:8083/connectors/debezium-pg-connector
此时修改pg表source表数据,sink表不会正常同步数据,kafka对应topic也不会产生消息
重新添加:curl -i -X POST -H “Content-Type:application/json” -H “Accepted:application/json” http://172.19.102.150:8083/connectors -d ‘{“name”:“debezium-pg-connector”,“config”:{“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“database.hostname”:“172.19.103.5”,“database.port”:“5432”,“database.user”:“dev”,“database.password”:“123456”,“database.dbname”:“postgres”,“database.server.name”:“debezium-pg-test”,“table.include.list”:“public.table1_with_pk”,“slot.name”:“debezium_pg_test”,“plugin.name”:“wal2json”,“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,“value.converter.schemas.enable”:“false”,“tombstones.on.delete”: “true”}}’

重新添加后sink表立马同步数据

4.部署多台kafka-connect,kill其中一个

在172.19.103.5上部署多一个kafka-conenct(直接从172.19.102.150复制kafka目录,然后执行kafka-connect启动脚本即可)

查看connector状态:curl 172.19.103.5:8083/connectors/debezium-pg-connector/status
在这里插入图片描述
可以看到此时task运行在172.19.102.150上
kill 172.19.102.150这台机器的kafka-connect进程
再查看connector状态:
在这里插入图片描述
可以看到任务自动转移到了172.19.103.5这台机器

修改pg表source表数据,sink表立马同步数据

3、结论

kafkfa-connect进程被kill,或者debezium connector task终止再恢复,并不会影响debezium数据同步的最终一致性

如果部署了多台kafka-conenct,debezium任务会自动故障转移,其中一台挂掉,不会影响服务的可用性

4、问题记录&注意事项

1、kafka消息不要开启scheme

connector 配置:

"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",

flink kafka ddl配置:

'debezium-json.schema-include' = 'false'

开启scheme,除了消息中会有大量的冗余数据
还会导致flink无法识别kafka墓碑消息(不开启schema,墓碑消息为null;开启后为{“schema”:null,“payload”:null}),导致空指针异常

2、debezium发到kafka的消息中,before中的字段值为null问题

pg执行ALTER TABLE “public”.“table1_with_pk” REPLICA IDENTITY FULL;
在这里插入图片描述
默认情况下, REPLICA IDENTITY 为DEFAULT,修改前的值只有主键,没有其他字段
如下图,可以通过对比REPLICA IDENTITY为DEFAULT和FULL的情况,上面的oldKeys只有id,下面的oldKeys除了id还有name
在这里插入图片描述
flink ddl中设置’debezium-json.ignore-parse-errors’='true’表面可以解决报错问题,但是其实只是掩盖问题,不推荐这样做

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/124200.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt样式(qss)使用小结(软件换肤,比如暗黑模式)

1.背景&#xff1a; Qt style sheet&#xff08;qss&#xff09;跟前端技术一样&#xff0c;就是为了美化界面。关键是&#xff0c;太好用了。之前还为此写过一篇博客。 Qt样式&#xff08;qss&#xff09;手册小结_大橘的博客-CSDN博客 其中主要是记录如何获取手册细节。 …

6、GPIO输入按键检测(轮询检测)

目录 0x01、简介 0x02、硬件设计 0x03、编写函数 0x001、按键初始化 0x002、按键检测 0x003、按键led翻转 0x04、源程序下载地址 0x01、简介 本次实验主要实现按键控制LED灯。 由于机械按键在按下和抬起的时候会产生按键抖动&#xff0c;所以在设计的时候需要考虑如何消除抖…

Pytorch可视化特征图(代码 亲测可用)

2013年Zeiler和Fergus发表的《Visualizing and Understanding Convolutional Networks》 早期LeCun 1998年的文章《Gradient-Based Learning Applied to Document Recognition》中的一张图也非常精彩&#xff0c;个人觉得比Zeiler 2013年的文章更能给人以启发。从下图的F6特征&…

会议OA项目-首页

目录一、Flex布局简介什么是flex布局&#xff1f;flex属性学习地址&#xff1a;案例演示二、轮播图组件及mockjs三、会议OA小程序首页布局一、Flex布局简介 布局的传统解决方案&#xff0c;基于盒状模型&#xff0c;依赖 display属性 position属性 float属性 什么是flex布局…

简单有效的Mac内存清理方法,不用收藏也能记住

Mac电脑使用的时间越久&#xff0c;系统的运行就会变的越卡顿&#xff0c;这是Mac os会出现的正常现象&#xff0c;卡顿的原因主要是系统缓存文件占用了较多的磁盘空间&#xff0c;或者Mac的内存空间已满。如果你的Mac运行速度变慢&#xff0c;很有可能是因为磁盘内存被过度占用…

如何理解并记忆DataFrame中的Axis参数

当我们遇到有axis参数的方法时&#xff0c;脑子里的第一反应应该是&#xff1a;这个方法一定是沿着某一方向进行某种“聚合”或者“过滤”操作。在此场景下&#xff0c;Axis参数就是用来设定操作方向的&#xff1a;是垂直方向还是水平方向&#xff1f; axis0: 一行一行推进&…

【微服务架构实战】第1篇之API网关概述

1.网关概述 采用分布式、微服务的架构模式开发系统时&#xff0c;API 网关是整个系统中必不可少的一环。 1.1 没有网关会有什么问题&#xff1f; 在微服务架构模式下&#xff0c;1个系统会被拆分成多个微服务&#xff0c;如果每个微服务都直接暴露给调用方&#xff0c;会有以…

MySQL主键和唯一键的区别

主键和唯一键基本知识参考这篇文章 MySQL表的约束 &#xff0c;本篇文章主要是谈一谈主键和唯一键的区别从而更好的理解唯一键和主键。 在上篇文章中已经提到 主键&#xff1a; primary key 用来唯一的约束该字段里面的数据&#xff0c;不能重复&#xff0c;不能为空&#x…

vue父页面调用子页面及方法及传参,鼠标光标定位

项目场景&#xff1a; vue父页面调用子页面及方法 问题描述 vue中父界面调用子界面及方法时界面可以调用&#xff0c;但是调用方法的时候第一次报错&#xff0c;但是关掉界面再次重新打开就没问题了 原因分析&#xff1a; 在我之前添加鼠标指针定位的时候&#xff0c;如果在…

记录scoped属性的使用和引发的问题

背景 在对表格数据进行样式处理时&#xff0c;通过业务逻辑判断&#xff0c;进行对符合要求的表格填充背景色&#xff0c;没有符合预期的效果。反复排查校验代码和判断逻辑&#xff0c;都没有什么问题&#xff0c;可能还是样式上出现问题。再通过F12 选取元素对表格设置背景色时…

获取树形结构中,父节点下所有子/孙节点(递归方式)

获取树形结构中&#xff0c;父节点下所有子/孙节点&#xff08;递归方式&#xff09;1 树形结构&#xff08;TreeItem类&#xff09;2 测试代码&#xff08;main函数&#xff09;3 运行效果1 树形结构&#xff08;TreeItem类&#xff09; 这里通用型树形结构为TreeItem类&…

初学Java web(七)RequestResponse

Request&Response Request:获取请求数据 Response:设置响应数据 一.Request对象 1.Request继承体系 Tomcat需要解析请求数据&#xff0c;封装为requestx对象并且创建requestx对象传递到service方法中 使用request对象&#xff0c;查阅JavaEE API文档的HttpServletReque…

rocketMq架构原理精华分析(一)

rocketMq架构原理精华分析是我们这篇文章的核心&#xff0c;从消息中间件的对比、架构模型、消息模型、常见问题等逐一分析&#xff1a; 一、中间件对比&#xff1a; RabbitMq 集群效果不太好&#xff0c;底层不是java 语言&#xff0c;研究原理比较困难&#xff1b; Kafka是…

前端面试题之计算机网络篇 OSI七层网络参考模型

互联网数据传输原理 &#xff5c;OSI七层网络参考模型 OSI七层网络参考模型 应用层&#xff1a;产生网络流量的程序表示层&#xff1a;传输之前是否进行加密或者压缩处理会话层&#xff1a;查看会话&#xff0c;查木马 netstat-n传输层&#xff1a;可靠传输、流量控制、不可…

亿级流量的互联网项目如何快速构建?手把手教你构建思路

一. 大流量的互联网项目 1.项目背景 索尔老师之前负责的一个项目&#xff0c;业务背景是这样的。城市的基础设施建设是每个城市和地区都会涉及到的&#xff0c;如何在基建工地中实现人性化管理&#xff0c;是当前项目的主要诉求。该项目要实现如下目标&#xff1a; 工地工人的…

C语言实现http下载器(附代码)

C语言实现http的下载器。 例&#xff1a;做OTA升级功能时&#xff0c;我们能直接拿到的往往只是升级包的链接&#xff0c;需要我们自己去下载&#xff0c;这时候就需要用到http下载器。 这里分享一个&#xff1a; 功能&#xff1a; 1、支持chunked方式传输的下载 2、被重定…

Apollo开放平台8.0发布:多维升级“为开发者而生”

Apollo开放平台8.0重磅发布&#xff1a;多维升级“为开发者而生” Apollo开放平台迎来8.0版本&#xff0c;百度自动驾驶开放平台迈向易用性时代 百度Apollo EDU计划进展公布&#xff1a;已覆盖自动驾驶技术人才33.5万、700多所院校 Apollo Studio学习实践社区上线&#xff0c;新…

剑指offer----C语言版----第一天

目录 1. 数组中重复的数字Ⅰ 1.1 题目描述 1.2 思路一 1.3 思路二 1.4 思路三&#xff08;最优解&#xff09; 1. 数组中重复的数字Ⅰ 原题&#xff1a;剑指 Offer 03. 数组中重复的数字 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/shu-zu-zhong-…

Python语言快速入门上

目录 1、前言 2、变量和常量 1&#xff09;Python对象模型 2&#xff09;Python变量 二、运算符和表达式 【运算符和表达式】 【位运算符】 【逻辑运算符】 【成员运算符】 【身份运算符】 【常用内置函数】 【基本输入输出】 【模块导入与使用】 【Python代码编…

【PCB专题】Allegro导出3D文件

在PCB布局时,已经决定了大部分器件要放置的位置。如接口、主要的芯片、模块等。因为放置好器件后可能与结构干涉,如果没有发现,那么不得不在Layout的后期调整器件位置,增加工作量。所以前期布局基本确定后就需要导出3D文件给结构工程师,由他查看是否有器件与结构、螺丝孔等…