安装延迟插件
根据rabbitmq的版本下载插件版本
# 延迟队列插件下载地址
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
# 将本地下载好的插件复制到docker里
# docker cp rabbitmq_delayed_message_exchange-3.9.0.ez 容器名:/plugins
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
# 开启延迟队列插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 查看插件
rabbitmq-plugins list
config配置
public class RabbitMqInfo {
public static final String EXCHANGE_NAME = "myDelayedExchange"; // 交换机
public static final String QUEUE_NAME = "delayed_queue"; // 队列名称
public static final String ROUTING_KEY = "delayed.routing.key"; // routing_key
}
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.*;
@Configuration
public class RabbitMqConfig {
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1"); // host主机
connectionFactory.setPort(5672); // 端口号
connectionFactory.setUsername("admin"); // 用户名
connectionFactory.setPassword("admin"); // 密码
connectionFactory.setVirtualHost("/"); // Virtual Hosts
return connectionFactory;
}
/**
* 交换机
*
* @return
*/
@Bean
public CustomExchange delayedExchange() {
HashMap<String, Object> map = new HashMap<>();
map.put("x-delayed-type", "direct"); // 看图,创建交换机输入的 Arguments
return new CustomExchange(RabbitMqInfo.EXCHANGE_NAME, // 交换机名称
"x-delayed-message", // 消息类型
true, // 是否持久化
false, // 是否自动删除
map);
}
@Bean
public Queue queue() {
/**
* 参数1: 队列名
* 参数2: durable:是否持久化,默认false
* 参数3: exclusive:只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable 默认false
* 参数4: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除
* 一般设置一下队列的持久化就好,其余两个就是默认false
*/
return new Queue(RabbitMqInfo.QUEUE_NAME, true, false, false);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()) //队列
.to(delayedExchange()) //交换机
.with(RabbitMqInfo.ROUTING_KEY) //routing_key
.noargs();
}
}
生成者测试类
import com.example.config.RabbitMqInfo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.time.LocalDateTime;
import java.util.UUID;
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProduceTest {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ProduceTest.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendProduce(){
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties messageProperties = message.getMessageProperties();
// 设置延迟消费时间
messageProperties.setHeader("x-delay", 10000); // 毫秒 ,1000=1秒 ,10000=10秒
// 设置消息ID
messageProperties.setMessageId("100"); //字符串,取消延迟队列
return message;
}
};
String content = UUID.randomUUID().toString();
logger.info("生产者发送消息,发送时间:{} ,发送内容{}",LocalDateTime.now(),content);
// 交换机,routing_key,消息内容
rabbitTemplate.convertAndSend(RabbitMqInfo.EXCHANGE_NAME, RabbitMqInfo.ROUTING_KEY, content,postProcessor);
}
/**
* 取消延迟队列消息
*/
@Test
public void clearProduce() {
// 交换机,routing_key,取消延迟队列中 messageId=100的
rabbitTemplate.convertAndSend(RabbitMqInfo.EXCHANGE_NAME, RabbitMqInfo.ROUTING_KEY, "100");
}
}
消费者
import com.example.config.RabbitMqInfo;
import org.slf4j.Logger;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class RabbitmqConsumer {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(RabbitmqConsumer.class);
@RabbitListener(queues = RabbitMqInfo.QUEUE_NAME)
public void OnMessage(String message){
logger.info("消费着接受消息,接受时间:{} ,接受内容{}",LocalDateTime.now(),message);
}
}