Reids实战—黑马点评(三)秒杀篇
来自黑马的redis课程的笔记
【黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目】
目录
- Reids实战—黑马点评(三)秒杀篇
- 一、全局唯一ID
- 小结
- 二、实现优惠券秒杀下单
- 三、超卖问题
- 3.1 问题描述
- 3.2 乐观锁和悲观锁
- 3.3 乐观锁实现
- 3.3.1 版本号法
- 3.3.2 CAS(Compare And Swap)
- 3.4 小结
- 四、一人一单
- 4.1 添加功能
- 4.2 并发问题
- 4.3 字符串问题
- 4.4 Spring事务问题
- 4.4.1 锁在事务内
- 4.4.2 事务不生效
- 4.5 集群模式下的一人一单问题
- 五、分布式锁
- 5.1 分布式锁概述
- 5.2 基于redis实现分布式锁
- 5.2.1 锁误删
- 5.2.2 Lua脚本保证命令原子性
- 5.3 小结
- 六、Redisson
- 6.1 使用Redisson分布式锁
- 6.2 Redisson分布式锁原理
- 6.2.1 可重入原理
- 6.2.2 可重试原理
- 6.2.3 解决超时释放
- 6.2.4 保证主从一致
- 6.2.5 小结
- 七、redis优化秒杀
- 八、Redis消息队列实现异步秒杀
- 8.1 基于List结构模拟消息队列
- 8.2 PubSub发布订阅模式
- 8.3 基于Stream结构的消息队列
- 8.3.1 基本使用
- 8.3.2 消费者组
- 8.3.3 改造秒杀业务
- 8.3.4 小结
一、全局唯一ID
每一个订单都需要不同的ID,如何做到ID唯一?
如果似乎用自增:
- id规律明显(安全问题)
- 受单表数据量限制
为了解决这些问题,我们有了全局ID生成器,这是一种生成全局唯一ID(或者叫分布式唯一ID)的工具,所生成的ID满足以下特征:
- 唯一性
- 高可用
- 高性能
- 递增性
- 安全性(复杂递增)
唯一性和递增性我们可以用redis自增来实现,恰好,redis本身就满足高可用、高性能,安全性如何解决?
方案:
用Long(数值效率比字符串高)类型,一个Long类型占64位(Java中),我们可以在这些bit位上做文章。
首先:从左数第一位,是符号位,我们的id永远为正数,所以让它永远为0。接下来的31位,用来存时间戳,保证安全性的同时,让id基本不可能重复。最后的32位,我们存普通的递增值。这个方案,理论上可以保证68年都不会有id重复。
为了避免最后的自增值超出redis自增限制,我们可以每天新建一个key用于自增(例如,yyyy:MM:dd),不仅解决了问题,还方便统计。
实现:
@Component
public class RedisIdWorker {
private StringRedisTemplate stringRedisTemplate;
// 起始的时间戳值
private static final Long BEGIN_TIMESTAMP = 1640995200L;
public RedisIdWorker (StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
/**
* @param “业务key前缀“
* */
public long nextId(String keyPrefix) {
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
long count = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + data);
// 位运算提高效率
return timestamp << 32 | count;
}
}
起始的时间戳的值可以这样计算:
@Test
void timeTest() {
LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
long epochSecond = time.toEpochSecond(ZoneOffset.UTC);
System.out.println(epochSecond);
}
测试:
ExecutorService executorService = Executors.newFixedThreadPool(500);
@Test
void idTest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(300);
Runnable task = () -> {
for (int i = 0; i < 100; i++) {
redisIdWorker.nextId("order");
}
latch.countDown();
};
long begin = System.currentTimeMillis();
for (int i = 0; i < 300; i++) {
executorService.submit(task);
}
// 必须阻塞 如果不阻塞,则程序结束时,我们的任务还没有结束
latch.await();
long end = System.currentTimeMillis();
System.out.println(end - begin);
}
收获的小技巧:
-
LocalDateTime的使用
// 使用of可以快速获取指定时间的LocalDateTime对象 LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0); // 使用toEpochSecond可以快速获取相应时间的时间戳 long epochSecond = time.toEpochSecond(ZoneOffset.UTC); // 使用format方法快速格式化时间 LocalDateTime now = LocalDateTime.now(); String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
-
并发包的小技巧
// 快速创建线程池,但是阿里的Java开发规范不建议这样做 ExecutorService executorService = Executors.newFixedThreadPool(500); // 快速创建一个任务,使用lambda表达式 Runnable task = () -> { …… }; // 提交任务 executorService.submit(task);
-
位运算
更好的利用bit位来处理信息
小结
uuid效率较低,且不满足自增性。
Redis自增,就是刚刚的方案,很常用。
雪花算法,类似于刚刚的方案,区别在于雪花算法的最后32位是以机器时间为基准,Redis是自增。
数据库自增,也就是数据库版本的Redis方案,效率不如Redis。
二、实现优惠券秒杀下单
按照流程图,实现非常简单,但会出现非常多的问题(详见后文)。
小收获:
使用lambdaUpdate
// 减库存操作
lambdaUpdate().setSql("stock = stock -1")
.eq(SeckillVoucher::getVoucherId, voucherId)
.update();
三、超卖问题
3.1 问题描述
在刚刚的流程中,是有很多问题隐患的,比如,经典的超卖问题。
在并发量较高的情况下,假如剩余最后一张票,在购买最后一张票的线程还未扣减库存时,其他线程进入,查询库存,都会查到还有余票的结果,此时再进行库存判断,并扣减库存,就会发生超卖。
如何解决超卖问题?
最简单的方法就是上锁,同一时间只让一个线程去执行查库存->扣减库存的操作。
3.2 乐观锁和悲观锁
加锁的话有两种锁可以加:
悲观锁:比较悲观,认为线程安全问题一定会发生,因此操作数据前先获取锁,确保线程串行执行。(如synchronized、Lock)
乐观锁:比较乐观,认为线程安全问题不一定发生,所以不加锁,而是在更新数据时去判断数据有没有被其他线程修改。若没有则更新,反之重试或异常。
3.3 乐观锁实现
悲观锁的解决方案较为简单,即在可能出问题的代码上加synchronized或Lock锁住。我们介绍一下乐观锁的解决方案:
常见两种方法:
3.3.1 版本号法
每次更新数据时,更新版本号,并判断版本号是否被修改。
3.3.2 CAS(Compare And Swap)
以数据本身为版本号
3.4 小结
秒杀场景下,悲观锁性能较差,乐观锁成功率较低。
小优化:更新库存时,不用判断库存是否和查询到的库存相同,只需要库存大于零即可,这样成功率大大提高。
四、一人一单
4.1 添加功能
某些业务我们需要用户只能购买一单,于是我们多加一个用户是否已经购买的判断:
从ThreadLocal中取出当前用户的id,根据用户和商品id查询用户是否已经有订单存在,若存在则失败。
4.2 并发问题
按照刚刚的业务流程,正常用户单线程操作的情况下是没有问题的,但是若是恶意用户,同时开多个线程访问我们的接口,则可能出现这样一种情况:
若多个线程同时进入该方法,在任一线程未保存订单前,都进行查询用户是否下单,得到的count都是0,拿到这个0后,再去判断是否一人一单则都会成功,都会下单,造成了一人多单的问题。
解决:给该方法加锁,因为涉及两个表的写操作,我们添加事务。
写完过后以看,这不成了悲观锁嘛!里面的乐观锁还拿来干嘛呢?
经过思考:我们只需要限制单个用户的并发操作,只需要锁住userId即可,于是我们不再锁整个方法。
该方法改进为:
@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString()) {
......
return Result.ok(voucherOrder.getId());
}
}
4.3 字符串问题
接着我们会发现锁不住,因为我们toString方法会创建一个新的字符串,不同的线程的userid.toString()出来的字符串不是同一个对象。
解决:使用intern方法
@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
Long userid = UserHolder.getUser().getId();
synchronized(userid.toString().intern()) {
......
return Result.ok(voucherOrder.getId());
}
}
使用intern方法,该方法会从JVM常量池中找equals为true的字符串,意味着不同的线程都是同一个字符串。
4.4 Spring事务问题
spring的声明式事务是基于AOP实现的,意味着方法结束后才会提交事务。
4.4.1 锁在事务内
当synchronized在该方法内时,会出现这么一种情况:锁释放了,但事务未提交,事务未提交时,下一个线程进来,读到的是数据库未修改的快照,仍然会发生一人多单的问题,所以我们要在调用该方法处添加锁,或是使用编程式事务。
synchronized (userid.toString().intern()) {
createVoucherOrder(voucherId);
}
4.4.2 事务不生效
正是因为spring事务是通过aop实现,而aop是通过动态代理实现。当我们直接调用该方法时,默认是this.createVoucherOrder(voucherId),使用的是this调用,而不是使用代理对象来调用,只有通过代理对象来调用,事务才会生效。
-
引入依赖
<dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> </dependency>
-
暴露代理对象
启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解
@MapperScan("com.hmdp.mapper") @EnableAspectJAutoProxy(exposeProxy = true) @SpringBootApplication public class HmDianPingApplication { public static void main(String[] args) { SpringApplication.run(HmDianPingApplication.class, args); } }
-
获取代理对象,并使用代理对象调用方法
synchronized (userid.toString().intern()) { // 确保事务生效 IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy(); proxy.createVoucherOrder(voucherId); }
4.5 集群模式下的一人一单问题
通过刚刚的几波操作,可以说是基本解决了单机部署情况下的一人一单问题,若是集群部署,仍然会有问题。
可以开两个服务debug一下:
(小坑:Intellij Idea 2022.3.1版本,断点调试打断点后,需要等待断点上的“√”出现才能启动服务,不然断点无效,多个服务建议多选并同时启动,不然要等上一个服务启动到“√”出现才能启动下一个服务。)
我们的 锁 锁的是userid,锁的是jvm常量池中的userid字符串。若是多个jvm,不是同一个常量池,自然就锁不住了。
五、分布式锁
解决集群情况下的并发问题,则需要一个多个进程能都使用的锁。
5.1 分布式锁概述
分布式锁:满足分布式系统或集群模式下多进程可见并互斥的锁。
分布式锁可以借助第三方中间件简单的实现:
5.2 基于redis实现分布式锁
通过redis的setnx命令可以轻松实现简单的互斥锁,因为redis是单线程的,不会存在并发问题。
通过set key value nx ex 10 来保证原子性,防止在设置ttl值前宕机,发生死锁。这是一种非阻塞的锁,拿不到锁立即返回false,没有重试机制。
5.2.1 锁误删
若只是简单的使用setnx和del来获取和释放锁,则会出现一些问题,例如,拿到锁的线程业务阻塞了,它的锁超时释放了,此时另一个线程拿到锁,在另一个线程还未释放锁时,它的业务完成,并删掉了另一个线程的锁。这样一来,第三个线程又能拿到锁。
所以我们需要有一种机制,防止锁的误删:
在释放锁时,再判断一下锁是否是自己的。
具体方案:在设置该锁的value时,我们使用threadid,因为在多个JVM中,threadid有可能相同,于是我们在threadid前拼接一个UUID。这样就保证了误删锁的情况不会再发生。
5.2.2 Lua脚本保证命令原子性
避免锁的误删后,我们的分布式锁就比较健壮了。但在某些极端场景下,仍会出现误删问题:
在释放锁时,会先判断是否是当前的锁(在redis中获取该锁的value),此时,因为一些原因(如GC)阻塞了,下一个线程拿到锁,此时阻塞结束,由于已经做过判断,上一个线程会把下一个线程拿到的锁误删,此时第三个线程又能拿到锁了。
解决方案:将判断和释放锁封装成一个原子操作。
使用Lua脚本可以完美解决这个问题。
在lua脚本中,可以直接使用redis.call()方法来编写redis命令,在redis中使用EVAL调用。有如下好处:
- 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延。
- 原子操作。Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务。
- 复用。客户端发送的脚本会永久存在redis中,这样其他客户端可以复用这一脚本,而不需要使用代码完成相同的逻辑。
注意:lua语言中,数组下标是从1开始。
了解lua脚本后,我们就可以开始编写lua脚本了。
根据需求:
简写:
接下来就是使用Java的redis客户端嗲用Lua脚本:
使用execute方法:
于是,我们写出了简单且健壮的分布式锁:
package com.hmdp.utils;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
private StringRedisTemplate stringRedisTemplate;
// Lua脚本
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
// 锁的名字 key一般为业务名
private String key;
// 锁前缀
private static final String KEY_PREFIX = "lock:";
// 该区分不同的JVM
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "";
// 初始化Lua脚本
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}
// 初始化key
public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.key = KEY_PREFIX + name;
}
/**
* @param timeoutSec 锁持有的时长 过期自动释放
* @return
*/
@Override
public boolean tryLock(long timeoutSec) {
String threadId = ID_PREFIX + Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId + "", timeoutSec, TimeUnit.SECONDS);
return BooleanUtil.isTrue(success);
}
/**
* 释放锁
*/
@Override
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(key),
ID_PREFIX + Thread.currentThread().getId()
);
}
}
5.3 小结
编码收获:
- 读取文件,若该文件是不变的,则在static代码块中初始化一次就好了。
- ClassPathResource()类快速获取Resource目录下的资源。
六、Redisson
现在我们基于String类型的setnx实现的分布式锁已经足够健壮,但仍有不足:
要解决这些问题,我们的基础版redis分布式锁就不好发力了。
但Redisson轻松解决:
Redisson中不仅有分布式锁,分布式锁只是它的一个子集 。
6.1 使用Redisson分布式锁
-
引依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.19.3</version> </dependency>
-
配置
如果使用yaml来配置,则会将redis的配置覆盖。我们使用redisson只是作为分布式锁使用,所以单独配置:
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); // 单点配置,也可以用useClusterServers添加集群地址 config.useSingleServer().setAddress("redis://192.168.0.122:6379").setPassword("123456"); return Redisson.create(config); } }
-
使用
private void handleVoucherOrder(VoucherOrder voucherOrder) { Long userid = voucherOrder.getUserId(); // 创建锁对象 RLock lock = redissonClient.getLock("order:" + userid); // 尝试获取锁 boolean isLock = lock.tryLock(); // 是否拿到锁 if (!isLock) { log.error("不能重复下单!"); } try { proxy.createVoucherOrder(voucherOrder); } finally { lock.unlock(); } }
非常简单。
也可以携带参数
6.2 Redisson分布式锁原理
redisson的分布式锁是如何解决这四个问题?
6.2.1 可重入原理
我们的基础版锁是不可重入的
redisson使用Hash结构轻松解决了这个问题
每重入一次,value+1,每次释放锁,value-1,value为0,则释放锁。非常巧妙
其底层都是lua脚本,保证命令原子性:
获取锁:
释放锁:
源码中lua脚本是写死了放在代码中的,可以去查看,和这些大差不差,释放锁的时候会发布一个释放信号(后文)。
6.2.2 可重试原理
我们基础版的redis是一个非阻塞式的锁,没有任何重试机制。redisson用PubSub模式解决了这个问题。
我们从redis源码中探究:
/**
* 参数分别是等待时间,锁ttl,时间单位。我们不指定ttl时,默认为-1。
**/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 执行获取锁的方法,会返回锁的ttl,获取锁失败会返回null
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 成功获取锁
if (ttl == null) {
return true;
}
// 获取失败,准备重试,先检查重试等待时间是否还有剩余
time -= System.currentTimeMillis() - current;
// 过期,获取锁失败,返回false
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 还可以重试,准备重试
current = System.currentTimeMillis();
// 订阅锁的释放信号
CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
try {
// 等待锁释放,获取信号
subscribeFuture.get(time, TimeUnit.MILLISECONDS);
// 超时 失败
} catch (TimeoutException e) {
if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
"Unable to acquire subscription lock after " + time + "ms. " +
"Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
subscribeFuture.whenComplete((res, ex) -> {
if (ex == null) {
unsubscribe(res, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
// 异常 失败
} catch (ExecutionException e) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 获取锁释放信号成功
try {
// 再次判断等待时间是否过期
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 没有过期 尝试获取锁
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 获取锁成功
if (ttl == null) {
return true;
}
// 失败 检查等待时间是否过期
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 未过期 准备重试
currentTime = System.currentTimeMillis();
// 等待释放锁的信号
if (ttl >= 0 && ttl < time) {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 获得锁释放信号 再次检查等待时间
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 无论获取锁成功或失败,都要解除订阅
unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
相比我们基础版redis分布式锁,有很多优点,例如:
- 重试并不是无休止的重试,而是使用发布订阅模式,等待一个锁的释放信号再去重试,节约内存,性能。
- 非常严谨,每次获取锁前,都要先判断重试等待时间是否过期。
6.2.3 解决超时释放
虽然锁超时释放可以解决业务异常带来的死锁问题,但是业务超时引发的自动释放也会产生线程安全问题。redisson使用看门狗机制解决了这个问题。
从redis源码中探究:
首先看获取锁的源码
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 参数和之前一致
RFuture<Long> ttlRemainingFuture;
if (leaseTime > 0) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
// 默认情况 leaseTime为 -1 ,这个时候,redisson使用了一个变量internalLockLeaseTime,实际上是30 * 1000毫秒,也就是30秒
// 该方法会执行lua脚本,尝试获取锁,若获取成功,返回null,获取失败,返回ttl
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
// 判断是否获取成功
if (ttlRemaining == null) {
// 获取成功 刚才提到 默认情况下,leaseTime为 -1 所以这里走else分支
if (leaseTime > 0) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
// 续约 该方法就是解决超时释放的关键
scheduleExpirationRenewal(threadId);
}
}
// 返回lua脚本返回的ttl或是nil
return ttlRemaining;
});
return new CompletableFutureWrapper<>(f);
}
再来看最关键的续约的方法
protected void scheduleExpirationRenewal(long threadId) {
// 用来存线程对应的锁
ExpirationEntry entry = new ExpirationEntry();
// 如果是重入的锁,则无法放入这个map,放入这个map的锁才有资格续约,会获取一个旧的entry,意思就是每一把锁在这个map中有且仅有一条记录
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
// 判断是否是重入的锁
if (oldEntry != null) {
// 是重入的锁,只做记录
oldEntry.addThreadId(threadId);
} else {
// 不是重入的锁,记录并开始续约任务
entry.addThreadId(threadId);
try {
// 续约 核心方法
renewExpiration();
} finally {
// 线程终止,取消续约任务
if (Thread.currentThread().isInterrupted()) {
cancelExpirationRenewal(threadId);
}
}
}
}
看看redisson是如何巧妙地续约的:
private void renewExpiration() {
// 获取当前锁的信息
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
// 没有记录 不用续约
if (ee == null) {
return;
}
// 定时任务续约
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 检查是否需要续约
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// 执行lua脚本续约,每次执行,都会重置有效期为30秒,续约成功返回ture 续约失败返回false
CompletionStage<Boolean> future = renewExpirationAsync(threadId);
future.whenComplete((res, e) -> {
// 异常 移除续约map
if (e != null) {
log.error("Can't update lock {} expiration", getRawName(), e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
// 续约成功 继续续约
if (res) {
renewExpiration();
} else {
// 失败 取消定时任务 可以通过EXPIRATION_RENEWAL_MAP.get(getEntryName())快速获取定时任务并取消掉
cancelExpirationRenewal(null);
}
});
}
// 每隔10秒重置一次
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
redisson相比我们的基础版redis,利用看门狗机制,解决了因为业务阻塞而导致锁超时释放的安全问题。有以下优点:
- 维护一个可以存锁信息和定时任务的EXPIRATION_RENEWAL_MAP,可以快速的找到对应的锁,进行续约或是取消续约操作。
- 每10秒钟重置一次,若业务没有完成就无限重置,不会因为业务阻塞而超时释放,若是业务异常或宕机,则会马上取消掉定时任务,让锁超时释放,避免导致死锁。
6.2.4 保证主从一致
普通redis分布式锁:
假如在未同步时,主节点故障,从节点成为主节点后,数据不一致,锁丢失了,有线程安全隐患。
相比普通的redis分布式锁,redisson是如何保证主从一致性的呢?
redisson让每一个redis都是主节点 ,并在这些节点后面跟上从节点
即使某个节点宕机,造成了数据不一致,锁也不会失效,因为redisson的连锁,需要在每一个节点上都能获取锁,才算成功。
6.2.5 小结
单体redisson原理:
七、redis优化秒杀
之前的业务流程,我们是按顺序同步执行的,其中包含了很多数据库的读写操作,并且还花了很多时间来保证线程安全,现在我们使用lua脚本保证一部分的线程安全(例如超卖问题就不用考虑了),再用java的阻塞队列来让耗时操作异步执行。
redis执行lua脚本是单线程的,所以线程安全。
我们需要做几件事:
- 新增秒杀优惠券的同时,将优惠券库存存入redis。
- 利用lua脚本判断库存,一人一单。
- 如果有购买资格,则生成订单id返回,并将用户id等订单信息传入阻塞队列。
- 开启线程任务,不断从阻塞队列中获取信息,操作数据库,保存订单。
做完后,会有一些问题:
由于阻塞队列是javautil包下的,占用的是jvm的内存,会受jvm内存的限制。
当阻塞队列中还有未处理订单,或是处理订单异常等订单丢失的情况时,会导致数据不一致。
八、Redis消息队列实现异步秒杀
为了解决以上问题,我们引入第三方的消息队列。好处就是不会受JVM内存限制。当然redis消息队列不好玩,一般都是用专门的mq中间件(rabbitmq,rocketmq,kafka等)。
redis提供了三种不同的实现消息队列的方式:
- list:基于list结构模拟消息队列。
- PubSub:发布订阅模式,基本的点对点消息模型。
- stream:比较完善的消息队列模型。(功能强大但复杂的一批)
8.1 基于List结构模拟消息队列
就是利用LPUSH、RPOP这些命令,只要出入口不一致即可。但要模拟阻塞队列,就需要LPUSH、BRPOP这些阻塞的命令了。
8.2 PubSub发布订阅模式
8.3 基于Stream结构的消息队列
8.3.1 基本使用
发消息:
读消息:
阻塞地读消息:
这就是stream结构地基本使用。
问题:
我们是使用ID来指定读取地消息,当ID为0时,代表从第一个消息开始读,当ID为“$”时,代表读取最新消息,若在处理某条消息时,有n(n>1)条消息进入队列,当那条消息处理完,下次读取只会读取最新地一条,会漏掉中间地消息,造成消息漏读。
8.3.2 消费者组
消费者组不仅可以加快消息地处理速度,还有一定的数据保护措施。但不得不吐槽,真的麻烦,想要保证数据安全,有时候不得不手动ACK。
创建消费者组:
读消息:
这里比较特别的是ID可以取值为“>”。
8.3.3 改造秒杀业务
// 在类初始化完毕后,就开始监听队列
@PostConstruct
public void init() {
Runnable task = () -> {
String queueName = "stream.orders";
while (true) {
try {
// 1. 获取消息队列中的订单消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
// 2. 判断消息是否获取成功
if (list == null || list.isEmpty()) {
// 2.1 获取失败 下一轮循环
continue;
}
// 获取消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);
// 3. 获取成功 下单
handleVoucherOrder(voucherOrder);
// 4. ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理消息异常", e);
// 异常,从pending-list中取出消息重试
handlePendingList();
}
}
};
SECKILL_ORDER_EXECUTOR.submit(task);
}
异常消息重试:
private void handlePendingList() {
String queueName = "stream.orders";
while (true) {
try {
// 1. 获取pending-list中的订单消息
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
// 2. 判断消息是否获取成功
if (list == null || list.isEmpty()) {
// 2.1 获取失败 下一轮循环
break;
}
// 获取消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue();
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);
// 3. 获取成功 下单
handleVoucherOrder(voucherOrder);
// 4. ACK确认
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("处理pending-list消息异常", e);
}
}
}