本文章通过MQ队列来实现秒杀场景
整体的设计如下图,整个流程中对于发送发MQ失败和发送到死信队列的数据未做后续处理
1、首先先创建MQ的配置文件
@Configuration
public class RabbitConfig {
public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.#";
public static final String DEAD_LETTER_QUEUEA_NAME = "deadQueue";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("seckill_topic",true,false);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
@Bean("seckillQueue")
public Queue seckillQueue(){
Map<String,Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable("seckillQueue").withArguments(args).build();
}
@Bean("deadQueue")
public Queue binding(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
@Bean
public Binding bindingExchange(){
return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with("seckill.#");
}
// 声明死信队列绑定关系
@Bean
public Binding deadLetterBinding(@Qualifier("deadQueue") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
//配置会覆盖yml的重试次数
//RabbitMQ监听容器
/*@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//设置并发
factory.setConcurrentConsumers(1);
SimpleMessageListenerContainer s=new SimpleMessageListenerContainer();
//最大并发
factory.setMaxConcurrentConsumers(1);
//消息接收——手动确认
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置超时
factory.setReceiveTimeout(2000L);
//设置重试间隔
factory.setFailedDeclarationRetryInterval(3000L);
//监听自定义格式转换
//factory.setMessageConverter(jsonMessageConverter);
return factory;
}*/
}
2、配置yml文件
spring:
redis:
database: 0
host: xxx
port: 6379
password: xxx
timeout: 60
jedis:
pool:
max-active: 8
max-wait: -1
max-idle: 8
min-idle: 0
rabbitmq:
username: admin
password: admin
virtual-host: /
host: xxxx
port: 12345
publisher-confirms: true
publisher-returns: true
template:
mandatory: true
listener:
simple:
concurrency: 1
max-concurrency: 3
# 消费者预取1条数据到内存,默认为250条
prefetch: 1
# 确定机制
acknowledge-mode: manual
retry:
enabled: true #是否支持重试
max-attempts: 2
# 重试间隔(ms)
initial-interval: 5000
这里有一点需要注意的是在做死信队列的时候如果Config文件中配置了监听容器,在yml文件中的一些属性要在容器里面进行配置,当时测试重试的时候发现没有在Config文件中配置,只在yml文件中配置了重试次数,结果会无限期的重试,MQ的默认方式就是无限期的重试,所以这点很容易踩坑
3、实现交换机的ACK,实现 RabbitTemplate.ConfirmCallback接口
@Component
public class ConfirmCallBackHandler implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitMessageMapper rabbitMessageMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
//PostConstruct注解会在Component、Autowired注解完成后再执行
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(!ack){
RabbitMessage rabbitMessage = new RabbitMessage();
rabbitMessage.setUniqueKey(correlationData.getId().toString());
rabbitMessage.setSuccessFlag("N");
rabbitMessageMapper.updateSuccessFlag(rabbitMessage);
System.out.println("失败原因:"+cause);
}
}
}
4、实现队列的ACK,实现 RabbitTemplate.ReturnCallback
@Component
public class ReturnCallBackHandler implements RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
//注入
//PostConstruct注解会在Component、Autowired注解完成后再执行
@PostConstruct
public void init(){
rabbitTemplate.setReturnCallback(this);
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息主体 message:"+message);
System.out.println("应答码 replyCode: :"+replyCode);
System.out.println("原因描述 replyText:"+replyText);
System.out.println("交换机 exchange:"+exchange);
System.out.println("消息使用的路由键 routingKey:"+routingKey);
}
}
5、消费者方面,实现 ChannelAwareMessageListener 接口
@Component
public class AckListener implements ChannelAwareMessageListener {
@Autowired
private RabbitMqService rabbitMqService;
@RabbitListener(queues = "seckillQueue")
@Override
public void onMessage(Message messagex, Channel channel) throws Exception {
try {
String result = new String(messagex.getBody(),"utf-8");
rabbitMqService.receive(result);
channel.basicAck(messagex.getMessageProperties().getDeliveryTag(), false);
}catch (Exception exception){
channel.basicNack(messagex.getMessageProperties().getDeliveryTag(), false, false);
}
}
}