【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

news2025/4/21 10:03:12

【黑马点评优化】2-Canel实现多级缓存(Redis+Caffeine)同步

  • 0 背景
  • 1 配置MySQL
    • 1.1 开启MySQL的binlog功能
      • 1.1.1 找到mysql配置文件my.ini的位置
      • 1.1.2 开启binlog
    • 1.2 创建canal用户
  • 2 下载配置canal
    • 2.1 canal 1.1.5下载
    • 2.2 配置canal
    • 2.3 启动canal
    • 2.4 测试
  • 3 canal实现双写一致
    • 3.1 Redis操作封装
    • 3.2 编写监听器
    • 3.3 修改ShopServiceImpl.java
    • 3.4 测试
  • 参考资料

github地址如下:https://github.com/xianghua-2/hm-dianping

0 背景

【黑马点评优化】之使用Caffeine+Redis实现应用级二层缓存_caffeine redis二级缓存-CSDN博客

当时使用Redis+Caffeine实现对商铺信息的应用层两级缓存。文章提到了两级缓存Redis+Caffeine可以解决缓存雪等问题也可以提高接口的性能,但是可能会出现缓存一致性问题。如果数据频繁的变更,可能会导致Redis和Caffeine数据不一致的问题。

为此,使用Canel来解决这一问题。

MySQL工作原理如下:

一句话总结(详细工作原理,可以查看下面的介绍)

模拟MySQL从库读取binlog实现数据变更监听。它支持数据过滤、转换,并能将变更数据推送到不同的下游系统,如消息队列和其他数据库。

我的相关软件版本
mysql:8.0.36
redis:6.2.6
canal:1.1.5

1 配置MySQL

1.1 开启MySQL的binlog功能

1.1.1 找到mysql配置文件my.ini的位置

开始的时候找不到mysql配置文件(my.ini)的位置

通过下列方法找到:

  • 先连接mysql

mysql -h localhost -u root -p123456 (注意,有密码的话,才-p123456)

  • 输入以下命令

show variables like 'datadir';

这样就找到了配置文件所在的位置

1.1.2 开启binlog

my.ini的最后几行加上以下内容

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1

1.2 创建canal用户

DROP USER IF EXISTS 'canal'@'%';

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

但是由于mysql8.0之后 如果只设置了 % 的访问权限,
会导致localhost无法访问
所以 我们需要把当前权限更新为 localhost 再执行一遍
继续执行下述命令

select mysql;

update user set host = 'localhost' where user = 'canal' and host='%';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;
查看所有用户访问权限结果如下:

