Java17 --- redis7缓存双写一致性

news2024/11/23 15:20:46

一、缓存双写一致性

  1. 如果redis中有数据:需要和数据库中的值相同。
  2. 如果redis中没有数据:数据库中的值要是最新值,且准备回写redis。
  3. 只读缓存。
  4. 读写缓存:①、同步直写策略:写数据库后也同步写redis缓存,缓存和数据库中的数据一致,对于读写缓存来说,要想保证缓存和数据库中的数据一致,就要采用同步直写策略。②、异步缓写策略:正常业务运行中,mysql数据变动了,但是可以在业务上容许出现一定时间后才作用于redis,如仓库、物流等功能。异常情况出现了,不得不将失败的动作重新修补,有可能需要借助kafka或者rabbitMQ等消息中间件,实现重试重写。
  5. 双检加锁策略:多个线程同时去查询数据库的这条数据,那么就可以第一个查询数据的请求上使用一个互斥锁来锁住它。其他线程走到这一步拿不到锁就等着,等第一个线程查询到了数据,然后做缓存。后面的线程进来发现已经有缓存了,就直接走缓存。

1.1、数据库和缓存一致性的更新策略

目的:给缓存设置过期时间,定期清理缓存并回写,是保证最终一致性的解决方案。

可以对存入缓存的数据设置过期时间,所有的写操作以数据库为准,对缓存操作只是尽最大努力即可。就是如果数据库写入成功,缓存更新失败,那么只要到达过期时间,则后面的读请求自然会从数据库中读取新值然后回填缓存,达到一致性,要以mysql的数据库写入库为准。

1.1.1、在停机的情况下

给出公告,服务升级,单线程,这样重量级的数据操作最好不要多线程。

1.1.2、先更新数据库,再更新缓存

1、情况1:①、先更新mysql的某商品的库存,当前商品的库存是100,更新为99。②、先更新mysql修改为99成功,然后更新redis。③、出现异常,更新redis失败了,导致MySQL里面的库存是99而redis里面还是100。所以会导致数据库里的数据和缓存redis里面数据不一致,读到redis脏数据。

2、情况2:在多线程环境下,A,B两个线程有快有慢。①、A更新mysql为100。②、B更新mysql为90。③、B先更新redis为90。④、A再更新redis为100。所以导致redis与mysql更新的数据不一致。

1.1.3、先更新缓存,再更新数据库

不推荐:业务上一般把mysql作为底单数据库,保证最后解释。

1.1.4、先删除缓存,再更新数据库

1、请求A进行写操作,删除redis缓存后,工作正在进行中,更新mysql……A还没有彻底更新完mysql,还没commit。

2、请求B开工查询,查询redis发现缓存不存在(被A从redis中删除了)。

3、请求B继续,去数据库查询得到了mysql中的旧值(A还没有更新完)。

4、请求B将旧值写回redis缓存。

5、请求A将新值写入mysql数据库。

这样依然会导致数据不一致的情况发生。

解决方法:采用延时双删策略,A线程删除redis缓存,然后sleep一段时间,这期间就是为了让B线程先从数据库读取数据,再把缺失的数据写入缓存,然后线程A再进行删除。所以,线程A sleep的时间,就需要大于线程B读取数据再写入缓存的时间。这样,其它线程读取数据时,会发现缓存缺失,所以会从数据库中读取最新值,因为这个方案会在第一次删除缓存后,延迟一段时间再次进行删除,所以叫做:延迟双删。

延时双删的不足:

  1. 这个删除该休眠多久呢?

线程A sleep的时间,需要大于线程B读取数据再写入缓存的时间。①、在业务程序运行的时候,统计下线程读数据和写缓存的操作时间,评估出项目的读数据业务逻辑的耗时,以此为基础,然后写数据的休眠时间则在读数据业务的耗时上加百毫秒就行。这样确保请求结束,写请求可以删除读请求造成的缓存脏读。②、新启动一个后台监控程序,如watchdog监控程序会加时。

1.1.5、先更新数据库,在删除缓存

缺点:缓存删除失败或者来不及,导致请求再次访问redis时缓存命中,读取到的是缓存旧值。

