目录
1. 缓存穿透
1.1 解决方案
2. 缓存击穿
2.1 解决方案
3. 缓存雪崩
3.1 概念图及问题描述
编辑3.2 解决方案
4. 分布式锁
4.1 概念
4.2 基于redis来实现分布式锁
4.3 用idea来操作一遍redis分布式锁
4.4 分布式上锁的情况下,锁释放了服务器b中的锁问题
4.5 LUA保证删除原子性
4.6 分布式锁小总结
1. 缓存穿透
当我们用户访问浏览器从应用服务器中请求web服务的时候,这时候请求的数据,在redis缓存中访问不到,那么就会去访问数据库,如果redis一直访问不到数据,它就会一直访问数据库,导致数据库压力变大,从而导致数据库崩溃。(缓存穿透)
- 一般出现缓存穿透的原因:
- redis查询不到数据库;
- 出现很多非正常url访问(就是一直访问不存在的url路径);
其目的并不是为了得到数据,而是可能黑客利用此漏洞进行攻击,从而压垮数据库,让服务器瘫痪;
比如说一个不存在的用户id获取用户信息,不论缓存还是数据库都没有,被一直访问攻击,就会存在此问题;
1.1 解决方案
一个一定不存在缓存几查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义;
-
解决:
(1) 对空值缓存:如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(nul)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟;
(2) 设置可访问的名单(白名单):使用 bitmaps 类型定义一个可以访问的名单,名单id 作为 bitmaps 的偏移量每次访问和 bitmap 里面的 id 进行比较,如果访问 id 不在 bitmaps 里面,进行拦截,不允许访问;
(3) 采用布隆过滤器:(布隆过滤器(Bloom Filter)是 1970 年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。布隆过滤器可以用于检索一个元素是否在一个集合中。"它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难;将所有可能存在的数据哈希到一个足够大的 bitmaps 中,一个一定不存在的数据会被 这个 bitmaps 拦截掉,从而避免了对底层存储系统的查询压力;
(4) 进行实时监控:当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务;
2. 缓存击穿
- 缓存击穿问题起因:
- 数据库访问压力瞬时增加
- redis里面没有出现大量key过期
- redis正常运行
问题描述:
key对应的数据存在,但在redis中过期,此时若有大量并发请求过滤,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能回瞬间把后端DB压垮;
2.1 解决方案
(1) 预先设置热门数据:在 redis 高峰访问之前,把一些热门数据提前存入到redis 里面,加大这些热门数据 key 的时长;
(2) 实时调整:现场监控哪些数据热门,实时调整 key 的过期时长;
(3) 使用锁:
- 就是在缓存失效的时候(判断拿出来的值为空),不是立即去 load db;
- 先使用缓存工具的某些带成功操作返回值的操作(比如 Redis 的 SETNX)去set一个mutex key;
- 当操作返回成功时,再进行load db的操作,并回设缓存,最后删除mutex key;
(4) 当操作返回失败,证明有线程在load db,当前线程睡眠一段时间再重试整个get缓存的方法;
3. 缓存雪崩
3.1 概念图及问题描述
key 对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端 DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮;
缓存雪崩与缓存击穿的区别在于这里针对很多 key 缓存,前者则是某一个 key;
缓存失效时的雪崩效应对底层系统的冲击非常严重;
3.2 解决方案
采用构建多级缓存架构:
通过其它缓存来建立多道城墙,当nginx缓存不行了,换到redis缓存,以此类推。
(1) 构建多级缓存架构:nginx缓存+redis 缓存 +其他缓存(ehcache等);
(2) 使用锁或队列:用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。但是不适用高并发情况;
(3) 设置过期标志更新缓存:记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际 key 的缓存;
(4) 将缓存失效时间分散开:比如我们可以在原有的失效时间基础上增加一个随机值,比如 1-5 分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件;
4. 分布式锁
4.1 概念
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的 Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨 JVM 的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题;
-
分布式锁主流的实现方案:
基于数据库实现分布式锁;
基于缓存(Redis等);
基于 Zookeeper;
-
每一种分布式锁解决方案都有各自的优缺点:
性能:redis 最高;
可靠性:zookeeper 最高;
4.2 基于redis来实现分布式锁
设置锁:
释放锁:
避免设置锁的时候服务器挂掉来不及设置过期时间,直接连锁和过期时间一起设置:
手动设置一个锁的过期时间(这样就不需要再次del删除锁就可以设值了):
4.3 用idea来操作一遍redis分布式锁
- 给key设置值
/numValue
; - 使用锁机制,去实现资源控制
/getLock
;
package com.lei.my_redis.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@RestController
public class RedisLock {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/numValue")
public String numValue() {
redisTemplate.opsForValue().set("num", "1");
return "1";
}
/**
* 实现锁机制,如果该锁被持有者使用,未得到释放,那么取余的操作者不得使用到锁
*
* @return 打印日志
*/
@GetMapping("/getLock")
public String testLock() {
/*
1.在 Redis 中,使用 setIfAbsent 方法尝试获取锁时,如果锁的键已经存在(即锁已经被其他客户端持有),那么该操作将失败,
2.不会重置锁的过期时间或更改锁的值。如果成功获取了锁(即锁的键之前不存在),那么会将锁的键设置为指定的值,并设置一个过期时间
3.setIfAbsent锁设置:
key: 要设置的值
value: 要设置的键的值
timeout: 过期时间, 如果设置为-1, 表示该键没有过期时间(永久有效), 设置为0, 表示立即过期, 如果是整数, 就是该键在指定单位时间内过期
TimeUnit: 过期时间单位, 如 TimeUnit.SECONDS、TimeUnit.MILLISECONDS 等
*/
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock6", "10", 5, TimeUnit.SECONDS); // 1.获取锁,setnx;
if (Boolean.TRUE.equals(lock)) { // 2.获取锁成功,查询num 的值
Object value = redisTemplate.opsForValue().get("num"); // 2.1判断nun为空,return
if (ObjectUtils.isEmpty(value)) {
return "num为空";
}
int num = Integer.parseInt(value + ""); // 2.2有值就转成int
num++;
redisTemplate.opsForValue().set("num", Integer.toString(num)); // 2.3把redis的num加1
redisTemplate.delete("lock6"); // 2.4释放锁 del
return "值设置成功: " + redisTemplate.opsForValue().get("num");
} else {
try {
// Thread.sleep(100); // 3.获取锁失败,每隔0.1秒再获取
Thread.sleep(1000); // 3.获取锁失败,每隔0.1秒再获取
testLock();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return "资源正在获取中,请等待...";
}
}
4.4 分布式上锁的情况下,锁释放了服务器b中的锁问题
问题描述:
解决办法(给锁设置一个uuid):
set lock uuid nx ex 10
设uuid:仅参考
String uuid = UUID.randomUUID().toString();
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock6", uuid, 5, TimeUnit.SECONDS); // 1.获取锁,setnx;
// 判断比较uuid值是否一样
String lockUuid = (String) redisTemplate.opsForValue().get("lock6");
if (Objects.requireNonNull(lockUuid).equals(uuid)) {
redisTemplate.delete("lock6"); // 2.4释放锁 del
}
4.5 LUA保证删除原子性
在我们使用redis继续上锁的过程中是缺少原子性的;
LUA脚本的作用支持java也支持C++,其中支持原子性;
可以用LUA脚本来解决误删key;
4.6 分布式锁小总结
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
-互斥性:在任意时刻,只有一个客户端能持有锁;
-不会发生死锁:即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁;
-解铃还须系铃人:加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了;
-加锁和解锁必须具有原子性;