基于Canal+kafka监听数据库变化的最佳实践

news2025/1/11 2:45:21

1、前言

        工作中,我们很多时候需要根据某些状态的变化更新另一个业务的逻辑,比如订单的生成,成交等,需要更新或者通知其他的业务。我们通常的操作通过业务埋点、接口的调用或者中间件完成。

        但是状态变化的入口比较多的时候,就很容易漏掉某些地方。代码维护起来也比较麻烦。今天介绍阿里出品的 【canal】中间件完成数据库字段的监听。

2、canal的简单介绍

        canal详见介绍件官网:https://github.com/alibaba/canal

 

2.1 家族成员:

【canal.adapter】:客户端落地的适配以及功能

      

 【canal.admin】:提供WebUI的管理界面

 【canal.deployer】:canal服务

 【canal.example】:客户端提供的demo

2.2 工作原理

 3、 实践目标

        使用canal监控mysql数据的变化,将变化的数据推送到kafka,并使用canal-admin动态管理需要监控的数据库表。

 4、工具准备

其中kafka是依赖zookeeper的,所以也需要zookeeper。

5、配置并启动kafka

Kafka QuickStart

5.1 修改配置

vim config/server.properties

换成自己的IP

替换成自己zookeeper的地址

 5.2 启动server

  • 启动zookeeper脚本

# bin/zkServer.sh start

  • 启动kafka脚本

# bin/kafka-server-start.sh -daemon config/server.properties &

  •  查看是否启动成功脚本

# jps -ml

 

此时kafka启动成功。

5.3 注意事项

值得注意的是官方文档中查看topic的命令,

# bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181

在心的kafka版本中已经改变,可移步kafka官方文档: Apache Kafka

新版本中使用bootstrap-server,如下

# bin/kafka-topics.sh --list --bootstrap-server localhost:9092 

6、启动canal-admin

6.1 修改配置

改成对应的ip

 6.2 执行 conf/canal.manage.sql

         该脚本是canal-admin的管理脚本。

 6.3 启动canal-admin

sh bin/startup.sh

 6.4 查看启动状态

 6.5 访问页面

此时代表canal-admin已经启动成功,可以通过 http://127.0.0.1:8089/ 访问,

默认密码:admin/123456

7、启动canal-server

7.1 修改配置脚本

# vim conf/canal_local.properties

换成canal-admin的IP

7.2 启动服务 指定local

# sh bin/startup.sh local

7.3 查询启动状态

8、管理平台配置

8.1 查看canal服务的状态

 8.2 配置实例

 修改监听的数据库信息:

canal.instance.master.address=192.168.88.111:3306

canal.instance.dbUsername=***
canal.instance.dbPassword=***

#默认监听全库

canal.instance.filter.regex=test.test_user

#配置不可访问的库表

canal.instance.filter.black.regex=

#配置mq的主题/路由

canal.mq.topic=example

 保存即可。

8.3 启动实例

9、编写客户端监听kafka的客户端

@Test
public void test01(){
	// 修改打印日志的级别,不然会不停的打印debug日志,影响阅读
	LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
	Logger root = loggerContext.getLogger("root");
	root.setLevel(Level.INFO);

	//设置消费者属性
	Properties properties = new Properties();
	properties.put("bootstrap.servers","192.168.88.111:9092");
	//反序列化器,与生产者的序列化器相对应
	properties.put("key.deserializer", StringDeserializer.class);
	properties.put("value.deserializer", StringDeserializer.class);
	//设置消费者的消费者群组
	properties.put(ConsumerConfig.GROUP_ID_CONFIG,"example");
	// properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest
	KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
	try {
		//消费者订阅主题(可以多个,支持正则表达式,进行模糊匹配)
		consumer.subscribe(Collections.singletonList("example"));
		System.out.println("-------------------------消费端准备就绪,等待消息接受------------------------------------");
		//kafka消费者是通过拉取的方式获得服务端消息
		while(true){
			//循环调用poll方法,获取数据。
			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
			for(ConsumerRecord<String, String> record:records){
				String topic = record.topic();
				String value = record.value();
				if (StringUtils.isNotEmpty(value)) {
					System.out.println(String.format("topic:%s;" + "value:%s", topic,value));
				}
			}
		}
	} finally {
		consumer.close();
	}
}

 10、验证

