全局唯一ID
全局唯一ID生成策略:
- UUID
- Redis自增
- snowflake算法
- 数据库自增
Redis自增ID策略: - 每天一个key,方便统计订单量
- ID构造是 时间戳 + 计数器
@Component
public class RedisIdWorker {
// 2024的第一时刻
private static final long BEGIN_TIMESTAMP = 1704067200L;
private static final int COUNT_BITS = 32;
private final StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix){
// 1. 获取当前时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timeStamp = nowSecond - BEGIN_TIMESTAMP;
// 2. 获取序列号
// 2.1 获取当天日期,精确到天
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
// 2.2 自增
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
// 3. 拼接成ID
return timeStamp << COUNT_BITS | count;
}
}
超卖问题
在处理大量请求时,可能会出现超卖问题。
可以通过加锁解决。
这里采用的是乐观锁。
一人一单
该任务需要每名用户只能抢到一张优惠券。
同时还要考虑到后端部署在多个服务器上可能会出现的异常,此时需要使用分布式锁进行解决。
这里的分布式锁基于Redis实现。
基于Redis的分布式锁实现思路
基于Redis的分布式锁实现思路:
- 利用set nx ex获取锁,并设置过期时间,保存线程标示
- 释放锁时先判断线程标示是否与自己一致,一致则删除锁
特性:
- 利用set nx满足互斥性
- 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
- 利用Redis集群保证高可用和高并发特性
使用Redis优化秒杀
这里将库存判断与一人一单的校验使用Redis完成。
具体流程为:
- 新增秒杀优惠券的同时,将优惠券信息保存到Redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
这里的阻塞队列是基于Stream的消息队列
STREAM类型消息队列的XREADGROUP命令特点: - 消息可回溯
- 可以多消费者争抢消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
新增秒杀优惠券的同时,将优惠券信息保存到Redis中
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
// 保存优惠券
save(voucher);
// 保存秒杀信息
SeckillVoucher seckillVoucher = new SeckillVoucher();
seckillVoucher.setVoucherId(voucher.getId());
seckillVoucher.setStock(voucher.getStock());
seckillVoucher.setBeginTime(voucher.getBeginTime());
seckillVoucher.setEndTime(voucher.getEndTime());
seckillVoucherService.save(seckillVoucher);
// 保存优惠券至Redis
stringRedisTemplate.opsForValue().set(RedisConstants.SECKILL_STOCK_KEY +voucher.getId(), voucher.getStock().toString());
}
基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
-- 1.参数列表
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 2. 数据key
local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId
-- 3. 判断库存是否充足
if tonumber(redis.call('get', stockKey) )< 1 then
-- 库存不足
return 1
end
-- 4. 判断用户是否已经抢购过
if redis.call('sismember', orderKey, userId) == 1 then
-- 已经抢购过
return 2
end
-- 5. 减库存
redis.call('incrby', stockKey, -1)
-- 6. 记录用户抢购信息
redis.call('sadd', orderKey, userId)
redis.call('xadd', "stream.orders", "*", "userId", userId, "voucherId", voucherId,"id", orderId)
return 0
异步下单
LUA脚本
-- 1.参数列表
local voucherId = ARGV[1]
local userId = ARGV[2]
local orderId = ARGV[3]
-- 2. 数据key
local stockKey = "seckill:stock:" .. voucherId
local orderKey = "seckill:order:" .. voucherId
-- 3. 判断库存是否充足
if tonumber(redis.call('get', stockKey) )< 1 then
-- 库存不足
return 1
end
-- 4. 判断用户是否已经抢购过
if redis.call('sismember', orderKey, userId) == 1 then
-- 已经抢购过
return 2
end
-- 5. 减库存
redis.call('incrby', stockKey, -1)
-- 6. 记录用户抢购信息
redis.call('sadd', orderKey, userId)
redis.call('xadd', "stream.orders", "*", "userId", userId, "voucherId", voucherId,"id", orderId)
return 0
从消息队列取出,处理代码
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private IVoucherOrderService proxy;
// 静态代码块加载Lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
// private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
// 线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@PostConstruct
// 完成类的construct即执行下面的函数
public void init() {
// 交给线程池做
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
String queueName = "stream.orders";
// 消费线程
private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
// 4.1从消息队列中取出订单 xreadgroupnngroup g1 c1 count 1 block 200 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 判断是否成功
// 没有
if (list == null || list.isEmpty()) {
continue;
}
// 有
// 4.2创建订单,解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理失败", e);
handlePendingList();
}
}
}
}
private void handlePendingList() {
while (true) {
try {
// 4.1从消息队列中取出订单 xreadgroupnngroup g1 c1 count 1 block 200 streams streams.order >
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 判断是否成功
// 没有
if (list == null || list.isEmpty()) {
// pendingList 没有消息
break;
}
// 有
// 4.2创建订单,解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
handleVoucherOrder(voucherOrder);
// ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("订单处理失败", e);
try {
Thread.sleep(200);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
}
private void handleVoucherOrder(VoucherOrder voucherOrder) {
// 因为为子线程,userId只能从数据中取
Long userId = voucherOrder.getUserId();
RLock lock = redissonClient.getLock("order:" + userId);
boolean isLock = lock.tryLock();
if (!isLock) {
log.error("重复抢购");
return ;
}
try {
proxy.createVoucherOrder(voucherOrder);
return;
} finally {
lock.unlock();
}
}
创建订单
@Transactional
public Result createVoucherOrder(VoucherOrder voucherOrder) {
// 4.一人一单
Long userId = voucherOrder.getUserId();
// 4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
if (count > 0) {
return Result.fail("每人限购一张");
}
// 4.是
// 4.1扣减库存,基于乐观锁
boolean success = seckillVoucherService.update().setSql("stock=stock-1").eq("voucher_id", voucherOrder.getVoucherId()).
gt("stock", 0).update();
if (!success) {
return Result.fail("库存不足");
}
//4.2创建订单
save(voucherOrder);
//4.3返回订单id
return Result.ok(voucherOrder.getId());
}
@Override
public Result seckillVoucher(Long voucherId) {
Long userId = UserHolder.getUser().getId();
// 0. 生成订单id
long orderId = redisIdWorker.nextId("order");
// 1. 执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString(), String.valueOf(orderId)
);
// 2. 结果是否为0
int r = result.intValue();
// 2.1 不为0
if (r!=0) {
return Result.fail(r == 1 ? "库存不足" : "不能重复抢购");
}
// 注解底层基于aop实现,需要获得代理对象,进行执行
proxy = (IVoucherOrderService)AopContext.currentProxy();
// 3. 返回结果订单id
return Result.ok(orderId);
}