Canal同步数据

news2025/1/9 14:38:53

canal同步数据

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。

canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。

阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

canal工作原理

在这里插入图片描述

原理相对比较简单:

  1. canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  2. mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  3. canal解析binary log对象(原始为byte流)

加个canal slave 监听mysql数据变化日志,只适用mysql数据库。读取的数据可以同步到redis、其他数据库、ES等。

canal需要使用到mysql,我们需要先安装mysql, 安装mysql容器,但canal是基于mysql的主从模式实现的,所以必须先开启binlog.

开启binlog模式

binlog可以拿到mysql数据日志,canal再去获取日志信息。

先使用docker 创建mysql容器。

(1) 连接到mysql中,并修改/etc/mysql/mysql.conf.d/mysqld.cnf 需要开启主从模式,开启binlog模式。

执行如下命令,编辑mysql配置文件

在这里插入图片描述

命令行如下:

docker exec -it mysql /bin/bash
cd /etc/mysql/mysql.conf.d
vi mysqld.cnf

修改mysqld.cnf配置文件,添加如下配置:

在这里插入图片描述

上图配置如下:

二进制模式的日志目录

数据库唯一id

log-bin/var/lib/mysql/mysql-bin
server-id=12345

(2) 创建账号,用于测试使用

使用root账号创建用户并授予权限。账号canal,密码canal

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

(3)重启mysql容器

docker restart mysql

canal容器安装

下载镜像:

docker pull docker.io/canal/canal-server

容器安装

-p端口映射,-d后台运行

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

进入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步数据的数据库连接配置。

执行代码如下:

docker exec -it canal /bin/bash
cd canal-server/conf/
vi canal.properties

cd example/
vi instance.properties

修改canal.properties的id,不能和mysql的server-id重复,如下图:

在这里插入图片描述

修改instance.properties,配置数据库连接地址:

改需要监听的数据库ip地址

配置里面有username和password,都是canal

在这里插入图片描述

这里的canal.instance.filter.regex有多种配置,如下:

数据库正则表达式

.*表示所有数据库

\\..*表示所有表

.*\\..*表示所有数据库的所有表都被监听

可以参考地址如下:

https://github.com/alibaba/canal/wiki/AdminGuide
mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

配置完成后,设置开机启动,并记得重启canal。

exit
docker update --restart=always canal
docker restart canal

canal微服务搭建

当用户执行数据库的操作的时候,binlog 日志会被canal捕获到,并解析出数据。我们就可以将解析出来的数据进行同步到redis(也可以是另一个数据库、ES)中即可。

思路:创建一个独立的程序,并监控canal服务器,获取binlog日志,解析数据,将数据更新到redis中。这样数据就更新了。

在这里插入图片描述

github地址:https://github.com/wanwujiedao/spring-boot-starter-canal、

https://github.com/alibaba/canal

(1)安装辅助jar包

spring-boot-starter-canal-master中有一个工程starter-canal,它主要提供了SpringBoot环境下canal的支持,我们需要先安装该工程,在starter-canal目录下执行mvn install下载jar到本地,如下图:

在这里插入图片描述

(2)canal微服务工程搭建

创建service-canal工程,并引入相关配置。

pom.xml

        <!--canal依赖-->
        <dependency>
            <groupId>com.xpand</groupId>
            <artifactId>starter-canal</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

application.yml配置

server:
  port: 18082
spring:
  application:
    name: canal
#example实例,对应canal配置文件
canal:
  client:
    instances:
      example:
        host: 192.168.169.140
        port: 11111
        userName: canal
        password: canal
#springCloud
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
feign:
  hystrix:
    enabled: true
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
        #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE

(3)启动类创建

在包下创建启动类,代码如下:

@SpringBootApplication(exclude={DataSourceAutoConfiguration.class}) // 忽略数据库连接
@EnableEurekaClient // springCloud注册中心
@EnableCanalClient  // canal客户端
public class CanalApplication {

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class,args);
    }
}

(4)监听创建

创建一个CanalDataEventListener类,实现对表增删改操作的监听,代码如下:

package com.changgou.service.canal.listener;

import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.CanalEventListener;
import com.xpand.starter.canal.annotation.DeleteListenPoint;
import com.xpand.starter.canal.annotation.InsertListenPoint;
import com.xpand.starter.canal.annotation.ListenPoint;
import com.xpand.starter.canal.annotation.UpdateListenPoint;


/**
 * Title:实现对mysql数据库数据日志的监听
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2020/3/4
 */
@CanalEventListener
public class CanalDataEventListener {

