springboot+canal+mysql+redis缓存双写一致性

news2024/9/24 17:14:01

canal官网地址:https://github.com/alibaba/canal/wiki/QuickStart
基本上按照官网的步骤来就行
在这里插入图片描述

准备

首先服务器上要安装好jdk,因为canal运行需要jdk,同时把canal对应的端口在服务中开放,否则连接不上

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

查看是否开启binlog日志功能

SHOW VARIABLES LIKE 'log_bin';

如果为OFF说明没有开启

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

注意点,这里执行命令最好是在服务器端执行,因为我们通过连接客户端登录的自己创建的后台用户比如root ,没有授权权限,会出错
出现以下情况
在这里插入图片描述

DROP USER IF EXISTS 'canal'@'%'; #删除用户
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';   ## 创建用户
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';  
FLUSH PRIVILEGES;
SELECT * FROM mysql.user;

启动

下载 canal, 访问 release 页面 , 选择需要的包下载, 我这里下载最新的

https://github.com/alibaba/canal/releases/tag/canal-1.1.6

解压缩

上传服务器选择一个位置,并解压指定的文件夹下面

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

解压之后

解压完成后,进入 /tmp/canal 目录,可以看到如下结构

drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
drwxr-xr-x 4 jianghang jianghang  160 2013-02-05 21:51 conf
drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
drwxr-xr-x 2 jianghang jianghang   48 2013-02-05 21:29 logs

配置修改

vi conf/example/instance.properties
canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false

## mysql serverId
canal.instance.mysql.slaveId = 1234 #默认是注释的,
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  #canal默认用户名
canal.instance.dbPassword = canal  #canal默认密码 
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*  #监听的表,默认监听所有表

启动

sh bin/startup.sh

查看 server 日志

vi logs/canal/canal.log

2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2013-02-05 22:45:28.113 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
2013-02-05 22:45:28.210 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

在这里插入图片描述

查看 instance 的日志

vi logs/example/example.log

2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2013-02-05 22:50:45.641 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2013-02-05 22:50:45.803 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2013-02-05 22:50:45.810 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....

DEMO

依赖

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>


        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>


        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.3.10</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

    

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
        </dependency>

       

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 导入mysql驱动包-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version>
        </dependency>
        <!-- 导入数据源依赖-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.9</version>
        </dependency>

     


        <!--canal-->


        <dependency>
        <groupId>top.javatool</groupId>
        <artifactId>canal-spring-boot-starter</artifactId>
        <version>1.2.1-RELEASE</version>

        </dependency>




    </dependencies>

代码

配置文件

server:
  port: 8081
spring:
  datasource:
    url: jdbc:mysql://42.192.46.136:3306/test?useUnicode=true&useSSL=false&characterEncoding=utf-8
    username: root
    password: lkzroot
    type: com.alibaba.druid.pool.DruidDataSource
  redis:
    password: 123456
    host: 42.192.43.136
    port: 16379
    lettuce:
      pool:
        max-active: 200
        max-idle: 10
        max-wait: -1ms
        min-idle: 3

测试类

package com.boot.canal.client;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
 * @Author: lkz
 * @Title: RedisCanalClientExample
 * @Description: TODO
 * @Date: 2023/9/18 14:06
 */

public class RedisCanalClientExample {


    public static final Integer _60SECONDS = 60;
    public static final String  Canal_host_ip = "127.0.0.1";

    private static void redisInsert(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<CanalEntry.Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (CanalEntry.Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            CanalEntry.RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
            }
            //获取变动类型
            CanalEntry.EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == CanalEntry.EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args)
    {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(Canal_host_ip, 11111),
                "example",
                "",
                "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("test.test1");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }


}

redis配置

package com.boot.canal.client;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "127.0.0.1";
    public static final String  REDIS_pwd = "123456";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,16379,10000,REDIS_pwd);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }

}

结果

修改表数据

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1019580.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux安装python3.x版本

linux安装python3.x版本 ① 安装依赖环境② 下载python版本包③ 安装python④ 建立软链接⑤ 加入path⑥ 验证 官网版本地址&#xff1a;https://www.python.org/ftp/python/ ① 安装依赖环境 yum -y install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel…

Vue 使用vue-pdf 显示pdf文件 切换页面 缩放 全屏 自动播放等

<template><div id"container"><!-- 上一页、下一页--><div class"right-btn"><div click"toFullOrExit" class"turn-btn"><span>{{ isFull 1 ? "取消全屏" : "全屏" }}&l…

GLTF-pipeline

gltf-pipeline可用作命令行工具或 Node.js 模块。 开始 安装 Node.js如果还没有&#xff0c;然后&#xff1a; npm install -g gltf-pipeline使用 gltf-pipeline 作为命令行工具&#xff1a; 将 glTF 转换为 glb gltf-pipeline -i model.gltf -o model.glb gltf-pipeline…

python萌新爬虫学习笔记【建议收藏】

文章目录 1. 如何何请求解析url2. 如何获取标签里面的文本3. 如何解析JSON格式4. 如何添加常用的header5. 如何合并两个div6. 如何删除html dom的部分结构7. 如何一次性获取所有div标签里的文本8. python爬虫如何改变响应文本字符集编码9. 如何进行字符集转码11. response.text…

骨髓小游戏

欢迎来到程序小院 骨髓 玩法&#xff1a; 骨髓推塔小游戏&#xff0c;敌方士兵进入到我方高塔会毁坏建筑&#xff0c;我方可派兵前去迎战&#xff0c;我方&#xff1a;骑兵、长枪兵、弓兵、敌法&#xff1a;骷髅骑兵、骷髅长枪兵、 骷髅弓兵,快去消灭敌人吧^^。开始游戏https:…