修改数据库字段,可以接收到修改的信息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/107984.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

位图详解.

1.位图概念 给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&#xff0c;如何快速判断一个数是否在这40亿个数中&#xff1f; 思考&#xff1a; 1.用哈希表&#xff1f;遍历一遍&#xff1f;时间复杂度O(N) 40亿个不重复无符号整数占多大内存&#xff1…

springcloud配置中心nacos动态线程池Dynamic-tp配置接入实战

最近业务上需要把项目组几个微服务接入动态线程池框架Dynamic-tp监控&#xff0c;同时支持webhook机器人接受变更通知、容量报警、活性报警、拒绝报警、任务超时报警等通知&#xff0c;方便项目组同事企业微信群实时监控线程池状态&#xff0c;动态调整线程参数。 接手开始上手…

【C++学习】对二维数组进行排序

1 问题描述 今天在学习 CUDA-NMS 算法时&#xff0c;遇到一个小问题&#xff0c;就是希望对一个二维数组进行排序&#xff0c;具体是这样的&#xff1a; 需要对存储目标框的二维数组进行排序&#xff0c;排序的方式是按照目标框的得分score从高往低排序&#xff0c;也就是结果…

微服务系列 - Zookeeper上篇:入门到精通

一、前言 本内容仅用于个人学习笔记&#xff0c;如有侵扰&#xff0c;联系删除 视频教程&#xff1a;【尚硅谷】大数据技术之Zookeeper 3.5.7版本教程 源码学习&#xff1a;微服务系列 - Zookeeper下篇&#xff1a;源码解析 二、Zookeeper入门 1、概述 Zookeeper是一个开…

消息队列(MQ)

文章目录什么是消息队列定义消息队列的特性&#xff1a;1、解耦&#xff1a;2、削峰&#xff1a;3、异步&#xff1a;使用场景传统串行化设计串行化思想&#xff1a;并行处理调优高可用&#xff1a;高并发高性能MQ的两种流派MQ常见的问题1&#xff0c;mq如何避免消息堆积问题。…

如何解决甲乙双方需求理解巨大偏差的问题?

1、对双方进行专业培训 需求分析人员对需求方应用领域不熟悉&#xff0c;而需求方对软件开发也不熟悉&#xff0c;通过对双方进行专业知识培训&#xff0c;让双方对用户专业领域和软件开发领域都有较为清晰的认识。在此基础上&#xff0c;需求方提出的需求更精准&#xff0c;双…

Java中的多线程——线程安全问题

作者&#xff1a;~小明学编程 文章专栏&#xff1a;JavaEE 格言&#xff1a;热爱编程的&#xff0c;终将被编程所厚爱。 目录 多线程所带来的不安全问题 什么是线程安全 线程不安全的原因 修改共享数据 修改操作不是原子的 内存可见性对线程的影响 指令重排序 解决线程…

一个高性能、无侵入的Java性能监控和统计工具,有点东西!

背景 随着所在公司的发展&#xff0c;应用服务的规模不断扩大&#xff0c;原有的垂直应用架构已无法满足产品的发展&#xff0c;几十个工程师在一个项目里并行开发不同的功能&#xff0c;开发效率不断降低。 于是公司开始全面推进服务化进程&#xff0c;把团队内的大部分工程…

你造Python中的上下文管理器是啥吗

上下文管理器(Context managers)让我们在需要的时候可以准确地分配或释放资源 Python中最常用的上下文管理例子就是with语句了&#xff0c;一般是在操作文件的时候&#xff0c;比如&#xff1a; 有PY基础的小伙伴都知道&#xff0c;上面的代码等价于&#xff1a; 对比两个例子…

python机器学习及深度学习在空间模拟与时间预测领域中的应用

了解机器学习的发展历史、计算原理、基本定义&#xff0c;熟悉机器学习方法的分类&#xff0c;常用机器学习方法&#xff0c;以及模型的评估与选择&#xff1b;熟悉数据预处理的流程&#xff0c;掌握python程序包的使用&#xff1b;理解机器学习在生态水文中的应用&#xff0c;…

