canal 数据同步组件

news2024/11/29 8:58:02

canal 数据异构组件

为啥要使用这个组件?
在更新DB的时候不同步更新到redis,es等数据库中,时间太久,而且可能会存在同步失败的问题,因此引入canal去拉取DB的数据,再去更新到redis,es等数据库中,有失败重试和回滚等功能。
canal原理?
canal 伪装成salve向mysql发送dump协议,拿到备份数据binlog,去更新数据到redis,es等数据库中或者通过组装数据之后更新。canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据

canal 组件的使用

1.下载canal组件

下载地址canal组件下载地址
在我的资源中也有canal组件包
在这里插入图片描述
解压启动(我是windows版,双击startup.bat)

在这里插入图片描述

2.数据库配置

1.开启MySQL , 需要先开启 Binlog 写入功能

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2.授权 canal 作为mysql 的slave 的权限

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
3.项目引入jar包
<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.4</version>
</dependency>
4.写canal监听数据工具类
package com.next.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class SimpleCanalClientExample {
    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

5.简单例子使用测试

1.数据库更改user_id从0改为1,再从1改为0
2.查看canal监测的数据(canal可以拿到更新前的所有数据,更新后的所有数据,更新了哪些数据)

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.进一步完善canal监听数据工具类,用于应用例子

1.加入监听器,项目启动时启动
2.使用线程去监听数据
3.替换掉system.out.print(),里面有锁,会阻塞,使用日志打印
4.处理canal监测到的数据

package com.next.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.next.dao.TrainNumberDetailMapper;
import com.next.service.TrainNumberService;
import com.next.service.TrainSeatService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.net.InetSocketAddress;
import java.util.List;

/**
 * @desc 不要用system.out.print()里面有锁,会阻塞,用日志打印
 */
@Service
@Slf4j
public class CanalSubscribe implements ApplicationListener<ContextRefreshedEvent> {

    @Resource
    private TrainSeatService trainSeatService;

    @Resource
    private TrainNumberService trainNumberService;

    //监听,启动的时候就开始调用此监听方法
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        canalSubscribe();
    }

    private void canalSubscribe() {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        //使用线程
        new Thread(() -> {
            try {
                log.info("canal subscribe");
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                while (true) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        //没有取到数据继续
                        safeSleep(100);
                        continue;
                    }
                    try {
                        log.info("new message,batchIds:{},size:{}", batchId, batchSize);
                        //打印日志
                        printEntry(message.getEntries());
                        // 提交确认
                        connector.ack(batchId);
                    } catch (Exception e2) {
                        log.error("canal data exception,batchIds:{}", batchId, e2);
                        // 处理失败, 回滚数据
                        connector.rollback(batchId);
                    }
                }
            } catch (Exception e3) {
                log.error("canal subscribe exception", e3);
                safeSleep(1000);
                canalSubscribe();
            }
        }).start();
    }

    private void printEntry(List<CanalEntry.Entry> entrys) throws Exception{
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("RowChange.parse Exception , data:" + entry, e);
            }

            //更新类型-更新,删除,新增
            CanalEntry.EventType eventType = rowChage.getEventType();
            //数据库名
            String schemaName = entry.getHeader().getSchemaName();
            //表名
            String tableName = entry.getHeader().getTableName();
            log.info("name:[{},{}],eventType:{}",schemaName,tableName,eventType);
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.DELETE) {
                    handleColumn(rowData.getBeforeColumnsList(), eventType, schemaName, tableName);
                } else {
                    handleColumn(rowData.getAfterColumnsList(), eventType, schemaName, tableName);
                }
            }
        }
    }

    //处理canal监测到的数据
    private void handleColumn(List<CanalEntry.Column> columnsList, CanalEntry.EventType eventType, String schemaName, String tableName) throws Exception{
        if(schemaName.contains("12306_seat_")){
            //处理座位变更
            trainSeatService.handle(columnsList,eventType);
        }else if(tableName.equals("train_number")){
            //车次详情处理(实际上是车次信息变更之后才批量处理车次详情)
            trainNumberService.handle(columnsList,eventType);
        }else{
            log.info("drop data,no need care");
        }


    }



    private void safeSleep(int millis) {
        try {
            Thread.sleep(100);
        } catch (Exception e1) {

        }
    }

}

处理canal监测到的数据(拿到改变的数据,放到实体类中,存到redis中)

package com.next.service;


import com.alibaba.otter.canal.protocol.CanalEntry;
import com.next.dao.TrainNumberMapper;
import com.next.model.TrainNumber;
import com.next.model.TrainSeat;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;

@Service
@Slf4j
public class TrainSeatService {

    @Resource
    private TrainNumberMapper trainNumberMapper;

    @Resource
    private TrainCacheService trainCacheService;

