文章目录
- 前言
- 一、本地缓存与分布式缓存
- 1.1 使用缓存
- 1.2 本地缓存
- 1.3 本地模式在分布式下的问题
- 1.4 分布式缓存
- 二、整合redis测试
- 2.1 引入依赖
- 2.2 配置信息
- 2.3 测试
- 三、改造三级分类业务
- 3.1 代码改造
- 四、高并发下缓存失效问题
- 4.1 缓存穿透
- 4.2 缓存雪崩
- 4.3 缓存击穿
- 五、分布式下加锁
- 5.1 分布式锁示意图
- 5.2 锁的时序问题
前言
本文继续记录B站谷粒商城项目视频 P151-157 的内容,做到知识点的梳理和总结的作用。
一、本地缓存与分布式缓存
1.1 使用缓存
为了系统性能的提升,我们一般都会将部分数据放入缓存中,加速访问。而 db 承担数据落盘工作。
哪些数据适合放入缓存?
- 即时性、数据一致性要求不高的
- 访问量大且更新频率不高的数据(读多,写少)
举例:电商类应用,商品分类,商品列表等适合缓存并加一个失效时间(根据数据更新频率来定),后台如果发布一个商品,买家需要 5 分钟才能看到新的商品一般还是可以接受的。
伪代码
data = redisTemplate.opsForValue().get(redisKey);//从缓存加载数据
If(data == null){
//缓存中没有则从数据库加载数据
data = db.getDataFromDB(id);
//保存到 cache 中
redisTemplate.opsForValue().set(redisKey,data);
}
return data;
1.2 本地缓存
在单体项目中,我们可以使用 Map 集合存储数据作为项目的本地缓存,因为 Map 数据是存储与内存的,相比于数据库查询要从磁盘加载到内存有着更高的效率。
1.3 本地模式在分布式下的问题
但是在分布式情况下这种情况就不再适用了,每个微服务可能部署在多台机器上,每个机器上有各自的缓存 Map 对象,会导致数据不一致的问题。
1.4 分布式缓存
所以应该将数据缓存在同一个缓存中间件中,才能保证数据一致性问题
二、整合redis测试
2.1 引入依赖
<!-- 缓存中间件redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.2 配置信息
spring:
redis:
host: 192.168.57.129
port: 6379
2.3 测试
@Autowired
StringRedisTemplate redisTemplate;
@Test
public void testRedis() {
//存储
redisTemplate.opsForValue().set("HELLO_REDIS", "SpringBoot!");
//获取
String value = redisTemplate.opsForValue().get("HELLO_REDIS");
System.out.println(value);
}
三、改造三级分类业务
3.1 代码改造
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型:【序列化与反序列化】
/**
* 1、空结果缓存:解决缓存穿透
* 2、设置过期时间(加随机值):解决缓存雪崩
* 3、加锁:解决缓存击穿
*/
//1、加入缓存逻辑,缓存中存的数据是json字符串。
//JSON跨语言,跨平台兼容。
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
if (StringUtils.isEmpty(catalogJSON)) {
//2、缓存中没有,查询数据库
//保证数据库查询完成以后,将数据放在redis中,这是一个原子操作。
log.info("缓存不命中....将要查询数据库...");
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDB();
String result = JSON.toJSONString(catalogJsonFromDb);
redisTemplate.opsForValue().set("catalogJSON",result);
}
log.info("缓存命中....直接返回....");
//转为我们指定的对象。
return JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
}
四、高并发下缓存失效问题
4.1 缓存穿透
解决方案1:null 结果放入缓存,并加入短暂的过期时间
伪代码
//从缓存加载数据
data = redisTemplate.opsForValue().get(redisKey);
If(data == null){
//缓存中没有则从数据库加载数据
data = db.getDataFromDB(id);
if(data == null) {
//空结果保存到 cache 中
redisTemplate.opsForValue().set(redisKey,null,300,TimeUnit.SECONDS);
}else {
//保存到 cache 中
redisTemplate.opsForValue().set(redisKey,data);
}
}
return data;
解决方案2:使用布隆过滤器
这种技术在缓存之前再加一层屏障,里面存储目前数据库中存在的所有key。当业务系统有查询请求的时候,首先去BloomFilter中查询该key是否存在。若不存在,则说明数据库中也不存在该数据,因此缓存都不要查了,直接返回null。若存在,则继续执行后续的流程,先前往缓存中查询,缓存中没有的话再前往数据库中的查询。伪代码如下:
String get(String key) {
String value = redis.get(key);
if (value == null) {
if(!bloomfilter.mightContain(key)){
//不存在则返回
return null;
}else{
//可能存在则查数据库
value = db.get(key);
redis.set(key, value);
}
}
return value;
}
布隆过滤器示意图
4.2 缓存雪崩
4.3 缓存击穿
五、分布式下加锁
5.1 分布式锁示意图
本地锁,只能锁住当前进程,所以我们需要分布式锁。
5.2 锁的时序问题
@Override
public Map<String, List<Catelog2Vo>> getCatalogJson() {
//给缓存中放json字符串,拿出的json字符串,还用逆转为能用的对象类型:【序列化与反序列化】
/**
* 1、空结果缓存:解决缓存穿透
* 2、设置过期时间(加随机值):解决缓存雪崩
* 3、加锁:解决缓存击穿
*/
//1、加入缓存逻辑,缓存中存的数据是json字符串。
//JSON跨语言,跨平台兼容。
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
if (StringUtils.isEmpty(catalogJSON)) {
//2、缓存中没有,查询数据库
//保证数据库查询完成以后,将数据放在redis中,这是一个原子操作。
log.info("缓存不命中....将要查询数据库...");
Map<String, List<Catelog2Vo>> catalogJsonFromDb = getCatalogJsonFromDB();
return catalogJsonFromDb;
}
log.info("缓存命中....直接返回....");
//转为我们指定的对象。
return JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
}
查询数据库后将结果放入缓存,保证这是一个原子性操作,防止多个线程查询数据库而导致日志输出多个查询了数据库…
//从数据库查询并封装分类数据
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDB() {
//只要同一把锁,就能锁住需要这个锁的所有线程
//synchronized (this):springBoot所有组件在容器中都是单实例的
//TODO 本地锁:synchronized JUC(Lock) 在分布式情况下只能使用分布式锁才能锁住资源
synchronized (this) {
//得到锁以后,我们应该再去缓存中确定一次,如果没有才需要继续查询
String catalogJSON = redisTemplate.opsForValue().get("catalogJSON");
if (!StringUtils.isEmpty(catalogJSON)) {
return JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>() {});
}
log.info("查询了数据库....");
//1、将数据库的多次查询变为一次,查询所有分类信息
List<CategoryEntity> selectList = baseMapper.selectList(null);
//1、查出所有1级分类
List<CategoryEntity> level1Categorys = getParent_cid(selectList, 0L);
//2、封装数据
Map<String, List<Catelog2Vo>> parent_cid = level1Categorys.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//1、每一个的一级分类,查到这个一级分类的二级分类
List<CategoryEntity> categoryEntities = getParent_cid(selectList, v.getCatId());
//2、封装上面的结果
List<Catelog2Vo> catelog2Vos = null;
if (categoryEntities != null) {
catelog2Vos = categoryEntities.stream().map(l2 -> {
Catelog2Vo catelog2Vo = new Catelog2Vo(v.getCatId().toString(), null, l2.getCatId().toString(), l2.getName());
//1、找当前二级分类的三级分类封装成vo
List<CategoryEntity> level3Catelog = getParent_cid(selectList, l2.getCatId());
if (level3Catelog != null) {
List<Catelog2Vo.Catelog3Vo> collect = level3Catelog.stream().map(l3 -> {
//2、封装成指定格式
Catelog2Vo.Catelog3Vo catelog3Vo = new Catelog2Vo.Catelog3Vo(l2.getCatId().toString(), l3.getCatId().toString(), l3.getName());
return catelog3Vo;
}).collect(Collectors.toList());
catelog2Vo.setCatalog3List(collect);
}
return catelog2Vo;
}).collect(Collectors.toList());
}
return catelog2Vos;
}));
String result = JSON.toJSONString(parent_cid);
redisTemplate.opsForValue().set("catalogJSON",result,1,TimeUnit.DAYS);
return parent_cid;
}
}
压力测试结果:日志只输出一个查询了数据库…,表面只有一个线程查询了数据库。