接上一篇part1的内容
RabbitMQ通配符模式_编写消费者
接下来我们编写通配符模式的消费者:
// 站内信消费者
public class Customer_Station {
public static void main(String[] args)
throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory connectionFactory
= new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
10
connectionFactory.setVirtualHost("/");// 默
认虚拟机
11
12 //2.创建连接
13 Connection conn =
connectionFactory.newConnection();
14
15 //3.建立信道
16 Channel channel =
conn.createChannel();
17
18 // 4.监听队列
19
channel.basicConsume("SEND_STATION3", true,
new DefaultConsumer(channel) {
20 @Override
21 public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
22 String message = new
String(body, "utf-8");
23 System.out.println("发送站内
信:"+message);
24 }
25 });
26 }
27 }
// 邮件消费者
30 public class Customer_Mail {
31 public static void main(String[] args)
throws IOException, TimeoutException {
32 // 1.创建连接工厂
33 ConnectionFactory connectionFactory
= new ConnectionFactory();
34
connectionFactory.setHost("192.168.0.162");
35 connectionFactory.setPort(5672);
36
connectionFactory.setUsername("itbaizhan");
37
connectionFactory.setPassword("itbaizhan");
38
connectionFactory.setVirtualHost("/");// 默
认虚拟机
39
40 //2.创建连接
41 Connection conn =
connectionFactory.newConnection();
42
43 //3.建立信道
44 Channel channel =
conn.createChannel();
45
46 // 4.监听队列
47 channel.basicConsume("SEND_MAIL3",true, new DefaultConsumer(channel) {
48 @Override
public void
handleDelivery(String consumerTag, Envelope
envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
50 String message = newString(body, "utf-8");
51 System.out.println("发送邮件:"+message);
52 }
53 });
54 }
55
56 }
57
58 // 短信消费者
59 public class Customer_Message {
60 public static void main(String[] args)throws IOException, TimeoutException {
61 // 1.创建连接工厂
62 ConnectionFactory connectionFactory= new ConnectionFactory();
63 connectionFactory.setHost("192.168.0.162");
64 connectionFactory.setPort(5672);
65 connectionFactory.setUsername("itbaizhan");
66 connectionFactory.setPassword("itbaizhan");
67 connectionFactory.setVirtualHost("/");// 默认虚拟机
//2.创建连接
Connection conn =connectionFactory.newConnection();
//3.建立信道
Channel channel =conn.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE3", true,
new DefaultConsumer(channel) {
@Override
public void
handleDelivery(String consumerTag, Envelopeenvelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = newString(body, "utf-8");
System.out.println("发送短信:"+message);
}
});
}
}
SpringBoot整合RabbitMQ_项目搭建
之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。
1.创建SpringBoot项目,引入RabbitMQ起步依赖
<!-- RabbitMQ起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupI
d>
<artifactId>spring-boot-starteramqp</artifactId>
</dependency>
2.编写配置文件
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'
SpringBoot整合RabbitMQ_创建对列和交换机
SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:
@Configuration
2 public class RabbitConfig {
3 private final String EXCHANGE_NAME ="boot_topic_exchange";
4 private final String QUEUE_NAME ="boot_queue";
5
6 // 创建交换机
7 @Bean("bootExchange")
8 public Exchange getExchange() {
9 return ExchangeBuilder
10 .topicExchange(EXCHANGE_NAME) // 交换机类型
11 .durable(true) // 是否持久化
12 .build();
}
14
15 // 创建队列
16 @Bean("bootQueue")
17 public Queue getMessageQueue() {
18 return new Queue(QUEUE_NAME); // 队列名
19 }
20
21 // 交换机绑定队列
22 @Bean
23 public Binding bindMessageQueue(@Qualifier("bootExchange")Exchange exchange, @Qualifier("bootQueue")Queue queue) {
24 return BindingBuilder
25 .bind(queue)
26 .to(exchange)
27 .with("#.message.#")
28 .noargs();
29 }
30 }
SpringBoot整合RabbitMQ_编写生产者
SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息
@SpringBootTest
public class TestProducer {
// 注入RabbitTemplate工具类
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage(){
/**
* 发送消息
* 参数1:交换机
* * 参数2:路由key
* 参数3:要发送的消息
*/
rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
}
}
运行生产者代码,即可看到消息发送到了RabbitMQ中
SpringBoot整合RabbitMQ_编写消费者
我们编写另一个SpringBoot项目作为RabbitMQ的消费者
1.创建SpringBoot项目,引入RabbitMQ起步依赖
<!-- rabbitmq起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupI
d>
<artifactId>spring-boot-starteramqp</artifactId>
</dependency>
2.编写配置文件
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'
3.编写消费者,监听队列
@Component
public class Consumer {
// 监听队列
@RabbitListener(queues = "boot_queue")
public void listen_message(String
message){
System.out.println("发送短
信:"+message);
}
}
4.启动项目,可以看到消费者会消费队列中的消息
消息的可靠性投递_概念
RabbitMQ消息投递的路径为:生产者 —> 交换机 —> 队列 —> 消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?
- 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
- 退回模式(return)可以监听消息是否从交换机成功传递到队列。
- 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。
首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'
在生产者的配置类创建交换机和队列
@Configuration
public class RabbitConfig {
private final String
EXCHANGE_NAME="my_topic_exchange";
private final String
QUEUE_NAME="my_queue";
// 1.创建交换机
@Bean("bootExchange")
public Exchange getExchange(){
return ExchangeBuilder
10 .topicExchange(EXCHANGE_NAME) // 交换机类型
11 .durable(true) // 是否持久化
12 .build();
13 }
14
15 // 2.创建队列
16 @Bean("bootQueue")
17 public Queue getMessageQueue(){
18 return QueueBuilder
19 .durable(QUEUE_NAME) // 队列持久化
20 .build();
21 }
22
23 // 3.将队列绑定到交换机
24 @Bean
25 public Binding bindMessageQueue(@Qualifier("bootExchange")Exchange exchange, @Qualifier("bootQueue")Queue queue){
26 return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
27 }
28 }
消息的可靠性投递_确认模式
确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下
1.生产者配置文件开启确认模式
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
2.生产者定义确认模式的回调方法
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm(){
8 // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
9 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
10 /**
11 * 被调用的回调方法
12 * @param correlationData 相关配置信息
13 * @param ack 交换机是否成功收到了消息
14 * @param cause 失败原因
15 */
16 @Override
17 public void confirm(CorrelationData correlationData,boolean ack, String cause) {
18 if (ack){
19 System.out.println("confirm接受成功!");
20 }else{
21
System.out.println("confirm接受失败,原因为:"+cause);
22 // 做一些处理。
23 }
24 }
25 });
26
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
}
}
消息的可靠性投递_退回模式
退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:
1 生产者配置文件开启退回模式
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
# 开启回退模式
publisher-returns: true
2.生产者定义退回模式的回调方法
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
5
6 @Test
7 public void testReturn(){
8 // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
9 rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
10 /**
11 * @param returned 失败后将失败信息封装到参数中
12 */
13 @Override
14 public void returnedMessage(ReturnedMessage returned)
{
15 System.out.println("消息对象:"+returned.getMessage());
16 System.out.println("错误码:"+returned.getReplyCode());
17 System.out.println("错误信息:"+returned.getReplyText());
18 System.out.println("交换机:"+returned.getExchange());
19 System.out.println("路由键:"+returned.getRoutingKey());
20 // 处理消息...
21 }
22 });
rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
24 }
25 }
消息的可靠性投递_Ack
在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。
- 自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
- 手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”
1 消费者配置开启手动签收
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 开启手动签收
listener:
simple:
acknowledge-mode: manual
2.消费者处理消息时定义手动签收和拒绝签收的情况
@Component
public class AckConsumer {
@RabbitListener(queues = "my_queue")
public void listenMessage(Message
message, Channel channel) throws IOException, InterruptedException {
// 消息投递序号,消息每次投递该值都会+1
long deliveryTag =message.getMessageProperties().getDelivery
Tag();
try {
int i = 1/0; //模拟处理消息出现bug
System.out.println("成功接受到消息:"+message);
// 签收消息
/**
* 参数1:消息投递序号
* 参数2:是否一次可以签收多条消息
* */
15 channel.basicAck(deliveryTag,true);
16 }catch (Exception e){
17 System.out.println("消息消费失败!");
18 Thread.sleep(2000);
19 // 拒签消息
20 /**
21 * 参数1:消息投递序号
22 * 参数2:是否一次可以拒签多条消息
23 * 参数3:拒签后消息是否重回队列
24 */
25 channel.basicNack(deliveryTag,true,true);
26 }
27 }
28 }
RabbitMQ高级特性_消费端限流
之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
1 生产者批量发送消息
@Test
public void testSendBatch() {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
}
}
2.消费端配置限流机制
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
prefetch: 5
3.消费者监听队列
@Component
2 public class QosConsumer{
3 @RabbitListener(queues = "my_queue")
4 public void listenMessage(Message
message, Channel channel) throws IOException, InterruptedException {
5 // 1.获取消息
6 System.out.println(new String(message.getBody()));
7 // 2.模拟业务处理
8 Thread.sleep(3000);
9 // 3.签收消息
10 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
11 }
12 }
RabbitMQ高级特性_利用限流实现不公平分发
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
使用方法如下:
1 生产者批量发送消息
@Test
public void testSendBatch() {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_e
xchange", "my_routing", "send
message..."+i);
}
}
2.消费端配置不公平分发
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取1条消息消费,这样谁处理
的快谁拉取下一条消息,实现了不公平分发
prefetch: 1
3.编写两个消费者
@Component
public class UnfairConsumer {
// 消费者1
@RabbitListener(queues = "my_queue")
public void listenMessage1(Message message, Channel channel) throws Exception
{
//1.获取消息
System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
//2. 处理业务逻辑
Thread.sleep(500); // 消费者1处理快
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
12 }
13
14 // 消费者2
15 @RabbitListener(queues = "my_queue")
16 public void listenMessage2(Message message, Channel channel) throws Exception{
17 //1.获取消息
18 System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
19 //2. 处理业务逻辑
20 Thread.sleep(3000);// 消费者2处理慢
21 //3. 手动签收
22 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
23 }
24 }
RabbitMQ高级特性_消息存活时间
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
设置队列所有消息存活时间
1 在创建队列时设置其存活时间:
@Configuration
public class RabbitConfig2 {
private final String
EXCHANGE_NAME="my_topic_exchange2";
private final String
QUEUE_NAME="my_queue2";
// 1.创建交换机
@Bean("bootExchange2")
public Exchange getExchange2(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.创建队列
@Bean("bootQueue2")
public Queue getMessageQueue2(){
return QueueBuilder
.durable(QUEUE_NAME)
.ttl(10000) //队列的每条消息存活10s
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding
bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange,
@Qualifier("bootQueue2") Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
2.生产者批量生产消息,测试存活时间
@Test
2 public void testSendBatch2() throws InterruptedException {
3 // 发送十条消息
4 for (int i = 0; i < 10; i++) {
5 rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "sendmessage..."+i);
6 }
7 }
设置单条消息存活时间
@Test
public void testSendMessage() {
//设置消息属性
MessageProperties messageProperties =
new MessageProperties();
//设置存活时间
messageProperties.setExpiration("10000");
// 创建消息对象
Message message = new Message("send
message...".getBytes(StandardCharsets.UTF_8)
, messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("my_topic_exc
hange", "my_routing", message);
}
注意:
1 如果设置了单条消息的存活时间,也设置了队列的存活时
间,以时间短的为准。
2 消息过期后,并不会马上移除消息,只有消息消费到队列顶
端时,才会移除该消息。
@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties
messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration("10000");
// 3.创建消息对象
Message message = new Message(("send message..." +i).getBytes(), messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
} else {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..." + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有
马上被移除,但该消息已经不会被消费了,当它到达队列顶
端时会被移除。
RabbitMQ高级特性_优先级队列
假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。
优先级队列用法如下:
1.创建队列和交换机
@Configuration
public class RabbitConfig3 {
private final String
EXCHANGE_NAME="priority_exchange";
private final String
QUEUE_NAME="priority_queue";
// 1.创建交换机
@Bean(EXCHANGE_NAME)
public Exchange priorityExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true).
build();
}
// 2.创建队列
@Bean(QUEUE_NAME)
public Queue priorityQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
.maxPriority(10)
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding
bindPriority(@Qualifier(EXCHANGE_NAME)
Exchange exchange, @Qualifier(QUEUE_NAME)
Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
}
}
2.编写生产者
@Test
public void testPriority() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// i为5时消息的优先级较高
MessageProperties
messageProperties = new MessageProperties();
messageProperties.setPriority(9);
Message message = new Message(("send message..." +
i).getBytes(StandardCharsets.UTF_8),
messageProperties);
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
} else {
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..."+ i);
}
}
}
3.编写消费者
@Component
2 public class PriorityConsumer {
3 @RabbitListener(queues ="priority_queue")
4 public void listenMessage(Message message, Channel channel) throws Exception
{
5 //获取消息
6 System.out.println(new String(message.getBody()));
7 //手动签收
8 channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
9 }
10 }
RabbitMQ死信队列_概念
在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:
- 队列消息长度到达限制。
- 消费者拒签消息,并且不把消息重新放入原队列。
- 消息到达存活时间未被消费。
RabbitMQ死信队列_代码实现
@Configuration
public class RabbitConfig4 {
private final String DEAD_EXCHANGE =
"dead_exchange";
private final String DEAD_QUEUE =
"dead_queue";
private final String NORMAL_EXCHANGE =
"normal_exchange";
private final String NORMAL_QUEUE =
"normal_queue";
// 死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)
.durable(true)
.build();
}
// 死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding
bindDeadQueue(@Qualifier(DEAD_EXCHANGE)
Exchange
exchange,@Qualifier(DEAD_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
// 普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)
.durable(true)
.build();
}
// 普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
53 .ttl(10000) // 消息存活10s
54 .maxLength(10) // 队列最大长度
为10
55 .build();
56 }
57
58 // 普通交换机绑定普通队列
59 @Bean
60 public Binding
bindNormalQueue(@Qualifier(NORMAL_EXCHANGE)
Exchange
exchange,@Qualifier(NORMAL_QUEUE)Queue
queue){
61 return BindingBuilder
62 .bind(queue)
63 .to(exchange)
64 .with("my_routing")
65 .noargs();
66 }
67 }
测试死信队列
1.生产者发送消息
@Test
public void testDlx(){
// 存活时间过期后变成死信
//
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// 超过队列长度后变成死信
// for (int i = 0; i < 20; i++)
{
//
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// }
// 消息拒签但不返回原队列后变成死信
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
2.消费者拒收消息
@Component
public class DlxConsumer {
@RabbitListener(queues ="normal_queue")
public void listenMessage(Message
message, Channel channel) throws IOException {
// 拒签消息
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}
RabbitMQ延迟队列_概念
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单
但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
RabbitMQ延迟队列_死信队列实现
接下来我们使用死信队列实现延迟队列
1 创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。
<dependency>
<groupId>org.springframework.boot</groupI
d>
<artifactId>spring-boot-starteramqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupI
d>
<artifactId>spring-boot-starterweb</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
2.编写配置文件
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS}
%clr(%-5level) --- [%-15thread]
%cyan(%-50logger{50}):%msg%n'
3.创建队列和交换机
@Configuration
public class RabbitConfig {
// 订单交换机和队列
private final String ORDER_EXCHANGE =
"order_exchange";
private final String ORDER_QUEUE =
"order_queue";
// 过期订单交换机和队列
private final String EXPIRE_EXCHANGE =
"expire_exchange";
private final String EXPIRE_QUEUE =
"expire_queue";
// 过期订单交换机
@Bean(EXPIRE_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
14
.topicExchange(EXPIRE_EXCHANGE)
15 .durable(true)
16 .build();
17 }
18 // 过期订单队列
19 @Bean(EXPIRE_QUEUE)
20 public Queue deadQueue(){
21 return QueueBuilder
22 .durable(EXPIRE_QUEUE)
23 .build();
24 }
25 // 将过期订单队列绑定到交换机
26 @Bean
27 public Binding
bindDeadQueue(@Qualifier(EXPIRE_EXCHANGE)
Exchange exchange,@Qualifier(EXPIRE_QUEUE)
Queue queue){
28 return BindingBuilder
29 .bind(queue)
30 .to(exchange)
31 .with("expire_routing")
32 .noargs();
33 }
34
35 // 订单交换机
36 @Bean(ORDER_EXCHANGE)
37 public Exchange normalExchange(){
38 return ExchangeBuilder
39
.topicExchange(ORDER_EXCHANGE)
.durable(true)
.build();
}
// 订单队列
@Bean(ORDER_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(ORDER_QUEUE)
.ttl(10000) // 存活时间为10s,模拟30min
.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("expire_routing") //死信交换机的路由关键字
.build();
}
// 将订单队列绑定到交换机
@Bean
public Binding
bindNormalQueue(@Qualifier(ORDER_EXCHANGE)
Exchange exchange,@Qualifier(ORDER_QUEUE)
Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("order_routing")
.noargs();
}
}
4.编写下单的控制器方法,下单后向订单交换机发送消息
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
//下单
@GetMapping("/place/{orderId}")
public String placeOrder(@PathVariable
String orderId){
System.out.println("处理订单数据...");
// 将订单id发送到订单队列
rabbitTemplate.convertAndSend("order_exch
ange", "order_routing", orderId);
return "下单成功,修改库存";
}
}
5.编写监听死信队列的消费者
// 过期订单消费者
@Component
public class ExpireOrderConsumer {
// 监听队列
@RabbitListener(queues ="expire_queue")
public void listenMessage(String
orderId){
System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
}
}
RabbitMQ延迟队列_插件实现
在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。
安装延迟队列插件
1 使用rz将插件上传至虚拟机
2 安装插件
# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-
3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable
rabbitmq_delayed_message_exchange
3.重启RabbitMQ服务
#停止rabbitmq
rabbitmqctl stop
#启动rabbitmq
rabbitmq-server restart -detached
此时登录管控台可以看到交换机类型多了延迟消息