Springcloud Alibaba使用Canal将Mysql数据实时同步到Redis保证缓存的一致性

news2025/1/26 14:32:19

目录

1. 背景

2. Windows系统安装canal

3.Mysql准备工作

4. 公共依赖包

5. Redis缓存设计

6. mall-canal-service


1. 背景

canal [kə'næl] ,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。其诞生的背景是早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。所以其核心功能如下:

  • 数据实时备份
  • 异构数据源(elasticsearch、Hbase)与数据库数据增量同步
  • 业务缓存cache 刷新,保证缓存一致性
  • 带业务逻辑的增量数据处理,如监听某个数据的变化做一定的逻辑处理

canal是借助于MySQL主从复制原理实现

  • master将改变记录到二进制日志(binary log)中(这些记录叫做二进制日志事件,binary log events,可以通过show binlog events进行查看);
  • slave将master的binary log events拷贝到它的中继日志(relay log);
  • slave重做中继日志中的事件,将改变反映它自己的数据。

 canal的工作原理:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

2. Windows系统安装canal

Github下载链接:Releases · alibaba/canal · GitHub

 

 解压

 进入/conf/example,修改instance.properties

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

之后进入到bin目录下启动startup就可以了。

3.Mysql准备工作

/*
SQLyog Community v13.2.0 (64 bit)
MySQL - 8.0.33 : Database - shop
*********************************************************************
*/

/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`shop` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;

USE `shop`;

/*Table structure for table `brand` */

DROP TABLE IF EXISTS `brand`;

CREATE TABLE `brand` (
  `id` INT NOT NULL AUTO_INCREMENT COMMENT '品牌id',
  `name` VARCHAR(100) NOT NULL COMMENT '品牌名称',
  `image` VARCHAR(1000) DEFAULT '' COMMENT '品牌图片地址',
  `initial` VARCHAR(1) DEFAULT '' COMMENT '品牌的首字母',
  `sort` INT DEFAULT NULL COMMENT '排序',
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8mb3 COMMENT='品牌表';

/*Data for the table `brand` */

INSERT  INTO `brand`(`id`,`name`,`image`,`initial`,`sort`) VALUES 
(11,'华为','https://sklll.oss-cn-beijing.aliyuncs.com/secby/eed72cc4-a9c1-4010-949a-03cef5b933d6.jpg','',NULL),
(12,'中兴','https://sklll.oss-cn-beijing.aliyuncs.com/secby/4fedb361-5ab3-4ad0-a667-580c1f37dff0.jpg','',NULL),
(13,'大疆','https://sklll.oss-cn-beijing.aliyuncs.com/secby/e8382c48-0487-4a9b-8fd0-a3716c3eea19.jpg','',NULL);

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

4. 公共依赖包

<dependencies>
        <!--web包-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!--MyBatis Plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.2</version>
        </dependency>

        <!--MySQL-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--Redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!--Nacos-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
    </dependencies>

5. Redis缓存设计

这里基于Springcloud Alibaba进行设计,对应的服务名称为mall-goods

bootstrap.yaml代码如下:

server:
  port: 8081
spring:
  application:
    name: mall-goods
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/shop?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    username: root
    password: 123456
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848

  redis:
    host: xx //改为自己redis ip地址
    port: 6379

导入RedisConfig配置

@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String,Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(factory);
        GenericJackson2JsonRedisSerializer serializer = new GenericJackson2JsonRedisSerializer();
        // 值采用json序列化
        redisTemplate.setValueSerializer(serializer);
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        // 设置hash key 和value序列化模式
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(serializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }


    @Bean
    public RedisCacheManager redisCacheManager(RedisTemplate redisTemplate) {
        RedisCacheWriter redisCacheWriter = RedisCacheWriter.nonLockingRedisCacheWriter(redisTemplate.getConnectionFactory());
        RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(Duration.ofHours(12))//设置默认缓存时间
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(redisTemplate.getValueSerializer()));
        return new RedisCacheManager(redisCacheWriter, redisCacheConfiguration);
    }
}

