文章目录
- 前言
- 一、本地缓存和分布式缓存
- 1.本地缓存
- 2.分布式缓存
- 二、项目实战
- 1.配置Redis
- 2.整合业务代码
- 2.1 缓存击穿
- 2.2 缓存雪崩
- 2.3 缓存穿透
- 2.4 业务代码1.0版
- 2.5 分布式锁1.0版
- 2.6 分布式锁2.0版
- 2.7 Spring Cache及缓存一致性问题
- 2.7.1 Spring Cache
- 2.7.2 缓存一致性问题
- 2.7.3 Spring Cache的弊端
前言
本篇重点介绍谷粒商城首页整合缓存技术,从本地缓存(Map
)到分布式缓存(Redis
),描述常见的缓存三大问题(缓存穿透,缓存雪崩,缓存击穿
)及解决方案,并且在解决的过程中引用成熟的Redisson
方案。最后到缓存一致性
的问题及解决,整合Spring Cache
。
对应视频P151-P172
一、本地缓存和分布式缓存
1.本地缓存
本地缓存存储在单个应用服务器的内存中
,属于该服务器的进程空间。仅在当前服务器节点内有效,不会在多个服务器之间共享。
本地缓存最简单的实现方式:通过Map
:
private HashMap<String,Object> map = new HashMap<>();
@Test
public Object testMapCache(){
Object key = map.get("key");
if (key !=null){
return key;
}
//查询数据库相关逻辑...假设查询到的值为value
map.put("key","value");
return "value";
}
不考虑缓存一致性,穿透,击穿等问题,上面的案例就是通过Map
做本地缓存最简单的实现。
2.分布式缓存
目前市面上大多数的项目都是采用微服务的架构,同一个服务也可能部署多个实例。而如上面所说,本地缓存仅在当前服务器节点内有效。假设现在有三台服务器:
初始状态下三台服务器都没有缓存,第一次用户访问了服务器1,查询数据库后将结果存入了缓存。下一次由于负载均衡,访问到了服务器2:
由于缓存此时只存在于服务器1,这次用户又需要去数据库中查询,然后放入服务器2的缓存中。
为了解决这样的问题,在微服务的架构中,引入了缓存中间件对不同服务间的缓存进行统一管理。常用的是Redis
。
二、项目实战
1.配置Redis
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
spring:
redis:
host: xxx
port: 6379
Redis为我们封装了两个模版,分别是redisTemplate
和stringRedisTemplate
,stringRedisTemplate
的key和value默认都是String类型的,在项目中使用时,只需要注入对应的模版即可。
2.整合业务代码
在项目中,需要加入缓存的业务场景是,首页渲染三级分类菜单。
缓存这一块的坑点很多,在整合业务代码前,有必要先介绍一下缓存常见的三大问题及解决方案:
2.1 缓存击穿
假设数据库中的某张A表,数据的主键ID是从1-1000,如果使用1001的ID去查询数据,是无论如何都查询不到的,查询到的会是空值。如果没有将这个空值存入缓存,那么通过伪造请求等方式不断地使用不存在的ID作为条件去查询数据库,也会导致数据库崩溃的情况。
解决方式
:如果根据查询条件查询到的结果不存在,就缓存一个空值
或进行约定,缓存一个特定的值。也可以通过布隆过滤器
,或加强参数校验的方式解决。
2.2 缓存雪崩
这种情况主要是出现在大并发量的场景下,大量的热点key
同时失效,导致这一刻的所有请求都打到数据库上。
解决方式
:给不同的key设置随机的过期时间,或者设置永不过期。
2.3 缓存穿透
区别于缓存雪崩,击穿主要是体现在某个热点key
失效,导致大量的请求在查询缓存无果的情况下,都去数据库中查询。
解决方式
:加锁,让同一时刻只有一个线程能查询到数据库。但是涉及到多线程锁的问题时,一般就不会有那么简单了。我们知道锁有本地锁和分布式锁,也有乐观锁和悲观锁。
如果直接使用synchronized
关键字进行加锁,在单体应用下是没问题的。synchronized
关键字是锁当前的JVM。在微服务架构下,每个服务都有自己的JVM,假设我的product服务部署在了8台服务器上,每个服务器锁自己的JVM,最后还是有可能8个请求同时打在数据库上。所以需要一个全局的锁
去统一管理这些服务。通过Redis也可以自己实现分布式锁,但是其中有很多坑点。
2.4 业务代码1.0版
加入缓存后的业务流程图:
我们先不考虑分布式锁的实现,完成第一版加入缓存的业务代码:
这里有几点需要注意下:
- 存入缓存的
key
必须唯一,可以加上当前用户或者业务的前缀。例如我将商品列表放入缓存,商品列表可以被不同的用户访问,又带有查询条件,可以这样设计key:用户标识:查询条件1_查询条件2_查询条件3 - 某个线程获取到了锁,在查询数据库前,需要先再次查询缓存中是否有值。
- 将数据库查询结果,放入缓存必须在锁的范围内,否则可能存在,A线程查到了数据然后释放了锁,准备放入缓存,在放入缓存的过程中,B线程获取到了锁,又去查了一遍数据库的问题。
- 向Redis中存储的数据,一般约定使用JSON字符串的方式进行存储,在读取时进行反序列化。
@Slf4j
@Service("categoryService")
public class CategoryServiceImpl extends ServiceImpl<CategoryDao, CategoryEntity> implements CategoryService {
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Map<String, List<CategoryJsonVO>> getCategoryJson() {
//从缓存中获取
String category = stringRedisTemplate.opsForValue().get(RedisConstants.CATEGORY_KEY);
//缓存中不为空
if (StringUtils.isNotBlank(category)) {
log.info("查询到了结果");
return JSON.parseObject(category, new TypeReference<Map<String, List<CategoryJsonVO>>>() {
});
}
/*
缓存空值解决缓存穿透
设置过期时间(随机值)解决缓存雪崩
加锁解决缓存击穿
*/
//查询pms_category表的全量数据
Map<String, List<CategoryJsonVO>> map;
map = this.getCateGoryFromDB();
return map;
}
/**
* 从数据库查询三级分类
* @return 查询结果
*/
private Map<String, List<CategoryJsonVO>> getCateGoryFromDB() {
synchronized (this) {
log.info("获取到了锁");
//再看下缓存中有没有
//从缓存中获取
String category = stringRedisTemplate.opsForValue().get(RedisConstants.CATEGORY_KEY);
//缓存中不为空
if (StringUtils.isNotBlank(category)) {
log.info("查询到了结果");
return JSON.parseObject(category, new TypeReference<Map<String, List<CategoryJsonVO>>>() {
});
}
log.info("开始查询数据库");
List<CategoryEntity> list = list();
Map<String, List<CategoryJsonVO>> map = list.stream().collect(Collectors.toMap(k -> k.getCatId().toString(), v -> {
//查出某个一级分类下的所有二级分类
// List<CategoryEntity> entityList = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", v.getCatId()));
List<CategoryEntity> entityList = list.stream().filter(categoryEntity -> categoryEntity.getParentCid().equals(v.getCatId())).collect(Collectors.toList());
List<CategoryJsonVO> categoryJsonVOS = entityList.stream().map(categoryEntity -> {
CategoryJsonVO jsonVO = new CategoryJsonVO();
jsonVO.setCatalog1Id(String.valueOf(categoryEntity.getParentCid()));
jsonVO.setId(String.valueOf(categoryEntity.getCatId()));
jsonVO.setName(categoryEntity.getName());
//查出某个二级分类下的所有三级分类
// List<CategoryEntity> entityListThree = baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", categoryEntity.getCatId()));
List<CategoryEntity> entityListThree = list.stream().filter(categoryEntity1 -> categoryEntity1.getParentCid().equals(categoryEntity.getCatId())).collect(Collectors.toList());
List<CategoryJsonVO.CatalogJsonThree> catalogJsonThrees = entityListThree.stream().map(categoryEntity1 -> {
CategoryJsonVO.CatalogJsonThree catalogJsonThree = new CategoryJsonVO.CatalogJsonThree();
catalogJsonThree.setId(String.valueOf(categoryEntity1.getCatId()));
catalogJsonThree.setName(categoryEntity1.getName());
catalogJsonThree.setCatalog2Id(String.valueOf(categoryEntity1.getParentCid()));
return catalogJsonThree;
}).collect(Collectors.toList());
jsonVO.setCatalog3List(catalogJsonThrees);
return jsonVO;
}).collect(Collectors.toList());
return categoryJsonVOS;
}));
//向缓存中存一份(序列化)
stringRedisTemplate.opsForValue().set(RedisConstants.CATEGORY_KEY, CollectionUtils.isEmpty(map) ? "0" : JSON.toJSONString(map), 1, TimeUnit.DAYS);
return map;
}
}
}
2.5 分布式锁1.0版
下面我们自己先手动实现一个分布式锁:
@Test
public void testLock(){
String uuid = UUID.randomUUID().toString();
//获取锁
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid);
//获取到了锁
if (lock){
try {
//设置过期时间
stringRedisTemplate.expire("lock",300, TimeUnit.SECONDS);
//执行业务代码
}catch (Exception e){
//日志记录异常
}finally {
stringRedisTemplate.delete("lock");
}
}else {
//未获取到锁就自旋继续尝试获取
testLock();
}
}
上面的代码有什么问题?可谓漏洞百出。
获取锁和设置过期时间分为了两个步骤去实现。
:会导致一个什么样的问题?既然是两步,没有写在一条命令里,说明是非原子性的操作。如果两行代码之间出现了异常,那么过期时间就没有设置成功。那么能不能将设置过期时间写在finally块中?答案也是不行的,因为出现异常不仅仅可能是程序方面的异常,假设极端情况下机房停电了…所以为了解决这个问题,需要做如下的改动:
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock",uuid,300, TimeUnit.SECONDS);
解锁时没有进行判断
:会导致将其他线程的锁误删的问题。例如线程A拿到了锁,由于业务执行的时间较长,线程A的锁超时了,线程B拿到了锁,B在执行自己业务的时候,线程A执行完了业务,释放了B线程的锁…不是那么靠谱的解决方案:
if (stringRedisTemplate.opsForValue().get("lock").equals(uuid)){
stringRedisTemplate.delete("lock");
}
为什么说这个解决方案不是那么靠谱?引出了第三个问题
解锁时的条件判断非原子性操作
:因为判断+解锁之间也是存在间隔时间的,必须要保证原子性。例如锁设置的key的value是1,设置的过期时间是10S,但是前面的操作花费了9.5S,判断的时间花费了0.6S,相当于key对应的value已经过期了。下一个线程进来又设置key的value是2(实际上lock对应的值变了,但是在判断的时候,获取到的lock的值还是之前的1),然后原来的线程解锁就把下一个线程的锁给解了。解决方案是使用lua脚本
,包括后面引入的Redisson
的底层很多也是通过lua脚本
实现的
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1]) else return 0 end";
//删除锁
Long lock1 = redisTemplate.execute(new
DefaultRedisScript<Long>(script, Long.class)
, Arrays.asList("lock"), uuid);
通过上述问题的发现与解决,看似我们自己实现的分布式锁没有问题了,其实不然,仔细深究还是会存在锁重入,重试等相关问题。
2.6 分布式锁2.0版
引入Redisson:
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<!-- 请使用最新版本 -->
<version>3.16.3</version>
</dependency>
进行配置:
@Configuration
public class RedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://自己的虚拟机地址:6379");
return Redisson.create(config);
}
}
Redisson的基本使用及原理:
@Test
public void testRedisson() {
RLock lock = redissonClient.getLock("lock");
//默认过期时间30S,业务在执行完成之前每隔10S续期一次
//如果设置了过期时间,就按照过期时间来,不会自动续期
lock.lock();
try {
}finally {
lock.unlock();
}
}
通过RLock lock = redissonClient.getLock("lock");
可以获取一把锁,只要名称相同就代表是同一把锁。
除了上面获取锁的方式,还有其他关于锁的操作,在官方文档中都有说明:
Redisson官方文档中文版
lock.lock();
方法,如果没有设置过期时间,它有一个默认的30S过期时间,同时会每隔1/3默认时间自动续期,设置了过期时间,则按照实际的过期时间,即使业务没有执行完成也不会自动续期。
项目实战篇以应用为主,限于篇幅不翻源码,源码解析会放在源码分析专栏后续更新。
改造业务代码:
@Autowired
private RedissonClient redissonClient;
/**
* 从数据库查询三级分类
* 分布式锁解决缓存击穿
* @return 查询结果
*/
private Map<String, List<CategoryJsonVO>> getCateGoryFromDB() {
//category_lock
RLock lock = this.redissonClient.getLock(RedisConstants.CATEGORY_LOCK_KEY);
lock.lock(10, TimeUnit.SECONDS);
try {
-- 业务代码
} finally {
lock.unlock();
}
}
2.7 Spring Cache及缓存一致性问题
2.7.1 Spring Cache
简单来说,Spring Cache是基于声明式注解
的缓存,对于缓存声明,Spring的缓存抽象提供了一组Java注解:
@Cacheable
: 触发缓存的填充。@CacheEvict
: 触发缓存删除。@CachePut
: 更新缓存而不干扰方法的执行。@Caching
: 将多个缓存操作重新分组,应用在一个方法上。@CacheConfig
: 分享一些常见的类级别的缓存相关设置。
详见Spring官方文档中文版
在项目中使用,只需要引入依赖,并在配置文件中进行配置:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
# 配置spring cache 为redis
spring.cache.type=redis
spring.cache.redis.time-to-live=360000
在方法上加入注解:
@Override
@Cacheable(value = {"category"},key = "'getLevelOneCateGory'") //放入缓存 如果缓存中有方法就不调用
public List<CategoryEntity> getLevelOneCateGory() {
return list(new QueryWrapper<CategoryEntity>().eq("parent_cid", "0"));
}
启动项目,通过redis客户端查看对应的缓存数据:
需要注意,默认的序列化方式不是JSON,而是JDK序列化。需要自定义配置:
@Configuration
@EnableCaching
@EnableConfigurationProperties(CacheProperties.class)
public class MyRedisCacheConfig {
@Bean
public RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties){
RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
//自定义键值的序列化
config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer()));
//自定义键和值的过期时间,从配置文件中读取
CacheProperties.Redis redisProperties = cacheProperties.getRedis();
if (redisProperties.getTimeToLive() != null) {
config = config.entryTtl(redisProperties.getTimeToLive());
}
if (redisProperties.getKeyPrefix() != null) {
config = config.prefixKeysWith(redisProperties.getKeyPrefix());
}
if (!redisProperties.isCacheNullValues()) {
config = config.disableCachingNullValues();
}
if (!redisProperties.isUseKeyPrefix()) {
config = config.disableKeyPrefix();
}
return config;
}
}
2.7.2 缓存一致性问题
缓存一致性问题简单来说,就是缓存中的数据和数据库最新的数据不一致,导致用户看到的数据非实时而是旧的缓存中的。
解决缓存一致性问题,对于数据库写入方
,一般有如下几种方案:
先删除缓存再更新数据库
先更新数据库再删除缓存
上述两种方案都是有弊端的:
先删除缓存再更新数据库
对应上图的情况,用户读取到的数据还是未更新数据库前旧的数据。
如果先更新数据库再删除缓存
也可能存在上图的情况,即如果B线程更新数据库的时间较长,并且此时C线程进行查询,C线程查询到的还是A线程更新数据库的结果,并且将A的操作结果写入缓存,获取到的依旧不是B最新操作的数据。
既然两者都有弊端,那么就引入了第三种方式:延迟双删
其实无论是何种方式,保证的都是缓存的最终一致性
,如果对数据实时性的要求高,且数据更新频繁,应该去查数据库,而不是使用缓存。
在项目中,采用先更新数据库再删除缓存
的策略,结合注解:
/**
* 修改
* 修改时删除缓存
*/
@CacheEvict(value = {"category"},key = "'getLevelOneCateGory'")
@RequestMapping("/update")
public R update(@RequestBody CategoryEntity category){
categoryService.updateById(category);
return R.ok();
}
2.7.3 Spring Cache的弊端
主要体现在解决缓存击穿
问题上,在手动编写逻辑时,是通过Redisson分布式锁
的方式解决的,而Spring Cache的注解默认是不加锁的,如果加锁,需要在注解中设置sync
为true,并且这里的锁是本地锁
,非分布式锁。