基于canal的Redis缓存双写

news2025/1/9 1:19:32

canal地址:alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)icon-default.png?t=O83Ahttps://github.com/alibaba/canal

1. 准备

1.1 MySQL

查看主机二进制日志

show master status

查看binlog是否开启

show variables like 'log_bin'

授权canal连接MySQL账号

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

SELECT * FROM mysql.user;

1.2 canal 

下载canal,上传redis所在服务器

 解压

修改conf/example下instance.properties

修改mysql主机ip

修改在mysql新建的canal账户 

启动

 2. canal客户端编写

新建表

CREATE TABLE `t_user` (

                          `id` bigint(20) NOT NULL AUTO_INCREMENT,

                          `userName` varchar(100) NOT NULL,

                          PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

依赖

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
</dependency>
package com.example.redis.utils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisUtils
{
    public static final String  REDIS_IP_ADDR = "192.168.229.102";
    // public static final String  REDIS_pwd = "111111";
    public static JedisPool jedisPool;

    static {
        JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(20);
        jedisPoolConfig.setMaxIdle(10);
        jedisPool=new JedisPool(jedisPoolConfig,REDIS_IP_ADDR,6379,10000);
    }

    public static Jedis getJedis() throws Exception {
        if(null!=jedisPool){
            return jedisPool.getResource();
        }
        throw new Exception("Jedispool is not ok");
    }

}
package com.example.redis;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.example.redis.utils.RedisUtils;
import org.springframework.boot.test.context.SpringBootTest;
import redis.clients.jedis.Jedis;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@SpringBootTest
public class CanalTest {

    public static final Integer _60SECONDS = 60;
    public static final String  REDIS_IP_ADDR = "192.168.229.102";

    private static void redisInsert(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }


    private static void redisDelete(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.del(columns.get(0).getValue());
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<Column> columns)
    {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns)
        {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(),column.getValue());
        }
        if(columns.size() > 0)
        {
            try(Jedis jedis = RedisUtils.getJedis())
            {
                jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
            }
            //获取变动类型
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args)
    {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR,
                11111), "example", "", "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //connector.subscribe(".*\\..*");
            connector.subscribe("redis_canal.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:"+ UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了"+totalEmptyCount+"秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }


}

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>Redis</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>Redis</name>
    <description>Redis</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>17</java.version>
    </properties>
    <dependencies>
        <!--SpringBoot通用依赖模块-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>

        <!-- 数据库相关配置启动器 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- druid启动器的依赖  -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-3-starter</artifactId>
            <version>1.2.18</version>
        </dependency>
        <!-- 驱动类-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.8.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
spring.application.name=Redis
server.port=8080


# ========================redis??=====================
spring.data.redis.database=0
spring.data.redis.host=192.168.229.102
spring.data.redis.port=6379
spring.data.redis.lettuce.pool.max-active=8
spring.data.redis.lettuce.pool.max-wait=-1ms
spring.data.redis.lettuce.pool.max-idle=8
spring.data.redis.lettuce.pool.min-idle=0

# ========================alibaba.druid=====================
spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/redis_canal?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=abc123
spring.datasource.druid.test-while-idle=false

canal启动失败可能是没有durid依赖,将durid放入lib下即可

Central Repository: com/alibaba/druid/1.2.22 (maven.org)icon-default.png?t=O83Ahttps://repo1.maven.org/maven2/com/alibaba/druid/1.2.22/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2113853.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

有限自动机例题

答案&#xff1a;A 解析&#xff1a; 从图中可以看出从1出发&#xff0c;有一个a的闭环&#xff0c;可以多次重复a&#xff0c;因此选项A不正确 选项B&#xff0c;如果有b&#xff0c;必然经过a回去&#xff0c;不可能出现连续的b 选项C&#xff0c;可以从图中看出&#xf…

前端学习-day14