解决方案:

  1. 可以把要删除的缓存值或者是要更新的数据库值暂存到消息队列中(如使用Kafaka/RabbitMQ)。
  2. 当程序没有能够成功地删除缓存值或者是更新数据库值时,可以从消息队列中重新读取这些值,然后再次进行删除或更新。
  3. 如果能够成功地删除或更新,我们就要把这些值从消息队列中去除,以免重复操作,此时,也可以保证数据库和缓存的数据一致了,否则还需要再次进行重试。
  4. 当重试超过一定次数后,就需要向业务层发送保错信息了,通知运维人员。

总结:

1.2、Redis与Mysql数据双写一致性

1.2.1、canal

主要用途用于MySQL数据库增量日志数据的订阅,消费和解析,是阿里巴巴开发并开源的,采用Java语言开发。

主要功能:1、数据库镜像,2、数据库实时备份。3、索引构建和实时维护(拆分异构索引、倒排索引等)。4、业务cache刷新。5、带业务逻辑的增量数据处理。

工作原理:①、canal模拟MySQL  slave的交互协议,伪装自己为MySQL master发送dump协议。②、MySQL master收到dump请求,开始推送binary log给slave(即canal)

③、canal解析binary  log对象(原始为byte流)。

下载地址:GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

1.2.2、Redis与Mysql数据双写一致性实现

mysql前置配置:

  • 、MySQL 5.7.36
  • 、当前主机二进制日志:SHOW  MASTER STATUS;
  • 、查看:SHOW VARIABLES LIKE 'log_bin';
  • 、开启MySQL的binlog写入功能,在mysql的ini文件中配置

log-bin=mysql-bin #开启binlog

binlog-format=ROW #开启ROW模式

server_id=1 #配置MySQL replction需要定义,不要和canal的slaveid重复

 

 

  • 重启mysql
  • 、再次查看:SHOW VARIABLES LIKE 'log_bin';

 

  • 授权canal连接MySQL

#先检查是否有canal

SELECT*FROM mysql.user

#没有就创建

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';

GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

FLUSH PRIVILEGES;

 

 

Canal服务端:

  • 、下载linux版本:

 

解压

 

配置文件

 

 

启动

 

查看日志

 

 

Java程序:

  • 、sql脚本:

CREATE TABLE `a_user`(

`id` BIGINT(20) NOT NULL AUTO_INCREMENT,

`userName` VARCHAR(100) NOT NULL,

PRIMARY  KEY(`id`)

)ENGINE=INNODB  AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4

 

public class RedisCanalClientExample {

    public static final Integer _60SECONDS = 60;
    public static final String REDIS_IP_ADDR = "192.168.200.110";

    private static void redisInsert(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void redisDelete(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.del(columns.get(0).getValue());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private static void redisUpdate(List<Column> columns) {
        JSONObject jsonObject = new JSONObject();
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            jsonObject.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            try (Jedis jedis = RedisUtils.getJedis()) {
                jedis.set(columns.get(0).getValue(), jsonObject.toJSONString());
                System.out.println("---------update after: " + jedis.get(columns.get(0).getValue()));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                //获取变更的row数据
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error,data:" + entry.toString(), e);
            }
            //获取变动类型
            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList());
                } else if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList());
                } else {//EventType.UPDATE
                    redisUpdate(rowData.getAfterColumnsList());
                }
            }
        }
    }


    public static void main(String[] args) {
        System.out.println("---------O(∩_∩)O哈哈~ initCanal() main方法-----------");

        //=================================
        // 创建链接canal服务端
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(REDIS_IP_ADDR, 11111),
                "example",
                "",
                "");
        int batchSize = 1000;
        //空闲空转计数器
        int emptyCount = 0;
        System.out.println("---------------------canal init OK,开始监听mysql变化------");
        try {
            connector.connect();
            //设置监控的数据库与表
            //connector.subscribe(".*\\..*");
            connector.subscribe("test1.t_user");
            connector.rollback();
            int totalEmptyCount = 10 * _60SECONDS;
            while (emptyCount < totalEmptyCount) {
                System.out.println("我是canal,每秒一次正在监听:" + UUID.randomUUID().toString());
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //计数器重新置零
                    emptyCount = 0;
                    printEntry(message.getEntries());
                }
                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }
            System.out.println("已经监听了" + totalEmptyCount + "秒,无任何消息,请重启重试......");
        } finally {
            connector.disconnect();
        }
    }
}

 

 

 

 

 

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1834731.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT的问题与回复的内容导出(Chorme)