-- 查看所有用户访问权限
 SELECT DISTINCT CONCAT('User: ''',user,'''@''',host,''';') AS query FROM mysql.user;

在这里插入图片描述

自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password

解决:修改canal用户对应的身份验证插件为mysql_native_password

因此,接着执行下列命令

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

ALTER USER 'canal'@'localhost' IDENTIFIED WITH mysql_native_password BY 'password';

之后我们再次查看canal用户对应的身份验证插件,如下即修改成功

在这里插入图片描述

2 下载配置canal

2.1 canal 1.1.5下载

在这里我们选择1.1.15版本的canal,因为canal1.1.15版本以后的canal不兼容Java 1.8

Release v1.1.5 · alibaba/canal (github.com)

选择canal.deployer-1.1.5.tar.gz下载,之后解压到自己的软件目录下。

在这里插入图片描述

2.2 配置canal

进入解压后的Canal目录,找到conf目录下的example实例,通常情况下,你可以通过修改conf/example/instance.properties文件来配置Canal连接到MySQL的参数,主要配置项包括:

canal.instance.master.address:MySQL服务器地址和端口。
canal.instance.dbUsername和canal.instance.dbPassword:用于连接MySQL的用户名和密码。
canal.instance.connectionCharset:数据库的字符集,通常为UTF-8。
canal.instance.tsdb.enable:是否启用表结构历史记录功能,建议开启。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

2.3 启动canal

当安装好canal的时候,在window中启动bat的时候有些问题,JDK17版本已经不持’PermSize=128m’

因此,删除start.bat中的这一行

在这里插入图片描述

之后,双击bin/start.bat运行即可。

结果如下:

在这里插入图片描述

2.4 测试

pom.xml中导入依赖

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.7</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.7</version>
        </dependency>

之后,编写测试类如下:


import java.net.InetSocketAddress;
import java.util.List;



import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
import org.jetbrains.annotations.NotNull;

public class CanalTest {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmtryCount = 1200;
            while (emptyCount < totalEmtryCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(@NotNull List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(@NotNull List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

运行后,能看到,监听到了数据库的变化

在这里插入图片描述

3 canal实现双写一致

导入依赖

        <dependency>
            <groupId>top.javatool</groupId>
            <artifactId>canal-spring-boot-starter</artifactId>
            <version>1.2.1-RELEASE</version>
        </dependency>

新建com.hmdp.cache.handler包

3.1 Redis操作封装

新建ShopRedisHandler类

package com.hmdp.cache.handler;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import com.hmdp.service.IShopService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.List;

import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;

@Component
public class ShopRedisHandler implements InitializingBean {

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IShopService shopService;


    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Resource
    private Cache<String, Object> shopCache;

    // 缓存预热

    @Override
    public void afterPropertiesSet() throws Exception {
        // 初始化缓存
        // 1.查询商品信息
        List<Shop> shopList = shopService.list();
        // 2.放入缓存
        for (Shop shop : shopList) {
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(shop);
            // 2.2 存入caffeind
            String key = CACHE_SHOP_KEY + shop.getId();
            shopCache.put(key, shop);
            // 2.2.存入redis
            redisTemplate.opsForValue().set(key, json);
        }

//        // 3.查询商品库存信息
//        List<ItemStock> stockList = stockService.list();
//        // 4.放入缓存
//        for (ItemStock stock : stockList) {
//            // 2.1.item序列化为JSON
//            String json = MAPPER.writeValueAsString(stock);
//            // 2.2.存入redis
//            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
//        }
    }

    public void saveShop(Shop shop) {
        try {
            String json = MAPPER.writeValueAsString(shop);
            String key = CACHE_SHOP_KEY  + shop.getId();
            redisTemplate.opsForValue().set(key + shop.getId(), json);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public void deleteShopById(Long id) {
        String key = CACHE_SHOP_KEY  + id;
        redisTemplate.delete(key + id);
    }
}

3.2 编写监听器

新建ShopHandler类

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("tb_item")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类
package com.hmdp.cache.handler;

import com.github.benmanes.caffeine.cache.Cache;
import com.hmdp.entity.Shop;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

import javax.annotation.Resource;

import static com.hmdp.utils.RedisConstants.CACHE_SHOP_KEY;

@CanalTable(value = "tb_shop")
@Component
public class ShopHandler implements EntryHandler<Shop>{


    @Autowired
    private ShopRedisHandler redisHandler;

    @Resource
    private Cache<String, Object> shopCache;

    @Override
    public void insert(Shop shop) {
        // 写数据到JVM进程缓存
        String key = CACHE_SHOP_KEY + shop.getId();
        shopCache.put(key , shop);
        // 写数据到redis
        redisHandler.saveShop(shop);
    }

    @Override
    public void update(Shop before, Shop after) {
        // 写数据到JVM进程缓存
        String key = CACHE_SHOP_KEY + after.getId();
        shopCache.put(key, after);
        // 写数据到redis
        redisHandler.saveShop(after);
    }

    @Override
    public void delete(Shop shop) {
        // 删除数据到JVM进程缓存
        String key = CACHE_SHOP_KEY + shop.getId();
        shopCache.invalidate(key);
        // 删除数据到redis
        redisHandler.deleteShopById(shop.getId());
    }
}

3.3 修改ShopServiceImpl.java

修改ShopServiceImpl中的update方法。注释掉删除缓存的操作。

缓存更新由监听器完成。

    @Override
    public Result update(Shop shop) {

        Long id = shop.getId();
        if(id == null){
            return Result.fail("店铺id不能为空");
        }
        //1.更新数据库
        updateById(shop);

        // @TODO 现在不再需要删除缓存了,由canal监听数据库的变化,然后更新缓存
        //2.删除缓存
//        stringRedisTemplate.delete(CACHE_SHOP_KEY + id);
        return Result.ok();
    }

3.4 测试

接下来可以自行调断点测试,也可以运行项目,然后更改数据库,查看终端输出。
在这里插入图片描述

参考资料

mysql 8.0找不到my.ini配置文件解决方案_mysql80没有my.ini-CSDN博客

Docker整合canal 踩坑实录_com.alibaba.otter.canal.parse.exception.canalparse-CSDN博客

Canal 1.1.5 启动报错:caching_sha2_password Auth failed_canal启动出现canal.deployer-1.1.5.jar:na-CSDN博客

Canal启动和运行出现的问题_canal启动闪退-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2301701.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CPP集群聊天服务器开发实践(五):nginx负载均衡配置

1 负载均衡器的原理与功能 单台Chatserver可以容纳大约两万台客户端同时在线聊天&#xff0c;为了提升并发量最直观的办法需要水平扩展服务器的数量&#xff0c;三台服务器可以容纳六万左右的客户端。 负载均衡器的作用&#xff1a; 把client的请求按照负载均衡算法分发到具体…

百问网(100ask)的IMX6ULL开发板的以太网控制器(MAC)与物理层(PHY)芯片(LAN8720A)连接的原理图分析(包含各引脚说明以及工作原理)

前言 本博文承接博文 https://blog.csdn.net/wenhao_ir/article/details/145663029 。 本博文和博文 https://blog.csdn.net/wenhao_ir/article/details/145663029 的目录是找出百问网(100ask)的IMX6ULL开发板与NXP官方提供的公板MCIMX6ULL-EVK(imx6ull14x14evk)在以太网硬件…

组件库地址

react&#xff1a; https://react-vant.3lang.dev/components/dialoghttps://react-vant.3lang.dev/components/dialog vue用v2的 Vant 2 - Mobile UI Components built on Vue

2025.2.16机器学习笔记:TimeGan文献阅读

2025.2.9周报 一、文献阅读题目信息摘要Abstract创新点网络架构一、嵌入函数二、恢复函数三、序列生成器四、序列判别器损失函数 实验结论后续展望 一、文献阅读 题目信息 题目&#xff1a; Time-series Generative Adversarial Networks会议&#xff1a; Neural Information…

最新智能优化算法: 中华穿山甲优化( Chinese Pangolin Optimizer ,CPO)算法求解23个经典函数测试集,MATLAB代码

中华穿山甲优化&#xff08; Chinese Pangolin Optimizer &#xff0c;CPO&#xff09;算法由GUO Zhiqing 等人提出&#xff0c;该算法的灵感来自中华穿山甲独特的狩猎行为&#xff0c;包括引诱和捕食行为。 算法流程如下&#xff1a; 1. 开始 设置算法参数和最大迭代次数&a…

使用 DeepSeek + 语音转文字工具 实现会议整理

目录 简述 1. DeepSeek与常用的语音转文字工具 1.1 DeepSeek 1.2 讯飞听见 1.3 飞书妙记 1.4 剪映电脑版 1.5 Buzz 2. 安装Buzz 3. 使用DeepSeek Buzz提取并整理语音文字 3.1 使用 Buzz 完成语音转文字工作 3.2 使用 DeepSeek 进行文本处理工作 3.3 整理成思维导图…

【OS安装与使用】part4-ubuntu22.04安装anaconda

文章目录 一、待解决问题1.1 问题描述1.2 解决方法 二、方法详述2.1 必要说明2.2 应用步骤2.2.1 官网下载Anaconda&#xff08;1&#xff09;确认自己的系统型号与硬件架构&#xff08;2&#xff09;官网下载对应版本 2.2.2 安装Anaconda&#xff08;1&#xff09;基于shell脚本…

Spring IoC DI:控制反转与依赖注入

目录 前言 - Spring MVC 与 Spring IoC 之间的关系 1. IoC 1.1 Spring Framework, Spring MVC, Spring boot 之间的联系[面试题] 1.2 什么是容器 1.3 什么是 IoC 2. DI 2.1 什么是 DI 3. Spring IoC & DI 3.1 Component 3.2 Autowired 4. IoC 详解 4.1 Applica…

数字化转型4化:标准化奠基-信息化加速-数字化赋能-智能化引领

​随着经济增速的放缓&#xff0c;大国体系所催生的生产力逐渐释放&#xff0c;后续业务的发展愈发需要精耕细作&#xff0c;精益理念也必须深入企业的骨髓。与此同时&#xff0c;在全球经济一体化的大背景下&#xff0c;企业面临着来自国内外同行&#xff0c;甚至是跨行业的激…

简单易懂,解析Go语言中的Channel管道

Channel 管道 1 初始化 可用var声明nil管道&#xff1b;用make初始化管道&#xff1b; len()&#xff1a; 缓冲区中元素个数&#xff0c; cap()&#xff1a; 缓冲区大小 //变量声明 var a chan int //使用make初始化 b : make(chan int) //不带缓冲区 c : make(chan stri…

C++基础知识学习记录—模版和泛型编程

1、模板 概念&#xff1a; 模板可以让类或者函数支持一种通用类型&#xff0c;在编写时不指定固定的类型&#xff0c;在运行时才决定是什么类型&#xff0c;理论上讲可以支持任何类型&#xff0c;提高了代码的重用性。 模板可以让程序员专注于内部算法而忽略具体类型&#x…

已解决IDEA无法输入中文问题(亲测有效)

前言 在使用IDEA的时候&#xff0c;比如我们想写个注释&#xff0c;可能不经意间&#xff0c;输入法就无法输入中文了&#xff0c;但是在其他地方打字&#xff0c;输入法仍然能够正常工作。这是什么原因呢&#xff0c;这篇文章带你解决这个问题&#xff01; 快捷键 如果你的I…

人工智能之目标追踪DeepSort源码解读(yolov5目标检测,代价矩阵,余弦相似度,马氏距离,匹配与预测更新)

要想做好目标追踪,须做好目标检测,所以这里就是基于yolov5检测基础上进行DeepSort,叫它为Yolov5_DeepSort。整体思路是先检测再追踪,基于检测结果进行预测与匹配。 一.参数与演示 这里用到的是coco预训练人的数据集&#xff1a; 二.针对检测结果初始化track 对每一帧数据都输出…

Copilot基于企业PPT模板生成演示文稿

关于copilot创建PPT&#xff0c;咱们写过较多文章了&#xff1a; Copilot for PowerPoint通过文件创建PPT Copilot如何将word文稿一键转为PPT Copilot一键将PDF转为PPT&#xff0c;治好了我的精神内耗 测评Copilot和ChatGPT-4o从PDF创建PPT功能 Copilot for PPT全新功能&a…

MySQL 主从复制原理及其工作过程

一、MySQL主从复制原理 MySQL 主从复制是一种将数据从一个 MySQL 数据库服务器&#xff08;主服务器&#xff0c;Master&#xff09;复制到一个或多个 MySQL 数据库服务器&#xff08;从服务器&#xff0c;Slave&#xff09;的技术。以下简述其原理&#xff0c;主要包含三个核…

MySQL远程连接配置

一、配置TCP服务地址绑定 配置文件路径 /etc/mysql/mysql.cnf /etc/mysql/mysql.conf.d/mysqld.cnf具体文件可以通过 mysql --help查看 配置项 # 只接受本地连接 bind-address 127.0.0.1 mysqlx-bind-address 127.0.0.1改为 # 接受任意IP地址连接 bind-address …

iOS开发书籍推荐 - 《高性能 iOS应用开发》(附带链接)

引言 在 iOS 开发的过程中&#xff0c;随着应用功能的增加和用户需求的提升&#xff0c;性能优化成为了不可忽视的一环。尤其是面对复杂的界面、庞大的数据处理以及不断增加的后台操作&#xff0c;如何确保应用的流畅性和响应速度&#xff0c;成为开发者的一大挑战。《高性能 …

leetcode1047-删除字符串中的所有相邻重复项

leetcode 1047 思路 因为要删除字符串中的所有相邻重复项&#xff0c;那么在删除完成后&#xff0c;最后返回的元素中是不应该存在任何相邻重复项的&#xff0c;如果是普通的遍历&#xff0c;假设str ‘abbaca’&#xff0c;遍历出来只发现中间的bb是相邻重复的删除了以后a…

解决DeepSeek服务器繁忙问题的实用指南

目录 简述 1. 关于服务器繁忙 1.1 服务器负载与资源限制 1.2 会话管理与连接机制 1.3 客户端配置与网络问题 2. 关于DeepSeek服务的备用选项 2.1 纳米AI搜索 2.2 硅基流动 2.3 秘塔AI搜索 2.4 字节跳动火山引擎 2.5 百度云千帆 2.6 英伟达NIM 2.7 Groq 2.8 Firew…

web入侵实战分析-常见web攻击类应急处置实验1

场景说明&#xff1a; 某天运维人员发现在/opt/tomcat8/webapps/test/目录下&#xff0c;多出了一个index_bak.jsp这个文件&#xff0c; 并告诉你如下信息 操作系统&#xff1a;ubuntu-16.04业务&#xff1a;测试站点中间件&#xff1a;tomcat开放端口&#xff1a;22&#x…