目录
- 前言
- 为什么要用Redis
- 前置知识
- 一、缓存问题
- 1.1 缓存击穿
- 1.2 缓存穿透
- 1.3 缓存雪崩
- 二、缓存方案
- 2.1 普通缓存
- 2.2 冷热分离
- 2.3 多级缓存
- 2.4 缓存预热
- *课程内容
- 一、一个案例引发的思考(电商场景)
- 二、代码优化方案的演进
- 2.1 问题一:纯【读】的场景下缓存固定时间后会过期
- 问题分析
- 解决方案
- 小总结
- 2.2 问题二:批量上传缓存过期时间一样导致【缓存击穿】
- 问题分析
- 解决方案
- 小总结
- 2.3 问题三:没有过滤无效请求或者处理空数据导致【缓存穿透】
- 问题分析
- 小总结
- 2.4 问题四:冷门数据突然爆热导致高并发创建
- 问题分析
- 解决方案
- 小总结
- 2.5 问题五:缓存与数据库双写不一致(有点难度)
- 问题分析
- 解决方案
- 2.6 问题六:对2.5的巧妙优化
- 2.7 问题七:大量请求访问Redis导致【缓存雪崩】
- 学习总结
- 感谢
前言
在本章内容里,我希望大家还是要先看看【前置知识】的内容。按照我的大纲设计,我是想先给大家抛出一些大家比较陌生的,关于【Redis缓存问题以及缓存方案】的一些名词概念,再然后在正文【课程内容】里面给大家使用源码案例,然后优化演进的方式,逐步、尽可能地将【前置知识】中提到的这些内容给大家结合案例解释一下,帮助大家加深理解印象。
另外,说实在对于这个推演的过程理解还是有点门槛的,对于没有【并发意识】的同学来说,真的有点难度。我只能说我尽可能地,简单地给大伙讲解一下。
对了,我在推演过程中用到了【Redis分布式锁】,非常推荐大家看看我前面的文章《【Redis专题】大厂生产级Redis高并发分布式锁实战》,没有【并发意识】和【分布式锁】经验的同学墙裂建议看看,获取能得到那么一些灵感吧。
为什么要用Redis
听说,Redis、数据库、JVM级别扛并发能力如下:
层级 | Redis | Mysql | JVM |
---|---|---|---|
扛并发能力 | Redis官宣单个节点10W+ | 主流说法数千到数万之间 | 百万级 |
解释一下:
- 这里说的扛并发能力是指,每秒同时做【读】操作的次数
- JVM级别其实是指内存级别,比如比较有名的:ecache
所以,你知道为什么要学,或者要用Redis了吗?
前置知识
在说缓存问题跟缓存方案之前,需要跟大家声明一句:没有绝对完美的方案,只有相对可靠的方案。别钻牛角尖哦同学们
一、缓存问题
1.1 缓存击穿
Q1:什么是缓存击穿?
答:也叫缓存失效。也叫缓存失效,由于大批量缓存key在同一时间失效可能导致大量请求同时穿透缓存直达数据库,可能会造成数据库瞬间压力过大甚至挂掉。击穿,是指击穿了缓存,使得请求直达数据库。
记忆点:缓存中没有但是数据库有的数据
Q2:导致缓存击穿可能的原因是什么?
答:大量已存在的key同时失效。比如在电商场景下, 以前存在的做法是:批量上架一些热点商品到缓存中,由于批量操作,所以过期时间设置的一样。所以,在某些情况下造成:热点数据同时到达过期时间,接着大量用户进来。
Q3:缓存击穿如何解决?
答:批量添加缓存时,分散缓存过期时间(设置为一个时间段内的不同时间),避免相同时间段大量缓存失效。
给一个伪代码示例:
String get(String key) {
// 从缓存中获取数据
String cacheValue = cache.get(key);
// 缓存为空
if (StringUtils.isBlank(cacheValue)) {
// 从存储中获取
String storageValue = storage.get(key);
cache.set(key, storageValue);
//***设置一个过期时间(300到600之间的一个随机数)***
int expireTime = new Random().nextInt(300) + 300;
if (storageValue == null) {
cache.expire(key, expireTime);
}
return storageValue;
} else {
// 缓存非空
return cacheValue;
}
}
注意上面的过期时间。
1.2 缓存穿透
Q1:什么是缓存穿透?
答:缓存穿透是指缓存和数据库中都没有的数据,导致所有的请求都落到数据库上造成数据库短时间内承受大量请求而崩掉。
记忆点:缓存和数据库中都没有的数据
Q2:导致缓存穿透可能的原因是什么?
答:通常处于容错的考虑,存储层找不到数据不会写入都缓存层。正常情况是没问题的,可能导致缓存穿透的的情况如下:
- 自身业务代码或数据出现问题。这个比较难解释,但是楼主曾经在生产遇到过我一个同事,在一个定时器写了一个
while
操作,反复获取一个不存在的数据(实际上是新增功能,上线时忘了准备一些初始化数据),导致死循环CPU超负荷运行,直接把系统搞炸了。(属于是自己玩自己了) - 一些恶意攻击、爬虫等造成大量空命中。通常是遭受外部攻击
Q3:缓存穿透如何解决?
答:有如下几点:
- 接口层增加校验,如用户鉴权校验,id做基础校验,id<=0的直接拦截
- 从缓存取不到的数据,在数据库中也没有取到,这时也可以将keyvalue对写为key-空对象(不是null),缓存有效时间可以设置短点,如30秒(设置太长会导致正常情况也没法使用)。这样可以防止攻击用户反复用同一个id暴力攻击。伪代码如下:
// 从缓存中获取数据
String cacheValue = cache.get(key);
// 缓存为空
if (StringUtils.isBlank(cacheValue)) {
// 从存储中获取
String storageValue = storage.get(key);
cache.set(key, storageValue);
// 如果存储数据为空, 需要设置一个过期时间(300秒)
if (storageValue == null) {
// 缓存空对象
cache.set(key, new TargetObject());
cache.expire(key, 60 * 5);
}
return storageValue;
} else {
// 缓存非空
return cacheValue;
}
}
(注意:不同于网上其他版本,这里的建议是缓存一个空对象。为啥?说不出很好的理由。只能说,这算是一种比较好的规范。对于某些变量,给个有意义的初始值远比Null安全的多)
- 采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的 bitmap中,一个一定不存在的数据会被这个 bitmap 拦截掉,从而避免了对底层存储系统的查询压力。
对于恶意攻击,向服务器请求大量不存在的数据造成的缓存穿透,还可以用布隆过滤器先做一次过滤,对于不存在的数据布隆过滤器一般都能够过滤掉,不让请求再往后端发送。当布隆过滤器说某个值存在时,这个值可能不存在;当它说不存在时,那就肯定不存在。
布隆过滤器就是一个大型的位数组和几个不一样的无偏 hash 函数。所谓无偏就是能够把元素的 hash 值算得比较均匀.
向布隆过滤器中添加 key 时,会使用多个 hash 函数对 key 进行 hash 算得一个整数索引值然后对位数组长度进行取模运算得到一个位置,每个 hash 函数都会算得一个不同的位置。再把位数组的这几个位置都置为 1 就完成了 add 操作。
向布隆过滤器询问 key 是否存在时,跟 add 一样,也会把 hash 的几个位置都算出来,看看位数组中这几个位置是否都为 1,只要有一个位为 0,那么说明布隆过滤器中这个key 不存在。如果都是 1,这并不能说明这个 key 就一定存在,只是极有可能存在,因为这些位被置为 1 可能是因为其它的 key 存在所致。如果这个位数组长度比较大,存在概率就会很大,如果这个位数组长度比较小,存在概率就会降低。(PS:总结一下:布隆过滤器认为这个key不存在那就一定不存在;布隆过滤器认为key存在,却不一定真的存在,只是极有可能存在。记住这一点,你基本上算是理解了布隆过滤器!!!)
这种方法适用于数据命中不高、 数据相对固定、 实时性低(通常是数据集较大) 的应用场景, 代码维护较为复杂, 但是缓存空间占用很少。
(PS:重要的知识点说三遍)
(PS:重要的知识点说三遍)
(PS:重要的知识点说三遍)
布隆过滤器认为这个key不存在那就一定不存在;布隆过滤器认为key存在,却不一定真的存在,只是极有可能存在
在项目中使用Redisson实现布隆过滤器,引入依赖:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.6.5</version>
</dependency>
测试代码:
package com.redisson;
import org.redisson.Redisson;
import org.redisson.api.RBloomFilter;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
public class RedissonBloomFilter {
public static void main(String[] args) {
Config config = new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
//构造Redisson
RedissonClient redisson = Redisson.create(config);
RBloomFilter<String> bloomFilter = redisson.getBloomFilter("nameList");
//初始化布隆过滤器:预计元素为100000000L,误差率为3%,根据这两个参数会计算出底层的bit数组大小
bloomFilter.tryInit(100000000L,0.03);
//将test插入到布隆过滤器中
bloomFilter.add("test");
//判断下面号码是否在布隆过滤器中
System.out.println(bloomFilter.contains("test_1"));//false
System.out.println(bloomFilter.contains("test_2"));//false
System.out.println(bloomFilter.contains("test"));//true
}
}
但是正常来说,使用布隆过滤器需要把所有数据提前放入布隆过滤器,并且在增加数据时也要往布隆过滤器里放,这样才能起到过滤作用。布隆过滤器缓存过滤伪代码:(注意:布隆过滤器不能删除数据,如果要删除得重新初始化数据)
// 第一步:初始化布隆过滤器
// 第一步:初始化布隆过滤器
// 第一步:初始化布隆过滤器
RBloomFilter<String> bloomFilter = redisson.getBloomFilter("nameList");
bloomFilter.tryInit(100000000L,0.03); //初始化布隆过滤器:预计元素为100000000L,误差率为3%
// 第二步:预先把所有数据存入布隆过滤器
// 第二步:预先把所有数据存入布隆过滤器
// 第二步:预先把所有数据存入布隆过滤器
void init(){
for (String key: keys) {
bloomFilter.put(key);
}
}
// 第三步:真正的使用场景了。Redis查询数据
// 第三步:真正的使用场景了。Redis查询数据
// 第三步:真正的使用场景了。Redis查询数据
String get(String key) {
// 先从布隆过滤器这一级缓存判断下key是否存在,防止缓存穿透
Boolean exist = bloomFilter.contains(key);
if(!exist){
return "";
}
// 从缓存中获取数据
String cacheValue = cache.get(key);
// 缓存为空
if (StringUtils.isBlank(cacheValue)) {
// 从存储中获取
String storageValue = storage.get(key);
cache.set(key, storageValue);
// 如果存储数据为空, 需要设置一个过期时间(300秒)
if (storageValue == null) {
// 缓存空对象
cache.set(key, new TargetObject());
cache.expire(key, 60 * 5);
}
return storageValue;
} else {
// 缓存非空
return cacheValue;
}
}
1.3 缓存雪崩
Q1:什么是缓存雪崩?
答:缓存雪崩指的是缓存层支撑不住或宕掉后,流量会像奔逃的野牛一样,打向后端存储层,然后一层一层的滚雪球导致各个层宕机。由于缓存层承载着大量请求,有效地保护了存储层,但是如果缓存层由于某些原因不能提供服务(比如超大并发过来,缓存层支撑不住,或者由于缓存设计不好,类似大量请求访问bigkey,导致缓存能支撑的并发急剧下降),于是大量请求都会打到存储层,存储层的调用量会暴增,造成存储层也会级联宕机的情况。
记忆点:缓存层撑不住,大量流量打向服务层,导致整体拒绝服务,或者级联宕机
Q2:导致缓存雪崩可能的原因是什么?
答:缓存层没有起到缓存作用,或者缓存作用没有达到预期。比如:
- 大量大key操作,长时间阻塞了Redis请求
Q3:缓存雪崩如何解决?
答:有如下几种方式:
- 保证缓存服务的高可用。比如搭建redis集群,使用Redis Sentinel或Redis Cluster
- 使用隔离组件为后端限流熔断并降级。比如使用Sentinel或Hystrix组件,区分非核心数据和核心数据的处理方式
- 比如服务降级,我们可以针对不同的数据采取不同的处理方式。当业务应用访问的是非核心数据(例如电商商品属性,用户信息等)时,暂时停止从缓存中查询这些数据,而是直接返回预定义的默认降级信息、空值或是错误提示信息;当业务应用访问的是核心数据(例如电商商品库存)时,仍然允许查询缓存,如果缓存缺失,也可以继续通过数据库读取
- 提前演练。在项目上线前, 演练缓存层宕掉后, 应用以及后端的负载情况以及可能出现的问题, 在此基础上做一些预案设定
二、缓存方案
2.1 普通缓存
基本介绍
查询数据时先查找缓存,如果有,则延长缓存时间并返回;如果没有,再去查找数据库,将查询的数据再写到缓存,同时设置过期时间。如果是静态热点数据,可以不设置缓存失效时间。
对应的伪代码:
public Target getTartget(String id) {
String key = getRedisKey(id);
String json = redisUtil.get(key);
if (StrUtil.isNotEmpty(json)) {
// 延长缓存时间
redisUtil.expire(key, 新的缓存时间);
return JSONObject.parse(json, Target.class);
}
Target target = dbUtil.get(id);
if (target != null) {
// 这里根据是否为热点数据,可以考虑是否要设置过期时间
redisUtil.set(key, JSONObject.toJSONString(target), 缓存时间);
}
return target;
}
2.2 冷热分离
基本介绍
在服务降级时,根据冷热数据做不同的处理。这个方案其实我们在上面【缓存雪崩】讲过大致的过程了。但是对于冷热数据需要大家自己根据业务去区分。这里给一点思路:
- 所谓冷数据:我们不常点击,不常使用的数据。例如美团APP,用户个人信息,就是冷数据;
- 所谓热数据:我们常点击,常使用的数据。还是美团APP。首页推广信息肯定是热数据,无论谁点开美团APP首页,显示的数据就是那些。如果这些数据每次都从数据库拿,那得多慢…
比如服务降级,我们可以针对不同的数据采取不同的处理方式。当业务应用访问的是非核心数据(例如电商商品属性,用户信息等)时,暂时停止从缓存中查询这些数据,而是直接返回预定义的默认降级信息、空值或是错误提示信息;当业务应用访问的是核心数据(例如电商商品库存)时,仍然允许查询缓存,如果缓存缺失,也可以继续通过数据库读取
2.3 多级缓存
基本介绍
比如在redis前再加一级缓存JVM,一般是通过map存储数据。可以类似redis方案更新缓存,也可以使用redis的发布订阅功能、MQ、canal来实现与数据库的同步。也可以单独部署热点缓存系统,监测到热点数据主动同步到分布式系统中。
值得注意的是,因为多引入了一级缓存,那在分布式的情况下,势必需要通过通信的方式更新新的一级缓存(就像Redis集群使用gossip协议更新节点一样)。通信就势必存在时间消耗,进而导致:短期的数据不一致性。所以建议大家使用多级缓存时,就不要就觉绝对一致了,否则会增加更大的维护成本。
2.4 缓存预热
基本介绍
热点重建缓存时,通过双检锁重建缓存:先查询,不存在需要重建缓存,重建缓存逻辑加入分布式锁,仅有一个请求能重建缓存,重建完成后,后面的请求都能获取到数据了。
对应的伪代码:
public Target getTartget(String id) {
String key = getRedisKey(id);
String json = redisUtil.get(key);
if (StrUtil.isNotEmpty(json)) {
// 延长缓存时间
redisUtil.expire(key, 新的缓存时间);
return JSONObject.parse(json, Target.class);
}
// 获取分布式锁
RLock rLock = redisson.getLock(lockKey);
rLock.lock();
try {
// 由于我们在外层已经设置了分布式锁,所以我们知道的
// 只要有一个去查库,然后设置到缓存,其他的没必要再去查库了
// 通过双重检查,我们把原本n个查库的请求,转变成了1个查库,n-1查缓存
// 再次检查缓存
if (StrUtil.isNotEmpty(json)) {
// 延长缓存时间
redisUtil.expire(key, 新的缓存时间);
return JSONObject.parse(json, Target.class);
}
Target target = dbUtil.get(id);
if (target != null) {
// 这里根据是否为热点数据,可以考虑是否要设置过期时间
redisUtil.set(key, JSONObject.toJSONString(target), 缓存时间);
}
} finaly {
rLock.unlock();
}
return target;
}
通过上面的例子我们可以看到,通过【缓存预热】,或者说【双检索】思想,我们把原本n
个查库的请求,转变成了1
个查库,n-1
个查缓存
*课程内容
同学们有没有一种感觉,目前我们Java技术栈,出现的各种中间件也好,框架也好,似乎都是为了【电商场景】服务的。太难了…
一、一个案例引发的思考(电商场景)
public class ProductService {
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Autowired
private Redisson redisson;
// 缓存时间12小时
public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 12;
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT , TimeUnit.SECONDS);
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT , TimeUnit.SECONDS);
}
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return product;
}
}
上面的例子很简单,就是CRUD而已。但是为了防止大家没注意到细节,我这边拎出重点给大家讲讲:
- 缓存时间是12小时(说三遍)
- 缓存时间是12小时(说三遍)
- 缓存时间是12小时(说三遍)
- 创建商品的时候,也会把数据设置到缓存里面,时间固定为12小时
- 更新商品的时候,也会把数据设置到缓存里面,时间固定为12小时
- 获取商品的时候,如果Redis中没有数据,则会从数据库中读数据,当数据存在时还会设置到缓存里面,时间固定为12小时
我想,大家对于Redis缓存,很多人的使用都止于上述几个get
、create
、update
方法的使用了吧?反正博主我就是,啊哈哈
如果你也是这么写的话,我想问问大伙儿,你们有没有思考过这里面会存在什么问题没有?我想大家也是看过我【前置知识】的各种缓存问题跟方案介绍了,应该多少能写一两点问题出来吧。
建议小伙伴们可以拿出纸和笔好好写一下。尽可能地写出来,也算是对自己的一种锻炼了。我个人能看到的问题有如下:(大流量下)
create
方法的缓存时间是固定的,在批量插入场景下,可能会出现批量缓存同时过期,造成【缓存击穿(失效)】现象get
方法在成功拿到缓存数据返回之后,没有延长缓存时间。有可能出现【热点数据】,或者最近使用过的数据缓存过期,造成【缓存击穿(失效)】现象get
方法在没有拿到缓存,去查询数据库的时候,可能存在大量请求同时请求数据库,数据库压力暴增get
方法对于Redis跟数据库都不存在的数据没有防备,可能会造成【缓存穿透】现象- … … (目前我能想到的就这些了)
好了,问题已经发现了,那让我们一起学习一下,在大厂里面,是如何写这些代码的!
首先,还是希望大家时刻对背景有个简单的印象:
- 分布式,多个tomcat
- 多线程,高并发,动不动就百万流量,吹牛逼都很爽
二、代码优化方案的演进
2.1 问题一:纯【读】的场景下缓存固定时间后会过期
我们来看看get
方法:
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return product;
}
问题分析
首先,我们在创建该商品的缓存的时候是给了一个固定的过期时间的,12
小时嘛。然后大家看,在没有调用update
更新方法的前提下,单纯只是调用get
获取数据的时候,数据的缓存时间是不会被更新的,所以它一定会在12
小时后过期。
在这种情况下,假设持续有大流量进来访问这个key,那么,在缓存过期失效的那一刻,很可能会突然有大量流量打到我们的Mysql层,是吧?
解决方案
对于这个问题的解决方案,很容易能想到。就是每次从缓存读取之后,适当的给缓存续上时间或者重新设置缓存时间即可。这种方式,就是一种简单的【冷热数据】分离。修改后的代码如下:
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
}
return product;
}
小总结
【冷热数据】的核心思想其实就是,尽可能地让【热数据】留在缓存中,而【冷数据】则尽量不缓存或者不处理延迟、刷新缓存的业务
2.2 问题二:批量上传缓存过期时间一样导致【缓存击穿】
我们先来看看create
方法:
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult), PRODUCT_CACHE_TIMEOUT , TimeUnit.SECONDS);
return productResult;
}
问题分析
当我们使用这个接口进行批量上传的时候(假设1K个),显然缓存过期时间都会被设置成PRODUCT_CACHE_TIMEOUT =12
小时。正常来说这1K个前后差距不会超过1秒。这可能会导致什么?显然这1K多个缓存,在12小时后同时失效,如果在失效后同时有大量流量进来查询这1K个key,那就导致了对这1K多个数据的访问同时打到了Mysql上,这就是【缓存击穿】。
解决方案
这个问题解决方式也不难。既然数据会成片失效,那就避免成片失效,就算是失效也尽可能让他们分散好了。所以,给这些缓存时间再+一个随机时间即可。改正后代码如下:
public class ProductService {
@Autowired
private ProductDao productDao;
@Autowired
private RedisUtil redisUtil;
@Autowired
private Redisson redisson;
// 缓存时间12小时
public static final Integer PRODUCT_CACHE_TIMEOUT = 60 * 60 * 12;
@Transactional
public Product create(Product product) {
Product productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult), genProductCacheTimeout(), TimeUnit.SECONDS);
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult), genProductCacheTimeout(), TimeUnit.SECONDS);
}
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
}
private Integer genProductCacheTimeout() {
return PRODUCT_CACHE_TIMEOUT + new Random().nextInt(5) * 60 * 60;
}
注意这里新增了一个方法genProductCacheTimeout
,方法实现就i是在原时间基础PRODUCT_CACHE_TIMEOUT
上+随机时间。然后,用这个方法,代替原本的过期时间,替换到所有的get
、create
、update
上
小总结
- 【缓存击穿】是Redis没数据,Mysql有数据
- 【缓存击穿】的关键是原本存在的缓存数据同时大量过期,所以针对【同时】提出解决方式就好
2.3 问题三:没有过滤无效请求或者处理空数据导致【缓存穿透】
继续来看2.1优化之后的get
方法:
public Product get(Long productId) throws InterruptedException {
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
问题分析
假设,前端传过来一个不存在的数据,productId=-99999999
这种后端不存在的数据,按照现在的逻辑,你发现没有,它还是会按部就班地去查Redis,然后再查数据库。最后两边都没有,返回空。
你是不是在想,有谁这么无聊查这种无意义的数据呢?确实没有人会这么无聊,但是有人会故意这么干,那就是黑客的攻击。试想,如果黑客用类似Jmeter的压测工具,重复查询这个不存在的数据,我们系统会怎样?【缓存穿透】呗,甚至可能把你的Redis跟Mysql的资源都耗尽,最后挂了。这个问题怎么解决?其实我们最开始说过了,其中有一种叫做【布隆过滤器】的,这边就不介绍了,感兴趣的同学自己去看看。
这边使用的是:给无效数据一个缓存,并且设置较短的过期时间,防止循环查询
public static final String EMPTY_CACHE = "{}";
public Product get(Long productId) throws InterruptedException {
if (productId < 0) {
return null;
}
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
注意上面的源码,有3个关键点:
- 做了一个
if (productId < 0)
判断。当然这里需要根据自己的业务代码做校验,我这里只是简单演示 - 新增了一个
String static final EMPTY_CACHE = "{}";
,值设置成{}
是因为我们存储的数据是Json格式。然后在数据库中查询不到数据的时候,写入缓存中,并且设置了过期时间 - 上面的过期时间的设置也是
genEmptyCacheTimeout
随机的,大家能猜到为什么要设置随机吗?其实跟2.2
一样啦,防止【缓存击穿】
但是上面对空数据
的缓存还不是最完美的。如果,黑客持续不间断的做无效查询,那等到我们的空缓存过期的时候他还是会再次【穿透】我们。所以可以采用跟2.1
的【冷热数据】方案一样,给个续命逻辑。改正后如下:
public Product get(Long productId) throws InterruptedException {
if (productId < 0) {
return null;
}
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
if (productJson.equals(EMPTY_CACHE)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return null;
}
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
小总结
- 【缓存穿透】是Redis没数据,Mysql也没有数据
- 【缓存穿透】是很容易被利用的一个系统弱点,所以可以适当的多写一点代码,做防护
2.4 问题四:冷门数据突然爆热导致高并发创建
这个问题我个人觉得很有意思。其实也是我个人在使用Redis的get
方法的时候早就想过的问题,但是没有去追究解决办法。
我们前面有说过,正常电商场景,只会对一些比较热门的数据预先做Reids缓存。如果哪一天突然来了一个冷门数据爆单呢?还别说,现在的直播带货真有可能出现。
我们先看看目前的get
方法:
public Product get(Long productId) throws InterruptedException {
if (productId < 0) {
return null;
}
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
if (productJson.equals(EMPTY_CACHE)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return null;
}
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
return product;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
问题分析
像上面的代码,如果来了一个我们前面说到的没有缓存的【冷门数据】突然爆热,你想,会有大量的请求执行到product = productDao.get(productId);
查数据库,然后写入到缓存,对吧?这里还没考虑Mysql压力突然增大可能引起的问题呢!
解决方案
其实解决方案前面已经说过了,聪明的同学估计已经反应过来了,那就是缓存方案中的【缓存预热】。
【缓存预热】:热点重建缓存时,通过双检锁重建缓存。先查询,不存在需要重建缓存,重建缓存逻辑加入分布式锁,仅有一个请求数据库,然后设置缓存。重建完成后,后面的请求都能从缓存中获取到数据了。
代码如下:
public static final String LOCK_PRODUCT_HOT_CACHE_PREFIX = "lock:product:hot_cache:";
public Product get(Long productId) throws InterruptedException {
if (productId < 0) {
return null;
}
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
if (productJson.equals(EMPTY_CACHE)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return null;
}
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
// 获取分布式锁
String lockKey = LOCK_PRODUCT_HOT_CACHE_PREFIX + productId;
RLock rLock = redisson.getLock(lockKey);
rLock.lock();
try {
// 再次从缓存拿数据,有就直接返回
String productJson = redisUtil.get(productCacheKey);
if (StrUtil.isNotEmpty(productJson)) {
if (productJson.equals(EMPTY_CACHE)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return null;
}
// 每次拿完数据,都给做一个刷新缓存时间的操作,让热数据尽可能留在缓存
redisUtil.expire(productCacheKey, PRODUCT_CACHE_TIMEOUT, TimeUnit.SECONDS);
product = JSONObject.parseObject(productJson, Product.class);
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
return product;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
我们优化一下上面的冗余代码,变成如下:
public Product get(Long productId) throws InterruptedException {
if (productId < 0) {
return null;
}
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
// 获取分布式锁
String lockKey = LOCK_PRODUCT_HOT_CACHE_PREFIX + productId;
RLock rLock = redisson.getLock(lockKey);
rLock.lock();
try {
// 再次从缓存拿数据,有就直接返回
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
rLock.unlock();
}
return product;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
private Product getProductFromCache(String productCacheKey) {
Product product = productMap.get(productCacheKey);
if (product != null) {
return product;
}
String productStr = redisUtil.get(productCacheKey);
// 区分对待空,防止不存在缓存穿透
if (!StringUtils.isEmpty(productStr)) {
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return null;
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS); //读延期
}
return product;
}
顾名思义,双重检查锁,肯定是有锁的,这边考虑到分布式环境,所以用的是分布式锁。双重检查,就是双重检查缓存。大家看看锁里面的代码就i知道了。但是目前这个方案是有问题的,他其实会带来一个新的问题,所以需要在细节上改一改。
由于我们之前在做空缓存的时候,有个判断:
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return null;
}
看,这里因为空缓存直接返回了null
,所以,会导致,在锁内部双重检查的时候,对于空缓存继续去查库,这显然不是我们想要的。
所以,需要修改空缓存的时候,返回一个空对象,而不是null
。如下:
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return new Product();
}
当然啦,你这么修改之后也得通知前端小伙伴一起修改哦,不然可能会拿一些无意义的数据,然后不知道做什么去了。
小总结
- 当前问题的关键在于,如何避免大量流量并发查库、重复创建缓存。而处理并发的一个方式就是加锁让并行变串行
- 双重检查锁,有锁,然后又是分布式环境,首选是分布式锁
- 双重检查,是一个很有意思的思路。通过双重检查,让原本N个查库的请求,变成了1个查库 + (N-1)个查Redis
2.5 问题五:缓存与数据库双写不一致(有点难度)
在大并发下,同时操作数据库与缓存会存在数据不一致性问题。在分析问题之前,我们先介绍一下什么是双写不一致,及导致其出现的原因。
问题分析
Q1:什么是双写不一致?
答:这个问题导致的原因通常有:双写导致的不一致;读写并发导致的不一致。文字不好描述他们的产生原因,所以直接给一张图大家看看,清晰明了。
【双写导致的不一致】如下:
【读写并发导致的不一致】如下:(下面是写后删数据的策略,其实写后更新缓存,然后我们调整一下下图的一些操作的次序,还是可以引发双写不一致问题的)
不知道大家发现规律没有,【双写不一致】通常都是发生在==【不同的线程之间】,然后【并发】发生:一个在写数据库,一个在写缓存的时候==。
也就是说,【双写不一致】的产生条件之一有:并发。你懂我意思吧?解决并发就解决了这个问题!
解决方案
- 对于并发几率很小的数据(如个人维度的订单数据、用户数据等),这种几乎不用考虑这个问题,很少会发生缓存不一致,可以给缓存数据加上过期时间,每隔一段时间触发读的主动更新即可
- 就算并发很高,如果业务上能容忍短时间的缓存数据不一致(如商品名称,商品分类菜单等),缓存加上过期时间依然可以解决大部分业务对于缓存的要求
- 如果不能容忍缓存数据不一致,可以通过加分布式读写锁保证并发读写或写写的时候按顺序排好队,读读的时候相当于无锁
- 也可以用阿里开源的canal通过监听数据库的binlog日志及时的去修改缓存,但是引入了新的中间件,增加了系统的复杂度
下面我们给一个,加上分布式读写锁之后的解决方案代码:
@Transactional
public Product create(Product product) {
Product productResult = null;
// 使用分布式读写锁解决 - 写锁
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
productResult = productDao.create(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
} finally {
writeLock.unlock();
}
return productResult;
}
@Transactional
public Product update(Product product) {
Product productResult = null;
// 使用分布式读写锁解决 - 写锁
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + product.getId());
RLock writeLock = readWriteLock.writeLock();
writeLock.lock();
try {
productResult = productDao.update(product);
redisUtil.set(RedisKeyPrefixConst.PRODUCT_CACHE + productResult.getId(), JSON.toJSONString(productResult),
genProductCacheTimeout(), TimeUnit.SECONDS);
} finally {
writeLock.unlock();
}
return productResult;
}
public Product get(Long productId) throws InterruptedException {
if (productId < 0) {
return null;
}
Product product = null;
String productCacheKey = RedisKeyPrefixConst.PRODUCT_CACHE + productId;
// 先从缓存拿数据,有就直接返回
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
// 获取分布式锁
String lockKey = LOCK_PRODUCT_HOT_CACHE_PREFIX + productId;
RLock rLock = redisson.getLock(lockKey);
rLock.lock();
try {
// 再次从缓存拿数据,有就直接返回
product = getProductFromCache(productCacheKey);
if (product != null) {
return product;
}
// 主要是针对数据库,使用分布式读写锁解决 - 读锁,当读的时候并行执行,与写锁互斥
RReadWriteLock readWriteLock = redisson.getReadWriteLock(LOCK_PRODUCT_UPDATE_PREFIX + productId);
RLock readLock = readWriteLock.readLock();
readLock.lock();
try {
product = productDao.get(productId);
if (product != null) {
redisUtil.set(productCacheKey, JSON.toJSONString(product), genProductCacheTimeout(), TimeUnit.SECONDS);
} else {
redisUtil.set(productCacheKey, EMPTY_CACHE, genEmptyCacheTimeout(), TimeUnit.SECONDS);
}
} finally {
readLock.unlock();
}
} finally {
rLock.unlock();
}
return product;
}
private Integer genEmptyCacheTimeout() {
return 60 + new Random().nextInt(30);
}
private Product getProductFromCache(String productCacheKey) {
Product product = productMap.get(productCacheKey);
if (product != null) {
return product;
}
String productStr = redisUtil.get(productCacheKey);
// 区分对待空,防止不存在缓存穿透
if (!StringUtils.isEmpty(productStr)) {
if (EMPTY_CACHE.equals(productStr)) {
redisUtil.expire(productCacheKey, genEmptyCacheTimeout(), TimeUnit.SECONDS);
return null;
}
product = JSON.parseObject(productStr, Product.class);
redisUtil.expire(productCacheKey, genProductCacheTimeout(), TimeUnit.SECONDS); //读延期
}
return product;
}
这里主要用到的是读写锁。这是一个降低锁粒度的锁设计,对读写锁不熟悉的朋友,可以去看看,这玩意用好了也是一个提升。
但是有一个点可能没有经验的朋友会懵。为什么我在创建和更新的地方也要加上这个读写锁?其实这里我偷懒了,一下子过渡到读写锁让大家陷入了对【读写锁】未知的恐惧中(读写锁在这里是为了替代使用普通所,提升锁的性能,降低锁的粒度)。
其实我们理解的时候,可以吧上面的读写锁,看成是一个普通的分布式锁试试,【读写都互斥】。那这样的结果是什么?其实就是,让【读数据库+写缓存】和【写数据库+更新缓存】从并行变成串行互斥的了。即:
要么我先读,读完再写缓存;然后才允许其他线程写数据库,接着更新缓存。
要么你先写数据库,写完更新缓存;然后我再读数据库,接着写缓存。
2.6 问题六:对2.5的巧妙优化
其实从问题递进角度来说,这一个问题,或者优化应该是写在2.5之后的。但是,这个方案真的让我有点“拍案叫绝”的感叹,所以我想着单独拎出来给大家说。真的非常有意思,也非常巧妙的一个优化。优化的地方,其实是下面红框处这一步:
就像上面说的那样,加了分布式锁之后,那锁后面的代码,将由【并行】改成【串行】执行。也就是说,假设有100W个请求进来了,那这100W个请求将依次执行,接着进入try
里面的代码块。
那其实大家有没有想过一点,其实我们try
里面重建索引这个过程,也许3秒钟,甚至1秒钟就能完成的。那,我还有必要让后面的所有请求都阻塞在那里排队吗?我们不妨把代码改成如下:
// 获取分布式锁
String lockKey = LOCK_PRODUCT_HOT_CACHE_PREFIX + productId;
RLock rLock = redisson.getLock(lockKey);
rLock.trylock(3, TimeUnit.SECOND);
看,很简单,就是把lock
改成tryLock
了而已。大家还记得tryLock
的特点吗?就是到了一定时间之后,不管有没有拿到锁,都不再等待了(【串行】转【并发】)。接着就会继续往前走,接着就会走到【双重检查缓存】的第二段【获取缓存】的地方,接着被缓存拦下来。牛逼吧??!!
但是有一个要注意到的点,一定要确保,你的tryLock
时间足够安全,不然很可能会演变成灾难。
2.7 问题七:大量请求访问Redis导致【缓存雪崩】
我们前面说的很多问题都是大量访问Mysql导致的,但其实,如果有大量流量访问Redis,Redis也会有扛不住的时候,毕竟QPS 10W+就摆在这里。
比如,微博突然爆发一个惊天消息:【华为mate60携带国产麒麟9000问世】的时候,微博的压力一下子就上来了。那瞬时流量,搞不好破亿都有。那这种情况怎么解决?看最前面【为什么要用Redis】说的。
对,多加一级JVM级别的缓存就好了,这样基本上是尽力了。
学习总结
- 学习了一些Redis缓存架构的性能优化
- 学习了一些精妙的设计,如双检查机制,和对
tryLock
的使用
感谢
感谢我东哥的文章《一线大厂Redis高并发缓存架构实战与性能优化》