文章目录 01-媒体查询02-媒体查询-书写顺序03-媒体查询04-媒体查询-link引入06-Bootstrap-使用07-Bootstrap-栅格系统08-Bootstrap-按钮样式09-Bootstrap-表格样式10-bootstrap组件11-bootstrap字体图标alloyTeam项目index.htmlindex.less 01-媒体查询 <!DOCTYPE html> …

数学建模算法汇总(全网最全,含matlab案例代码)

数学建模常用的算法分类 全国大学生数学建模竞赛中&#xff0c;常见的算法模型有以下30种&#xff1a; 最小二乘法数值分析方法图论算法线性规划整数规划动态规划贪心算法分支定界法蒙特卡洛方法随机游走算法遗传算法粒子群算法神经网络算法人工智能算法模糊数学时间序列分析马…

一文梳理RAG(检索增强生成)的现状与挑战

一 RAG简介 大模型相较于过去的语言模型具备更加强大的能力&#xff0c;但在实际应用中&#xff0c;例如在准确性、知识更新速度和答案透明度方面&#xff0c;仍存在不少问题&#xff0c;比如典型的幻觉现象。因此&#xff0c;检索增强生成 (Retrieval-Augmented Generation, …

Learn ComputeShader 09 Night version lenses

这次将要制作一个类似夜视仪的效果 第一步就是要降低图像的分辨率&#xff0c; 这只需要将id.xy除上一个数字然后再乘上这个数字 可以根据下图理解&#xff0c;很明显通过这个操作在多个像素显示了相同的颜色&#xff0c;并且很多像素颜色被丢失了&#xff0c;自然就会有降低分…

Open-Sora代码详细解读(1):解读DiT结构

Diffusion Models专栏文章汇总&#xff1a;入门与实战 前言&#xff1a;目前开源的DiT视频生成模型不是很多&#xff0c;Open-Sora是开发者生态最好的一个&#xff0c;涵盖了DiT、时空DiT、3D VAE、Rectified Flow、因果卷积等Diffusion视频生成的经典知识点。本篇博客从Open-S…

攻防世界 Web_php_unserialize