我给出两种方式&#xff0c;第一种方式无使用要求&#xff0c;第二种方式必须安装Chorme 个人更推荐第二种方式 第一种方式&#xff1a;使用chatgpt自带的数据导出 缺点&#xff1a;会将当前未归档的所有聊天记录导出&#xff0c;发送到你的电子邮箱中 第二种方式&#xff1a…

C语言 | Leetcode C语言题解之第155题最小栈

题目&#xff1a; 题解&#xff1a; //单调栈 单调递减 typedef struct {//正常 stackint stack[10000];int stackTop;//辅助 stackint minStack[10000];int minStackTop; } MinStack;MinStack* minStackCreate() {MinStack* newStack (MinStack *) malloc(sizeof(MinS…

【Git】多人协作 -- 详解

一、多人协作&#xff08;1&#xff09; ⽬前&#xff0c;我们所完成的工作如下&#xff1a; 基本完成 Git 的所有本地库的相关操作&#xff0c;git 基本操作&#xff0c;分支理解&#xff0c;版本回退&#xff0c;冲突解决等等。 申请码云账号&#xff0c;将远端信息 clone…

C++通过VS2022使用Conan2.0安装fmt库实现控制台彩色打印

Conan是一个开源的C/C包管理器&#xff0c;用于管理和构建C/C项目的依赖关系。它允许开发人员轻松地集成第三方库、工具和资源到他们的项目中&#xff0c;并管理这些依赖项的版本、构建选项和配置。 Conan官方提供了对应的VS2022扩展插件&#xff0c;通过这个插件再搭配VS2022…

路由控制和策略路由

文章目录 一、路由控制&#xff08;1&#xff09;、前言1.1.1-路由策略 &#xff08;2&#xff09;、正反掩码和通配符1.2.1-通配符 &#xff08;3&#xff09;、ACL1.3.1-ACL步长1.3.2-步长的作用1.3.3-TCP/UDP端口号 实验1:实验2: 二、前缀列表实验1:2.1.1-前缀列表的表达式2…

一平台一张图,撑起危化生产“安全伞”

安全生产是永恒的主题&#xff0c;是一切工作的基础。 风险辨识不到位、特种作业不合规、隐患治理不彻底、应急能力不匹配……如何从消除事故隐患、从根本上解决问题&#xff1f;随着新一代信息技术和安全生产的深度融合&#xff0c;安全生产的管理方式也在发生深刻变化。 提前…

SQL Server入门-SSMS简单使用(2008R2版)-1

环境&#xff1a; win10&#xff0c;SQL Server 2008 R2 参考&#xff1a; SQL Server 新建数据库 - 菜鸟教程 https://www.cainiaoya.com/sqlserver/sql-server-create-db.html 第 2 课&#xff1a;编写 Transact-SQL | Microsoft Learn https://learn.microsoft.com/zh-cn/…

百数私有化本地部署技术解析,附带独家优惠政策

数据驱动的时代&#xff0c;数据安全性对于每个企业来说都至关重要。私有化本地部署作为一种高效的数据管理方式&#xff0c;越来越受到企业的青睐。然而&#xff0c;高昂的部署费用常常让企业望而却步。 作为一家深耕办公领域10年的低代码公司&#xff0c;百数以本地化部署起…

监控局域网电脑屏幕的办法,最简单的三种方法,好用!

在现代企业管理和家庭教育环境中&#xff0c;对局域网内电脑屏幕进行有效监控成为了保障信息安全、提升工作效率和监督行为规范的重要手段。 监控局域网电脑屏幕不仅可以帮助管理者了解员工的工作状态&#xff0c;确保资源的合理使用&#xff0c;还能在一定程度上预防潜在的网…

