使用canal增量同步ES索引库数据

news2024/11/13 12:46:08

Canal增量数据同步利器

Canal介绍

canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。

github地址:https://github.com/alibaba/canal

版本下载地址:https://github.com/alibaba/canal/releases

文档地址:https://github.com/alibaba/canal/wiki/Docker-QuickStart

Canal应用场景

1.电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
2.价格、库存发生变更实时同步到redis;
3.数据库异地备份、数据同步;
4.代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。
在这里插入图片描述

MySQL主从复制原理

1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

Canal工作原理

1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3.canal 解析 binary log 对象(原始为 byte 流)
在这里插入图片描述

Canal安装

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

MySQL Bin-log开启

1)MySQL开启bin-log

a.进入mysql容器

docker exec -it -u root mysql /bin/bash

b.开启mysql的binlog

cd /etc/mysql/mysql.conf.d

在mysqld.cnf最下面添加如下配置
# 开启 binlog
log-bin=/var/lib/mysql/mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server-id=12345

c.创建账号并授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

d.重启mysql

docker restart mysql

开启bin-log后,我们可以用sql语句查看下:

show variables like '%log_bin%'

效果如下:
在这里插入图片描述

Canal安装

1)拉取镜像

docker pull canal/canal-server:v1.1.1

2)安装容器

a.安装canal-server容器

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

b.配置canal-server

修改/home/admin/canal-server/conf/canal.properties,将它的id属性修改成和mysql数据库中server-id不同的值,如下图:
在这里插入图片描述
c.修改/home/admin/canal-server/conf/example/instance.properties,配置要监听的数据库服务地址和监听数据变化的数据库以及表,修改如下:
在这里插入图片描述
在这里插入图片描述
指定监听数据库表的配置如下canal.instance.filter.regex:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) 
常见例子:
1.  所有表:.*   or  .*\\..*
2.  canal schema下所有表: canal\\..*
3.  canal下的以canal打头的表:canal\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

重启canal:

docker restart canal
Canal微服务

​ 我们搭建一个微服务,用于读取canal监听到的变更日志,微服务名字叫seckill-canal。该项目我们需要引入canal-spring-boot-autoconfigure包,并且需要实现EntryHandler接口,该接口中有3个方法,分别为insert、update、delete,这三个方法用于监听数据增删改变化。

参考地址:https://github.com/NormanGyllenhaal/canal-client

1)pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>seckill-service</artifactId>
        <groupId>com.seckill</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>seckill-canal</artifactId>

    <dependencies>
        <!--web-->
        <dependency>
            <groupId>com.seckill</groupId>
            <artifactId>seckill-web</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <!--esAPI-->
        <dependency>
            <groupId>com.seckill</groupId>
            <artifactId>seckill-search-api</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <!--goodsAPI-->
        <dependency>
            <groupId>com.seckill</groupId>
            <artifactId>seckill-goods-api</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>

        <!--canal-->
        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-autoconfigure</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <!-- 指定该Main Class为全局的唯一入口 -->
                    <mainClass>com.seckill.CanalApplication</mainClass>
                    <layout>ZIP</layout>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal><!--可以把依赖的包都打包到生成的Jar包中-->
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

bootstrap.yml配置

server:
  port: 18088
spring:
  application:
    name: seckill-canal
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: nacos-server:8848
      discovery:
        #Nacos的注册地址
        server-addr: nacos-server:8848
#超时配置
ribbon:
  ReadTimeout: 3000000
#Canal配置
canal:
  server: canal-server:11111
  destination: example
#日志
logging:
  level:
    root: error

2)创建com.seckill.handler.SkuHandler实现EntryHandler接口,代码如下:

@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {

    /***
     * 增加数据
     * @param sku
     */
    @Override
    public void insert(Sku sku) {
        System.out.println("===========insert:"+sku);
    }

    /***
     * 修改数据
     * @param before
     * @param after
     */
    @Override
    public void update(Sku before, Sku after) {
        System.out.println("===========update-before:"+before);
        System.out.println("===========update-after:"+after);
    }

    /***
     * 删除数据
     * @param sku
     */
    @Override
    public void delete(Sku sku) {
        System.out.println("===========delete:"+sku);
    }
}

3)创建启动类

@SpringBootApplication
public class CanalApplication {

    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class,args);
    }
}

程序启动后,修改tb_sku数据,可以看到控制会打印修改前后的数据:
在这里插入图片描述

索引库同步

当tb_sku秒杀商品发生变化时,我们应该同时变更索引库中的索引数据,比如秒杀商品增加,则需要同步增加秒杀商品的索引,如果有秒杀商品删除,则需要同步移除秒杀商品。

修改seckill-canal中的com.seckill.handler.SkuHandler的增删改方法,代码如下:

@Component
@CanalTable(value = "tb_sku")
public class SkuHandler implements EntryHandler<Sku> {

