优雅数据同步--canal实现mysql同步demo

news2025/1/12 12:00:39

当需要两张表数据同步的时候,我们会想到几种方案?
最简单的一种方式就是触发器的方式。例如A同步到B,可以通过下面的sql来添加触发器

create trigger tri_trade_update 
after UPDATE 
on `A`
for each row
begin 
update `B` 
set 
company_id = new.`company_id`,
supplier_id =new.`supplier_id`
WHERE id=old.`id`;   
end;

但是这种方式有一定的弊端和局限性,首先就是只局限同库,并且会增大数据库开销,以及无法实现一些自定义的逻辑。

canal是阿里推出的一个开源的中间件,它的原理类似于mysql主从的原理,它把自己伪装成mysql的从库,这样就可以从主库中获取binLog来复制数据

接下来我们自己搭建一个canal同步mysql的一个demo。我们都通过docker来安装mysql和canal,这样比较方便

一、首先需要安装mysql容器

1、首先在docker搜索mysql的官方镜像。我们选择最新的版本latest然后拉取下来
2、运行这个镜像
3、使用命令把mysql容器内部存储数据文件拷贝到外部目录存储。

    docker cp mysql:/etc/mysql/my.cnf /Users/admin/WORK/docker/mysql/config
    docker cp mysql:/var/lib/mysql /Users/admin/WORK/docker/mysql/data

4、 删除运行的mysql容器
5、 重新启动mysql容器
-v 挂载容器文件到外部目录,这样我们就可以持久化mysql的数据不会随着容器关闭而消失。
-p 将容器内mysql的端口映射到外部端口上
-e 设置环境变量

    docker run -d \
    --name mysql \
    -p 3306:3306 \
    -v /Users/admin/WORK/docker/mysql/config/my.cnf:/etc/mysql/my.cnf \
    -v /Users/admin/WORK/docker/mysql/data/mysql:/var/lib/mysql \
    -e MYSQL_ROOT_PASSWORD=123456 \
    mysql:latest

二、其次我们安装canal容器

1、首先在docker搜索canal-server的官方镜像。选择最新的版本latest然后拉取下来
2、运行这个镜像
3、使用命令把mysql容器内部存储数据文件拷贝到外部目录存储。

    docker cp canal:/home/admin/canal-server/conf/example/instance.properties /Users/admin/WORK/docker/cancal/conf

4、 删除运行的canal容器
5、 重新启动canal容器
-v 挂载容器文件到外部目录,这样我们就可以持久化mysql的数据不会随着容器关闭而消失。
-p 将容器内mysql的端口映射到外部端口上
-e 设置环境变量

docker run -d \
    --name canal \
    -p 11111:11111 \
    -v /Users/admin/WORK/docker/cancal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
    canal/canal-server:latest

三、同步canal和mysql

修改本地my.cnf文件。将canal同步需要的配置添加上

log-bin=mysql-bin #开启binlog
binlog-format=ROW #row格式
server_id=1 #主从标识

创建canal账号,并赋予同步权限


# 创建账号
CREATE USER canal IDENTIFIED BY 'canal'; 
# 授予权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新并应用
FLUSH PRIVILEGES;
# 建库
create database canal;
# 查看设置binlog是否生效
show variables like 'log_bin';

修改instance.properties文件

修改canal.instance.master.address配置,ip改成mysql容器的内部ip。注意容器跟容器访问不能使用127.0.0.1

重启mysql容器,重启canal容器

进入canal容器中,观察日志more canal-server/logs/example/example.log 查看是否报错

四、java编写运行canal客户端代码

package logistics.canal;


import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * Canal测试
 *
 * @author admin
 * @date 2022/12/23
 */
public class CanalTest {

