Canal实时监控案例

news2025/1/11 19:45:16

Canal实时监控案例

文章目录

  • Canal实时监控案例
    • 0. 写在前面
    • 1. TCP 模式测试
      • 1.1 IDEA创建项目canal-module
    • 1.2 通用监视类——CanalClient
      • 1.2.1 Canal 封装的数据结构
      • 1.2.2 在 canal-module 模块下创建 cn.canal 包,并在该包下创建 CanalClient.java文件
    • 2. Kafka 模式测试


0. 写在前面

  • Canal版本:Canal-1.1.5
  • Kafka版本:Kafka-2.4.1
  • Zookeeper版本:Zookeeper-3.5.7

解压安装canal的tar.gz包之前,提前创建一个目录canal-x.x.x作为canal的安装目录,因为canal解压后是分散

1. TCP 模式测试

1.1 IDEA创建项目canal-module

编辑pom.xml文件:添加以下依赖

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.2</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.protocol</artifactId>
        <version>1.1.5</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>

V1.1.5版本需要多添加canal.protocol这个依赖,如果是V1.1.2就不需要

1.2 通用监视类——CanalClient

1.2.1 Canal 封装的数据结构

Message:一次canal从日志中抓取的信息,一个message可以包含多个sql执行的结果

在这里插入图片描述

1.2.2 在 canal-module 模块下创建 cn.canal 包,并在该包下创建 CanalClient.java文件

代码如下:

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;


public class CanalClient {

    public static void main(String[] args) throws InvalidProtocolBufferException, InterruptedException {
        // TODO 获取连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("node01", 11111),"example", "", "");

        while (true) {
            // TODO 连接
            canalConnector.connect();
            // TODO 订阅数据库test_canal
            canalConnector.subscribe("test_canal.*");
            // TODO 获取指定数量的数据
            Message message = canalConnector.get(100);
            // TODO 获取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            //TODO 判断集合是否为空,如果为空,则等待一会继续拉取数据
            if (entries.size() <= 0) {
                System.out.println("当次抓取没有数据,休息一会----------------");
                Thread.sleep(1000);
            } else {
                // TODO 遍历entries,单条解析
                for (CanalEntry.Entry entry : entries) {
                    // 1.获取表名
                    String tableName = entry.getHeader().getTableName();
                    // 2.获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    / /3.获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();

                    //4.判断当前entryType类型是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        // 6.获取当前事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        // 7.获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();

                        // 8.遍历rowDataList,并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {
                            // 之前的数据
                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
                            // 之后的数据
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }
                            // 数据打印(控制台|Kafka)
                            System.out.println("Table:" + tableName +
                                    ",EventType:" + eventType +
                                    ",Before:" + beforeData +
                                    ",After:" + afterData);
                        }
                    } else {
                        System.out.println("当前操作类型为:" + entryType);
                    }
                }
            }
        }
    }
}

开启canal,运行CanalClient查程序,对订阅的数据库canal_test下的表进行增删改操作,同时观察控制台的输出情况

  • 增加数据

单词插入一条数据

insert into user_info values('1001', 'zss', 'male');

在这里插入图片描述

一条sql影响多行

insert into user_info values('1002', 'lisi', 'female'),('1001', 'zss', 'male');

在这里插入图片描述

  • 修改数据

在这里插入图片描述

  • 删除数据

在这里插入图片描述

2. Kafka 模式测试

  • 修改 canal.properties 中 canal 的输出 model,默认 tcp,改为输出到kafka
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = kafka
  • 修改 Kafka 集群的地址
##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
  • 修改 instance.properties 输出到 Kafka 的主题(canal_test)以及分区数
# mq config
canal.mq.topic=canal_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6

注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱 binlog 的顺序, 如果要提高并行度, 首先设置 kafka 的分区数>1, 然后设置 canal.mq.partitionHash 属性

  • 启动Canal
[zhangsan@node01 example]$ cd /opt/module/canal/ 
[zhangsan@node01 example]$  bin/startup.sh
  • 看到 CanalLauncher 你表示启动成功,同时会创建 canal_test 主题
