Redis学习Day3——黑马点评项目工程开发`-CSDN博客
问题发现及描述
在黑马点评项目中,进行到使用Redis提供的Stream消息队列优化异步秒杀问题时,我在进行jmeter测试时遇到了重大的错误
发现无论怎么测试,一定会进入到catch中,又由于消息队列是个循环读的过程,所以ERROR 33016错误就会不断的发生。
观察一下报错信息
java.lang.NullPointerException: Cannot invoke "com.hmdp.service.IVoucherOrderService.createVoucherOrder(com.hmdp.entity.VoucherOrder)" because the return value of "com.hmdp.service.impl.VoucherOrderServiceImpl.access$400(com.hmdp.service.impl.VoucherOrderServiceImpl)" is null
意思是
java.lang.NullPointerException 错误表明你的代码中有一个地方尝试调用了 null 对象的方法或访问了其属性。在你的具体错误信息中,问题出现在尝试调用 com.hmdp.service.IVoucherOrderService.createVoucherOrder(com.hmdp.entity.VoucherOrder) 方法时,但这个方法的调用是通过 com.hmdp.service.impl.VoucherOrderServiceImpl.access$400(com.hmdp.service.impl.VoucherOrderServiceImpl) 返回的对象进行的,而这个返回值为 null。
问题排除
既然明白了问题缘由是空对象导致出来的,那我们就根据报错的栈信息去处理:
定位位置
at com.hmdp.service.impl.VoucherOrderServiceImpl$VoucherOrderHandler.handleVocherOrder(VoucherOrderServiceImpl.java:406) ~[classes/:na]
at com.hmdp.service.impl.VoucherOrderServiceImpl$VoucherOrderHandler.handlePendingList(VoucherOrderServiceImpl.java:438) ~[classes/:na]
at com.hmdp.service.impl.VoucherOrderServiceImpl$VoucherOrderHandler.run(VoucherOrderServiceImpl.java:385) ~[classes/:na]
发现定位出现问题的是 执行订单创建方法 handleVocherOrder()
跟进去看看,proxy代理对象也是一个报错提示点
结论
哦,这么一来问题就解决啦!原来是由于handleVocherOrder()需要使用到代理对象进行订单创建,那他必须不能写在线程任务了,要不然是没有办法获取到代理对象的,也就是null。就是因为这个空,才导致了我们的程序一致在报错。
错误代码说明
一开始,为了代码逻辑的顺畅可懂,我将方法进行编号,并统一写入了线程任务VoucherOrderHandler方法中,在我看来handleVocherOrder()创建订单方法 和 handlePendingList()执行异常方法 对应着两者情况,本身的地位是一致的,于是乎将其都写在了线程的内部。
但是没注意到的是,handleVocherOrder()需要调用在主线程提供的代理对象,这样一来就没理由将它写在异步线程任务中了。
//3. 创建线程任务用于接收消息队列的信息
private class VoucherOrderHandler implements Runnable{
// 消息队列名称
private String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try{
//1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.oredes >
// 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2. 判断消息获取是否成功
if( list == null || list.isEmpty()){
//2.1 获取失败 说明没有消息 ---->继续循环
continue;
}
// 解析消息中的订单信息
MapRecord<String,Object,Object> record = list.get(0);
// 获取键值对集合
Map<Object,Object> values = record.getValue();
// 获取订单信息
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3. 获取成功,执行订单创建
handleVocherOrder(voucherOrder);
//4. ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e) {
// 消息没有被ACK确认 进入Pending List
log.error("订单处理出现异常",e);
handlePendingList();
}
}
}
// 4. 取到了订单—————创建订单
private void handleVocherOrder(VoucherOrder voucherOrder){
// 获取用户
Long userId = voucherOrder.getUserId();
// 1. 创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//2. 尝试获取锁
boolean isLock = lock.tryLock();
// 3. 判断锁是否获取成功
if(! isLock){
log.error("不允许重复下单");
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
// 4. 释放锁
lock.unlock();
}
}
// 5.取不到订单————— 处理Pending List中的订单信息
private void handlePendingList(){
while (true) {
try {
//1. 获取Pending List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.oredes 0
// 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2. 判断消息获取是否成功
if (list == null || list.isEmpty()) {
//2.1 获取失败 说明Pending List没有消息 ---->结束循环
break;
}
// 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
// 获取键值对集合
Map<Object, Object> values = record.getValue();
// 获取订单信息
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3. 获取成功,执行订单创建
handleVocherOrder(voucherOrder);
//4. ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("Pending List订单处理出现异常", e);
try {
Thread.sleep(20);
}catch (InterruptedException interruptedException){
interruptedException.printStackTrace();
}
}
}
}
}
正确代码展示
/** 方案二、三公共代码
* 预加载lua脚本
*/
private static DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
// 这是第二种方案需要执行的lua脚本
// SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/seckill.lua"));
// 这是第三种方案需要执行的lua脚本
SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/streamSeckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
/*-----------------------------第三种方案: 使用Redis的stream消息队列 + redis + lua脚本判断秒杀资格添加消息队列 的方案-------------------------------------------------------------*/
// 1,创建-- 秒杀线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//2. 初始化方法 一初始化就执行
@PostConstruct
public void init(){
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}
//3. 创建线程任务用于接收消息队列的信息
private class VoucherOrderHandler implements Runnable{
// 消息队列名称
private String queueName = "stream.orders";
@Override
public void run() {
while (true) {
try{
//1. 获取队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS stream.oredes >
// 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
List<MapRecord<String,Object,Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create(queueName, ReadOffset.lastConsumed())
);
//2. 判断消息获取是否成功
if( list == null || list.isEmpty()){
//2.1 获取失败 说明没有消息 ---->继续循环
continue;
}
// 解析消息中的订单信息
MapRecord<String,Object,Object> record = list.get(0);
// 获取键值对集合
Map<Object,Object> values = record.getValue();
// 获取订单信息
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3. 获取成功,执行订单创建
handleVocherOrder(voucherOrder);
//4. ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName,"g1",record.getId());
}catch (Exception e) {
// 消息没有被ACK确认 进入Pending List
log.error("订单处理出现异常",e);
handlePendingList();
}
}
}
// 5.取不到订单————— 处理Pending List中的订单信息
private void handlePendingList(){
while (true) {
try {
//1. 获取Pending List中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 STREAMS stream.oredes 0
// 指定队列名称,组名称,消费者名称,读取模式,读取数量,阻塞时间,队列名称,读取位置
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create(queueName, ReadOffset.from("0"))
);
//2. 判断消息获取是否成功
if (list == null || list.isEmpty()) {
//2.1 获取失败 说明Pending List没有消息 ---->结束循环
break;
}
// 解析消息中的订单信息
MapRecord<String, Object, Object> record = list.get(0);
// 获取键值对集合
Map<Object, Object> values = record.getValue();
// 获取订单信息
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
//3. 获取成功,执行订单创建
handleVocherOrder(voucherOrder);
//4. ACK确认 SACK stream.orders g1 id
stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
} catch (Exception e) {
log.error("Pending List订单处理出现异常", e);
try {
Thread.sleep(20);
}catch (InterruptedException interruptedException){
interruptedException.printStackTrace();
}
}
}
}
}
// 4. 取到了订单—————创建订单
private void handleVocherOrder(VoucherOrder voucherOrder){
// 获取用户
Long userId = voucherOrder.getUserId();
// 1. 创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//2. 尝试获取锁
boolean isLock = lock.tryLock();
// 3. 判断锁是否获取成功
if(! isLock){
log.error("不允许重复下单");
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
// 4. 释放锁
lock.unlock();
}
}
/**
* 秒杀优惠券下单------秒杀优化代码----lua脚本---主线程---使用Redis stream的消息队列完成的
*/
private IVoucherOrderService proxy;
@Override
public Result seckillVoucher(Long voucherId) {
// 获取用户
Long userId = UserHolder.getUser().getId();
// 获取订单id
long orderId = redisIdWorker.nextId("order");
//1.执行Lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(),userId.toString(),String.valueOf(orderId)
);
//2.判断结果是否为0
int r = result.intValue();
if(r != 0){
//3.不为0,代表没有购买资格
return Result.fail(r == 1 ? "库存不足!" : "不能重复下单!");
}
//提前 获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
//5.返回订单id
return Result.ok(orderId);
}
/**
* 秒杀优惠券下单------秒杀优化代码----创建订单
* @param voucherOrder
*/
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//4. 限制一人一单【悲观锁方案】
Long userId = voucherOrder.getUserId();
//4.1 查询订单
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
//4.2 判断订单是否存在
// 是 -----> 返回异常信息---->结束
if (count > 0) {
log.error("用户已经购买了一次了");
}
//5. 扣减库存——解决超卖问题【乐观锁方案】
boolean success = seckillVoucherService.update()
.setSql("stock = stock-1")
.eq("voucher_id", voucherOrder.getVoucherId()).gt("stock", 0) // 库存大于0就行了
.update();
if (!success) {
log.error("库存不足");
}
//6. 创建订单
save(voucherOrder);
}
}
总结
以前在遇到bug时,我总喜欢做的事是将别人写的代码复制回来。但是随着学习的深入发现,其实调代码是一件正常不过的事情,为此,锻炼自己发现问题、定位问题、解决问题能力十分重要,不断地刨根问底,才能愈发印象深刻。