AI绘画Stable Diffusion 挽救渣图的神器—Loopback Scaler脚本,你值得拥有!

大家好&#xff0c;我是向阳 今天这篇文章就是围绕着开局的这两张原图开始的。 在Stable diffusion甚至当前所有的AI画图工具里面&#xff0c;AI生成内容随机性都是一个很大的问题。 我们经常遇到一张图构图不错但是脸崩了&#xff0c;又或者人物形象不错但是背景画得崩了这…

海南云亿商务咨询有限公司抖店开店怎么样?

在数字化浪潮席卷全球的今天&#xff0c;电商行业日新月异&#xff0c;其中抖音电商以其独特的短视频直播模式&#xff0c;迅速崛起成为电商领域的新贵。海南云亿商务咨询有限公司&#xff0c;作为抖音电商服务的佼佼者&#xff0c;凭借专业的团队和丰富的经验&#xff0c;致力…

人体关键点检测-基于Gradio完成应用开发

前言 本次分享将带领大家从 0 到 1 完成一个人体姿态估计任务&#xff0c;覆盖数据准备、模型训练、推理部署和应用开发的全流程&#xff0c;项目将采用以PaddlePaddle为核心的飞桨深度学习框架进行开发&#xff0c;并总结开发过程中踩过的一些坑&#xff0c;希望能为有类似项…

通过nginx转发后应用偶发502bad gateway

序言 学习了一些东西&#xff0c;如何才是真正自己能用的呢&#xff1f;好像就是看自己的潜意识的反应&#xff0c;例如解决了一个问题&#xff0c;那么下次再碰到类似的问题&#xff0c;能直接下意识的去找到对应的信息&#xff0c;从而解决&#xff0c;而不是和第一次碰到一样…

重生奇迹MU召唤术师简介

出生地&#xff1a;幻术园 性 别&#xff1a;女 擅 长&#xff1a;召唤幻兽、辅助魔法&攻击魔法 转 职&#xff1a;召唤巫师&#xff08;3转&#xff09; 介 绍&#xff1a;从古代开始流传下来的高贵的血缘&#xff0c;为了种族纯正血缘的延续及特殊使用咒术的天赋&…

那些年你用过的iOS开发工具

版权说明 本文转载于《程序员》杂志 2014 年 6 月刊。 前言 从苹果发明 iPhone 起&#xff0c;AppStore 上的一个又一个类似 flappy bird 的一夜暴富的故事刺激着大量开发者加入移动开发大军。随着这些开发者出现的&#xff0c;还有大量方便 iOS 开发者的各种工具。这些工具…

琪朗护眼大路灯推荐入手吗?书客、琪朗、雷士落地灯测评大比拼!

护眼大路灯现在的风越来越大&#xff0c;它是一种能够改善光线质量的工具&#xff0c;通过光源的设计、技术的调校、防眩光的设计等&#xff0c;利用LED全光谱光源的高能效、长寿命光色稳定性的优点&#xff0c;搭载专研的护眼黑科技技术&#xff0c;以及采用防眩光设计&#x…

【vue3|第10期】Vue3中watchEffect详解

日期&#xff1a;2024年6月10日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xf…

4、matlab双目相机标定实验

1、双目相机标定原理及流程 双目相机标定是将双目相机系统的内外参数计算出来&#xff0c;从而实现双目视觉中的立体测量和深度感知。标定的目的是确定各个摄像头的内部参数&#xff08;如焦距、主点、畸变等&#xff09;和外部参数&#xff08;如相机位置、朝向等&#xff09…

【因果推断python】42_异质干预效应2

目录 预测弹性 关键思想 预测弹性 我们在这里陷入了复杂的境地。我们已经同意我们需要预测 &#xff0c;遗憾的是这是不可观察的。因此&#xff0c;我们不能使用 ML 算法并将其作为目标插入。但也许我们不需要观察 来预测它 这是一个想法。如果我们使用线性回归呢&#xff…

Flutter ffi iOS Failed to lookup symbol

官方文档&#xff1a;在 iOS 中使用 dart:ffi 调用本地代码