使用Python绘制圣诞树教程(附源代码)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

Qt Creator 运行LVGL模拟器

windows下用Qt Creator运行LVGL 8.2 背景 最近在学习LVGL&#xff0c;手头又没有硬件&#xff0c;好多网上资料介绍了如何在PC端模拟&#xff0c;纯C语言实现的库模拟的话也不会复杂&#xff0c;恰巧本人熟悉Qt Creator&#xff0c;那就用这个环境模拟吧&#xff0c;网上搜索…

为什么企业传统网络访问海外应用程序不稳定、速度慢?怎么解决?

外贸、游戏等行业经常会有跨网数据访问的需求&#xff0c;并且访问慢、卡是常见的问题。这其中主要的原因是网络访问速度与物理距离有直接关系。刨除距离原因&#xff0c;还有哪些因素影响了我们的访问呢?那么访问国站慢的影响因素有哪些? 在中国的国内用户&#xff0c;使用应…

下载安装npm,配置环境变量详细教程

要在本地运行项目&#xff0c;就需要安装npm&#xff0c;其次还需要配置项目依赖node-modules。今天我们就先安装npm npm下载安装一、安装1、下载2、勾选同意&#xff0c;下一步3、默认的存储位置即可&#xff0c;next4、不用勾选&#xff0c;next5、点击install6、等待安装完成…

通过 Mito 在 Python 中使用电子表格

在本文中,您将学习如何使用这个强大的库,该库能够自动执行某些 Pandas 任务并以非常快速的方式执行数据分析。 Mitosheet是什么? Mitosheet 是 Python 中可用的众多库之一,它实际上是 Python 和电子表格之间的融合,大大加快了数据分析过程。 安装过程 您可以安装在分析…

《Python多人游戏项目实战》第五节 断线重连

目录 5.1 模拟弱网状态 5.2 断线重连 5.3 优化玩家名称显示 5.4 完整代码下载地址 导致客户端和服务端断开连接的原因可能有以下三种&#xff1a; 服务端主动关闭连接。客户端窗口关闭&#xff0c;玩家退出游戏。客户端所在网络不给力&#xff08;也叫做弱网&#xff09;&…

Redis - Redis持久化:AOF和RDB

1. 为什么要持久化 Redis是内从数据库&#xff0c;宕机后数据会丢失&#xff1b;Redis重启后&#xff0c;为了快速恢复数据&#xff0c;提供了持久化机制&#xff1b;Redis有两种持久化方式&#xff1a;RDB和AOF&#xff0c;这也是Redis无畏宕机与快速恢复数据的杀手锏。 注意…

全球代表供应商!腾讯安全NDR再获Gartner认可

近日&#xff0c;国际研究机构Gartner发布了2022年《Market Guide for Network Detection and Response》&#xff08;《网络检测和响应&#xff08;NDR&#xff09;市场指南》&#xff09;&#xff08;以下简称《报告》&#xff09;&#xff0c;腾讯安全被Gartner列为全球NDR市…

非零基础自学Golang 第17章 HTTP编程(上) 17.3 爬虫框架gocolly 17.3.1 gocolly简介

非零基础自学Golang 文章目录非零基础自学Golang第17章 HTTP编程(上)17.3 爬虫框架gocolly17.3.1 gocolly简介第17章 HTTP编程(上) 17.3 爬虫框架gocolly 我们在之前学习了如何使用标准库实现HTTP爬虫【其实也不算&#xff0c;就实现了简单的请求&#xff0c;但是爬虫不就是这…

别乱用了,用新的。Go SliceHeader 和 StringHeader 将会被废弃!

大家好&#xff0c;我是煎鱼。Go 语言中有个很经典的 (Slice|String)Header&#xff0c;经常出现在大家视野中&#xff0c;为此我写了《Go SliceHeader 和 StringHeader&#xff0c;你知道吗&#xff1f;》给大家介绍&#xff0c;避免被面试官卷到。以重点来讲&#xff0c;Slic…