对于Brand进行redis缓存设计

在主启动类上添加@EnableCaching开启缓存。

@Cacheable 注解可以标记一个方法需要被缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会先查找缓存,如果缓存中存在相应的数据,则直接从缓存中读取,否则执行方法并将结果缓存到缓存中。

@CachePut 注解可以标记一个方法需要更新缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会更新缓存中的数据。

@CacheEvict 注解可以标记一个方法需要删除缓存。在注解中,可以指定缓存的名称和缓存的键。当方法被执行时,Spring Boot 会删除缓存中对应的数据。

代码如下:

@RestController
@RequestMapping("/brand")
public class BrandController {

    @Autowired
    BrandService brandService;

    @GetMapping("/{id}")
    @Cacheable(value = "brand",key = "#id")
    public Brand search(@PathVariable(value = "id")Integer id){
        return brandService.getById(id);
    }
     /****
     * 添加缓存
     */
    @PostMapping
    @CachePut(value = "brand",key = "#brand.id")
    public Brand add(@RequestBody Brand brand){
        return brand;
    }

    /****
     * 修改缓存
     */
    @PutMapping
    @CachePut(value = "brand",key = "#brand.id")
    public Brand update(@RequestBody Brand brand){
        return brand;
    }

    /****
     * 删除缓存
     */
    @DeleteMapping("/{id}")
    @CacheEvict(value = "brand",key = "#id")
    public void delete(@PathVariable(value = "id")Integer id){
    }
}

6. mall-canal-service

这里微服务名称为mall-canal-service,监控Mysql数据库数据变化进行实时的更新缓存。

除公共依赖外导入依赖

     <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-loadbalancer</artifactId>
        </dependency>

bootstrap.yaml设计如下:

server:
  port: 8083
spring:
  application:
    name: mall-canal
  cloud:
    nacos:
      config:
        file-extension: yaml
        server-addr: localhost:8848
      discovery:
        #Nacos的注册地址
        server-addr: localhost:8848
#Canal配置
canal:
  server: localhost:11111
  destination: example

设计Feign接口

@FeignClient(value = "mall-goods",contextId = "brand")//服务名字
public interface BrandFeign {

    @GetMapping("/brand/{id}")
    Brand search(@PathVariable(value = "id")Integer id);


    @PostMapping("/brand")
    Brand add(@RequestBody Brand brand);


    /****
     * 修改方法
     */
    @PutMapping("/brand")
     Brand update(@RequestBody Brand brand);


    /****
     * 删除方法
     */
    @DeleteMapping("/brand/{id}")
    void delete(@PathVariable(value = "id")Integer id);

}

Canal实时监控Mysql数据库变化,代码设计如下:

@Component
@CanalTable(value = "brand")
public class BrandHandler implements EntryHandler<Brand> {

    @Autowired
    BrandFeign brandFeign;

    @Override
    public void insert(Brand brand) {
        System.out.println(brand);
        brandFeign.add(brand);
    }

    @Override
    public void update(Brand before, Brand after) {
        System.out.println(after);
        brandFeign.update(after);
    }

    @Override
    public void delete(Brand brand) {
        System.out.println(brand);
        brandFeign.delete(brand.getId());
    }
}

在MallCanalApplication主启动类上添加@EnableFeignClients(basePackages = {"org.example.feign"}),包名为feign接口路径。

注意:本人在使用canal时,数据库有非varchar类型的null值,在运行时会抛异常错误。

	at top.javatool.canal.client.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:52) ~[canal-client-1.2.1-RELEASE.jar:na]
	at top.javatool.canal.client.handler.impl.AsyncMessageHandlerImpl.lambda$handleMessage$0(AsyncMessageHandlerImpl.java:30) ~[canal-client-1.2.1-RELEASE.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_281]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_281]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_281]
Caused by: java.lang.NumberFormatException: For input string: ""
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[na:1.8.0_281]

