09 Redis与MySQL数据双写一致性工程落地案例

news2025/1/8 12:12:18

canal

是什么

  • canal [kə’næl],中文翻译为 水道/管道/沟渠/运河,主要用途是用于 MySQL 数据库增量日志数据的订阅、消费和解析,是阿里巴巴开发并开源的,采用Java语言开发;
  • 历史背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房数据同步的业务需求,实现方式主要是基于业务 trigger(触发器) 获取增量变更。从2010年开始,阿里巴巴逐步尝试采用解析数据库日志获取增量变更进行同步,由此衍生出了canal项目;
  • Canal是基于MySQL变更日志增量订阅和消费的组件

能干嘛

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

去哪下

  • 下载canal
  • java案例

工作原理

传统MySQL主从复制工作原理

在这里插入图片描述
MySQL的主从复制将经过如下步骤:

  1. 当 master 主服务器上的数据发生改变时,则将其改变写入二进制事件日志文件中;
  2. salve 从服务器会在一定时间间隔内对 master 主服务器上的二进制日志进行探测,探测其是否发生过改变,如果探测到 master 主服务器的二进制事件日志发生了改变,则开始一个 I/O Thread 请求 master 二进制事件日志;
  3. 同时 master 主服务器为每个 I/O Thread 启动一个dump Thread,用于向其发送二进制事件日志;
  4. slave 从服务器将接收到的二进制事件日志保存至自己本地的中继日志文件中;
  5. salve 从服务器将启动 SQL Thread 从中继日志中读取二进制日志,在本地重放,使得其数据和主服务器保持一致;
  6. 最后 I/O Thread 和 SQL Thread 将进入睡眠状态,等待下一次被唤醒;

canal工作原理

在这里插入图片描述

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )canal 解析 binary log 对象(原始为 byte 流)
  • 分布式系统只有最终一致性,很难做到强一致性

mysql-canal-redis双写一致性Coding

mysql