[zhangsan@node01 example]$ jps 
2269 Jps
2253 CanalLauncher
  • 启动 Kafka 消费客户端测试,查看消费情况
	[zhangsan@node01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic canal_test
  • 向 MySQL 中插入|修改|删除数据后查看消费者控制台

Kafka 消费者控制台

  • 增加数据

单词插入一条数据

insert into user_info values('1001', 'zss', 'male');

在这里插入图片描述

一条sql影响多行

insert into user_info values('1002', 'lisi', 'female'),('1001', 'zss', 'male');
update user_info 

在这里插入图片描述

  • 修改数据

tp

  • 删除数据

在这里插入图片描述

结束!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/388459.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前后端身份验证

1、web 开发模式 【】基于服务端渲染的传统 Web 开发模式 【】基于前后端分离的新型 Web 开发模式&#xff1a;依赖于 Ajax 技术的广泛应用。后端只负责提供 API 接口&#xff0c;前端使用 Ajax 调用接口的开发模式 2、身份认证 【】服务端渲染推荐使用 Session 认证机制 【】…

《堆的应用》TOP-K问题

TOP-K问题:即求数据中前k个最大的元素或者最小的元素&#xff0c;一般情况下&#xff0c;这些数据量是非常大的。 比如:专业前10名、世界500强、世界富豪榜、游戏中前100名等这些排名都是TOP-K问题。 来源于《财富》世界500强排行榜。 对于TOP-k问题&#xff0c;能想到的最简…

【XXL-JOB】XXL-JOB定时处理视频转码

【XXL-JOB】XXL-JOB定时处理视频转码 文章目录【XXL-JOB】XXL-JOB定时处理视频转码1. 准备工作1.1 高级配置1.2 分片广播2. 需求分析2.1 作业分片方案2.2 保证任务不重复执行2.2.1 保证幂等性3. 视频处理业务流程3.1 添加待处理任务3.2 查询待处理任务3.3 更新任务状态3.4 工具…

考研还是工作?两战失败老道有话说

老道入职第一周自我介绍谈谈考研谈谈工作新的启程自我介绍 大家好&#xff01;在下是一枚考研失败两次的自认为聪明能干的有点小帅的实则超级垃圾的三非名校毕业的自动化渣男。大一下就加入实验室&#xff0c;在实验室焊板子、画板子、培训、打比赛外加摸鱼&#xff1b;参加过…

Swagger扩展 - 同一个接口生成多份Swagger API文档

为同一个ApiOperation生成多份不同Swagger API文档。 0. 目录1. 背景2. 效果展示3. 实现3.1 关键逻辑 - 让接口自解释3.2 关键逻辑 - 如何生成相应的ApiDescription3.3 关键逻辑 - 如何为生成的ApiDescription 赋值3.4 关键逻辑 - 如何动态生成Docket4. 继续优化5. 参考1. 背景…

【Spark分布式内存计算框架——Structured Streaming】3. Structured Streaming —— 入门案例:WordCount

1.3 入门案例&#xff1a;WordCount 入门案例与SparkStreaming的入门案例基本一致&#xff1a;实时从TCP Socket读取数据&#xff08;采用nc&#xff09;实时进行词频统计WordCount&#xff0c;并将结果输出到控制台Console。 文档&#xff1a;http://spark.apache.org/docs/2…

一个Bug让人类科技倒退几十年?

大家好&#xff0c;我是良许。 前几天在直播的时候&#xff0c;问了直播间的小伙伴有没人知道「千年虫」这种神奇的「生物」的&#xff0c;居然没有一人能够答得上来的。 所以&#xff0c;今天就跟大家科普一下这个人类历史上最大的 Bug 。 1. 全世界的恐慌 一个Bug会让人类…

Java中的自动类型提升与强制类型转换

一、自动类型提升 自动类型提升是指在程序运行时因为某种情况需要&#xff0c;JVM将较小的数据类型自动转换为较大的数据类型&#xff0c;以保证精度和正确性。在Java中&#xff0c;需要进行类型提升的情况有以下几种&#xff1a; 1. byte、short和char提升为int类型 当运算…

spark sql(五)sparksql支持查询哪些数据源,查询hive与查询mysql的区别

1、数据源介绍 sparksql默认查询的数据源是hive数据库&#xff0c;除此之外&#xff0c;它还支持其它类型的数据源查询&#xff0c;具体的到源码中看一下&#xff1a; 可以看到sparksql支持查询的数据源有CSV、parquet、json、orc、txt、jdbc。这些数据源中前面五个我还能理解&…

【Python】RPA批量生成word文件/重命名及批量删除

批量生成word文件 场景&#xff1a;需要新建多个类似文件名 比如&#xff1a;今天的事例是新建12个文件名为&#xff1a; ​ 保安员考试试卷1及答案.docx ​ 保安员考试试卷2及答案.docx ​ … ​ 保安员考试试卷12及答案.docx 痛点&#xff1a; ​ 手动操作重复性高&a…

目标检测中回归损失函数(L1Loss,L2Loss,Smooth L1Loss,IOU,GIOU,DIOU,CIOU,EIOU,αIOU ,SIOU)

文章目录L-norm Loss 系列L1 LossL2 LossSmooth L1 LossIOU系列IOU &#xff08;2016&#xff09;GIOU &#xff08;2019&#xff09;DIOU &#xff08;2020&#xff09;CIOU &#xff08;2020&#xff09;EIOU &#xff08;2022&#xff09;αIOU (2021)SIOU &#xff08;2022…

【SpringCloud】SpringCloud详解之Eureka实战

目录前言SpringCloud Eureka 注册中心一.服务提供者和服务消费者二.需求三.搭建Eureka-Server四.搭建Eureka-Client(在服务提供者配置:用户订单)前言 微服务中多个服务&#xff0c;想要调用&#xff0c;怎么找到对应的服务呢&#xff1f; 这里有组件的讲解 → SpringCloud组件…

深圳大学《计算机论题》作业:大数据与人工智能技术对人类生活的影响

说明 本作业为小组作业&#xff0c;要求基于一场报告完成&#xff08;即观后感&#xff09;。共分4个小题&#xff0c;讨论人工智能时代的伦理思考。由于版权原因&#xff0c;不提供报告的具体内容&#xff0c;只展示答题内容。 第一题 &#xff08;1&#xff09; 你如何看待…

winform控件PropertyGrid的应用(使运行中的程序能像vistual studio那样设置控件属性)

上周在看别人写的上位机demo代码时&#xff0c;发现创建的项目模板是"Windows 窗体控件库"(如下图) 生成的项目结构像自定义控件库&#xff0c;没有程序入口方法Main&#xff0c;但却很神奇能调试&#xff0c;最后发现原来Vistual Studio启动了一个外挂程序UserContr…

LSM(日志结构合并树)_笔记

WAL&#xff1a;Write Ahead Log 写前日志&#xff0c;顺序日志文件 1 LSM tree的定义 LSM tree&#xff1a; Log-Structured-Merge-Tree&#xff0c;日志结构合并树。 Log-Structured Merge-tree (LSM-tree) is a disk-based data structure designed to provide low-cost …

Linux操作系统学习(了解文件系统动静态库)

文章目录浅谈文件系统了解EXT系列文件系统目录与inode的关系软硬链接动静态库浅谈文件系统 当我们创建一个文件时由两部分组成&#xff1a;文件内容文件属性&#xff0c;即使是空文件也有文件属性 一个文件没有被打开是存储在磁盘中的&#xff0c;而磁盘是计算机中的一个机械…

你想赚的钱不一定属于你

昨天一个同行跟我说&#xff0c;最近有个五十多万的订单&#xff0c;客户是拿着别人家的设计来找的他&#xff0c;跟了也有大半个月了&#xff0c;自己明明报的价格比原设计的公司要低&#xff0c;客户一直说会尽快下的&#xff0c;他原本想着能够从这个订单里赚到几万块&#…

王道计算机组成原理课代表 - 考研计算机 第六章 总线 究极精华总结笔记

本篇博客是考研期间学习王道课程 传送门 的笔记&#xff0c;以及一整年里对 计算机组成 知识点的理解的总结。希望对新一届的计算机考研人提供帮助&#xff01;&#xff01;&#xff01; 关于对 “总线” 章节知识点总结的十分全面&#xff0c;涵括了《计算机组成原理》课程里的…

软件测试用例(3)

按照测试对象划分: 一)界面测试: 1)软件只是一种工具&#xff0c;软件和人的信息交流是通过界面来进行的&#xff0c;界面是软件和用户交流的最直接的一层&#xff0c;界面的设计决定了用户对于我们设计软件的第一映像&#xff0c;界面如同人的面孔&#xff0c;具有最吸引用户的…

Java中String详解(从原理理解经典面试题)

本篇文章我先通过经典面试题&#xff0c;筛选需要观看本篇文章的朋友&#xff0c;然后咱们介绍String的基本特性&#xff0c;通过基本特性就可以找到面试题的答案。最后咱们再深入每个面试题&#xff0c;通过字节码、编译原理、基本特性深入剖析所有的面试题&#xff0c;让大家…