Redis缓存双写一致性

news2024/10/7 20:30:13

目录

    • 双写一致性
      • Redis与Mysql双写一致性
        • canal
          • 配置流程
          • 代码案例
        • 双写一致性理解
          • 缓存操作细分
        • 缓存一致性多种更新策略
          • 挂牌报错,凌晨升级
          • 先更新数据库,在更新缓存
          • 先删除缓存,在更新数据库
          • 先更新数据库,在删除缓存
          • 延迟双删策略
      • 总结

双写一致性

Redis与Mysql双写一致性

canal

主要是用于MySQL数据库增量日志数据的订阅,消费和解析(由阿里开源的Java项目),canal是通过伪装成MySQL的slave节点来转储master节点的binlog日志的一个中间件,他拿到日志内容以后,就可以把日志的相关数据变更重放到任何地方,可以是其他的MySQL,也可以是消息队列,redis甚至是文件中.

配置流程
  • 开启MySQL的binlog写入功能(需要重启MySQL,阿里云的好像默认就开启了)
  • 授权canal连接MySQL的账号,其实就是新建一个canal专用的账号便于区分(权限可以稍微高一些)
  • 去官网下载并解压canal到自己的目录下,修改instance.properties配置文件
  • 换成自己mysql主机所在的ip地址
  • 换成自己刚才给MySQL新建的用户及其密码
  • 启动canel并查看server和instance实例的日志来确保启动运行成功