    /***
     * 增加数据监听
     * @param eventType 当前操作的类型 增加数据
     * @param rowData 发生变更的数据-->>增加的数据
     */
    @InsertListenPoint
    public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("增加数据:");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    }

    // rowData.getAfterColumnsList() 之后的数据,适用于增加、修改
    // rowData.getBeforeColumnsList() 之前的数据,适用于删除

    /**
     * 修改数据监听
     * @param eventType
     * @param rowData 发生变更的数据-->>修改的数据
     */
    @UpdateListenPoint
    public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("修改前的数据:");
        rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
        System.out.println("修改后的数据:");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    }

    /***
     * 删除数据监听
     * @param eventType
     */
    @DeleteListenPoint
    public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("删除数据:");
        rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    }

    /***
     * 自定义数据修改监听,指定监听的库,表
     * @param eventType
     * @param rowData
     */
    @ListenPoint(destination = "example", // 实例配置
            schema = "changgou_content", // 库
            table = {"tb_content_category", "tb_content"}, // 表
            eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.DELETE}) // 监听类型,修改数据,删除数据
    public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        System.out.println("自定义修改前的数据:");
        rowData.getBeforeColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
        System.out.println("自定义修改后的数据:");
        rowData.getAfterColumnsList().forEach((c) -> System.out.println("列名: " + c.getName() + "---数据: " + c.getValue()));
    }
}

(5)测试

启动canal微服务,然后修改任意数据库的表数据,canal微服务后台输出如下:

在这里插入图片描述

同步redis缓存例子

监听修改的数据并同步到redis缓存(Mysql、ES)。一般缓存的数据是静态数据,防止高并发。

广告图片缓存同步:

在这里插入图片描述

如上图,每次执行广告操作的时候,会记录操作日志到,然后将操作日志发送给canal,canal将操作记录发送给canal微服务,canal微服务根据修改的分类ID调用content微服务查询分类对应的所有广告,canal微服务再将所有广告存入到Redis缓存。

content微服务

service-content微服务是广告微服务的增删改查,这里不用写出来。canal微服务根据修改的分类ID调用content微服务查询分类对应的所有广告,这里要使用到fegin,微服务之间的调用。

 /***
     * 根据categoryId查询广告集合
     */
    @GetMapping(value = "/list/category/{id}")
    public ResponseResult<List<Content>> findByCategory(@PathVariable Long id){
        //根据分类ID查询广告集合
        List<Content> contents = contentService.findByCategory(id);
        return new ResponseResult<List<Content>>(true,StatusCode.OK,"查询成功!",contents);
    }
#springCloud配置
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
  instance:
    prefer-ip-address: true
    
feign:
  hystrix:
    enabled: true
    
#hystrix 配置
hystrix:
  command:
    default:
      execution:
        timeout:
        #如果enabled设置为false,则请求超时交给ribbon控制
          enabled: true
        isolation:
          strategy: SEMAPHORE
<!-- Spring Cloud -->
<spring-cloud.version>Greenwich.SR2</spring-cloud.version>

<!-- Spring Cloud -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>${spring-cloud.version}</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

<!-- eureka注册中心,只有eureka-server微服务用到 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

<!-- eureka-client客户端 -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<!-- openfeign -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

<!-- redis 使用-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

另外在另一个工程中加入fegin,方法和地址跟微服务controller一样,service-content-fegin中直接加,不用配置文件,不用启动类,放个接口就行:

@FeignClient(name="content")
@RequestMapping(value = "/content")
public interface ContentFeign {

    /***
     * 根据分类ID查询所有广告
     */
    @GetMapping(value = "/list/category/{id}")
    ResponseResult<List<Content>> findByCategory(@PathVariable Long id);
}

同步实现

在canal微服务中修改如下:

(1)配置redis

修改application.yml配置文件,添加redis配置,如下代码:

redis有设置密码则添加password

在这里插入图片描述

(2)启动类中开启feign

修改CanalApplication,添加@EnableFeignClients注解,扫描fegin包,可调用content微服务controller方法。代码如下:

在这里插入图片描述

(3)同步实现

修改监听类CanalDataEventListener,实现监听广告的增删改,并根据增删改的数据使用feign查询对应分类的所有广告,将广告存入到Redis中,代码如下:

上图代码如下:

/**
 * Title:实现对mysql数据库数据日志的监听
 * Description:
 * @author WZQ
 * @version 1.0.0
 * @date 2020/3/4
 */
@CanalEventListener
public class CanalDataEventListener {

    @Resource
    private ContentFeign contentFeign;

    //字符串
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    //自定义数据库的 操作来监听
    //destination = "example"