    @Autowired
    private SkuInfoFeign skuInfoFeign;

    /***
     * 增加数据
     * @param sku
     */
    @Override
    public void insert(Sku sku) {
        //将Sku转换成SkuInfo
        SkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);
        //同步索引
        skuInfoFeign.modify(1,skuInfo);
    }

    /***
     * 修改数据
     * @param before
     * @param after
     */
    @Override
    public void update(Sku before, Sku after) {
        int type=2;
        //将Sku转换成SkuInfo
        SkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(after) ,SkuInfo.class);
        if(skuInfo.getStatus()==1 || after.getSeckillNum()<=0){
            //商品变成了普通商品,或者商品库存为0,则需要删除索引数据
            type=3;
        }
        //同步索引
        skuInfoFeign.modify(type,skuInfo);
    }

    /***
     * 删除数据
     * @param sku
     */
    @Override
    public void delete(Sku sku) {
        //将Sku转换成SkuInfo
        SkuInfo skuInfo = JSON.parseObject( JSON.toJSONString(sku) ,SkuInfo.class);
        //同步索引
        skuInfoFeign.modify(3,skuInfo);
    }
}

开启Feign功能:@EnableFeignClients(basePackages = {“com.seckill.search.feign”})
在这里插入图片描述
此时对数据库中tb_sku表进行增删改的时候,会同步到索引库中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2080535.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jmeter的聚合报告生成测试报告的方法(生成.HTML模式)

1、找到所要【生成的测试报告地址】 2、新建一个空文件&#xff08;记住地址&#xff09; 3、在jmeter的bin目录下输入cmd,回车 4、输入 jmeter -n -t 【huacei.jmx】 -l 【11.jmx】 -e -o 【D:\egd-download\apache-jmeter-5.4.1\bin\report】 &#xff08;1&#xff09;其中…

NLP从零开始------14.文本中阶序列处理之语言模型(2)

3.2 长短期记忆 梯度消失问题的一个解决方案是使用循环神经网络的变体——长短期记忆( long short- term memory, LSTM)。 长短期记忆的原理是&#xff0c; 在每一步t&#xff0c; 都保存一个隐状态和一个单元状态( cell state) , 通过单元状态来存储长距离信息&#xff0c; 长…

Datawhale X 李宏毅苹果书 AI夏令营 入门 Task1-机器学习

目录 机器学习基础案例分析-视频的点击次数预测机器学习流程相关公式 机器学习基础 机器学习&#xff1a;机器具备有学习的能力/让机器具备找一个函数的能力。比如语音识别、图像识别、 机器学习有不同的类别。 1&#xff09;回归&#xff1a;假设要找的函数的输出是一个数值…

密码学(二)---DES、SM、RSA

在使用本博客提供的学习笔记及相关内容时&#xff0c;请注意以下免责声明&#xff1a;信息准确性&#xff1a;本博客的内容是基于作者的个人理解和经验&#xff0c;尽力确保信息的准确性和时效性&#xff0c;但不保证所有信息都完全正确或最新。非专业建议&#xff1a;博客中的…

【网络安全】服务基础第一阶段——第四节:Windows系统管理基础---- NTFS安全权限与SMB文件共享服务器

目录 一、NTFS安全权限 1.1 文件系统 1.2 格式化磁盘中的文件系统 1.FAT32 2.NTFS 3.EXT 4.XFS 应用场景&#xff1a; 1.3 文件操作权限 1.4 权限管理系统 1.5 特殊权限 1.6 NTFS权限类型 二、权限管理实践 三、SMB文件共享服务器 3.1 文件共享服务器 3.2 常用的…

excel规划求解结合vba宏笔记

目录 概念与配置 规划求解定义 excel设置规划求解 宏的基本操作 excel批量进行规划求解案例 加载规划求解模块 宏的设置 宏录制vba 其他案例 概念与配置 规划求解定义 运用“规划求解”定义并求解问题 - Microsoft 支持 excel设置规划求解 EXCEL规划求解的简明教程…

OpenAI的GPT-4模型详细介绍:研发能力、应用场景、开发的合作、持续投入

Open AI GPT-4的详细介绍 OpenAI的GPT-4模型展现出了强大的研发能力&#xff1a; 这主要体现在以下几个方面&#xff1a; 1. 庞大的模型规模和参数数量 GPT-4拥有超过1万亿个参数&#xff0c;这是其前代模型GPT-3的显著扩展。如此庞大的模型规模使得GPT-4能够处理更为复杂…

如何从人机环境系统中捕捉语义

从人机环境系统中捕捉语义主要涉及将系统中的数据和信息转化为具有实际意义的内容&#xff0c;以便更好地理解和响应用户的需求。以下是几种常见的方法来捕捉语义&#xff1a; 1. 自然语言处理 (NLP) 方法&#xff1a;使用自然语言处理技术来分析和理解用户输入的文本或语音。N…

