2 使用RabbitMQ实现消息队列
2.1 修改\hm-dianping\pom.xmlpom.xml文件
添加RabbitMQ的环境
<!-- RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.2 修改Resource下的application.yaml文件
添加RabbitMQ的配置信息
spring:
rabbitmq:
host: 127.0.0.1 # IP地址
port: 5672 # 端口号
username: hmdianping # 用户名
password: 123456 # 密码
listener:
simple:
concurrency: 1
max-concurrency: 1
acknowledge-mode: manual
prefetch: 1
主启动类标注@EnableRabbit开启消息队列的监听功能
2.3 配置RabbitMQ,创建交换机和消息队列,将二者绑定
创建direct类型的交换机以及一个名为seckill.order.queue的消息队列,然后将二者绑定.之后发往该交换机且路由键为seckill.order的消息都会转发seckill.order.queue
具体而言,在hm-dianping.src.main.java.com.hmdp.config下新建RabbitMQConfig文件
文件内容如下
package com.hmdp.config;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.io.IOException;
@Configuration
public class RabbitMQConfig {
@Resource
IVoucherOrderService voucherOrderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.seckill.queue"),
key = "direct.seckill",
exchange = @Exchange(name = "hmdianping.direct", type = ExchangeTypes.DIRECT)
))
public void recieveMessage(Object message){
System.out.println("监听到了"+message);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
2.3.1 测试监听消息
在Test中添加发送消息的方法,指定交换机hmdianping.direct
为和路由键 direct.seckill
。
@Resource
RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage(){
rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill","测试发送消息");
}
先运行Test方法
之后运行启动类HmDianPingApplication
发现监听到了,说明连接成功。
2.3.2 修改秒杀下单业务(VoucherOrderServiceImpl中的seckillVoucher方法)
-
注入RabbitTemplate类
-
在认定有抢购资格后,直接向seckill.direct交换机发送消息,内容包含voucherId、userId、orderId
@Resource
RabbitTemplate rabbitTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
//1.执行lua脚本,判断当前用户的购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
if (result != 0) {
//2.不为0说明没有购买资格
return Result.fail(result==1?"库存不足":"不能重复下单");
}
//3.走到这一步说明有购买资格,将订单信息存到消息队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//存入消息队列等待异步消费
rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);
return Result.ok(orderId);
}
此时VoucherOrderServiceImpl文件内容如下
package com.hmdp.service.impl;
import com.hmdp.dto.Result;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Collections;
import java.util.concurrent.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* <p>
* 服务实现类
* </p>
*
* @author 虎哥
* @since 2021-12-22
*/
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private IVoucherOrderService iVoucherOrderService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024*1024);
//异步处理线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
@Transactional
public void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.所有信息从当前消息实体中拿
Long voucherId = voucherOrder.getVoucherId();
//2.扣减库存
boolean success = seckillVoucherService.update().setSql("stock=stock-1")
.eq("voucher_id", voucherId)
//======判断当前库存是否大于0就可以决定是否能抢池子中的券了
.gt("stock", 0)
.update();
//3.创建订单
if(success) save(voucherOrder);
}
@Resource
RabbitTemplate rabbitTemplate;
@Override
public Result seckillVoucher(Long voucherId) {
//1.执行lua脚本,判断当前用户的购买资格
Long userId = UserHolder.getUser().getId();
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString());
if (result != 0) {
//2.不为0说明没有购买资格
return Result.fail(result==1?"库存不足":"不能重复下单");
}
//3.走到这一步说明有购买资格,将订单信息存到消息队列
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
//存入消息队列等待异步消费
rabbitTemplate.convertAndSend("hmdianping.direct","direct.seckill",voucherOrder);
return Result.ok(orderId);
}
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder){
// 5.一人一单逻辑
// 5.1.用户id
Long userId = voucherOrder.getId();
int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder).count();
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("用户已经购买过一次!");
return ;
}
//6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock= stock -1") // set stock = stock -1
.eq("voucher_id", voucherOrder)
.gt("stock",0)// where id = ? and stock > 0
.update();
if (!success) {
//扣减库存
log.error("库存不足!");
return ;
}
save(voucherOrder);
}
}
2.3.3 监听秒杀成功订单
- 监听seckill.order.queue队列的信息并且创建订单到数据库,当创建完成时手动ack
RabbitMQConfig中的recieveMessage修改为
public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){
try {
voucherOrderService.handleVoucherOrder(voucherOrder);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("监听到了"+message);
}
此时,文件内容为。
package com.hmdp.config;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.service.IVoucherOrderService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
import java.io.IOException;
@Configuration
public class RabbitMQConfig {
@Resource
IVoucherOrderService voucherOrderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.seckill.queue"),
key = "direct.seckill",
exchange = @Exchange(name = "hmdianping.direct", type = ExchangeTypes.DIRECT)
))
/* public void recieveMessage(Object message){
System.out.println("监听到了"+message);
}*/
public void recieveMessage(Message message, Channel channel, VoucherOrder voucherOrder){
try {
voucherOrderService.handleVoucherOrder(voucherOrder);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
throw new RuntimeException(e);
}
System.out.println("监听到了"+message);
}
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
2.3.4 测试
使用Apifox测试
成功