    /**
     * 自定义数据库的 操作来监听
     * @param eventType 数据库修改数据类型,增改删
     * @param rowData 数据
     */
    @ListenPoint(destination = "example",
            schema = "changgou_content",
            table = {"tb_content", "tb_content_category"},
            eventType = {
                    CanalEntry.EventType.UPDATE,
                    CanalEntry.EventType.DELETE,
                    CanalEntry.EventType.INSERT})
    public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        //1.获取列名 为category_id的值
        String categoryId = getColumnValue(eventType, rowData);
        //2.调用feign 获取该分类下的所有的广告集合
        ResponseResult<List<Content>> categoryresut = contentFeign.findByCategory(Long.valueOf(categoryId));
        List<Content> data = categoryresut.getData();
        //3.使用redisTemplate存储到redis中,存json值
        stringRedisTemplate.boundValueOps("content_" + categoryId).set(JSON.toJSONString(data));
    }

    private String getColumnValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
        String categoryId = "";
        //判断 如果是删除  则获取beforlist
        if (eventType == CanalEntry.EventType.DELETE) {
            for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                // 列名为category_id
                if (column.getName().equalsIgnoreCase("category_id")) {
                    categoryId = column.getValue();
                    return categoryId;
                }
            }
        } else {
            //判断 如果是添加 或者是更新 获取afterlist
            for (CanalEntry.Column column : rowData.getAfterColumnsList()) {
                if (column.getName().equalsIgnoreCase("category_id")) {
                    categoryId = column.getValue();
                    return categoryId;
                }
            }
        }
        return categoryId;
    }
}

测试:

修改数据库数据,可以看到Redis中的缓存跟着一起变化

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/154389.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(9)Qt中信号与槽重载的解决方案

信号与槽重载的解决方案 一、通过函数指针解决 //信号 void (Me::*funchungury)() &Me::hungury; void (Me::*funchungury_QString)(QString) &Me::hungury; //槽 void (Me::*funceat)() &Me::eat; void (Me::*funceat_QString)(QString) &Me::eat;//有参…

Oracle与MySQL语法转换

前言 Oracle与MySQL语法转换 场景&#xff1a;系统改造&#xff0c;需要由Oracle切换为MySQL&#xff0c;因而要对代码中的Oracle语法的sql调整为MySQL语法 博客地址&#xff1a;芒果橙的个人博客 【http://mangocheng.com】 sysdate–当前日期 Oracle 使用sysdate select s…

hdl_graph_slam代码解析

hdl SLAM和定位的关系&#xff1a;HDL和cartographer一样&#xff0c;是离线建图的 整个SLAM系统的架构 包含四个节点&#xff1a; 预处理、 帧匹配、hdl_slam、地面检测 输入点云首先经过预处理进行降采样&#xff0c;然后传给下一个节点。帧匹配通过迭代获取两帧之间运动变化…

【SpringCloud01】微服务架构入门

1.微服务架构理论入门 SpringCloud微服务 2.Boot和Cloud版本选型 上篇&#xff1a;SpringBoot2.X版和SpringCloud H版 下篇&#xff1a;SpringCloud Alibaba 官网强烈推荐SpringBoot2.0以上的版本 Cloud与Boot之间的版本关系 技术选型相关的网站使用在线解析json字符串 由于…

第2章 马尔可夫决策过程

2.1 马尔可夫决策过程&#xff08;上&#xff09; Markov Decision Process&#xff08;MDP&#xff09; Markov Decision Process can model a lot of real-world problem. It formally describes the framework of reinforcement learningUnder MDP, the environment is ful…

Promise 实现 (从简易版到符合Promise A+规范)

前言 手写 Promise 是面试的时候大家都逃避的送命题&#xff0c;在学些了解后发现通过实现源码更能将新一代的异步方案理解的通透&#xff0c;知其然知其所以然的运用。 如果直接将源码贴到此处势必不能有更大的收获&#xff0c;下面就按实现版本来看做简要分析。 回顾 Prom…

SpringBoot测试类编写

前置要求: a.测试类上需要的注解 SpringBootTest AutoConfigureMockMvc Slf4j b.引入MockMvc类 Autowired private MockMvc mockMvc; c.如果需要前置条件可以用before注解 1.get/delete请求 // 查询Testvoid testQuery() throws Exception {String content mockMvc.perfor…

Django(15):身份和权限认证

目录1.Django中的身份认证模块1.1 用户模型1.2 认证模块1.3 项目搭建演示2.权限管理架构2.1 权限相关数据模型2.2 权限相关功能函数2.3 权限分配函数2.4 权限设置3.资源访问管理1.Django中的身份认证模块 1.1 用户模型 Django中有内建的用户模块django.contrib.auth.models.U…

2022 CNCC 中国计算机大会参会总结