    public static void main(String[] args) {
        String ip = "127.0.0.1";
        String destination = "example";
        //创建连接对象
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(
                new InetSocketAddress(ip, 11111), destination, "", ""
        );

        //进行连接
        canalConnector.connect();
        //进行订阅
        canalConnector.subscribe();

        int batchSize = 5 * 1024;
        //使用死循环不断的获取canal信息
        while (true) {
            //获取Message对象
            Message message = canalConnector.getWithoutAck(batchSize);
            long id = message.getId();
            int size = message.getEntries().size();

            System.out.println("当前监控到的binLog消息数量是:" + size);

            //判断是否有数据
            if (id == -1 || size == 0) {
                //如果没有数据,等待1秒
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //如果有数据,进行数据解析
                List<Entry> entries = message.getEntries();

                //遍历获取到的Entry集合
                for (Entry entry : entries) {
                    System.out.println("----------------------------------------");
                    System.out.println("当前的二进制日志的条目(entry)类型是:" + entry.getEntryType());

                    //如果属于原始数据ROWDATA,进行打印内容
                    if (entry.getEntryType() == EntryType.ROWDATA) {
                        try {
                            //获取存储的内容
                            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

                            //打印事件的类型,增删改查哪种 eventType
                            System.out.println("事件类型是:" + rowChange.getEventType());

                            //打印改变的内容(增量数据)
                            for (RowData rowData : rowChange.getRowDatasList()) {
                                System.out.println("改变前的数据:" + rowData.getBeforeColumnsList());
                                System.out.println("改变后的数据:" + rowData.getAfterColumnsList());
                            }

                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }
                //消息确认已经处理了
                canalConnector.ack(id);
            }
        }
    }
}


测试

开启客户端,mysql执行sql语句。我们可以看到客户端日志变化,说明成功了
在这里插入图片描述

碰到的问题

1、mysql启动时报错。Different lower_case_table_names settings for server (‘1‘) and data dictionary (‘0‘)

解决方法: my.cnf里面添加lower_case_table_names = 1。同时删除data里面的内容,去掉data目录的挂载,重新启动容器再复制一份到data中。这样再次启动就不会报错了

2、canal日志报错MySQL8.0 caching_sha2_password Auth failed,无法连接mysql

解决方法: mysql执行下面语句

ALTER USER 'canal'@'%' IDENTIFIED BY 'canal' PASSWORD EXPIRE NEVER;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
FLUSH PRIVILEGES;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/110514.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据结构和算法学习——稀疏数组

目录 一、数据结构和算法的关系 二、数据结构的分类 (一)线性结构 (二)非线性结构 三、稀疏数组(sparsearray) (一)稀疏数组的基本介绍 (二)稀疏数组的处理方法 一、数据结构和算法的关系 数据data结构(structure)是一门研究组织数据方式的学科&#xff0c;有了编程语言…

Grafana 查询数据和转换数据

Grafana 系列文章&#xff0c;版本&#xff1a;OOS v9.3.1 Grafana 的介绍和安装Grafana监控大屏配置参数介绍&#xff08;一&#xff09;Grafana监控大屏配置参数介绍&#xff08;二&#xff09;Grafana监控大屏可视化图表Grafana 查询数据和转换数据 介绍 Grafana能够支持各…

微服务网关GateWay

在微服务架构下&#xff0c;网关的本质&#xff0c;其实就是对请求进行路由转发&#xff0c;在此基础上我们可以根据网关在整个微服务架构中的特殊位置&#xff0c;对请求进行前置和后置的处理。 请求转发和路由&#xff1a;网关类似于一个门面&#xff0c;微服务的组织细节对…

三维数字化开发管理中心

目录一、前言二、项目依赖2.1 后端2.2 前端三、快速运行3.1 启动后台服务3.2 启动前台页面四、使用手册4.1 登录4.2 首页4.3 资源中心4.4 在线预览4.5 三维开发4.6 信息管理4.6.1 用户信息4.6.2 模型信息4.7 个人中心五、数据库5.1 数据需求5.2 数据流图5.3 数据字典1&#xff…

马斯克辞任CEO,产品经理如何用项目协作软件武装自己?

自马斯克接管推特以来&#xff0c;已经发起了多轮裁员潮&#xff0c;仅第一波就裁掉了50%的员工。11月14日&#xff0c;马斯克开启第二波裁员。而IT之家12 月 21 日消息&#xff0c;埃隆・马斯克在推特发文对网友的投票结果做出回应&#xff0c;称会尽快找到一个足够傻的人来接…

蓝桥杯备赛Day3——基础数据结构(一维数组)

目录 数据结构 什么是数据结构? 《数据结构》教材一般包含 基础数据结构 最简单的数据结构——一维数组 一维数组的定义 一维变长数组 一维正向遍历 一维反向遍历 一维数组区间操作&#xff08;实际上就是切片操作&#xff09; 一维数组从a[1]开始赋值 一维数组的读…

热门项目披露:四川超声印制板有限公司100%股权转让

热门项目披露&#xff1a;四川超声印制板有限公司100%股权转让 项目推荐指数&#xff1a;&#xff1b;该项目由 北京产权交易所 发布&#xff0c;于2022年12月11日被塔米狗平台收录。 项目方 四川超声印制板有限公司&#xff0c; 成立于 1998年7月5日 &#xff0c; 注册资金 2…

Unity 3D Hierarchy 视图 || Unity 3D Project 视图

Unity 3D 的 Hierarchy 视图包含了每一个当前场景的所有游戏对象&#xff08; GameObject &#xff09;&#xff0c;如下图所示。 其中一些是资源文件的实例&#xff0c;如 3D 模型和其他预制物体&#xff08; Prefab &#xff09;的实例&#xff0c;可以在 Hierarchy 视图中选…

Mac 截图工具 iShot Pro - 软件介绍、下载安装详细教程

Mac 截图工具 iShot Pro -软件介绍、下载安装详细教程 iShot -优秀&#xff0c;功能齐全的区域截图&#xff0c;窗口截图&#xff0c;多窗口截图&#xff0c;长屏幕截图&#xff0c;shell截图&#xff0c;时间间隔截图&#xff0c;快速注释&#xff0c;纹理&#xff0c;颜色匹配…

虚拟专用网VPN(计算机网络-网络层)

目录 专用网络与专用地址 RFC 1918指明的专用地址 互连两个地点的专用网络 虚拟专用网VPN (Virtual Private Network) IP 隧道技术 VPN 的要点 专用网络与专用地址 世界上有很多机构有自己独立的网络&#xff0c;这些网络并不与因特网 互连&#xff0c;为该机构所专有&…

【LeetCode】有效的数独 [M](模拟)

36. 有效的数独 - 力扣&#xff08;LeetCode&#xff09; 一、题目 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔…

国产CAE的涅槃-岩土行业高性能离散元软件MatDEM

作者 | 刘春博士 一、导读 2019年9月11日&#xff0c;ANSYS公司公开宣称&#xff1a;“收购LSTC公司&#xff0c;一举获得其旗下拥有LS-DYNA&#xff08;结构&流体&电磁的多物理场求解器&#xff09;、LS-PrePost&#xff08;前后处理器&#xff09;、LS-OPT/LS-TASC…

基于Web的文件管理系统,支持Office、WPS预览/编辑、在线解压缩、文件分享、文件加密、远程存储、远程文件推送、秒传、断点

基于Web的文件管理系统&#xff0c;支持权限管理、历史版本管理、Office预览/编辑、WPS预览/编辑、在线解压缩、文件分享、文件加密、远程存储、远程文件推送、秒传、断点续传、智能搜索、文件备注、本地自动备份、异地自动备份、一键迁移、集群部署。 主要应用场景&#xff1…

14_视图

1. 常见的数据库对象 对象描述表(TABLE)表是存储数据的逻辑单元&#xff0c;以行和列的形式存在&#xff0c;列就是字段&#xff0c;行就是记录数据字典就是系统表&#xff0c;存放数据库相关信息的表。系统表的数据通常由数据库系统维护&#xff0c;程序员通常不应该修改&…

STM32正点原子图片——显示实验

目录 一、图片显示部分 GIF piclib.c介绍 图像显示实验main.c介绍 二、SD卡模块 1、SD卡基础知识 2、SD卡读操作 3、SD卡写操作 一、图片显示部分 GIF GIF(Graphics Interchange Format)的原义是“图像互换格式”,是CompuServe公司在1987年开发的图像文件格式。GI…

敏捷价值流管理

对团队或企业来说&#xff0c;敏捷能够通过快速迭代、改进来更好地为客户或终端用户交付价值。但有些团队在引入敏捷项目管理模式之后&#xff0c;团队管理层看了看埋头工作的团队&#xff0c;“唉&#xff1f;团队的效率好像并没有提升啊&#xff0c;这不和以前一样吗……”在…

怎样给黑白照片上色?2个技能教你如何给黑白照片上色

大家看过长辈的黑白照片吗&#xff1f;最近我的爷爷翻出了几十年前的老照片&#xff0c;给我细细道来每张照片背后的故事。可惜那个年代的技术水平有限&#xff0c;没办法拍出好看的彩色照片。如今照片修复技术层层递进&#xff0c;我想借助一些图片处理软件&#xff0c;将这些…

Qt QCustomPlot 添加多个坐标系区域

Qt QCustomPlot 添加多个坐标系区域 文章目录Qt QCustomPlot 添加多个坐标系区域摘要1 新建多个坐标系QCPAxisQCPAxisRectQCPLayoutGrid2 多个坐标轴如何更新数据添加数据3 遇到的问题最后关键字&#xff1a; Debian、 Linux、 QCustomPlot、 Qt、 QCPAxisRect内容背景&#xf…

vue3的中间值思维

在用vue框架的开发的时候&#xff0c;经常使用到的一种中间值思维&#xff0c;什么是中间值思维&#xff0c;就是通过一个间接的属性去改变需要渲染的值 我们在传值的时候&#xff0c;如果是用的mitt传值&#xff0c;那传过来的值就是在bus.on函数中&#xff0c;我们就得取出来…

消息中间件RocketMQ快速入门

目录前言消息中间件需要解决哪些问题&#xff1f;Publish/SubscribeMessage PriorityMessage FilterBroker端消息过滤Consumer端消息过滤Message Persistence消息可靠性低延迟消息回溯消费消息堆积定时消息消息重试RocketMQ 物理部署结构RocketMQ 逻辑部署结构RocketMQ 数据存储…