1、项目优化-redis缓存
2、缓存穿透
使用缓存后代码的性能有了很大的提高,虽然性能有很大的提升但是控制台打出了很多“从数据库查询”的日志,明明判断了如果缓存存在课程信息则从缓存查询,为什么要有这么多从数据库查询的请求的?
这是因为并发数高,很多线程会同时到达查询数据库代码处去执行。
我们分析下代码:
如果存在恶意攻击的可能,如果有大量并发去查询一个不存在的课程信息会出现什么问题呢?
比如去请求/content/course/whole/181,查询181号课程,该课程并不在课程发布表中。
进行压力测试发现会去请求数据库。
大量并发去访问一个数据库不存在的数据,由于缓存中没有该数据导致大量并发查询数据库,这个现象要缓存穿透。
缓存穿透可以造成数据库瞬间压力过大,连接数等资源用完,最终数据库拒绝连接不可用。
3、解决缓存穿透
如何解决缓存穿透?
1、对请求增加校验机制
比如:课程Id是长整型,如果发来的不是长整型则直接返回。
2、使用布隆过滤器
什么是布隆过滤器,以下摘自百度百科:
布隆过滤器可以用于检索一个元素是否在一个集合中。如果想要判断一个元素是不是在一个集合里,一般想到的是将所有元素保存起来,然后通过比较确定。链表,树等等数据结构都是这种思路. 但是随着集合中元素的增加,我们需要的存储空间越来越大,检索速度也越来越慢(O(n),O(logn))。不过世界上还有一种叫作散列表(又叫哈希表,Hash table)的数据结构。它可以通过一个Hash函数将一个元素映射成一个位阵列(Bit array)中的一个点。这样一来,我们只要看看这个点是不是1就可以知道集合中有没有它了。这就是布隆过滤器的基本思想。
布隆过滤器的特点是,高效地插入和查询,占用空间少;查询结果有不确定性,如果查询结果是存在则元素不一定存在,如果不存在则一定不存在;另外它只能添加元素不能删除元素,因为删除元素会增加误判率。
比如:将商品id写入布隆过滤器,如果分3次hash此时在布隆过滤器有3个点,当从布隆过滤器查询该商品id,通过hash找到了该商品id在过滤器中的点,此时返回1,如果找不到一定会返回0。
所以,为了避免缓存穿透我们需要缓存预热将要查询的课程或商品信息的id提前存入布隆过滤器,添加数据时将信息的id也存入过滤器,当去查询一个数据时先在布隆过滤器中找一下如果没有到到就说明不存在,此时直接返回。
实现方法有:
Google工具包Guava实现。
redisson 。
3、缓存空值或特殊值
请求通过了第一步的校验,查询数据库得到的数据不存在,此时我们仍然去缓存数据,缓存一个空值或一个特殊值的数据。
但是要注意:如果缓存了空值或特殊值要设置一个短暂的过期时间
。
public CoursePublish getCoursePublishCache(Long courseId) {
//查询缓存
Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);
if(jsonObj!=null){
String jsonString = jsonObj.toString();
if(jsonString.equals("null"))
return null;
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
} else {
//从数据库查询
System.out.println("从数据库查询数据...");
CoursePublish coursePublish = getCoursePublish(courseId);
//设置过期时间300秒
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),30, TimeUnit.SECONDS);
return coursePublish;
}
}
再测试,虽然还存在个别请求去查询数据库,但不是所有请求都去查询数据库,基本上都命中缓存。
4、缓存雪崩
4.1 什么是缓存雪崩
缓存雪崩是缓存中大量key失效后当高并发到来时导致大量请求到数据库,瞬间耗尽数据库资源,导致数据库无法使用。
造成缓存雪崩问题的原因是是大量key拥有了相同的过期时间,比如对课程信息设置缓存过期时间为10分钟,在大量请求同时查询大量的课程信息时,此时就会有大量的课程存在相同的过期时间,一旦失效将同时失效,造成雪崩问题。
5、 解决缓存雪崩
如何解决缓存雪崩?
1、使用同步锁控制查询数据库的线程
使用同步锁控制查询数据库的线程,只允许有一个线程去查询数据库,查询得到数据后存入缓存。
synchronized(obj){
//查询数据库
//存入缓存
}
2、对同一类型信息的key设置不同的过期时间
通常对一类信息的key设置的过期时间是相同的,这里可以在原有固定时间的基础上加上一个随机时间使它们的过期时间都不相同。
示例代码如下:
//设置过期时间300秒
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish),300+new Random().nextInt(100), TimeUnit.SECONDS);
3、缓存预热
不用等到请求到来再去查询数据库存入缓存,可以提前将数据存入缓存。使用缓存预热机制通常有专门的后台程序去将数据库的数据同步到缓存。
6、缓存击穿
缓存击穿是指大量并发访问同一个热点数据,当热点数据失效后同时去请求数据库,瞬间耗尽数据库资源,导致数据库无法使用。
比如某手机新品发布,当缓存失效时有大量并发到来导致同时去访问数据库。
7、解决缓存击穿
如何解决缓存击穿?
1、使用同步锁控制查询数据库的线程
使用同步锁控制查询数据库的代码,只允许有一个线程去查询数据库,查询得到数据库存入缓存。
synchronized(obj){
//查询数据库
//存入缓存
}
public CoursePublish getCoursePublishCache(Long courseId) {
//从缓存中查询
Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);
//缓存中有
if(jsonObj!=null){
// System.out.println("=============从缓存中查询=============");
//缓存中有直接返回数据
String jsonString = jsonObj.toString();
if("null".equals(jsonString)){
return null;
}
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
}else{
synchronized (this){
//再次查询一下缓存
//从缓存中查询
jsonObj = redisTemplate.opsForValue().get("course:" + courseId);
//缓存中有
if(jsonObj!=null) {
//缓存中有直接返回数据
String jsonString = jsonObj.toString();
if("null".equals(jsonString)){
return null;
}
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
}
System.out.println("==查询数据库==");
//从数据库查询
CoursePublish coursePublish = getCoursePublish(courseId);
//查询完成再存储到redis
redisTemplate.opsForValue().set("course:"+courseId,JSON.toJSONString(coursePublish),300, TimeUnit.SECONDS);
return coursePublish;
}
}
2、热点数据不过期
可以由后台程序提前将热点数据加入缓存,缓存过期时间不过期,由后台程序做好缓存同步。
8、分布式锁
本地锁的问题
上边的程序使用了同步锁解决了缓存击穿、缓存雪崩的问题,保证同一个key过期后只会查询一次数据库。
如果将同步锁的程序分布式部署在多个虚拟机上则无法保证同一个key只会查询一次数据库,如下图:
一个同步锁程序只能保证同一个虚拟机中多个线程只有一个线程去数据库,如果高并发通过网关负载均衡转发给各个虚拟机,此时就会存在多个线程去查询数据库情况,因为虚拟机中的锁只能保证该虚拟机自己的线程去同步执行,无法跨虚拟机保证同步执行。
我们将虚拟机内部的锁叫本地锁,本地锁只能保证所在虚拟机的线程同步执行。
下边进行测试:
启动三个内容管理服务:
通过网关访问课程查询,网关通过负载均衡将请求转发给三个服务。
通过测试发现,有两个服务各有一次数据库查询,这说明本地锁无法跨虚拟机保证同步执行。
9、什么是分布锁
本地锁只能控制所在虚拟机中的线程同步执行,现在要实现分布式环境下所有虚拟机中的线程去同步执行就需要让多个虚拟机去共用一个锁,虚拟机可以分布式部署,锁也可以分布式部署,如下图:
虚拟机都去抢占同一个锁,锁是一个单独的程序提供加锁、解锁服务,谁抢到锁谁去查询数据库。
该锁已不属于某个虚拟机,而是分布式部署,由多个虚拟机所共享,这种锁叫分布式锁。
10、分布式锁的实现方案
实现分布式锁的方案有很多,常用的如下:
1、基于数据库实现分布锁
利用数据库主键唯一性的特点,或利用数据库唯一索引的特点,多个线程同时去插入相同的记录,谁插入成功谁就抢到锁。
2、基于redis实现锁
redis提供了分布式锁的实现方案,比如:SETNX、set nx、redisson等。
拿SETNX举例说明,SETNX命令的工作过程是去set一个不存在的key,多个线程去设置同一个key只会有一个线程设置成功,设置成功的的线程拿到锁。
3、使用zookeeper实现
zookeeper是一个分布式协调服务,主要解决分布式程序之间的同步的问题。zookeeper的结构类似的文件目录,多线程向zookeeper创建一个子目录(节点)只会有一个创建成功,利用此特点可以实现分布式锁,谁创建该结点成功谁就获得锁。
11、Redis----setnx 实现分布式锁
if(缓存中有){
返回缓存中的数据
}else{
获取分布式锁
if(获取锁成功){
try{
查询数据库
}finally{
释放锁
}
}
}
在调用setnx命令设置key/value时,每个线程设置不一样的value值,这样当线程去删除锁时可以先根据key查询出来判断是不是自己当时设置的vlaue,如果是则删除。
这整个操作是原子的,实现方法就是去执行上边的lua脚本。
Lua 是一个小巧的脚本语言,redis在2.6版本就支持通过执行Lua脚本保证多个命令的原子性。
什么是原子性?
这些指令要么全成功要么全失败。
以上就是使用Redis Nx方式实现分布式锁,为了避免删除别的线程设置的锁需要使用redis去执行Lua脚本的方式去实现,这样就具有原子性,但是过期时间的值设置不存在不精确的问题。
12、Redisson实现分布式锁(看门狗机制)
Redisson相比set nx实现分布式锁要简单的多,工作原理如下:
•加锁机制
线程去获取锁,获取成功: 执行lua脚本,保存数据到redis数据库。
线程去获取锁,获取失败: 一直通过while循环尝试获取锁,获取成功后,执行lua脚本,保存数据到redis
•WatchDog自动延期看门狗机制
第一种情况:在一个分布式环境下,假如一个线程获得锁后,突然服务器宕机了,那么这个时候在一定时间后这个锁会自动释放,你也可以设置锁的有效时间(当不设置默认30秒时),这样的目的主要是防止死锁的发生
第二种情况:线程A业务还没有执行完,时间就过了,线程A 还想持有锁的话,就会启动一个watch dog后台线程,不断的延长锁key的生存时间。
•lua脚本-保证原子性操作
主要是如果你的业务逻辑复杂的话,通过封装在lua脚本中发送给redis,而且redis是单线程的,这样就保证这段复杂业务逻辑执行的原子性
具体使用RLock操作分布锁,RLock继承JDK的Lock接口,所以他有Lock接口的所有特性,比如lock、unlock、trylock等特性,同时它还有很多新特性:强制锁释放,带有效期的锁,。
//使用redisson实现分布式锁
@Override
public CoursePublish getCoursePublishCache(Long courseId) {
//从缓存中查询
Object jsonObj = redisTemplate.opsForValue().get("course:" + courseId);
//缓存中有
if (jsonObj != null) {
// System.out.println("=============从缓存中查询=============");
//缓存中有直接返回数据
String jsonString = jsonObj.toString();
if ("null".equals(jsonString)) {
return null;
}
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
} else {
RLock lock = redissonClient.getLock("coursequerylock:" + courseId);
//获取分布式锁
lock.lock();
try {
//再次查询一下缓存
//从缓存中查询
jsonObj = redisTemplate.opsForValue().get("course:" + courseId);
//缓存中有
if (jsonObj != null) {
//缓存中有直接返回数据
String jsonString = jsonObj.toString();
if ("null".equals(jsonString)) {
return null;
}
CoursePublish coursePublish = JSON.parseObject(jsonString, CoursePublish.class);
return coursePublish;
}
System.out.println("==查询数据库==");
// try {
// //手动延迟,测试锁的续期功能
// Thread.sleep(60000);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
//从数据库查询
CoursePublish coursePublish = getCoursePublish(courseId);
//查询完成再存储到redis
redisTemplate.opsForValue().set("course:" + courseId, JSON.toJSONString(coursePublish), 300, TimeUnit.SECONDS);
return coursePublish;
} finally {
//手动释放锁
lock.unlock();
}
}
lock():
•此方法为加锁,但是锁的有效期采用默认30秒
•如果主线程未释放,且当前锁未调用unlock方法,则进入到watchDog机制
•如果主线程未释放,且当前锁调用unlock方法,则直接释放锁