CREATE TABLE `t_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `userName` varchar(100) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4
  • 开启 MySQL的binlog写入功能
  • 授权canal连接MySQL账号
    • mysql默认的用户在mysql库的user表里
    • 默认没有canal账户,此处新建+授权

canal服务端

  • 下载
  • 解压后整体放入/mycanal路径下
  • 配置修改
    • /mycanal/canal.deployer-1.1.5/conf/example路径下

    • 在这里插入图片描述

    • instance.properties

    • 在这里插入图片描述

    • 换成自己的在mysql新建的canal账户

    • 在这里插入图片描述

  • 启动
    • /mycanal/canal.deployer-1.1.5/bin路径下执行
    • ./startup.sh
  • 查看 server 日志
  • 在这里插入图片描述
  • 查看 instance 的日志
  • 在这里插入图片描述

canal客户端(Java编写业务程序)

  • 建module

  • 改pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.5.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.zzyy.study</groupId>
        <artifactId>canal_demo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>canal_demo</name>
        <description>Demo project for Spring Boot</description>
    
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <junit.version>4.12</junit.version>
            <log4j.version>1.2.17</log4j.version>
            <lombok.version>1.16.18</lombok.version>
            <mysql.version>5.1.47</mysql.version>
            <druid.version>1.1.16</druid.version>
            <mybatis.spring.boot.version>1.3.0</mybatis.spring.boot.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>com.alibaba.otter</groupId>
                <artifactId>canal.client</artifactId>
                <version>1.1.0</version>
            </dependency>
            <!--guava-->
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>23.0</version>
            </dependency>
            <!--web+actuator-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <!--SpringBoot与Redis整合依赖-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
            </dependency>
            <!-- jedis -->
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>3.1.0</version>
            </dependency>
            <!-- springboot-aop 技术-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-aop</artifactId>
            </dependency>
            <!-- redisson -->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson</artifactId>
                <version>3.13.4</version>
            </dependency>
            <!--Mysql数据库驱动-->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.47</version>
            </dependency>
            <!--集成druid连接池-->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid-spring-boot-starter</artifactId>
                <version>1.1.10</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>${druid.version}</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>${druid.version}</version>
            </dependency>
            <!--mybatis和springboot整合-->
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>${mybatis.spring.boot.version}</version>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <!--通用基础配置-->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>${junit.version}</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>${log4j.version}</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>${lombok.version}</version>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>RELEASE</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.73</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
  • 写YML

    server.port=5555
    
  • 主启动

  • 业务类

    • RedisUtils
      package com.zzyy.study.util;
      
      import redis.clients.jedis.Jedis;
      import redis.clients.jedis.JedisPool;
      import redis.clients.jedis.JedisPoolConfig;
      
      /**
       * @auther zzyy
       * @create 2020-10-11 14:33
       */
      public class RedisUtils
      {
          public static JedisPool jedisPool;
      
          static {
              JedisPoolConfig jedisPoolConfig=new JedisPoolConfig();
              jedisPoolConfig.setMaxTotal(20);
              jedisPoolConfig.setMaxIdle(10);
              jedisPool=new JedisPool(jedisPoolConfig,"192.168.111.147",6379);
          }
      
          public static Jedis getJedis() throws Exception {
              if(null!=jedisPool){
                  return jedisPool.getResource();
              }
              throw new Exception("Jedispool is not ok");
          }
      
      
          /*public static void main(String[] args) throws Exception
          {
              try(Jedis jedis = RedisUtils.getJedis())
              {
                  System.out.println(jedis);
      
                  jedis.set("k1","xxx2");
                  String result = jedis.get("k1");
                  System.out.println("-----result: "+result);
                  System.out.println(RedisUtils.jedisPool.getNumActive());//1
              }catch (Exception e){
                  e.printStackTrace();
              }
          }*/
      }
      
    • RedisCanalClientExample
       
      package com.zzyy.study.t1;
      
      import com.alibaba.fastjson.JSONObject;
      import com.alibaba.otter.canal.client.CanalConnector;
      import com.alibaba.otter.canal.client.CanalConnectors;
      import com.alibaba.otter.canal.protocol.CanalEntry.*;
      import com.alibaba.otter.canal.protocol.Message;
      import com.zzyy.study.util.RedisUtils;
      import org.springframework.beans.factory.annotation.Autowired;
      import redis.clients.jedis.Jedis;
      
      import java.net.InetSocketAddress;
      import java.util.List;
      import java.util.concurrent.TimeUnit;
      
      /**
       * @auther zzyy
       * @create 2020-11-11 17:13
       */
      public class RedisCanalClientExample
      {
      
          public static final Integer _60SECONDS = 60;
      
          public static void main(String args[]) {
      
              // 创建链接canal服务端
              CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.111.147",
                      11111), "example", "", "");
              int batchSize = 1000;
              int emptyCount = 0;
              try {
                  connector.connect();
                  //connector.subscribe(".*\\..*");
                  connector.subscribe("db2020.t_user");
                  connector.rollback();
                  int totalEmptyCount = 10 * _60SECONDS;
                  while (emptyCount < totalEmptyCount) {
                      Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                      long batchId = message.getId();
                      int size = message.getEntries().size();
                      if (batchId == -1 || size == 0) {
                          emptyCount++;
                          try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
                      } else {
                          emptyCount = 0;
                          printEntry(message.getEntries());
                      }
                      connector.ack(batchId); // 提交确认
                      // connector.rollback(batchId); // 处理失败, 回滚数据
                  }
                  System.out.println("empty too many times, exit");
              } finally {
                  connector.disconnect();
              }
          }
      
          private static void printEntry(List<Entry> entrys) {
              for (Entry entry : entrys) {
                  if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                      continue;
                  }
      
                  RowChange rowChage = null;
                  try {
                      rowChage = RowChange.parseFrom(entry.getStoreValue());
                  } catch (Exception e) {
                      throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(),e);
                  }
      
                  EventType eventType = rowChage.getEventType();
                  System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                          entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                          entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
      
                  for (RowData rowData : rowChage.getRowDatasList()) {
                      if (eventType == EventType.INSERT) {
                          redisInsert(rowData.getAfterColumnsList());
                      } else if (eventType == EventType.DELETE) {
                          redisDelete(rowData.getBeforeColumnsList());
                      } else {//EventType.UPDATE
                          redisUpdate(rowData.getAfterColumnsList());
                      }
                  }
              }
          }
      
          private static void redisInsert(List<Column> columns)
          {
              JSONObject jsonObject = new JSONObject();
              for (Column column : columns)
              {
                  System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                  jsonObject.put(column.getName(),column.getValue());
              }
              if(columns.size() > 0)
              {
                  try(Jedis jedis = RedisUtils.getJedis())
                  {
                      jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                  }catch (Exception e){
                      e.printStackTrace();
                  }
              }
          }
      
          private static void redisDelete(List<Column> columns)
          {
              JSONObject jsonObject = new JSONObject();
              for (Column column : columns)
              {
                  jsonObject.put(column.getName(),column.getValue());
              }
              if(columns.size() > 0)
              {
                  try(Jedis jedis = RedisUtils.getJedis())
                  {
                      jedis.del(columns.get(0).getValue());
                  }catch (Exception e){
                      e.printStackTrace();
                  }
              }
          }
      
          private static void redisUpdate(List<Column> columns)
          {
              JSONObject jsonObject = new JSONObject();
              for (Column column : columns)
              {
                  System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
                  jsonObject.put(column.getName(),column.getValue());
              }
              if(columns.size() > 0)
              {
                  try(Jedis jedis = RedisUtils.getJedis())
                  {
                      jedis.set(columns.get(0).getValue(),jsonObject.toJSONString());
                      System.out.println("---------update after: "+jedis.get(columns.get(0).getValue()));
                  }catch (Exception e){
                      e.printStackTrace();
                  }
              }
          }
      
      
      }
      

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/608872.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

23种设计模式之职责链模式(Chain of Responsibility Pattern)

前言&#xff1a;大家好&#xff0c;我是小威&#xff0c;24届毕业生&#xff0c;在一家满意的公司实习。本篇文章将23种设计模式中的访问者模式&#xff0c;此篇文章为一天学习一个设计模式系列文章&#xff0c;后面会分享其他模式知识。 如果文章有什么需要改进的地方还请大佬…

集群化环境前置准备

集群化环境前置准备 介绍 需要完成集群化环境的前置准备&#xff0c;包括创建多台虚拟机&#xff0c;配置主机名映射&#xff0c;SSH免密登录等等。 部署 配置多台Linux虚拟机 安装集群化软件&#xff0c;首要条件就是要有多台Linux服务器可用。 我们可以使用VMware提供的…

冈萨雷斯DIP第1章知识点

文章目录 1.1 什么是数字图像处理1.3 数字图像处理技术应用领域实例1.4 数字图像处理的基本步骤 1.1 什么是数字图像处理 图像、数字图像 一幅图像可以定义为一个二维函数 f ( x , y ) f(x,y) f(x,y)&#xff0c; 其中 x x x 和 y y y 是空间(平面)坐标&#xff0c; 在坐标…

你真的会性能测试吗?资深测试总结全链路压测(详全)卷起来...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 什么是全链路压测…

多分类问题练习

练习3&#xff1a;多分类问题 介绍 在本练习中&#xff0c;我们将使用逻辑回归来识别手写数字&#xff08;0到9&#xff09;。我们将扩展在练习2中对逻辑回归的实现&#xff0c;并将其应用于一对多的分类问题。 在开始练习前&#xff0c;需要下载如下的文件进行数据上传&…

【Python开发】FastAPI 08:Security 登录认证

FastAPI 在 fastapi.security 模块中提供了诸多安全性的工具&#xff0c;简化了各种安全机制的使用方法&#xff0c;可用于处理安全性、身份认证和授权等问题&#xff01; 目录 1 介绍 1.1 OAuth2 1.2 OpenAPI 2 安全基础 2.1 使用 Bearer ① OAuth2PasswordBearer ② 使…

开关电源关键参数计算方法

1、源调整率&#xff08;Line Regulation&#xff09;&#xff1a;将待测开关电源以额定输入电压及额定负载状况下热机15 分钟稳定后&#xff0c;分别于输入电压的下限、额定输入电压(Normal)、输入电压的上限测量并记录各自对应的输出电压值为 V1、V0&#xff08;normal&#…

Linux NGINX服务 ReWrite^location

ReWrite^location 从功能看 rewrite 和 location 似乎有点像&#xff0c;都能实现跳转&#xff0c;主要区别在于 rewrite 是在同一域名内更改获取资源的路径&#xff0c;而 location 是对一类路径做控制访问或反向代理&#xff0c;还可以proxy_pass 到其他机器。 rewrite 对访问…

Nginx正则表达式、location匹配、Rewrite重写详解

Nginx正则表达式、location匹配、Rewrite重写详解 一、常用的Nginx正则表达式二、location匹配概述1、location大致可以分为三类2、location常用的匹配规则3、location 优先级4、location 示例说明5、实际网站使用中&#xff0c;至少有三个匹配规则定义 三、rewrite重写1、rewr…

果推断16--基于反事实因果推断的度小满额度模型学习笔记

目录 一、原文地址 二、一些问题 2.1如何从RCT随机样本过渡到观测样本因果建模&#xff1f; 2.2反事实学习的核心思想 2.3度小满的连续反事实额度模型 Mono-CFR 2.4Mono-CFR代码实现&#xff08;待补充&#xff09; 2.5CFR学习 2.5.1CFR 2.5.2DR-CFR 参考 一、原文地…

Spring Cloud Alibaba — Nacos 构建服务注册中心

文章目录 Nacos Server下载启动登录创建命名空间 Nacos Client启动样例Nacos 服务发现配置项 集成 OpenFeign 远程接口调用添加 OpenFeign 依赖开启 EnableFeignClients 注解编写远程服务接口远程接口调用 集成 Sentinel 熔断降级添加 Sentinel 依赖开启 Sentinel 熔断降级编写…

【数据结构每日一题】链表——单链表重排

[数据结构习题]链表——单链表重排 &#x1f449;知识点导航&#x1f48e;&#xff1a;【数据结构】线性表——顺序存储 &#x1f449;知识点导航&#x1f48e;&#xff1a;【数据结构】线性表——链式存储 &#x1f449;[王道数据结构]习题导航&#x1f48e;&#xff1a; p …

pr安装缺少VCRUNTIME140.dll怎么办?这三个修复方案可以解决

在我们安装pr的时候&#xff0c;遇到缺少VCRUNTIME140.dll怎么办&#xff1f;vcruntime140.dll是一个Windows动态链接库&#xff0c;其主要功能是为C/C编译的程序提供运行时支持。这些库包括输入/输出函数、数学函数、字符串函数等等。因此&#xff0c;如果您的计算机缺少vcrun…

【接口自动化测试】一步一步教你搭建接口环境

要做接口测试&#xff0c;我们得搭建一套本地可以运行的接口环境。这次我选择了一个搭建容易&#xff0c;适合学习的系统——学生管理系统。 Python安装 这套管理系统是Python代码写的&#xff0c;因此需要Python环境。 安装挺无脑的&#xff0c;按照我提供的安装包和方法装…

windows下PC端小程序抓包--FiddlerCharles

目录 引言 【背景说明】 【操作说明】 【总结】 引言 大家好&#xff0c;你是否曾经遇到过想要抓取Windows下PC端小程序的网络请求数据&#xff0c;但不知道该用什么工具呢&#xff1f; 今天我要介绍的Fiddler和Charles两款工具&#xff0c;可帮助你轻松切入小程序网络请…

MySQL数据库 7.图形化界面工具DataGrip基础应用教学

目录 前言&#xff1a; DataGrip安装界面&#xff1a; 利用DataGrip创建数据库&#xff1a; 利用DataGrip为数据库创建表&#xff1a; 利用datagrip修改表&#xff1a; 添加元素&#xff1a; 结束&#xff01; 前言&#xff1a; 在之前我们一直接触的是MySQL命令行语句开…

4.3 最优装载

博主简介&#xff1a;一个爱打游戏的计算机专业学生博主主页&#xff1a; 夏驰和徐策所属专栏&#xff1a;算法设计与分析 1.什么是贪心算法的最优装载问题&#xff1f; 最优装载问题&#xff08;Bin Packing Problem&#xff09;是一个经典的组合优化问题&#xff0c;涉及将一…

【Linux】-编译器-gcc/g++使用以及动态库和静态库的介绍(以及解决sudo失败的方法)

&#x1f496;作者&#xff1a;小树苗渴望变成参天大树 ❤️‍&#x1fa79;作者宣言&#xff1a;认真写好每一篇博客 &#x1f4a8;作者gitee:gitee &#x1f49e;作者专栏&#xff1a;C语言,数据结构初阶,Linux,C 如 果 你 喜 欢 作 者 的 文 章 &#xff0c;就 给 作 者 点…

iOS证书(.p12)和描述文件(.mobileprovision)申请

目录 iOS证书(.p12)和描述文件(.mobileprovision)申请文末扩展&#xff08;UDID获取、添加测试设备&#xff09; 说明&#xff1a;本文申请证书、描述文件转载自 uniapp官网   iOS证书(.p12)和描述文件(.mobileprovision)申请      官网会时不时更新&#xff0c;如有疑问&…

《Apollo 智能驾驶进阶课程》二、 高精地图

1. 高精地图与自动驾驶的关系 1.1 高精地图与自动驾驶 L3级别以上才需要高精地图 1.2 什么是高精地图 1.3 高精地图与导航地图 1.4 高精地图-基础模块 高精地图与定位模块的关系 现在主流的自动驾驶的定位方案有两种&#xff1a;一种是基于点云&#xff0c;另一种是基于C…