总目录
https://preparedata.blog.csdn.net/article/details/120062997
文章目录
- 总目录
- 一、rabbit延时插件下载
- 二、rabbit插件安装
- 三、项目中配置延时队列
- 四、定义消息通道
- 五、生成消息
- 六、监听消息,进行消费
延时队列的配置是对上片文章的延伸扩展
https://preparedata.blog.csdn.net/article/details/128647139
一、rabbit延时插件下载
查看 RabbitMQ 服务对应的版本号, 示例是3.8.9
访问github,找到对应的插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
下载扩展名为ez
的插件包即可
二、rabbit插件安装
以windows安装为例,将插件包直接放到安装目录plugins
文件夹下
C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.9\plugins
打开dos窗口,直接启动插件
C:\Users\Administrator>rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启动命令:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
然后再重启rabbit服务
可以打开页面的rabbit管理端,Type类型已经多了一个x-delayed-message
,说明插件安装成功
三、项目中配置延时队列
延时队列,新增延时类型声明
,其他配置和即时消费的队列一样,即时消费的队列不需要此配置
cloud:
stream:
rabbit:
bindings:
# 订单-生产者
orderDelayChannelOutput:
producer:
delayed-exchange: true
# 订单-消费者
orderDelayChannelInput:
consumer:
delayed-exchange: true
下面是完整配置
cloud:
stream:
# 绑定消息中间件的
binders:
# rabbit0000 别名, 同时可以绑定多个不同类型的消息中间件 rabbit0001、rabbit0002、kafka0001
rabbit0000:
# 声明类型,是rabbit
type: rabbit
environment:
spring:
rabbitmq:
# guest连接登录时,需要使用 localhost
host: localhost
# 管理端页面端口是:15672, 配置服务时:5672
port: 5672
username: guest
password: guest
# 虚拟主机命名空间,"/", 默认虚拟主机, 可以自定义(例如dev、test、prod),区分环境
virtual-host: /
# 绑定消息通道,生产通道、消费通道
bindings:
# 订单-生产者-延时队列 自定义通道名称
orderDelayChannelOutput:
# 生产者和消费者 连接的桥梁 Queues的名称
destination: shopping.order.create.delay
# 配置组
group: shopping-order
# 订单-消费者-延时队列 自定义通道名称
orderDelayChannelInput:
destination: shopping.order.create.delay
group: shopping-order
rabbit:
bindings:
# 订单-生产者
orderDelayChannelOutput:
producer:
delayed-exchange: true
# 订单-消费者
orderDelayChannelInput:
consumer:
delayed-exchange: true
四、定义消息通道
和即时消费的队列,是相同的
package com.pd.shopping.order.mq;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Channels {
/**
* 订单-消息生产者
*/
String ORDER_DELAY_OUTPUT = "orderDelayChannelOutput";
@Output(ORDER_DELAY_OUTPUT)
MessageChannel orderDelayChannelOutput();
/**
* 订单-消息消费者
*/
String ORDER_DELAY_INPUT = "orderDelayChannelInput";
@Input(ORDER_DELAY_INPUT)
SubscribableChannel orderDelayChannelInput();
}
五、生成消息
package com.pd.shopping.order.controller;
import com.pd.shopping.order.model.bo.OrderMsgBo;
import com.pd.shopping.order.mq.Channels;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.messaging
@RestController
@RequestMapping("/hello")
public class HelloController {
@Autowired
private Channels channels;
@GetMapping("/testDelayedMq")
public void testDelayedMq() {
OrderMsgBo bo = new OrderMsgBo();
bo.setId(1L);
bo.setCode("aaa");
//基础延时时间单位是毫秒
Integer delayTime = 5 * 1000;
Message<OrderMsgBo> message = MessageBuilder
.withPayload(bo)
.setHeader("x-delay", delayTime)
.build();
channels.orderDelayChannelOutput().send(message);
}
}
延时队列需要在消息头中添加"x-delay"
, 并且给定一个延时时间,单位毫秒
。
上面代码逻辑是,发送一条消息成功后,不会立即消费,消息通道的消息停留5秒,然后再去消费
六、监听消息,进行消费
和即时消费的队列,是相同的
package com.pd.shopping.order.mq;
import com.pd.shopping.order.model.bo.OrderMsgBo;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@EnableBinding(Channels.class)
@Component
public class Listener {
@StreamListener(Channels.ORDER_DELAY_INPUT)
public void orderDelayInputListener(Message<OrderMsgBo> message) {
OrderMsgBo orderMsgBo = message.getPayload();
//todo 业务处理
}
}