RabbitMQ延迟队列
- 1、延迟队列
- 1.1、延迟队列使用场景
- 1.2、延迟队列实现原理
- 2、使用rabbitmq-delayed-message-exchange 延迟插件
- 2.1、下载
- 2.2、安装
- 2.2.1、解压
- 2.2.2、启用插件
- 2.2.3、查询安装情况
- 2.4、示例
- 2.4.1、RabbitConfig配置类(关键代码)
- 2.4.2、发送消息(关键代码)
- 2.4.3、application.yml配置类
- 2.4.4、接收消息
- 2.4.5、pom.xml
- 2.4.6、测试
1、延迟队列
https://blog.csdn.net/weixin_42942786/article/details/139940269
1.1、延迟队列使用场景
场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;
1.2、延迟队列实现原理
RabbitMQ本身不支持延迟队列,可以使用TTL(过期消息)结合DLX(死信交换机)的方式来实现消息的延迟投递,即把DLX(死信交换机)跟某个队列绑定,到了指定时间,消息过期后,就会从DLX(死信交换机)路由到这个队列,消费者可以从这个队列取走消息。
2、使用rabbitmq-delayed-message-exchange 延迟插件
2.1、下载
选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,
下载地址:http://www.rabbitmq.com/community-plugins.html
2.2、安装
插件拷贝到 RabbitMQ 服务器plugins目录下
2.2.1、解压
unzip rabbitmq_delayed_message_exchange-3.10.2.ez
如果unzip 没有安装,先安装一下
yum install unzip -y
2.2.2、启用插件
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件;
2.2.3、查询安装情况
./rabbitmq-plugins list 查询安装的所有插件;
重启rabbitmq使其生效;(此处也可以不重启)
消息发送后不会直接投递到队列,
而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);
延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现
解决了消息过期时间不一致出现的问题。
2.4、示例
2.4.1、RabbitConfig配置类(关键代码)
package com.power.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
@Value("${my.exchangeName}")
private String exchangeName;
@Value("${my.queueDelayName}")
private String queueDelayName;
//创建自定义交换机
@Bean
public CustomExchange customExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type","direct");
return new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);
}
//创建队列
@Bean
public Queue queueNormal() {
//建造者模式创建队列
return QueueBuilder
.durable(queueDelayName)//队列名称
.build();
}
//队列绑定交换机
@Bean
public Binding binding(CustomExchange customExchange, Queue queueNormal) {
return BindingBuilder.bind(queueNormal).to(customExchange).with("plugin").noargs();
}
}
2.4.2、发送消息(关键代码)
package com.power.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Date;
@Service
@Slf4j
public class MessageService {
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public void sendMsg(){
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay",25000);//第一条消息设置过期时间25秒
Message message = MessageBuilder.withBody("hello world 01".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend("exchange.delay.04","plugin",message);
log.info("消息order发送完毕,发送时间是:{}",new Date());
}
{
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("x-delay",15000);//第二条消息设置过期时间15秒
Message message = MessageBuilder.withBody("hello world 02".getBytes())
.andProperties(messageProperties)
.build();
rabbitTemplate.convertAndSend("exchange.delay.04","plugin",message);
log.info("消息pay发送完毕,发送时间是:{}",new Date());
}
}
}
2.4.3、application.yml配置类
server:
port: 8080
spring:
application:
name: delay-plugins-test01
rabbitmq:
host: 你的服务器IP
port: 5672
username: 你的账号
password: 你的密码
virtual-host: power
my:
exchangeName: exchange.delay.04 # 交换机
queueDelayName: queue.delay.04 # 正常队列
2.4.4、接收消息
package com.power.message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
@Slf4j
public class ReceiveMessage {
@RabbitListener(queues = "queue.delay.04")
public void receiveMsg(Message message){
String body = new String(message.getBody());
log.info("接收到的消息为:{},接收时间为:{}",body,new Date());
}
}
2.4.5、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.power</groupId>
<artifactId>rabbit_07_delay04_plugins</artifactId>
<version>1.0-SNAPSHOT</version>
<name>rabbit_07_delay04_plugins</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.13</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>