利用爬虫技术自动化采集汽车之家的车型参数数据

导语 汽车之家是一个专业的汽车网站&#xff0c;提供了丰富的汽车信息&#xff0c;包括车型参数、图片、视频、评测、报价等。如果我们想要获取这些信息&#xff0c;我们可以通过浏览器手动访问网站&#xff0c;或者利用爬虫技术自动化采集数据。本文将介绍如何使用Python编写…

flutter开发实战-长按TextField输入框cut、copy设置为中文复制、粘贴

flutter开发实战-长按TextField输入框cut、copy设置为中文复制、粘贴 在开发过程中&#xff0c;需要长按TextField输入框cut、copy设置为中文“复制、粘贴”&#xff0c;这里记录一下设置的代码。 一、pubspec.yaml设置flutter_localizations 在pubspec.yaml中设置flutter_l…

教你制作简单的洪水淹没模型

制作思路&#xff1a; 利用GIS软件将地形图通过高程垂直夸大&#xff0c;显示出其地形地貌&#xff0c;绘制一个面&#xff0c;使面从下向上移动&#xff0c;因地形高程的不同&#xff0c;面穿过地形图的面积不同&#xff0c;从而视觉上模拟出了水淹没的过程。 前期准备&…

facechain环境部署

环境安装 # 创建虚拟环境facechain conda create -n facechain python3.8 conda activate facechain # 克隆 GIT_LFS_SKIP_SMUDGE1 git clone https://github.com/modelscope/facechain.git --depth 1 # 安装第三方库 cd facechain pip install -r requirements.txt pip insta…

【JavaEE】操作系统内核中的进程

文章目录 &#x1f490;什么叫做进程&#x1f490;进程在系统中是如何进行管理的&#x1f490;PCB中一些比较重要的属性&#x1f490;进程持有的CPU资源——进程调度&#x1f490;内存分配——内存管理 &#x1f490;什么叫做进程 进程概念&#xff1a;一个已经跑起来的程序就…

Scrum和Kanban方法的结合:Scrumban的实施指南

如果没有有效的项目管理&#xff0c;团队成员将不得不处理尽可能多的程序&#xff0c;而这种方法不会导致成功。有几种著名的项目管理方法&#xff0c;例如 Scrum 和看板。但是有一种方法可以结合他们的最佳实践。 Scrum 和看板的优势结合在称为 Scrumban 的混合方法中。它非常…

chales 重写/断点/映射/手机代理/其他主机代理

1 chales 安装和代理配置/手机代理配置/电脑代理配置 chales 安装和代理配置/手机代理配置/电脑代理配置 2 转载:Charles Rewrite重写(详解&#xff01;必懂系列) Charles Rewrite重写(详解&#xff01;必懂系列) 1.打开charles&#xff0c;点击菜单栏的Tools选中Rewrite2.…

企业架构LNMP学习笔记59

目录介绍&#xff1a; bin&#xff1a;存放的是启动和关闭tomcat的脚本文件&#xff1b; conf&#xff1a;存放tomcat服务器的各种全局配置文件&#xff0c;其中最重要的是server.xml和web.xml lib: 存放的是tomcat服务器所需要的各种jar文件。java打包类库。 logs&#xff…

区域图片上色

目录 下图中&#xff0c;记得点击Apply&#xff0c;然后再点击Symbology 实际选择的时候&#xff0c;不选1Categorized&#xff0c;因为其分段不方便。

js中如何判断两个对象是否相等?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 浅相等&#xff08;Shallow Equality&#xff09;⭐ 深相等&#xff08;Deep Equality&#xff09;⭐ 自定义深相等函数⭐ 使用第三方库⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接…

电路原理图字母缩写表示什么?

很多小白问&#xff0c;电路原理图上这些字母缩写都是些啥玩意儿啊&#xff1f; &#xff08;一&#xff09;EN :Enable&#xff0c;使能&#xff0c;使芯片能够工作。要用的时候&#xff0c;就打开 en 脚&#xff0c;不用的时候就关闭。有些芯片是高使能&#xff0c;有些是低…

SQLSERVER 数据库恢复挂起的解决办法

USE master GO ALTER DATABASE KH_Curve SET SINGLE_USER GO ALTER DATABASE KH_Curve SET EMERGENCY GO DBCC CHECKDB(KH_Curve,REPAIR_ALLOW_DATA_LOSS) go ALTER DATABASE KH_Curve SET ONLINE GO ALTER DATABASE KH_Curve SET MULTI_USER GO

.Net IDE智能提示汉化(.Net6、AspNetCore)

先上现成的.net6汉化文件&#xff0c;可以手动下载后参照 如何为 .NET 安装本地化的 IntelliSense 文件 进行安装。或者使用后文的工具进行自动安装。 无对照英文在前中文在前 汉化内容来自 官方在线文档 &#xff0c;某些内容可能存在明显的机翻痕迹。 上一些效果图&#x…

destoon根据目录下的html文件生成地图索引

因为项目需要&#xff0c;destoon根据目录下的html文件生成地图索引&#xff0c;操作方法&#xff0c;代码如下&#xff1a; <?php $new_array array(); function loopDir($dir,&$new_array,$modurl) {$handle opendir($dir);header("Content-Type:text/xml&qu…

elasticsearch15-数据聚合

个人名片&#xff1a; 博主&#xff1a;酒徒ᝰ. 个人简介&#xff1a;沉醉在酒中&#xff0c;借着一股酒劲&#xff0c;去拼搏一个未来。 本篇励志&#xff1a;三人行&#xff0c;必有我师焉。 本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》&#xff0c;SpringCloud…