高并发如何实现单用户信息查询接口
故事情节
- 产品:小李,有个单用户信息查询的功能,需要你实现一下
- 小李:这还不简单,两分钟我给你实现
- 两分钟过去…
- 小李:欧克了,部署上线了
- 运维:哪个傻蛋写的接口,导致MySQL宕机了
- 小李一愣,他写的接口明明没有报错啊,这是怎么回事呢?
- 产品:小李赶紧给我排查出来,否则这个月的奖金一分都没有
- 小李:这这这,我不知道什么问题啊
- 小李纳闷中,思来思去不知道什么问题如何解决…
- 小李:老黄,只能求你出马了,这个月我的奖金全部都给你
- 老黄听到小李的请求,他微微一笑,答应了下来
- 老黄:么得问题了
- 老黄耐心地指导小李修复了这个错误,并对代码进行了优化和完善
- 小李听得认真,心里暗自发誓要吸取教训,以后在工作中更加严谨细致。而这次经历也让他对老黄产生了更深的敬意和信任
- 最终,小李成功排查并解决了问题,产品顺利上线运行。产品部门的领导对他的表现给予了肯定和赞赏,而他也因为自己的努力和进步获得了全额的奖金
小李写的代码:
Service层:
直接查询MySQL返回数据
public UserQueryRespDTO queryUserByUserId(Long userId) {
LambdaQueryWrapper<UserDO> queryWrapper = Wrappers.lambdaQuery(UserDO.class)
.eq(UserDO::getUserId, userId);
List<UserDO> userDOList = userMapper.selectList(queryWrapper);
UserDO userDO = CollUtil.isNotEmpty(userDOList) ? userDOList.get(0) : null;
UserQueryRespDTO userQueryRespDTO = new UserQueryRespDTO();
BeanUtil.convert(userDO, userQueryRespDTO);
return userQueryRespDTO;
}
流程图:
http 请求直接打到 MySQL 数据库,不宕机才怪嘞
老黄写的代码:
Service层:
- 先读取 Redis 缓存,数据存在直接返回用户
- 数据不存在,读取 MySQL 数据库,加上双重判定锁,减轻获得分布式锁后线程访问数据库压力
- 读取到 MySQL 数据,缓存到 Redis 并且返回
- 读取数据为NULL,缓存空对象到 Redis 中,并设置一个较短的过期时间(防止缓存穿透)
public UserQueryRespDTO queryUserByUserId(Long userId) {
UserDO userDO = distributedCache.safeGet(
USER_INFO_KEY + userId,
UserDO.class,
() -> {
LambdaQueryWrapper<UserDO> queryWrapper = Wrappers.lambdaQuery(UserDO.class)
.eq(UserDO::getUserId, userId);
List<UserDO> userDOList = userMapper.selectList(queryWrapper);
return CollUtil.isNotEmpty(userDOList) ? userDOList.get(0) : null;
},
30,
TimeUnit.MINUTES,
null,
null,
key -> {
// 缓存空对象,解决缓存穿透。也可以使用布隆过滤器。
distributedCache.put(key, new UserDO(), 5, TimeUnit.MINUTES);
});
UserQueryRespDTO userQueryRespDTO = new UserQueryRespDTO();
BeanUtil.convert(userDO, userQueryRespDTO);
return userQueryRespDTO;
}
第二行distributedCache.safeGet方法
public <T> T safeGet(String key, Class<T> clazz, CacheLoader<T> cacheLoader, long timeout, TimeUnit timeUnit,
RBloomFilter<String> bloomFilter, CacheGetFilter<String> cacheGetFilter, CacheGetIfAbsent<String> cacheGetIfAbsent) {
T result = get(key, clazz);
// 缓存结果不等于空或空字符串直接返回;通过函数判断是否返回空,为了适配布隆过滤器无法删除的场景;两者都不成立,判断布隆过滤器是否存在,不存在返回空
if (!CacheUtil.isNullOrBlank(result)
|| Optional.ofNullable(cacheGetFilter).map(each -> each.filter(key)).orElse(false)
|| Optional.ofNullable(bloomFilter).map(each -> !each.contains(key)).orElse(false)) {
return result;
}
RLock lock = redissonClient.getLock(SAFE_GET_DISTRIBUTED_LOCK_KEY_PREFIX + key);
lock.lock();
try {
// 双重判定锁,减轻获得分布式锁后线程访问数据库压力
if (CacheUtil.isNullOrBlank(result = get(key, clazz))) {
// 如果访问 cacheLoader 加载数据为空,执行后置函数操作
if (CacheUtil.isNullOrBlank(result = loadAndSet(key, cacheLoader, timeout, timeUnit, true, bloomFilter))) {
Optional.ofNullable(cacheGetIfAbsent).ifPresent(each -> each.execute(key));
}
}
} finally {
lock.unlock();
}
return result;
}
流程图: