目录
拦截器设置
集群的session共享问题
基于redis实现共享session登录
创建bean对象技巧
什么是缓存
使用缓存来处理对象
使用String类型缓存来处理集合
缓存更新策略
主动更新策略
缓存穿透
空串""和null的区别
缓存null值解决穿透问题
缓存雪崩
缓存击穿
互斥锁和逻辑过期介绍
基于互斥锁解决缓存穿透问题
编辑
下载JMeter模拟线程测试
redis缓存工具类封装
优惠卷秒杀
全局唯一id
优惠券添加
优惠券秒杀下单
JMeter线程测试遇到401错误
超卖问题分析
乐观锁
版本号法
CAS法
一人一单的并发安全问题
分布式锁
编辑 什么是分布式锁
分布式锁的实现
基于Redis实现分布式锁的初级版本
线程存在问题
线程阻塞超时自动删除后,线程完成释放别的线程的锁
改进分布式锁(判断线程和存的是否一致)
有并发安全分析
Redis的Lua脚本
基于Redis的分布式锁实现思路
还存在的问题
Redission实现分布式锁
Redisson可重入锁原理
获取锁的Lua脚本
释放锁的Lua脚本
Redisson分布式锁的原理
Redis秒杀优化(暂未实现)
达人探店
点赞功能
实现点赞排行榜功能
关注和取关
查看共同关注
关注推送
feed流模式
Redis最佳实践
Redis键值设计
拦截器设置
第一步,定义拦截器
package com.hmdp.utils;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.User;
import org.springframework.web.servlet.HandlerInterceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
public class LoginInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
//1、获取session
HttpSession session = request.getSession();
//2、获取session中的用户
Object user = session.getAttribute("user");
//3、判断用户是否存在
if (user == null) {
//4、不存在,拦截,返回401状态码
response.setStatus(401);
return false;
}
//5、存在,保存用户信息到ThreadLocal
UserHolder.saveUser((UserDTO) user);
//6、放行
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
//移除用户,避免内存泄漏
UserHolder.removeUser();
}
}
userhold类下
package com.hmdp.utils;
import com.hmdp.dto.UserDTO;
public class UserHolder {
private static final ThreadLocal<UserDTO> tl = new ThreadLocal<>();
public static void saveUser(UserDTO user){
tl.set(user);
}
public static UserDTO getUser(){
return tl.get();
}
public static void removeUser(){
tl.remove();
}
}
第二步:配置文件(让拦截器生效)
package com.hmdp.config;
import com.hmdp.utils.LoginInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
public class MvcConfig implements WebMvcConfigurer {
//配置,添加拦截器,让之前的拦截器生效
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new LoginInterceptor())
//排除掉不需要拦截的路径
.excludePathPatterns(
"/shop/**",
"/voucher/**",
"/shop-type/**",
"/upload/**",
"/blog/hot",
"/usr/code",
"/usr/login"
);
}
}
集群的session共享问题
session共享问题:多态Tomcat并不共享session存储空间,当请求切换到不同tomcat服务是导致数据丢失问题
session的代替为redis,满足数据共享,内存存储key、value结构
基于redis实现共享session登录
保存登录的用户信息,可以使用String结构,以JSON字符串来保存,比较直观:
Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD,并且内存占用更少:
创建bean对象技巧
自己创建的类,无法直接用@Autowire方式注入,因为他不属于 spring容器管理的。
需要在创建的加入一个构造方法,然后在由其他由spring管理的类调用,然后在注入传入这个属性即可
@Configuration
public class MvcConfig implements WebMvcConfigurer {
//由spring管理的类注册
@Resource
private StringRedisTemplate stringRedisTemplate;
//配置,添加拦截器,让之前的拦截器生效
@Override
public void addInterceptors(InterceptorRegistry registry) {
//调用时传入这个即可
registry.addInterceptor(new LoginInterceptor(stringRedisTemplate))
}
public class LoginInterceptor implements HandlerInterceptor {
private StringRedisTemplate stringRedisTemplate;
public LoginInterceptor(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
}
什么是缓存
缓存就是数据交换的缓冲区(称作Cache,是存储数据的临时笛梵,一般读写性能较高)
使用缓存来处理对象
//1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2、判断是否存在
if (StrUtil.isNotBlank(shopJson)){
//3、存在,直接返回缓存中的
//是json数据,则需要通过JSONUtil返回指定的对象即可
Shop shop= JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4、缓存中不存在,根据id查询数据库
Shop shop = this.getById(id);
//5、数据库中不存在,返回错误
if (shop==null){
return Result.fail("商铺不存在");
}
//6、存在,写入缓存
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop));
//7、返回
使用String类型缓存来处理集合
@Resource
private StringRedisTemplate stringRedisTemplate;
@Override
public Result shopTypeList() {
String key="shop_Type_List";
//查询缓存
String shopTypeJson = stringRedisTemplate.opsForValue().get(key);
if (StrUtil.isNotBlank(shopTypeJson)){
//查到了,直接返回,json转list集合
List<ShopType> shopTypes = JSONUtil.toList(shopTypeJson, ShopType.class);
return Result.ok(shopTypes);
}
//缓存没查到,查数据库
List<ShopType> typeList = this.query().orderByAsc("sort").list();
//数据库没查到,返回错误
if (typeList==null){
return Result.fail("没有列表信息");
}
//数据库查到,缓存下
//list集合转json
String json = JSONUtil.toJsonStr(typeList);
stringRedisTemplate.opsForValue().set(key, json);
缓存更新策略
业务场景
低一致性需求:使用内存淘汰机制,例如店铺类型的查询
高一致性需求;主动更新,并以超时剔除作为兜底方案,例如店铺的详细信息
主动更新策略
操作缓存和数据库的三个问题需要考虑:
1,删除缓存还是更新缓存?
- 更新缓存:每次更新数据都会更新缓存,无效写操作较多(×)
- 删除缓存:更新数据库时让缓存失效,查询时在更新缓存(√)
2、如何保证缓存与数据库的操作同时成功或者失败?
- 单体系统,将缓存与数据库操作放在一个事务
- 分布式系统,利用TCC操作等分布式事务方案
3、先操作缓存还是先操作数据库
- 先操作数据库,在删除缓存
缓存更新策略的最佳实践方案:
1、低一致性需求:使用Redis自带的内存淘汰机制
2、高一致性需求:主动更新,并以超时提出作为兜底方案
读操作:
- 缓存命中则直接返回
- 缓存未命中则查询数据库,并写入缓存,设置超时时间
写操作::
- 先写数据库,然后在删除缓存
- 要确保数据库与缓存操作的原子性
缓存穿透
我们使用Redis大部分情况都是通过Key查询对应的值,假如发送的请求传进来的key是不存在Redis中的,那么就查不到缓存,查不到缓存就会去数据库查询。假如有大量这样的请求,这些请求像“穿透”了缓存一样直接打在数据库上,这种现象就叫做缓存穿透。
解决方案:
1、缓存空对象(把无效的Key存进Redis中)。如果Redis查不到数据,数据库也查不到,我们把这个Key值保存进Redis,设置value="null",当下次再通过这个Key查询时就不需要再查询数据库。这种处理方式肯定是有问题的,假如传进来的这个不存在的Key值每次都是随机的,那存进Redis也没有意义,会占用redis的内存空间,所以设置过期时间是有必要的,其次,当这个值一开始没有内容,我们查询数据库后 ,将null赋值给这个值,并存在redis中,而之后我们数据库新增了这个值,但是缓存中还是为null,这就会导致短期数据不一致,可以使用更新数据库删除那个缓存就可以解决。
2、使用布隆过滤器。布隆过滤器的作用是某个 key 不存在,那么就一定不存在,它说某个 key 存在,那么很大可能是存在(存在一定的误判率)。于是我们可以在缓存之前再加一层布隆过滤器,在查询的时候先去布隆过滤器查询 key 是否存在,如果不存在就直接返回,这个布隆过滤器也存在一定的穿透风险。
项目中解决缓存穿透的思路
空串""和null的区别
null表示的是一个对象的值,而非一个字符串。例如声明一个对象的引用,String aaa = null ;
""表示的是一个长度为0的空字符串。例如声明一个字符串String bbb = "" ;
所以:null不指向任何对象,相当于没有任何值;而""代表一个长度为0的字符串。
缓存null值解决穿透问题
public Result queryById(Long id) {
String key=CACHE_SHOP_KEY+id;
//1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2、判断是否存在
if (StrUtil.isNotBlank(shopJson)){
//3、存在,直接返回缓存中的
//是json数据,则需要通过JSONUtil返回指定的对象即可
Shop shop= JSONUtil.toBean(shopJson, Shop.class);
return Result.ok(shop);
}
//4、缓存中不存在,判断是否命中的为空串""
if (shopJson!=null){
return Result.fail("商铺不存在");
}
Shop shop = this.getById(id);
//5、数据库中不存在,并返回错误
if (shop==null){
//插入一个空串,设置过期时间
stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES);
return Result.fail("商铺不存在");
}
//6、存在,写入缓存
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
//7、返回
return Result.ok(shop);
}
缓存穿透产生的原因是什么?
用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请求给数据库带来巨大压力
缓存穿透的解决方案有?
- 缓存null值
- 布隆过滤
- 增强id的复杂度,避免被猜测id规律
- 做好数据的基础格式校验
- 加强用户权限校验
- 做好热点参数的限流
缓存雪崩
当某一个时刻出现大规模的redis缓存失效的情况,就会导致大量的请求直接打在数据库上面,导致数据库压力巨大,如果在高并发的情况下,可能瞬间就会导致数据库宕机。这时候如果运维马上又重启数据库,马上又会有新的流量把数据库打死。这就是缓存雪崩。缓存雪崩是指在同一时段大量的缓存key同时失效或者Redis服务宕机,导致大量请求到达数据库,带来巨大压力。
解决方案:
- 给不同的Key的TTL添加随机值
- 利用Redis集群提高服务的可用性
- 给缓存业务添加降级限流策略
- 给业务添加多级缓存
1、在原有的失效时间上加上一个随机值,这样就避免了因为采用相同的过期时间导致的缓存雪崩。
如果真的发生了缓存雪崩,有没有什么兜底的措施?
2、使用熔断机制。当流量到达一定的阈值时,就直接返回“系统拥挤”之类的提示,防止过多的请求打在数据库上。至少能保证一部分用户是可以正常使用,其他用户多刷新几次也能得到结果。
3、提高数据库的容灾能力,可以使用分库分表,读写分离的策略。
4、为了防止Redis宕机导致缓存雪崩的问题,可以搭建Redis集群,提高Redis的容灾性
缓存击穿
其实跟缓存雪崩有点类似,缓存雪崩是大规模的key失效,而缓存击穿是一个热点的Key,有大并发集中对其进行访问,突然间这个Key失效了,导致大并发全部打在数据库上,导致数据库压力剧增。这种现象就叫做缓存击穿。
解决方案:
- 互斥锁
- 逻辑过期(不过期)
1、业务允许的话,对于热点的key可以设置永不过期的key。
2、使用互斥锁。如果缓存失效的情况,只有拿到锁才可以查询数据库,降低了在同一时刻打在数据库上的请求,防止数据库打死。当然这样会导致系统的性能变差。
多条线程同时访问数据库
互斥锁和逻辑过期介绍
基于互斥锁解决缓存穿透问题
修改id查询店铺 ,基于互斥锁来解决缓存击穿问题
/**
* 互斥锁解决缓存击穿
* @param id
* @return
*/
public Shop queryWithMutex(Long id){
String key=CACHE_SHOP_KEY+id;
//1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2、判断是否存在
if (StrUtil.isNotBlank(shopJson)){
//3、存在,直接返回缓存中的
//是json数据,则需要通过JSONUtil返回指定的对象即可
Shop shop= JSONUtil.toBean(shopJson, Shop.class);
return shop;
}
//4、缓存中不存在,判断是否命中的为空串""
if (shopJson!=null){
return null;
}
//4实现缓存重建
//4.1获取互斥锁
String lockKey=LOCK_SHOP_KEY+id;
Shop shop = null;
try {
boolean isLock = tryLock(lockKey);
//4.2判断是否获取成功
if (!isLock){
//4.3失败,则休眠并重试
Thread.sleep(99);
return queryWithMutex(id);
}
//4.4成功,根据id查询数据库
shop = this.getById(id);
//模拟重建延时
Thread.sleep(366);
//5、数据库中不存在,并返回错误
if (shop==null){
//插入一个空串,设置过期时间
stringRedisTemplate.opsForValue().set(key,"", CACHE_NULL_TTL, TimeUnit.MINUTES);
return null;
}
//6、存在,写入缓存
stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop), CACHE_SHOP_TTL, TimeUnit.MINUTES);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
//7、释放互斥锁
unLock(lockKey);
}
//8、返回
return shop;
}
基于逻辑过期解决缓存击穿问题
/**
* 逻辑辑过期解决缓存击穿
* @param id
* @return
*/
//创建一个线程池
private static final ExecutorService CACHE_REBUILD_EXECUTOR= Executors.newFixedThreadPool(10);
public Shop queryWithLogicalExpire(Long id){
String key=CACHE_SHOP_KEY+id;
//1、从redis查询商铺缓存,使用opsForValue接受对象,返回的是json串
String shopJson = stringRedisTemplate.opsForValue().get(key);
//2、判断是否存在
if (StrUtil.isBlank(shopJson)){
//3不存在,直接返回null
return null;
}
//4命中,需要把json反序列化为对象
RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
JSONObject data = (JSONObject) redisData.getData();
Shop shop = JSONUtil.toBean(data, Shop.class);
LocalDateTime expireTime = redisData.getExpireTime();
//5、判断是否过期,是否在当前时间之后
if (expireTime.isAfter(LocalDateTime.now())){
//5。1没过期。直接返回店铺信息
return shop;
}
//5.2已过期,需要缓存
//6、缓存重建
//6/1获取互斥锁
String lockKey= LOCK_SHOP_KEY+id;
boolean isLock = tryLock(lockKey);
//6.2判断是否获取锁成功
if (isLock){
//6.3成功,开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(()->{
try {
//重建缓存
this.saveShop2Redis(id,20L);
} catch (Exception e) {
e.printStackTrace();
} finally {
//释放锁
unLock(lockKey);
}
});
}
return shop;
}
下载JMeter模拟线程测试
修改数据据库中的一点信息,会发现某一时刻重建前是旧数据,完成后是新数据
控制台中只有一数据重建,一次是查询旧数据,一次为新数据重建
redis缓存工具类封装
基于stringRedisTemplate封装一个缓存工具类,满足下列需求:
方法1:将任意va对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
方法2:将任意/ava对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存击穿问题
方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
package com.hmdp.utils;
import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.hmdp.utils.RedisConstants.LOCK_SHOP_KEY;
@Slf4j
@Component
/**
* 缓存工具类1
*/
public class CacheClient {
private final StringRedisTemplate stringRedisTemplate;
public CacheClient(StringRedisTemplate stringRedisTemplate){
this.stringRedisTemplate = stringRedisTemplate;
}
public void set(String key, Object value, Long time, TimeUnit unit){
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,unit);
}
public void setWithLogicalExpire(String key, Object value, Long time, TimeUnit unit){
//设置逻辑过期
RedisData redisData = new RedisData();
redisData.setData(value);
redisData.setExpireTime(LocalDateTime.now().plusSeconds(unit.toSeconds(time)));
//写入Redis
stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(redisData));
}
public <R,ID> R queryWithPassThrough(
String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit){
String key = keyPrefix + id;
//1.从redis查询商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isNotBlank(json)){
//3.存在,直接返回
return JSONUtil.toBean(json, type);
}
//判断命中的是否是空值
if (json != null){
//返回一个错误信息
return null;
}
//4.不存在,根据id查询数据库
R r = dbFallback.apply(id);
//5.不存在,返回错误
if (r == null){
//将空值写入redis
stringRedisTemplate.opsForValue().set(key,"",2,TimeUnit.MINUTES);
//返回错误信息
return null;
}
//6.存在写入redis
this.set(key,r,time,unit);
return r;
}
/**
* 逻辑删除解决缓存击穿
*/
private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);
public <R,ID> R queryWithLogicalExpire(
String keyPrefix, ID id, Class<R> type, Function<ID,R> dbFallback, Long time, TimeUnit unit) {
String key = keyPrefix + id;
//1.从redis查商铺缓存
String json = stringRedisTemplate.opsForValue().get(key);
//2.判断是否存在
if (StrUtil.isBlank(json)) {
//3.存在,缓存中存的null
return null;
}
//4.命中,先把json反序列化为对象
RedisData redisData = JSONUtil.toBean(json, RedisData.class);
R r = JSONUtil.toBean((JSONObject) redisData.getData(), type);
LocalDateTime expireTime = redisData.getExpireTime();
//5.判断是否过期
if (expireTime.isAfter(LocalDateTime.now())) {
//5.1未过期,直接返回店铺信息
return r;
}
//5.2已过期,需要缓存重建
//6.缓存重建
//6.1获取互斥锁
String lockKey = LOCK_SHOP_KEY + id;
boolean isLock = tryLock(lockKey);
//6.2判断是否获取锁成功
if (isLock) {
//6.3 成功,再进行二次判断,查看缓存中是否有数据,因为有可能是别人刚刚重建完释放锁,刚好获取到了
//6.4 开启独立线程,实现缓存重建
CACHE_REBUILD_EXECUTOR.submit(() -> {
try {
//查询数据库
R r1 = dbFallback.apply(id);
//写入redis
this.setWithLogicalExpire(key, r1, time, unit);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
//释放锁
unlock(lockKey);
}
});
}
//6.6返回过期的商铺信息
return r;
}
//获取锁和开锁
private boolean tryLock(String key){
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key){
stringRedisTemplate.delete(key);
}
}
优惠卷秒杀
全局唯一id
全局id生成器
全局ID生成器,是一种在分布式系统下用来生成全局唯一id的工具,一般满足:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性
为了增加ID的安全性,我们可以不直接使用Redis自增的数值,而是拼接一些其它信息:
ID的组成部分:
符号位:1bit,永远为0
时间戳:31bit,以秒为单位,可以使用69年
序列号:32bit,秒内的计数器,支持每秒产生2^32个不同ID
@Component
public class RedisWorker {
private static final long BEGIN_TIMESTAMP=1640995000L;
private static final long COUNT_BITS=32;
@Resource
private StringRedisTemplate stringRedisTemplate;
public long nextId(String keyPrefix){
//1、生成时间戳
LocalDateTime now=LocalDateTime.now();
long nowSecond= now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//2、生成序列号
//2、1获取当前时间精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2、2自增长
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//拼接并返回(先向左移把右边空出,然后在或,相当于加上)
return timestamp<<COUNT_BITS|count;
}
}
测试类中
private ExecutorService es= Executors.newFixedThreadPool(500);
@Test
void testIdWordker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task=()->{
for (int i=0;i<100;i++){
long id = redisWorker.nextId("order");
System.out.println("id="+id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i <100 ; i++) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time="+(end-begin));
}
全局唯一id生成策略
- UUID
- redis自增
- snowflake算法
- 数据库自增
redis自增id策略
- 每天一个key,方便统计订单量
- id构造是 时间戳+计数器
优惠券添加
没有后台只能通过postman添加
{
"shopId": 1,
"title": "200元代金券",
"subTitle": "周六周末可用",
"rules": "全场通用\\n可以叠加\\n仅限制堂食",
"payValue": 16000,
"actualValue": 20000,
"type": 1,
"stock": 100,
"beginTime": "2023-08-01T00:00:00",
"endTime": "2024-08-01T00:00:00"
}
优惠券秒杀下单
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisWorker redisWorker;
/**
* 优惠券下单
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1、查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2、判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀还未开始");
}
//3、判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4、判断库存是否充足
if (voucher.getStock()<1){
//库存不足
return Result.fail("已经被抢完");
}
//5、扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock=stock-1")
.eq("voucher_id", voucherId).update();
if (!success)return Result.fail("库存不足");
//6创建秒杀券订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
//添加订单
save(voucherOrder);
//7返回订单id
return Result.ok(orderId);
}
}
JMeter线程测试遇到401错误
这是未授权问题
用 F12 打开开发者工具
在network网络 里寻找相关信息:
添加一个信息管理器
订单出现了超卖
超卖问题分析
超卖问题是多线程安全问题,即在一个线程还没执行完,其他线程抢先执行,对同一个数据进行修改
乐观锁
乐观锁的关键是判断之前查询到的数据是否被修改,常见的方式有
版本号法
在修改之前查询一次版本号,若版本号不变则说明没有被其他线程修改,则正常进行数据修改,并让版本加一,若是版本号不一致,则不会执行
CAS法
直接比较数据是否发生了改变,若不变则说明安全
如果弄数据是否和之前一致,会导致成功率低
设置200个线程只卖出23个
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisWorker redisWorker;
/**
* 优惠券下单
* @param voucherId
* @return
*/
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
//1、查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2、判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀还未开始!");
}
//3、判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4、判断库存是否充足
if (voucher.getStock()<1){
//库存不足
return Result.fail("已经被抢完");
}
//5、扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock=stock-1") //set stock=stock-1
.eq("voucher_id", voucherId).gt("stock", 0)//where id=? and stock=?只要大于0即可
.update();
if (!success)return Result.fail("库存不足");
//6创建秒杀券订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
//添加订单
save(voucherOrder);
//7返回订单id
return Result.ok(orderId);
}
}
超卖这样的线程安全问题,解决方案有哪些?
1.悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 缺点:性能一般
乐观锁:不加锁,在更新时判断是否有其它线程在修改
- 优点:性能好
- 缺点:存在成功率低的问题
给整个this对象上锁
优点是简单安全
缺点是性能低,因为这样所有用户都被锁上了,我们的初衷是,单个用户中,只能单卖,这样就会导致其他用户也会受到影响
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisWorker redisWorker;
/**
* 优惠券下单
* @param voucherId
* @return
*/
@Override
public Result seckillVoucher(Long voucherId) {
//1、查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2、判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀还未开始!");
}
//3、判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4、判断库存是否充足
if (voucher.getStock()<1){
//库存不足
return Result.fail("已经被抢完");
}
//7返回订单id
return createVoucherOrder(voucherId);
}
@Transactional
public synchronized Result createVoucherOrder(Long voucherId){
//5、一人一单
Long userId = UserHolder.getUser().getId();
//5.1查询订单
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//判断是否已经存在
if (count>0){
//用户已经购买过
return Result.fail("你已经购买过这个券了");
}
//6、扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock=stock-1") //set stock=stock-1
.eq("voucher_id", voucherId).gt("stock", 0)//where id=? and stock=?只要大于0即可
.update();
if (!success)return Result.fail("库存不足");
//6创建秒杀券订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
voucherOrder.setUserId(userId);
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
//添加订单
save(voucherOrder);
//返回订单id
return Result.ok(orderId);
}
}
悲观锁升级后
@Override
public Result seckillVoucher(Long voucherId) {
//1、查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2、判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
//尚未开始
return Result.fail("秒杀还未开始!");
}
//3、判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())){
//已经结束
return Result.fail("秒杀已经结束");
}
//4、判断库存是否充足
if (voucher.getStock()<1){
//库存不足
return Result.fail("已经被抢完");
}
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()){//intern获取源对象,new多少次都只是从常量池中寻找
//拿到当前对象的代理对象(事务的对象)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);//要想让事务生效,必须要有代理对象
}
}
@Transactional
public Result createVoucherOrder(Long voucherId){
//5、一人一单
Long userId = UserHolder.getUser().getId();
//5.1查询订单
Integer count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
//判断是否已经存在
if (count>0){
//用户已经购买过
return Result.fail("你已经购买过这个券了");
}
//6、扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock=stock-1") //set stock=stock-1
.eq("voucher_id", voucherId).gt("stock", 0)//where id=? and stock=?只要大于0即可
.update();
if (!success)return Result.fail("库存不足");
//6创建秒杀券订单
VoucherOrder voucherOrder = new VoucherOrder();
//6.1订单id
long orderId = redisWorker.nextId("order");
voucherOrder.setId(orderId);
//6.2用户id
voucherOrder.setUserId(userId);
//6.3代金券id
voucherOrder.setVoucherId(voucherId);
//添加订单
save(voucherOrder);
//返回订单id
return Result.ok(orderId);
}
一人一单的并发安全问题
通过加锁可以解决在单机情况下的一人一单安全问题,但是在集群模式下就不行了。
1.我们将服务启动两份,端口分别为8081和8082:
2.然后修改nginx的conf目录下的nginx.conf文件,配置反向代理和负载均衡
nginx配置实现了反代理和负载均衡
发现线程不安全
集群下的锁监听器tomcat等不是同一个
分布式锁
什么是分布式锁
分布式锁:满足分布式系统或集群模式下多进程可见并且互斥的锁
分布式锁的实现
分布式锁的核心是是实现多进程之间互斥,常见的有三种
实现分布式锁时需要实现两个基本方法
获取锁:
- 互斥:确保只能有一个线程获取锁
- 非阻塞:尝试一次,成功返回true,失败返回false
添加锁,nx是互斥(当存在则不执行),ex是设置超时时间
local是键 thread1是值
释放锁:
- 手动释放
- 超时释放:获取锁时添加一个超时时间
释放锁,直接删除即可
业务流程
基于Redis实现分布式锁的初级版本
锁的类
public class SimpleRedisLock implements ILock{
private StringRedisTemplate stringRedisTemplate;
private String name;
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}
private static final String KEY_PREFIX="lock:";
//获取锁
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
long threadId = Thread.currentThread().getId();
//获取锁,这里的返回值是一个布尔型的包装类,直接返回有时会出现空指针异常
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId + "", timeoutSec, TimeUnit.SECONDS);
//和真比较,就不会出现异常
return Boolean.TRUE.equals(success) ;
}
//释放锁
@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX+name);
}
}
Long userId = UserHolder.getUser().getId();
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
//获取锁
boolean isLock = lock.tryLock(12);
//判断是否获取锁成功
if (!isLock){
//获取锁失败
return Result.fail("一个人只能下一单");
}
try {
//拿到当前对象的代理对象(事务的对象)
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);//要想让事务生效,必须要有代理对象
} finally {
//释放锁
lock.unlock();
}
线程存在问题
线程阻塞超时自动删除后,线程完成释放别的线程的锁
存在的线程阻塞超时自动删除后,线程释放别的线程的锁
改进分布式锁(判断线程和存的是否一致)
需求:修改之前的分布式锁实现,满足:
1.在获取锁时存入线程标示(可以用UUID表示)
2.在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致如果一致则释放锁
如果不一致则不释放锁
public void unlock() {
//获取线程标识
String threadId= ID_PREFIX+Thread.currentThread().getId();
//获取锁中的标识
String id=stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
//判断两个是否一致,从而判断是否为同一线程
if (threadId.equals(id)){
stringRedisTemplate.delete(KEY_PREFIX+name);
}
有并发安全分析
这里有个并发问题,即当判断完相同时,发生了阻塞,没来得及删除锁,被redis超时释放后,下一个线程来获取后,之前那个线程阻塞完成,就会释放掉锁,但是此时这把锁的拥有者不是他。
所以我们改进的是时候,应该保证,判断和删除在同一条语句中,即使用lua脚本可以保证原子性
Redis的Lua脚本
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种编程语言,它的基本语法大家可以参考网站: https://www.runoob.com/lua/lua-tutorial.html语法如下:
如,我们要执行set name jack则脚本是这样
列如。我们要先执行set name Rose,在执行get name ,则脚本如下
需要用Redis命令来调用脚本,调用脚本的常见命令如下:
例如,我们要执行 redis.call('set','name','jack') 这个脚本,语法如下:
脚本中可以从KEYS和ARGV数组获取这些参数:
如果脚本中的key、value不想写死,可以作为参数传递。key类型参数会放入KEYS数组,其它参数会放入ARGV数组,在
resoure下创建unlock.lua
--比较线程标识与锁中是否一致
if (redis.call('get',KEYS[1])==ARGV[1]) then
-- 释放锁 del key
return redis.call('del',KEYS[1])
end
return 0
private static final String KEY_PREFIX = "lock:";
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
//获取锁
@Override
public boolean tryLock(long timeoutSec) {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁,这里的返回值是一个布尔型的包装类,直接返回有时会出现空指针异常
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
//和真比较,就不会出现异常
return Boolean.TRUE.equals(success);
}
//释放锁
@Override
public void unlock() {
//调用Lia
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX+name),
ID_PREFIX+Thread.currentThread().getId());
}
基于Redis的分布式锁实现思路
- 利用set nxex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用setnx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
还存在的问题
基于setnx实现的分布式锁还存在下面的问题
不可重入:同一个线程无法多次获取同一把锁
不可重试:获取锁只尝试一次就返回false,没有重试机制
超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
主从一致性:如果Redis提供了主从集群主从同步存在延迟,当主宕机时,如果从并同步主中的
锁数据,则会出现锁实现
Redission实现分布式锁
Redisson是一个在Redis的基础上实现的ava驻内存数据网格 (In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。
官网地址: https://redisson.org
GitHub地址: https://github.com/redisson/redisso
Redisson入门
导入maven地址
<!--redisson-->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
创建配置文件
package com.hmdp.config;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Bean
public RedissonClient redissonClient(){
//配置
Config config=new Config();
config.useSingleServer().setAddress("redis://localhost:6379");
//创建RedissonClient对象
return Redisson.create(config);
}
}
//创建锁对象
// SimpleRedisLock lock = new SimpleRedisLock(stringRedisTemplate, "order:" + userId);
RLock lock = redissonClient.getLock("lock:order:" + userId);
//获取锁,数量分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);
if(isLock){
try {
System.out.println("执行业务");}
finally {
// 释放锁
lock.unlock();}
Redisson可重入锁原理
获取锁的Lua脚本
释放锁的Lua脚本
Redisson分布式锁的原理
Redisson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时续约:用于watchDog.每隔一段时间(releaseTime/3),重置超时时间
总结
1)不可重入Redis分布式锁
原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判断线程标示
缺陷:不可重入、无法重试、锁超时失效
2)可重入的Redis分布式锁:
原理:利用hash结构,记录线程标示和重入次数;利用watchDog延续锁时间;利用信号量控制锁重试等待
缺陷: redis宕机引起锁失效问题
3)Redisson的multiLock:
原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
缺陷:运维成本高、实现复杂
Redis秒杀优化(暂未实现)
达人探店
点赞功能
/**
* 博客点赞
*
* @param id
* @return
*/
@Override
public Result likeBlog(Long id) {
//获取blog实体
Blog blog = getById(id);
//获取博客id
Long blogId = blog.getId();
//获取登录当前用户id
Long userId = UserHolder.getUser().getId();
//拼接key
String key = BLOG_LIKED_KEY + blogId;
//去redis中看有没有点赞过,查询这个key是否存在
Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
//不存在,即没点赞过
if (BooleanUtil.isFalse(isMember)) {
//修改数据库,让liked加1
boolean isSuccess = update().setSql("liked=liked+1").eq("id", id).update();
if (isSuccess) {
//修改成功后,将自定义的blog的isLike(是否点赞过)改为true
// blog.setIsLike(true);不能,因为这不是数据库中的字段存不了
//将这个点赞信息加到redis缓存中
stringRedisTemplate.opsForSet().add(key, userId.toString());
}
} else {
//存在,即点赞过,则取消赞
boolean isSuccess = update().setSql("liked=liked-1").eq("id", id).update();
if (isSuccess) {
//修改,删除
stringRedisTemplate.opsForSet().remove(key, userId.toString());
}
}
return Result.ok();
}
实现点赞排行榜功能
/**
* 博客点赞排序
* @param id
* @return
*/
@Override
public Result queryBlogLikes(long id) {
String key =BLOG_LIKED_KEY+id;
//1查询top5的点赞用户 zrange key 0 4
Set<String> topRange = stringRedisTemplate.opsForZSet().range(key, 0, 4);
if (topRange==null||topRange.isEmpty()){
return Result.ok(Collections.emptyList());
}
//2解析出其中的用户id
List<Long> ids = topRange.stream().map(Long::valueOf).collect(Collectors.toList());
String idStr = StrUtil.join(",", ids);
//3根据用户id查询用户where id in (5,1) order by field (id,5,1)
List<UserDTO> userDTOS = userService.query().in("id", ids).last("order by field (id," + idStr + ")").list().stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}
关注和取关
@Override
public Result follow(Long followUserId, boolean isFollow) {
//1获取当前登录用户
Long userId = UserHolder.getUser().getId();
if (userId==null)return Result.fail("还没有登录");
Follow follow = new Follow();
//2判断为关注还是取关
if (isFollow){
//为关注,新增follow数据
follow.setUserId(userId);
follow.setFollowUserId(followUserId);
save(follow);
}else {
//为取关,删除follow数据
LambdaUpdateWrapper<Follow> updateWrapper=new LambdaUpdateWrapper();
updateWrapper.eq(Follow::getUserId, userId)
.eq(Follow::getFollowUserId,followUserId);
//删除数据
this.remove(updateWrapper);
}
return Result.ok();
}
@Override
public Result isFollow(Long followUserId) {
//1获取当前登录用户
Long userId = UserHolder.getUser().getId();
if (userId==null)return Result.fail("还没有登录");
Integer count = query().eq("user_id", userId).eq("follow_user_id", followUserId).count();
//判断是否有数据
return Result.ok(count>0);
}
查看共同关注
/**
* 被查看的人和我的共同关注
*
* @param checkedUserId
* @return
*/
@Override
public Result commonConcernPerson(Long checkedUserId) {
//1获取当前登录用户
Long userId = UserHolder.getUser().getId();
if (userId == null) return Result.fail("还没有登录");
String key = "follows:" + userId;
//2求交集
String key2 = "follows:" + checkedUserId;
Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key, key2);
if (intersect==null||intersect.isEmpty()){
//没有交集
return Result.ok(Collections.emptyList());
}
//解析出id集合
List<Long> ids = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
//4、根据id查询用户,转为userDto
List<UserDTO> userDTOS = userService.listByIds(ids)
.stream()
.map(user -> BeanUtil.copyProperties(user, UserDTO.class ))
.collect(Collectors.toList());
return Result.ok(userDTOS);
}
关注推送
关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
feed流模式
Feed流产品有两种常见模式:
Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈优点:信息全面,不会有缺失。并且实现也相对简单
缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户> 优点: 投喂用户感兴趣信息,用户粘度很高,容易沉迷
缺点:如果算法不精准,可能起到反作用
对于普通人:直接发送到他的粉丝下
对于大v:活跃粉丝直接给他推送,不活跃粉丝放在收件箱,等他要读的时候推
feed流实现方案
需求
修改新增探店笔记的业务,在保存blog到数据库的同时,推送到粉丝的收件箱
收件箱满足可以根据时间戳排序,必须用Redis的数据结构实现
查询收件箱数据时,可以实现分页查询
不能使用传统的分页,得使用滚动分页
@GetMapping("/of/follow")
public Result queryBlogOfFollow(
@RequestParam("lastId") Long max,@RequestParam(value = "offset",defaultValue = "0") Integer offset){
return blogService.queryBlogOfFollow(max,offset);
}
@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
//1获取当前用户
Long userId = UserHolder.getUser().getId();
//2查询收件箱 ZREVRANGEBYSCORE key Max Min LIMiT offset count
String key =FEED_KEY+userId;
Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
.reverseRangeByScoreWithScores(key, 0, max, offset, 2);
//3判断是否为空
if (typedTuples==null||typedTuples.isEmpty()){
return Result.ok();
}
//4解析数据:blogId,minTime(时间戳),offset
List<Long> ids=new ArrayList<>(typedTuples.size());
long minTime=0;
int offNum=1;
//统计有多次offNum(和最小的相同的个数)
for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
//获取id
ids.add(Long.valueOf(tuple.getValue()));
//获取分数(时间戳)
long time = tuple.getScore().longValue();
if (time==minTime){
offNum++;
}else {
minTime=time;
offNum=1;
}
}
//5,根据id查询blog
String idStr =StrUtil.join(",", ids);
List<Blog> blogs = query().in("id", ids).last("order by field (id," + idStr + ")").list();
//把博客相关信息点赞填充
for (Blog blog : blogs) {
//查询blog有关用户
queryBlogUser(blog);
isLikeBlog(blog);
}
//5封装并返回
ScrollResult scrollResult = new ScrollResult();
scrollResult.setList(blogs);
scrollResult.setOffset(offNum);
scrollResult.setMinTime(minTime);
return Result.ok(scrollResult);
}
Redis最佳实践
Redis键值设计
优雅的key结构
Redis的key虽然可以自定义,但最好遵循下面结构最佳实践的约定:
- 遵循基本格式:[业务名称]:[数据名]:[id]
- 长度不超过44字节
- 不包含特殊字符
例如:在登录业务,保存用户信息,key是这样:login:user:1
优点:
- 可读强
- 避免key冲突
- 方便管理
更节省内存:key是string类型,底层编码包含int、embstr、和raw三种,embstr在小于44字节使用,采用连续的空间,内存占用更小