    //处理座位,canal通过监听座位库,拿到改变的数据,放到实体类中
    public void handle(List<CanalEntry.Column> columns, CanalEntry.EventType eventType) {
        if (eventType != CanalEntry.EventType.UPDATE) {
            log.info("not update,no need care");
            return;
        }
        TrainSeat trainSeat = new TrainSeat();
        boolean isStatusUpdated = false;
        for (CanalEntry.Column column : columns) {
            //票的状态改变了才做下面的操作
            if (column.getName().equals("status")) {
                trainSeat.setStatus(Integer.parseInt(column.getValue()));
                if (column.getUpdated()) {
                    isStatusUpdated = true;
                } else {
                    break;
                }
            } else if (column.getName().equals("id")) {
                trainSeat.setId(Long.parseLong(column.getValue()));
            } else if (column.getName().equals("carriage_number")) {
                trainSeat.setCarriageNumber(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("row_number")) {
                trainSeat.setRowNumber(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("seat_number")) {
                trainSeat.setSeatNumber(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("train_number_id")) {
                trainSeat.setTrainNumberId(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("ticket")) {
                trainSeat.setTicket(column.getValue());
            } else if (column.getName().equals("from_station_id")) {
                trainSeat.setFromStationId(Integer.parseInt(column.getValue()));
            } else if (column.getName().equals("to_station_id")) {
                trainSeat.setToStationId(Integer.parseInt(column.getValue()));
            }
        }
        if (!isStatusUpdated) {
            log.info("status not update,no need care");
        }
        log.info("train seat update,trainSeat:{}", trainSeat);

        /**
         * 数据存到redis
         * 1.指定座位被占:hash
         * cacheKey:车次_日期  D386_20231001
         * field: carriage_row_seat_fromStationId_toStationId
         * value: 0-空闲 1-占座
         *
         * 2.每个座位详情剩余的座位数
         * cacheKey: 车次_日期_count D386_20231001_count
         * field: fromStationId_toStationId
         * value: 实际座位数
         *
         */

        TrainNumber trainNumber = trainNumberMapper.selectByPrimaryKey(trainSeat.getTrainNumberId());
        //放票
        if (trainSeat.getStatus() == 1) {
            trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),
                    trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()
                            + "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    "0");

            trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",
                    trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    1l);
            log.info("seat+1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);

            //占票
        } else if (trainSeat.getStatus() == 2) {
            trainCacheService.hset(trainNumber.getName() + "_" + trainSeat.getTicket(),
                    trainSeat.getCarriageNumber() + "_" + trainSeat.getRowNumber() + "_" + trainSeat.getSeatNumber()
                            + "_" + trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    "1");
            trainCacheService.hincr(trainNumber.getName() + "_" + trainSeat.getTicket() + "_count",
                    trainSeat.getFromStationId() + "_" + trainSeat.getToStationId(),
                    -1l);
            log.info("seat-1,trainNumber:{},trainSeat:{}", trainNumber, trainSeat);
        } else {
            log.info("status update not 1 or 2,no need care");
        }
    }


}

在这里插入图片描述

参考文档:canal使用说明文档

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1341150.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Harmony OS - 应用数据持久化】

概述 应用数据持久化就是应用将内存中的数据通过文件或者数据库的方式保存在设备本机上。HarmonyOS标准系统支持一下三种f方式进行持久化处理&#xff1a;包括用户首选项、键值型数据库、关系型数据库。 用户首选项 用户首选项(Preferences) 是通过将数据(Key-Value键值)保存…

【C++篇】讲解Vector容器的操作方法

文章目录 &#x1f354;vector容器概念&#x1f339;操作方法⭐赋值操作⭐容量和大小⭐插入和删除⭐数据存取 &#x1f354;vector容器概念 vector 是 C 标准库中的一个容器&#xff0c;它提供了一种动态数组的实现。vector 容器可以存储任意类型的元素&#xff0c;并且可以根…

【办公技巧】为什么有的pdf不能编辑

pdf文件大家应该都经常接触&#xff0c;但是不知道大家会遇到这种情况&#xff1a;有些PDF文件打开之后无法编辑&#xff1f;是什么原因呢&#xff1f;今天我们来分析一下都是那些原因导致的。 首先我们可以考虑一下&#xff0c;PDF文件中的内容是否是图片&#xff0c;如果确认…

【中小型企业网络实战案例 四】配置OSPF动态路由协议

【中小型企业网络实战案例 三】配置DHCP动态分配地址-CSDN博客 【中小型企业网络实战案例 二】配置网络互连互通-CSDN博客 【中小型企业网络实战案例 一】规划、需求和基本配置_大小企业网络配置实例-CSDN博客 配置OSPF 由于内网互联使用的是静态路由&#xff0c;在链路出…

软件测试/测试开发丨Git常用命令学习笔记

基于 Git 的远程仓库 远程仓库地址备注GitHubgithub.com/世界上最主流的远程开源仓库。Giteegitee.com/国内目前比较主流的开源仓库&#xff0c;也可以私有化部署。&#xff08;推荐&#xff09;GitLabgitlab.com/私有化部署&#xff0c;企业使用较多。 Git 远程仓库的应用场…

腾讯云服务器怎么购买?购买流程

腾讯云轻量应用服务器购买指南&#xff0c;有两个入口&#xff0c;一个是在特价活动上购买&#xff0c;一个是在轻量应用服务器官方页面购买&#xff0c;特价活动上购买价格更便宜&#xff0c;轻量2核2G3M带宽服务器62元一年起&#xff0c;阿腾云atengyun.com分享腾讯云轻量应用…

Codeforces Pinely Round 3 (Div. 1 + Div. 2) A~F

A.Distinct Buttons(思维) 题意&#xff1a; 你在开始时站在点 ( 0 , 0 ) (0,0) (0,0)&#xff0c;同时&#xff0c;手上有一个遥控器&#xff0c;上面有四个按钮&#xff1a; U:移动到 ( x , y 1 ) (x, y 1) (x,y1)的位置 R:移动到 ( x 1 , y ) (x 1, y) (x1,y)的位置 …

数据集介绍【02】CIFAR10

CIFAR10数据集共有60000个样本&#xff0c;每个样本都是一张32*32像素的RGB图像&#xff08;彩色图像&#xff09;&#xff0c;每个RGB图像又必定分为3个通道&#xff08;R通道、G通道、B通道&#xff09;。这60000个样本被分成了50000个训练样本和10000个测试样本。 CIFAR10数…

2024年【茶艺师(初级)】考试试卷及茶艺师(初级)考试总结

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 茶艺师&#xff08;初级&#xff09;考试试卷参考答案及茶艺师&#xff08;初级&#xff09;考试试题解析是安全生产模拟考试一点通题库老师及茶艺师&#xff08;初级&#xff09;操作证已考过的学员汇总&#xff0c;…

日常中msvcp120.dll丢失五种解决方法

在日常使用电脑的过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中之一就是“msvcp120.dll丢失”。那么&#xff0c;msvcp120.dll到底是什么&#xff1f;它的作用又是什么呢&#xff1f;为什么会出现丢失的情况呢&#xff1f;本文将为您详细介绍msvcp120.dll的相…

4.Python数据序列

Python数据序列 一、作业回顾 1、面试题 有一物,不知其数,三三数之余二,五五数之余三,七七数之余二,问物几何? 白话文:有一个数字,不知道具体是多少,用3去除剩2,用5去除剩3,用7去除剩2个,问这个数是多少?1 ~ 100以内的整数 while循环: # 初始化计数器 i = …

为什么企业需要客户crm系统?

客户CRM提供数据储存&#xff0c;数据调配&#xff0c;数据分析。让传统的人工操作&#xff0c;让系统去完成。企业只需要提供原始数据就行了。举几个栗子&#xff1a; 1、客户资料的集中管理&#xff1a;可以集中存储和管理客户信息&#xff0c;包括联系方式、工商信息&#…

用户规模破亿!基于文心一言的创新应用已超4000个

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

nginx源码分析-1

使用gdb查看函数上下文&#xff1a; gdb attach nginx的work线程 监听端口状态时&#xff1a; 断点打在ngx_http_process_request 并通过浏览器触发请求时&#xff1a;

【yolov5驾驶员和摩托车佩戴头盔的检测】

yolov5驾驶员和摩托车佩戴头盔的检测 数据集和模型yolov5驾驶员和摩托车佩戴头盔的检测yolov5驾驶员和摩托车佩戴头盔的检测可视化结果 数据集和模型 数据和模型下载&#xff1a; yolov5摩托车佩戴头盔和驾驶员检测模型 yolov5-6.0-helmat-mortor-1225.zipyolov3摩托车佩戴头…

计算机操作系统(OS)——P1操作系统概述

1、操作系统的概念(定义) 1.1、什么是操作系统 __操作系统&#xff08;Operating System&#xff0c;OS&#xff09;&#xff1a;__是指控制和管理整个计算机系统的__硬件和软件__资源&#xff0c;并合理的组织调度计算机的工作和资源的分配&#xff1b;以__提供给用户和其它…

im6ull学习总结(三)文字显示

文字显示 字符编码方式 编码与字体 一个字符以不同编码形式会保存为不同的二进制数。 ASCII American Standard Code for Information Interchange”的缩写&#xff0c;美国信息交换标准代码。 一个字节的 7 位就可以表示 128 个数值&#xff0c;在 ASCII 码中最高位永远是…

实习知识整理13:在购物车界面点击提交订单进入订单信息界面

在这块主要就是对前端传到后端的数据的处理&#xff0c;然后由后端再返还到新的前端界面 首先点击下单按钮后&#xff0c; 提交购物车中所选中的信息 因为前端是将name定义为 cartList[0].cartId &#xff0c;cartList[1].cartId 形式的 所以后端需要重新定义一个类来进行封装…

开源预约挂号平台 - 从0到上线

文章目录 开源预约挂号平台 - 从0到上线演示地址源码地址可以学到的技术前端技术后端技术部署上线开发工具其他技术业务功能 项目讲解前端创建项目 - 安装PNPM - 使用VSCODE - 安装插件首页顶部与底部 - 封装组建 - 使用scss左右布局中间内容部分路由 - vue-routerBANNER- 走马…

C# 图标标注小工具-查看重复文件

目录 效果 项目 代码 下载 效果 项目 代码 using System; using System.Collections.Generic; using System.Data; using System.IO; using System.Linq; using System.Security.Cryptography; using System.Windows.Forms;namespace ImageDuplicate {public partial clas…