代码运行后,经过测试成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1346923.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

微服务应用配置解决方案介绍

目录 一、传统服务配置面临的问题 1.1 本地静态配置 1.2 配置格式不统一 1.3 生产事故 1.4 配置修改困难 1.5 缺少安全审计和版本控制能⼒ 二、应用配置的使用场景 2.1 动态调整日志级别 2.2 配置项组合更新 2.3 开关驱动开发 2.4 金丝雀(灰度)发布 三、应用配置解决…

【Redis-08】Redis主从复制的实现原理

在Redis中&#xff0c;可以通过slaveof命令或者设置slaveof选项实现两台Redis服务器的主从复制&#xff0c;比如我们有两个Redis机器&#xff0c;地址分别是 127.0.0.1:6379 和 127.0.0.1:6380&#xff0c;现在我们在前者上面执行&#xff1a; 127.0.0.1:6379 > SLAVEOF 12…

Docker九 | Swarm mode

目录 Swarm基本概念 节点 服务和任务 创建Swarm集群 创建管理节点 增加工作节点 查看集群 部署服务 新建服务 查看服务 服务伸缩 增加服务 减少服务 删除服务 Swarm基本概念 节点 节点分为管理节点(manager)和工作节点(worker) 管理节点 管理节点用于Swarm集群的…

使用 GPT4V+AI Agent 做自动 UI 测试的探索

一、背景 从 Web 诞生之日起&#xff0c;UI 自动化就成了测试的难点&#xff0c;到现在近 30 年&#xff0c;一直没有有效的手段解决Web UI测试的问题&#xff0c;尽管发展了很多的 webdriver 驱动&#xff0c;图片 diff 驱动的工具&#xff0c;但是这些工具的投入产出比一直被…

windows go环境安装 swag

windows 下载依赖包 go get github.com/swaggo/swag/cmd/swag编译swag cd $GOPATH\pkg\mod\github.com\swaggo\swagv1.16.2\cmd\swagps: go env 获取 GOPATH位置 go installps: 此时 $GOPATH\bin下出现了 swag.exe 项目根目录下执行swag 初始化 swag init生成结果

【Java基础系列】body参数前后端不一致

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

动态规划 | 鸡蛋问题 | 元旦假期来点“蛋”题

文章目录 鸡蛋掉落 - 两枚鸡蛋题目描述动态规划解法问题分析程序代码 鸡蛋掉落题目描述问题分析程序代码复杂度分析 鸡蛋掉落 - 两枚鸡蛋 题目描述 原题链接 给你 2 枚相同 的鸡蛋&#xff0c;和一栋从第 1 层到第 n 层共有 n 层楼的建筑。 已知存在楼层 f &#xff0c;满足 …

微信小程序开发系列-10组件间通信01

微信小程序开发系列目录 《微信小程序开发系列-01创建一个最小的小程序项目》 《微信小程序开发系列-02注册小程序》 《微信小程序开发系列-03全局配置中的“window”和“tabBar”》 《微信小程序开发系列-04获取用户图像和昵称》 《微信小程序开发系列-05登录小程序》 《…

图文证明 泰勒公式展开