Web_php_unserialize PHP反序列化 看看代码 <?php class Demo { private $file index.php;public function __construct($file) { $this->file $file; }function __destruct() { echo highlight_file($this->file, true); }function __wakeup() { if ($this->…

软件测试 | 性能测试

性能测试的概念 为了 发现系统性能问题 或 获取系统性能相关指标 而进行的测试。 常见性能测试指标 并发数 即并发用户数。 从业务层面看&#xff0c;并发用户数指的是 实际使用系统的用户总数。从后端服务器层面看&#xff0c;指的是 web服务器在一段时间内处理浏览器请求而建…

服务器环境搭建-5 Nexus搭建与使用介绍

背景 本文介绍nexus的安装、配置和使用&#xff0c;之后通过案例的方式演示使用过程。 1.下载和安装 本文使用Nexus 3.x版本进行演示 下载地址&#xff1a;Download Nexus Repository OSS | Sonatype 国外网站下载速度较慢&#xff0c;也可以通过百度网盘下载(提取码:9999): …

爆改YOLOv8|利用图像分割网络UNetV2改进yolov8主干-即插即用

1&#xff0c;本文介绍 U-Net v2 通过引入创新的跳跃连接设计来提升医学图像分割的精度。这一版本专注于更有效地融合不同层级的特征&#xff0c;包括高级特征中的语义信息和低级特征中的细节信息。通过这种优化&#xff0c;U-Net v2 能够在低级特征中注入丰富的语义&#xff…

wireshark安装及抓包新手使用教程

Wireshark是非常流行的网络封包分析软件&#xff0c;可以截取各种网络数据包&#xff0c;并显示数据包详细信息。常用于开发测试过程各种问题定位。本文主要内容包括&#xff1a; 1、Wireshark软件下载和安装以及Wireshark主界面介绍。 2、WireShark简单抓包示例。通过该例子学…

JetBrains Aqua安装步骤和基本配置

一、安装步骤 下载链接&#xff1a;https://www.jetbrains.com.cn/aqua/ 1、点击下载按钮。 2、点击下载IDE&#xff0c;浏览器下载.exe。&#xff08;如果是mac或linux可选择对应的下载安装包&#xff09; 3、双击.exe文件&#xff0c;点击下一步。 4、可点击【浏览】选择安装…

在Webmin上默认状态无法正常显示 Mariadb V11.02及以上版本

OS: Armbian OS 24.5.0 Bookworm Mariadb V11.02及以上版本 Webmin&#xff1a;V2.202 小众问题&#xff0c;主要是记录一下。 如题 Webmin 默认无法 Mariadb V11.02及以上版本 如果对 /etc/webmin/mysql/config 文件作相应调整就可以再现Mariadb管理界面。 路径文件&#xff…

风格控制水平创新高!南理工InstantX小红书发布CSGO:简单高效的端到端风格迁移框架

论文链接&#xff1a;https://arxiv.org/pdf/2408.16766 项目链接&#xff1a;https://csgo-gen.github.io/ 亮点直击 构建了一个专门用于风格迁移的数据集设计了一个简单但有效的端到端训练的风格迁移框架CSGO框架&#xff0c;以验证这个大规模数据集在风格迁移中的有益效果。…

2024年,女生到底适合转行ui设计还是软件测试?

作为2024年的就业选择来说&#xff0c;软件测试和UI设计发展都挺不错的 选择这两个方向转行的女生很多。但具体选择测试还是UI设计&#xff0c;最好还是根据你个人的兴趣爱好以及长期的发展路径去选择 比如&#xff1a;薪资、工作稳定性、后续晋升空间、学习难度等等方面~ 如…

HCIP:一次性搞定OSPF基础

OSPF 一&#xff0c; OSPF基础1. 技术背景&#xff08;RIP中存在的问题&#xff09;OSPF协议特点OSPF三张表OSPF数据表头部数据包内容&#xff1a;helloDBD&#xff08;数据库描述报文&#xff09;LSRLSULSack OSPF工作过程1. 确认可达性&#xff0c;建立邻居2-way前&#xff0…

掌握Hive函数[2]:从基础到高级应用

目录 高级聚合函数 多进一出 1. 普通聚合 count/sum... 2. collect_list 收集并形成list集合&#xff0c;结果不去重 3. collect_set 收集并形成set集合&#xff0c;结果去重 案例演示 1. 每个月的入职人数以及姓名 炸裂函数 概述 案例演示 1. 数据准备 1&#xff09;表…

接口自动化三大经典难题

目录 一、接口项目不生成token怎么解决关联问题 1. Session机制 2. 基于IP或设备ID的绑定 3. 使用OAuth或第三方认证 4. 利用隐式传递的参数 5. 基于时间戳的签名验证 二、接口测试中网络问题导致无法通过怎么办 1. 重试机制 2. 设置超时时间 3. 使用模拟数据 4. 网…

nmon服务器监控工具使用

nmon&#xff1a;是一个分析linux服务器性能的免费工具&#xff0c;可以用来帮助我们整体性的分析服务端的CPU&#xff0c;内存&#xff0c;网络&#xff0c;IO&#xff0c;虚拟内存等指标 下载nmon.jar包及分析文件&#xff1a;百度网盘 链接: 提取码: 0000 一、nmon配置及使…

JavaScript (变量,var,Let,Const)

目录 JavaScript 变量 JavaScript 变量 JavaScript 标识符 声明&#xff08;创建&#xff09; JavaScript 变量 JavaScript Let 全局作用域 函数作用域 块作用域&#xff08;Let) 重新声明变量 JavaScript Const 在声明时赋值 JavaScript 变量 JavaScript 变量 Jav…