前言 第 19 届 CNCC 于2022年12月8-10日召开&#xff0c;本届大会为期三天&#xff0c;首次采取全线上举办形式&#xff0c;主题为“算力、数据、生态”&#xff0c;重点在保持多样性、聚焦热点前沿话题、平衡学术界和产业界参与等维度展开讨论。大会由CCF会士、中国科学院院士…

【SpringBoot】一文带你入门SpringBoot

✅作者简介&#xff1a;热爱Java后端开发的一名学习者&#xff0c;大家可以跟我一起讨论各种问题喔。 &#x1f34e;个人主页&#xff1a;Hhzzy99 &#x1f34a;个人信条&#xff1a;坚持就是胜利&#xff01; &#x1f49e;当前专栏&#xff1a;【Spring】 &#x1f96d;本文内…

【职场进阶】做好项目管理,先从明确职责开始

优秀的项目管理一定是高效协调各方资源、反馈及时、调整迅速的。 同时可以做到让参与各方在整个项目过程中张弛有序、愉快合作&#xff0c;最终实现产品项目的效益最大化。 那什么是项目呢&#xff1f; 项目是为向客户提供独特的产品或服务而进行的临时性任务&#xff0c;项目有…

TypeScript 对象key为number时的坑

首先在js的对象中有一个设定&#xff0c;就是对象的key可以是字符串&#xff0c;也可以是数字。 不论key是字符串还是数字&#xff0c;遍历对象key的时候&#xff0c;这个key会变成字符串 通过[] 操作符访问key对应值时候&#xff0c;不论是数字还是字符串都转成了 字符串的k…

Chromedriver安装教程

第一步 查看你当前Chrome浏览器的版本&#xff0c;如下图所示&#xff1a; 第二步 查看当前Chrome浏览器的版本号&#xff0c;如下图所示,版本 108.0.5359.125&#xff08;正式版本&#xff09; &#xff08;64 位&#xff09;中的&#xff0c;108就是我们的版本号。 第三…

VTK-PointPlacer

前言&#xff1a;本博文主要研究VTK中点转换到曲面上的应用&#xff0c;相关的接口为vtkPolygonalSurfacePointPlacer&#xff0c;为深入研究将基类vtkPointPlacer开始讲解。主要应用为在PolyData表面进行画线。 vtkPointPlacer 描述&#xff1a;将2D display位置转换为世界坐…

ospf知识点汇总

OSPF &#xff1a; 开放式最短路径优先协议使用范围&#xff1a;IGP 协议算法特点&#xff1a; 链路状态型路由协议&#xff0c;SPF算法协议是否传递网络掩码&#xff1a;传递网络掩码协议封装&#xff1a;基于IP协议封装&#xff0c;协议号为 89一.OSPF 特点1.OSPF 是一种典型…

基于javaweb(springboot+mybatis)网上酒类商城项目设计和实现以及文档报告

基于javaweb(springbootmybatis)网上酒类商城项目设计和实现以及文档报告 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏…

【Linux】Linux项目自动化构建工具—make/Makefile

目录一.什么是make/MakefileMakefilemake二.Makefile逻辑1.简单依赖2.复杂依赖三.make指令1.make的使用2.clean清理3.伪目标4.make如何确定是否编译访问时间的影响修改时间的影响一.什么是make/Makefile Makefile 在Windows下&#xff0c;我们使用VS、VS Code这些ide编写C/C程…

MySQL的客户端/服务器架构

以我们平时使用的微信为例&#xff0c;它其实是由两部分组成的&#xff0c;一部分是客户端程序&#xff0c;一部分是服务器程序。客户端可能有很多种形式&#xff0c;比如手机APP&#xff0c;电脑软件或者是网页版微信&#xff0c;每个客户端都有一个唯一的用户名&#xff0c;就…

赶紧收藏 | 50个超实用微信小程序,巨好用|||内含免费配音软件

现在App太多了&#xff0c;想用的功能都要下载&#xff0c;但是手机有258g内存不允许这么放肆呀&#xff0c;只能挖掘不占用存的方法了&#xff0c;小程序就解决了这个痛&#xff0c;节省内存&#xff0c;让手机不再卡顿&#xff0c;打游戏也舒服.给大家整理了50个很好用的小程…

【阶段三】Python机器学习11篇:机器学习项目实战:KNN(K近邻)回归模型

本篇的思维导图: 项目实战(KNN回归模型) K近邻算法回归模型则将离待预测样本点最近的K个训练样本点的平均值进行待预测样本点的回归预测。 项目背景 K近邻除了能进行分类分析,还能进行回归分析,即预测连续变量,此时的KNN称为K近邻回归模型。回归问题是一类…