代码案例
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class RedisCanalClientExample {

    public static final int _60SECONDS = 60;

    public static void main(String[] args) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(
                "127.0.0.1", 1111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        System.out.println("---------程序启动,开始监听MySQL的变化: ");
        try {
            connector.connect();
            //这个就是你要订阅的变化的那个库表
            connector.subscribe("db_test.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;

            while (emptyCount < totalEmptyCount) {
                //获取指定数量的数据
                Message message = connector.getWithoutAck(batchSize);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    emptyCount = 0;
                    printEntry(message.getEntries());
                    System.out.println();
                }
                //提交确认
                connector.ack(batchId);
                //处理失败,回滚数据
                //connector.rollback(batchId);
            }
            System.out.println("empty too many times,exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<CanalEntry.Entry> entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            CanalEntry.RowChange rowChange = null;
            try {
                rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                throw new RuntimeException(e);
            }
            CanalEntry.EventType eventType = rowChange.getEventType();
            System.out.printf("==========binlog[%s:%s],name[%s,%s],eventType : %s%n",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType);
            for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                if (eventType == CanalEntry.EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == CanalEntry.EventType.UPDATE) {
                    redisUpdate(rowData.getAfterColumnsList());
                } else {
                    redisDelete(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void redisInsert(List<CanalEntry.Column> columns) {
        //实现省略,往redis插入数据
    }

    private static void redisUpdate(List<CanalEntry.Column> columns) {
        //实现省略,往redis修改数据
    }

    private static void redisDelete(List<CanalEntry.Column> columns) {
        //实现省略,往redis删除数据
    }
}

双写一致性理解

  • redis中有数据,需要和数据库中的值相同
  • redis中无数据,需要数据库中的值要是最新值
缓存操作细分
  • 只读缓存
  • 读写缓存
  • 同步直写策略:写数据库时也同步写缓存,缓存和数据库中的数据一致(对于读写缓存来说,要想保证缓存和数据库中的数据一致,就要采用同步直写策略)

缓存一致性多种更新策略

挂牌报错,凌晨升级

让客户稍作等待,然后趁机更新mysql和redis(特别重要级别的数据最好不要多线程)

给缓存设置过期时间,是保证最终一致性的解决方案.所有的写操作以数据库为准,对缓存操作只是尽最大的努力即可.也就是说如果数据库写入成功,缓存更新失败,那么只要到达过期时间.后面的请求自然会从数据库中读取新数据然后回填缓存,达到一致性.切记以mysql的数据库写入为准.

先更新数据库,在更新缓存

在高并发的情境下,这个操作是跨两个不同的系统的,就一定会可能发生数据不一致的问题,导致读到脏数据(比如某方更新失败了)

先删除缓存,在更新数据库

容易出现的异常问题:A线程删除了缓存,去更新mysql. B线程过来又要读取,A还在更新中,这时候有可能发生

  • 有可能缓存击穿(看你有没有双端检索加锁来初始化缓存)
  • B从mysql获得了旧值
  • B会把获得的旧值写回到Redis缓存(被A删除掉的旧数据,又被B给写会了,缓存的更新就失败了)
  • 请求A更新完成,MySQL与Redis发生了数据不一致的情况

这种方案尽量不要用

先更新数据库,在删除缓存

还是会出现短时间的数据不一致(可能会从缓存中读取到旧数据)

canal就是类似的思想

延迟双删策略

先删除Redis的缓存,在更新完数据库之后,再删除一次Redis的缓存(延迟删除),这时候能保证数据的最终一致性.

  • 这个删除该休眠多久
  • 自己根据业务进行一个具体的评估,在此耗时基础上面加个**百毫秒**左右即可
  • 如果MySQL是主从分离如何
  • 从库更可能导致数据不一致问题(还有个主从复制的延迟时间),所以更加需要采用延迟双删的策略了(延迟时间可能需要再加上百毫秒时间)
  • 这种同步淘汰策略,吞吐量降低了怎么办
  • 可以新起来一个线程去后台做这个事情(用CompletableFuture等实现)

分布式系统只有最终一致性,很难去做到强一致性

总结

把Redis作为只读缓存的话还好,没有一致性的问题,但是如果把Redis作为读写缓存来用.建议使用先更新数据库,再删除缓存的方案.理由如下:

  • 先删除缓存的值在更新数据库,有可能缓存击穿打满MySQL,并且也避免不了数据不一致的问题
  • 如果业务应用中读取数据库和写缓存的时间不好估算,那么延迟双删中的等待时间就不好设置

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/403441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vuex3的介绍与state、actions和mutations的使用

一、定义官网&#xff1a;Vuex 是什么&#xff1f; | Vuex (vuejs.org)Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式。它采用集中式存储管理应用的所有组件的状态&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化。二、安装cdn<script src"/path/…

[手撕数据结构]栈的深入学习-java实现

CSDN的各位uu们你们好,今天千泽带来了栈的深入学习,我们会简单的用代码实现一下栈, 接下来让我们一起进入栈的神奇小世界吧!0.速览文章一、栈的定义1. 栈的概念2. 栈的图解二、栈的模拟实现三.栈的经典使用场景-逆波兰表达式总结一、栈的定义 1. 栈的概念 栈&#xff1a;一种…

Optimizers for Deep Learning

文章目录一、Some NotationsWhat is Optimization about?二、SGDSGD with Momentum(SGDM)Why momentum?三、AdagradRMSProp四、AdamSWATS [Keskar, et al., arXiv’17]Towards Improving AdamTowards Improving SGDMRAdam vs SWATSLookahead [Zhang, et al., arXiv’19]Momen…

[洛谷-P3047] [USACO12FEB]Nearby Cows G(树形DP+换根DP)

[洛谷-P3047] [USACO12FEB]Nearby Cows G一、问题题目描述输入格式输出格式样例 #1样例输入 #1样例输出 #1提示二、分析1、状态表示2、状态转移3、换根DP三、代码一、问题 题目描述 Farmer John has noticed that his cows often move between nearby fields. Taking this in…

【数据结构初阶】单链表面试题|内含链表带环问题

目录 前言 链表面试题 1. 删除链表中等于给定值 val 的所有节点。oj链接 2.反转一个单链表。oj链接 3. 给定一个带有头结点 head 的非空单链表&#xff0c;返回链表的中间结点。如果有两个中间结点&#xff0c;则返回第二个中间结点。oj链接 4. 输入一个链表&#xff0c;…

每天五分钟机器学习算法:贝叶斯算法中处理重复词语的三种方式

什么是重复词语? 我们预测一封邮件是否是垃圾邮件,前面已经介绍,我们需要对其进行分词处理,问题是分词处理之后很有可能有重复的词,那么这重复的词如何处理,这里我们介绍三种方式: 1.多项式模型 2.伯努利模型 3.混合模型 重复的情况举例 现在有一个垃圾邮件,它的内…

安装Linux虚拟机和Hadoop平台教程汇总及踩坑总结

&#x1f4cd;主要内容介绍安装Linux虚拟机、ubuntu系统、安装hadoop三个环节的教程链接介绍及本机与虚拟机的FTP传输教程总结&#xff08;直接找hadoop安装环节的5.filezilla传输文件&#xff09;新鲜出炉的踩坑总结和填坑指南安装Linux虚拟机和ubuntu系统一、材料和工具1、下…

站内SEO内容优化包括那些?

站内SEO优化是指优化网站内部结构&#xff0c;以提高搜索引擎对网站的识别和评价&#xff0c;从而提高网站在搜索引擎自然排名中的权重和位置。 站内SEO内容优化的目标是提高网站内容的质量和相关性&#xff0c;从而吸引更多的用户访问和留存。 以下是一些站内SEO优化的要点&…

Yolov5-交通标志检测与识别

项目介绍 上一篇文章介绍了基于卷积神经网络的交通标志分类识别Python交通标志识别基于卷积神经网络的保姆级教程&#xff08;Tensorflow&#xff09;&#xff0c;并且最后实现了一个pyqt5的GUI界面&#xff0c;并且还制作了一个简单的Falsk前端网页实现了前后端的一个简单交互…

C/C++内存管理讲解

c/C内存管理讲解 C/C内存分布 首先通过一些题目的引入讲解带大家走进C/C的内存分布。 eg1&#xff1a; 根据上述变量的定义&#xff0c;来判断它们所在的内存位置。 从接下来的4个选项中选出最佳答案填入&#xff08;注&#xff1a;可重复选&#xff09;。 A、栈 B、堆 C、数…

已知如下数据库表,写出查询各门课的分数最高者的SQL语句,要求格式为“科目,学生名,分数”,并按科目Id排序

题目描述 在某笔试题中遇到了这样的题目&#xff0c;之前学过数据库原理&#xff0c;但是这综合性太强&#xff0c;一下子犯了难。 解决过程 在数据库中建立上述表&#xff0c;以验证写的SQL对不对 平台&#xff1a;Navicate SQL 16 for MySQL 尝试写SQL查询 尝试1 …

CSS 扫盲

✏️作者&#xff1a;银河罐头 &#x1f4cb;系列专栏&#xff1a;JavaEE &#x1f332;“种一棵树最好的时间是十年前&#xff0c;其次是现在” 目录引入方式内部样式内联样式外部样式CSS 选择器CSS 常用属性值字体属性设置字体大小粗细文字样式文本属性文本颜色文本对齐文本装…

1640_MIT 6.828 fork函数的功能以及相关代码分析

全部学习汇总&#xff1a; GitHub - GreyZhang/g_unix: some basic learning about unix operating system. 继续分析之前看到的一段代码&#xff0c;先梳理一下这里遇到的fork函数的应用。 1. 这个是属于系统调用类的接口&#xff0c;也是这一段时间我看到的第一个这种类型的接…

Python3实现写作

导语T_T没有科研梦想的人半夜过来水篇文章~~~让Python学会写写歌&#xff0c;创创作~~~纯属娱乐~~~改编自PyTorch官网的一个教程&#xff0c;不过我用TF写的&#xff0c;然后生成英文变成了生成中文~~~Lets Go~~~相关文件百度网盘下载链接: https://pan.baidu.com/s/1VUEFR82Cq…

一个 适用 vue3 ts h5移动端 table组件

vue3-h5-table 介绍 适用于 vue3 ts 的 h5 移动端项目 table 组件 支持 左侧固定 滑动 每行点击回调 支持 指定列排序 链接 &#xff1a;https://github.com/duKD/vue3-h5-table 效果 props说明minTableHeight表格最小高度 可选 默认600rowNum表格显示几行 可选 默认 6he…

使用微软新必应(New Bing)AI机器人生成树莓派Pico W开发板MicroPython应用程序

微软新必应是一款由人工智能驱动的AI搜索引擎&#xff08;基于Chat GPT4.0的先进自然语言生成模型&#xff09;&#xff0c;它能与用户进行流畅、自然、有趣的对话&#xff0c;并提供可靠、及时的搜索结果&#xff0c;以及回答用户的各种问题。我们可以使用新必应生成程序代码、…

MySQL workbench基本查询语句

1.查询所有字段所有记录 SELECT * FROM world.city; select 表示查询&#xff1b;“*” 称为通配符&#xff0c;也称为“标配符”。表示将表中所有的字段都查询出来&#xff1b;from 表示从哪里查询&#xff1b;world.city 表示名为world的数据库中的city表&#xff1b; 上面…

13 node 程序后台执行加上 tail 命令, 中断 tail 命令, 同时也中断了 node 程序

前言 呵呵 最近帮朋友解决问题[2022.09.08] 需要启动一个 node 程序, 然后 需要一个 startUp.sh 脚本 然后 反手写了一个过去, 按道理 来说 应该是 后台启动了对应的 node 程序, 然后将 标准输出, 错误输出 输出到 logs/nohup.log 日志文件中, 然后基于 tail 命令 来查看 …

【Kafka】MM2同步Kafka集群时如何自定义复制策略(ReplicationPolicy)

文章目录需求准备工作自定义复制策略编译代码需求 使用MM2同步集群数据&#xff0c;topic名称不能变&#xff0c;默认的复制策略为&#xff1a;DefaultReplicationPolicy&#xff0c;这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀&#xff0c;比如源集群别名为…

设计模式-第13章(状态模式)

状态模式状态模式状态模式的好处和用处工作状态状态模式 状态模式&#xff08;State&#xff09;&#xff0c;当一个对象的内在状态改变时允许改变其行为&#xff0c;这个对象看起来像是改变了其类。 状态模式主要解决的是当控制一个对象状态转换的条件表达式过于复杂时的情况…