8.27-dockerfile的应用+私有仓库的创建

一、dockerfile应用 通过dockerfile创建⼀个在启动容器时&#xff0c;就可以启动httpd服务的镜像 1.步骤 : 1.创建⼀个⽬录&#xff0c;⽤于存储Docker file所使⽤的⽂件2.在此⽬录中创建Docker file⽂件&#xff0c;以及镜像制作所使⽤的⽂件3.使⽤docker build创建镜像4.使…

MySQL集群技术3——MySQL高可用之组复制

MySQL高可用之组复制 MySQL Group Replication(简称 MGR )是 MySQL 官方于 2016 年 12 月推出的一个全新的高可用与高扩 展的解决方案 组复制是 MySQL 5.7.17 版本出现的新特性&#xff0c;它提供了高可用、高扩展、高可靠的 MySQL 集群服务 MySQL 组复制分单主模式和多主模式…

大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; 目前已经更新到了&#xff1a; Hadoop&#xff08;已更完&#xff09;HDFS&#xff08;已更完&#xff09;MapReduce&#xff08;已更完&am…

IngsollRang拧紧Insight IC-D控制器维修 系统参数设置

——设置菜单 Setup&#xff08;设置&#xff09;菜单及其子菜单用于编写拧紧策略并设置许多重要的系统参数。 在Setup&#xff08;设置&#xff09;菜单中&#xff0c;创建基本拧紧策略。 除策略外&#xff0c;您可以使用Setup&#xff08;设置&#xff09;菜单来设置时间、显…

堆和栈的概念和区别

文章目录 堆和栈的概念和区别栈 (Stack)堆 (Heap)详细描述补充说明逃逸分析 (Escape Analysis)栈上分配 (Stack Allocation)堆碎片化 (Heap Fragmentation) 堆和栈的概念和区别 堆和栈的概念和区别【改编自博客】 在说堆和栈之前&#xff0c;我们先说一下JVM&#xff08;虚拟…

家里两个路由器IP地址一样吗?‌IP地址冲突怎么办?‌

在家庭网络环境中&#xff0c;‌随着智能设备的不断增多和网络需求的日益提升&#xff0c;‌很多家庭选择使用两个或更多的路由器来扩展网络覆盖、‌提高网络性能。‌然而&#xff0c;‌在设置和使用多个路由器的过程中&#xff0c;‌一个常见且令人困惑的问题是&#xff1a;‌…

C++常见面试题(面试中总结)

文章目录 原文章链接1、回调函数的了解&#xff1f;2、递归算法解释&#xff1f;3、内存对齐解释&#xff1f;4、一种排序算法解释&#xff08;快速排序&#xff09;5、什么是多态&#xff1f;6、基类为什么需要虚析构函数&#xff1f;7、new和malloc的区别&#xff1f;8、指针…

ubuntu中安装Mysql以及使用Navicat远程连接的详细步骤【图文教程】

安装步骤 注意&#xff1a;建议大家都安装Ubuntu22.04的版本&#xff0c;在该版本下再安装MySQL8.0版本的数据库。 1查看当前是否安装了MySQL程序 $ dpkg -l |grep mysql 执行以上命令&#xff0c;如果执行后什么都没有&#xff0c;则进入到MySQL的安装步骤 2如果执行以上…

MATLAB进阶:应用微积分

今天我们继续学习matlab中的应用微积分 求导&#xff08;微分&#xff09; 1、数值微分 n维向量x(xi&#xff0c;x,… x)的差分定义为n-1维向量△x(X2-X1&#xff0c;X3-X2&#xff0c;…&#xff0c;Xn- Xn-1)。 diff(x) 如果x是向量&#xff0c;返回向量x的差分如果x是矩…

初识Linux · 有关gcc/g++

目录 前言&#xff1a; 1 gcc和g 2 翻译过程 2.1 预处理 2.2 编译 2.3 汇编 2.4 链接 前言&#xff1a; 继上文介绍了vim 和 yum&#xff0c;相当于介绍了 文本编译器&#xff0c;我们可以利用vim写代码&#xff0c;那么写代码的我们了解了&#xff0c;现在应该了解编译…

R语言统计分析——如何选择最佳回归模型

参考资料&#xff1a;R语言实战【第2版】 尝试获取一个回归方程时&#xff0c;实际上你就面对着从众多可能的模型中做选择的问题。是不是所有的变量都要包括&#xff1f;还是去掉那个对预测贡献不显著的变量&#xff1f;是否需要添加多项式项和/或交互项来提高拟合度&#xff1…

.NET WPF 抖动动画

.NET WPF 抖动动画 Demo Code <!-- 水平抖动 --> <Button Content"Hello World"><Button.RenderTransform><TranslateTransform x:Name"translateTransform" /></Button.RenderTransform><Button.Triggers><Even…