文章目录
- 优惠券秒杀
- 全局ID生成器
- 优惠券秒杀下单
- 超卖问题
- 一人一单
- 分布式锁
- 基于Redis的setnx指令实现分布式锁
- 解决锁误删问题
- 基于Lua脚本实现多条指令原子性
- Redis调用Lua脚本
- Java中使用Lua脚本
- Redisson
- Redisson快速入门
- Redisson可重入锁原理
- Redisson的锁重试和Watchdog机制
- Redisson的multilock
- 秒杀优化
- Redis缓存解耦
- Redis消息队列
- 基于List结构
- 基于PubSub的消息队列
- 基于Stream的消息队列
- 基于Stream的消息队列 - 消费者组
优惠券秒杀
全局ID生成器
- 第一位为符号位,永远为0
- 2-32位为时间戳差值,指定从某一个时刻开始,计算当前的时间戳与起始时间戳的差值,保证了id的自增性,但不一定是连续的。
- 后32位,可以采用分区+序列号的方式。(分布式)
本质上跟mybatis-plus的雪花算法是一样的。
public class RedisIdWorker {
/**
* 开始时间戳
*/
private static final long BEGIN_TIMESTAMP = 1640995200L;
/**
* 序列号的位数
*/
private static final int COUNT_BITS = 32;
private final StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix) {
// 1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
// 2.生成序列号
// 2.1.获取当前日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2.自增长
// 此处的警告可以忽略,因为如果key不存在,会从0开始增长。
// 这里key前缀以日期构建的目的是为了避免自增超出序列号范围,同一天下单数量一定不会超出32位
long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3.拼接并返回
// timestamp 左移 32位,变到高位,那么低32位为0,或上count即可,count一定不会超出32位范围。
return timestamp << COUNT_BITS | count;
}
}
测试:
编写一个runnable 的任务task,循环100次,执行自增id测试。
构建一个固定工作线程数为300的线程池,循环将线程池中提交task。那么最终相当于是自增id 3万(100 * 300)次
使用CountDownLatch来帮助计时,因为我们用到了线程池,线程池的执行是异步的,因此简单使用end - begin,当执行到end时,可能还有未执行完毕的异步线程。
而使用CountDownLatch,则可以帮助我们标记异步线程,latch.await();会等待所有异步线程执行完毕。
@Resource
private RedisIdWorker redisIdWorker;
private final ExecutorService es = Executors.newFixedThreadPool(500);
@Test
public void testIdWorker() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; ++i) {
long id = redisIdWorker.nextId("order");
System.out.println("id:" + id);
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; ++i) {
es.submit(task);
}
latch.await();
long end = System.currentTimeMillis();
System.out.println("time cost:" + (end - begin));
}
优惠券秒杀下单
超卖问题
一个线程查询有库存,尚未扣除库存,另外一个线程也执行了库存查询,由于此外前面的线程还没来得及扣除库存,因此后来的线程也可以执行下单。
使用乐观锁解决超卖问题,仅当数据库更新操作时,剩余库存依旧大于0,才执行成功。
public class IVoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
RedisIdWorker redisIdWorker;
// 因为设计两张表操作,使用事务保证操作连续性
@Transactional
@Override
public Result secKillVoucher(Long voucherId) {
// 1. 查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
// 3. 判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
// 4. 判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
// 5. 扣减库存
// 使用乐观锁解决超卖问题,仅当数据库更新操作时,剩余库存依旧大于0,才执行成功。
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!success) {
return Result.fail("库存不足!");
}
// 6. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 6.1 创建订单id,使用全局生成器
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 6.2 用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 6.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 7. 返回订单id
return Result.ok(orderId);
}
}
一人一单
需求:修改优惠券秒杀业务,同一个用户只能下一单。
主要问题:
1、为了保证一人一张,需要根据用户id和和优惠券id查询是否已经下单过,该过程需要上锁避免线程安全问题。
2、锁对象可以是用户id的字符串形式,保存在常量池中。
3、锁的范围应该在事务提交之后,因此最好将整个方法上锁。
4、掉用本类中的方法,可能导致事务失效,解决方案是使用代理对象中的方法。
(1)添加依赖:
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
(2)启动类中暴露代理对象给spring容器:
(3)使用容器中的代理执行方法。
synchronized (userId.toString().intern()) {
// 防止事务失效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
完整逻辑如下:
public class IVoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
RedisIdWorker redisIdWorker;
@Override
public Result secKillVoucher(Long voucherId) {
// 1. 查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
// 3. 判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
// 4. 判断库存是否充足
if (voucher.getStock() < 1) {
// 库存不足
return Result.fail("库存不足!");
}
//用户id
Long userId = UserHolder.getUser().getId();
// 使用.intern(),使得从字符串常量池中取得唯一的id对象,作为锁对象
synchronized (userId.toString().intern()) {
// 防止事务失效
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
// 因为设计两张表操作,使用事务保证操作连续性
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5 一人一单
Integer count = query()
.eq("user_id", UserHolder.getUser().getId())
.eq("voucher_id", voucherId)
.count();
if (count > 0) {
// 用户已经秒杀过优惠券
return Result.fail("用户已经购买过一次!");
}
// 6. 扣减库存
// 使用乐观锁解决超卖问题,仅当数据库更新操作时,剩余库存依旧大于0,才执行成功。
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.gt("stock", 0)
.update();
if (!success) {
return Result.fail("库存不足!");
}
// 7. 创建订单
VoucherOrder voucherOrder = new VoucherOrder();
// 7.1 创建订单id,使用全局生成器
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 7.2 用户id
Long userId = UserHolder.getUser().getId();
voucherOrder.setUserId(userId);
// 7.3 代金券id
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
// 8. 返回订单id
return Result.ok(orderId);
}
}
上述方案在集群模式下依旧会有问题,因为锁对象是字符串常量池中的用户id,集群模式下,不同的服务器会有不同的JVM,因此锁对象就不唯一了。
解决方案就是使用分布式锁。
分布式锁
分布式锁:满足分布式系统或集群模型下,多进程可见并且互斥的锁。
常见的分布式锁实现方案有三种:
- 基于MySQL本身的互斥锁机制
- 基于Redis的setnx这样的互斥命令
- 基于Zookeeper利用节点的唯一性和有序性
基于Redis的setnx指令实现分布式锁
假定服务器集群共用一个第三方的Redis,那么就可以在Redis上,使用一个lock
为key,threadid
为值的键值对来表示锁对象。
模拟获取锁:
- 保证互斥,确保只能有一个线程获取到锁
- 非阻塞:尝试一次,成功返回true,失败返回false
- 为了避免释放锁的操作失败,导致后序永远无法获取到锁,应该为锁设置有效期,逾期自动释放。
Redis命令:
set lock threadId nx ex 10
模拟释放锁:
直接删除 lock
即可
del lock
在Java中实现如下,注意点为:
- 准备Redis操作需要的
StringRedisTemplate
,为了不同的业务使用不同的锁,应该在锁对象的key上加上业务名称name
,这两个变量通过构造函数传入。 - 模拟获取锁函数
tryLock()
,返回布尔值,代表是否成功获取到锁,可指定锁的TTL。.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
,指定锁的键值对,key为lock前缀 + 业务名,值为线程id。 - 模拟释放锁
unlock()
,直接根据key删除代表锁的键值对。
public class SimpleRedisLock implements ILock {
private String name;
private StringRedisTemplate stringRedisTemplate;
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
private static final String KEY_PREFIX = "lock:";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate
.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
解决锁误删问题
上边版本的分布式锁实现,有可能出现锁误删的问题,具体情形如下:
- 线程1获取到锁,因为业务阻塞,导致阻塞时间长于锁自动释放时间。
- 线程2在锁自动释放后,获取到锁,执行业务,在执行过程中,线程1完成业务,释放锁,但此时Redis中的锁已经是由线程2创建的锁对象了,而被线程1删除了。
- 线程1删除了锁,因此线程3可以继续获取到锁,那么此时线程2和线程3已经是并行执行了,违反了锁的互斥性!!!。
那么解决办法就是在删除锁字段,即释放锁的时候,检查一下,当前的锁释放是之前自己获取到的锁!!。
主要的修改有两处:
- 获取锁的时候,存入线程唯一标识,由于集群情况下,不同集群的不同线程id可能一样,采用UUID来拼接线程id,构,保证标识唯一性。
- 在释放锁的时候,判断是否与当前线程标识一致,如果不一致,则不释放锁(避免误删别的线程的锁)。
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";
@Override
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate
.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
@Override
public void unlock() {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁中的标示
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
// 判断标示是否一致
if(threadId.equals(id)) {
// 释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}
基于Lua脚本实现多条指令原子性
判断锁标识是否一致和释放锁不是原子性的,在这个间隙,可能再次导致线程安全问题。
解决方法是借助lua脚本来保证指令执行的原子性。
Redis调用Lua脚本
- Redis使用
EVAL
可以用于执行脚本,Lua脚本中使用redis.call()
,可以用于执行Redis指令。 - 使用
EVAL
指令时,可以指定脚本需要操作的key类型的参数个数,后边跟上keys列表和argv列表,这样在脚本中就可以直接使用传入的参数。需要注意的是在Lua脚本中,数组索引下标从1开始,因此KEYS[1]就表示name, 而ARGV[1]就表示Rose
Java中使用Lua脚本
1 、在Resource目录下编写unlock.lua脚本:
2、配置Redis脚本调用对象DefaultRedisScript
,指定脚本路径和返回值类型。
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
// 指定脚本路径
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
// 设置返回类型
UNLOCK_SCRIPT.setResultType(Long.class);
}
3、在unlock中使用stringRedisTemplate执行UNLOCK_SCTRIPT调用lua脚本保证操作的原子性。
@Override
public void unlock() {
// 调用lua脚本
stringRedisTemplate.execute(
UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}
Redisson
基于Lua脚本优化后的Redis分布式锁已经能够满足大部分场景下的业务需求,然而它还是具有一些不足:
- 1、锁不可重入
- 2、获取锁,不可重试
- 3、超时释放虽然避免了死锁,但是业务执行耗时较长,也会导致锁释放,存在安全隐患。
- 4、主从一致性,如果Redis提供了主从集群(读操作,使用从节点,写操作使用主节点),那么主从同步是存在延时的,当主服务器宕机,从节点尚未同步时,则会出现锁互斥失效。
为了实现上述这些高级功能,我们可以借助,Redisson,一个基于Redis的分布式锁框架。
官网地址
Redisson快速入门
- 引入依赖
- 配置Redisson客户端,在配置类中使用@Bean注解,将Redisson客户端类注入到IoC容器,交由Spring管理。
- 使用Redisson的分布式锁
Redisson可重入锁原理
可重入的原理与synchronized这类可重入锁原理类似,在Redis中使用setnx,存放一个hash类型的数据,field为锁的值,value为当前获取锁的次数。
- 首先判断锁是否存在,如果不存在,获取锁并添加线程标识,设置锁的有效期。
- 如果锁已经存在,根据锁标识判断锁是否属于该线程,如果属于将锁计数+1,否则获取锁失败。
- 业务执行完毕时,将锁计数减1,当锁计数减为0时释放锁,否则重置锁的有效期。
- 上述逻辑需要保证原子性,因此,所有的操作应该使用Lua脚本来实现。
Redisson的锁重试和Watchdog机制
- Redisson分布式锁实现了尝试重新获取锁的功能,在尝试获取锁的时候,可以传入最大等待时间
wait_time
和锁自动释放时间lease_time
。 - 尝试获取锁时,如果获取锁成功返回null,否则返回剩余的最大等待时间pttl,以毫秒为时间单位。如果剩余最大等待时间大于0,那么会订阅并等待释放锁的信号。
- 相应的,锁在释放时,会发布锁释放的消息,所有订阅该消息的线程都会接收。接收到后,需要判断此时等待是否超时,如果超时,则锁获取失败,否则重新尝试获取锁。
- 如果锁自动释放时间不为-1, 那么在获取锁成功时,Redisson内部采用了看门狗机制,开启watchDog机制,不停的更新锁的有效期(开启一个任务,在锁释放时间的1/3长后执行,执行的任务为本身,即递归调用,每1/3,重置有效期),这种看门狗机制,也是在锁释放时取消的。
Redisson的multilock
使用多个分布式Redis节点,每个Redis上构建一个锁,每次操作获取锁的时候,需要同时能够从多个Redis节点成功获取到锁,才视为成功获取到锁。
这种方式实际上构成了一个连锁,缺点在于运维成本高,实现复杂。
@BeforeEach是一种在软件开发中常见的测试框架中使用的注解。它通常用于JUnit或其他类似的单元测试框架中,用于标记在每个测试方法之前执行的设置操作。
使用:
秒杀优化
Redis缓存解耦
原始的秒杀业务需求,首先得判断秒杀库存,然后查询订单检验是否符合一人一单,从而锁定秒杀资格,随后再通过操作数据库修改库存,创建订单。
整个流程串联步骤较多,且频繁操作数据库,导致响应较慢。
其实业务可以拆解为两步:锁定秒杀劵和生成秒杀劵。锁定秒杀劵的请求对高并发的要求更严格,可以通过Redis缓存来实现,在锁定秒杀劵后,相当于用于订餐,给了用户一张小票,这张小票的信息会保存在阻塞队列中,开启一个异步线程来消费阻塞队列中的订单,生成相应的订单到数据库中。
具体实施时,可以采用lua脚本实现对Redis的操作,确保代码执行的原子性,异步线程对于阻塞队列的处理可以参照数据库的连接性能来构建。
Redis消息队列
基于阻塞队列来处理Redis生成的优惠券订单,有很大的问题:当高并发、高优惠券发放时,阻塞队列的长度却是有限的,而受限于JVM的内存,阻塞队列设置太大,很有可能导致OOM。
为此,应该使用消息队列在存放Redis生成的优惠券订单消息。
对于大型规模的消息处理场景,可以使用kafka、rabbitMq、rocketMq。
小规模场景,可以使用Redis自带的消息队列服务:
基于List结构
使用BRPOP、BLPOP来实现阻塞效果。
基于List消息队列的优缺点:
优点:
- 利用Redis存储,不受限于JVM内存上限
- 基于Redis持久化机制,数据安全性有保证
- 可以保证消息的有序性
缺点:
- 如果消息处理过程中,出现异常,则消息就丢失了
- 只支持单消费者模式。
基于PubSub的消息队列
相比于List结构的消息队列,基于PubSub的消息队列摩擦,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
基于Stream的消息队列
可以基于阻塞方式和&符号,读取最新的消息。
但是有漏读消息的风险,因为在读取到一条消息,并且消费消息的时候,这期间又来了多条消息,但是只能读取到最后发来的这条。
基于Stream的消息队列 - 消费者组
消费者组:将多个消费者划分到一个组中,监听同一个队列,具备以下特点:
- 消息分流:队列中的消息会分流给组内不同的消费者,而不是重复消费,从而加快消息处理的速度。
- 消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还是会从标识之后开始读取消息,确保每一个消息都会被消费。
- 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list,当处理完毕后,需要通过XACK确认消息,标记消息为已处理,才会从pending-list移除。