泰勒公式 泰勒公式简单来说就是,可以用一个N次多项式来表示出一个连续可导的函数 f(x) 是一个用函数在某点的信息描述其附近取值的公式 第一步 思考 这是一个sin(x)的图像 用函数在原点的信息描述其附近取值 用一阶导数贴合: 直接用切线来贴合就好 画一个点(0,sin(0)除的切…

内网常规攻击路径

点击星标&#xff0c;即时接收最新推文 随着网络技术的发展&#xff0c;企业内部网络架构的变化&#xff0c;网络设备多样性的增加&#xff0c;面对内网攻击&#xff0c;防御体系逐渐阶梯化&#xff0c;通过不同维度的防御联动&#xff0c;将攻击拒之门外。对于突破网络边界后进…

docker 私有仓库搭建,将镜像上传至私有仓库,从私有仓库拉取镜像

Docker 私有仓库 一、私有仓库搭建 # 1、拉取私有仓库镜像 docker pull registry # 2、启动私有仓库容器 docker run -id --nameregistry -p 5000:5000 registry # 3、打开浏览器 输入地址http://私有仓库服务器ip:5000/v2/_catalog&#xff0c;看到{"repositories&quo…

八个理由:从java8升级到Java17

目录 前言 1. 局部变量类型推断 2.switch表达式 3.文本块 4.Records 5.模式匹配instanceof 6. 密封类 7. HttpClient 8.性能和内存管理能力提高 前言 从Java 8 到 Java 20&#xff0c;Java 已经走过了漫长的道路&#xff0c;自 Java 8 以来&#xff0c;Java 生态系统…

DevExpress 皮肤改变触发后触发的事件,用来保存皮肤配置

代码&#xff1a; private UserLookAndFeel userLookAndFeel; public MainGeneral() {InitializeComponent();// 创建 UserLookAndFeel 实例userLookAndFeel new UserLookAndFeel(this);// 订阅 StyleChanged 事件userLookAndFeel.StyleChanged UserLookAndFeel_StyleChange…

学习体系结构 - AArch64内存管理

学习体系结构 - AArch64内存管理 Learn the architecture - AArch64 memory management Version 1.2 个人的英语很一般&#xff0c;对拿不准的翻译校准在后面添加了英文原文。 1、 概述 本指南介绍了AArch64中的内存转换&#xff0c;这是内存管理的关键。它解释了如何将虚拟地…

ACW741.斐波那契额数列

输入整数 N&#xff0c;求出斐波那契数列中的第 N项是多少。 斐波那契数列的第 0项是 0&#xff0c;第 1项是 1&#xff0c;从第 2 项开始的每一项都等于前两项之和。输入格式 第一行包含整数 T&#xff0c;表示共有T个测试数据。接下来 T行&#xff0c;每行包含一个整数 N。输…

Python高级用法:生成器(generator)

生成器&#xff08;generator&#xff09; 生成器是一种返回生成序列的方法&#xff0c;与直接使用列表等方式返回序列的方式不同的是&#xff0c;他的生成可以是无限的。 生成器可以与next搭配使用&#xff0c;可以被看作是一种特殊的迭代器。 yield语句 yield一般与循环相…

研究:同样的C++模板在多个cpp里出现,编译器是否要重复生成?

2023年就要过去&#xff0c;马上要跨如2024年。祝大家在新的一年&#xff0c;有个好收成。 一直以来不是很确定&#xff1a; 同样的的模板&#xff0c;在各个cpp分别出现&#xff0c;编译器要实现几份&#xff1f; 研究一下。 用命令行的编译方法&#xff0c;参考&#xff1a…

mxxWechatBot微信机器人V2使用教程(图文)最全最详细

大家伙&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂。 先看这里 mxxWechatBot功能列表一、前言二、适用人群三、准备工作四、获取账号五、下载资料 六、安装相关软件七、启动客户端八、注入并启动微信九、机器人的基本配置十、自定义接口开发 …

数据库系统概论SQL编程题合集(包含期末题、考研初试题以及复试题)

二、现有数据库casemanage中表结构如下图 1&#xff09;请编写sql语句对年龄进行升序排列 select * from afinfo order by birth;2&#xff09;请编写sql语句查询对“徐”姓开头的人员名单 select * from afinfo where name like 徐%;3&#xff09;请编写sql语句修改“陈晓”…

【28】Kotlin语法进阶——使用协程编写高效的并发程序

提示&#xff1a;此文章仅作为本人记录日常学习使用&#xff0c;若有存在错误或者不严谨得地方欢迎指正。 文章目录 一、Kotlin中的协程1.1 协程的基本用法1.1.1协程与协程作用域1.1.2 使用launch函数创建子协程1.1.3 通过suspend关键声明挂起函数1.1.4 coroutineScope函数 1.2…