插件rabbitmq_delayed_message_exchange是RabbitMQ官方提供的一种用于实现延迟消息的解决方案。该插件将交换机类型扩展至x-delayed-message,这种类型的交换机能够将消息暂时挂起,直到设定的延迟时间到达,才将消息投递到绑定的队列中。这一特性使得RabbitMQ能够轻松处理延迟消息的场景,无需额外的业务逻辑来定时检查和触发消息的投递。
插件需要在服务端安装并开启后使用。
消息发送:生产者向一个x-delayed-message类型的交换机发送消息,同时在消息属性中设置x-delay头,表示消息应延迟的时间(单位:毫秒)。
延迟处理:交换机接收到消息后,不会立即投递给队列,而是将其挂起,等待设定的延迟时间。在此期间,消息处于未投递状态。
消息投递:一旦达到延迟时间,交换机会将消息投递给与之绑定的队列。此时,消息的行为就像普通消息一样,可以被消费者消费。
消息消费:消费者从队列中拉取消息,执行相应的业务逻辑。
1、生产者:在service文件夹下建立rabbitmq.service.ts文件,通过调用sendDelayOrderToExchange方法发送消息,x-delay 设置延时时间 单位ms
import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config, Inject } from '@midwayjs/decorator';
import * as amqp from 'amqp-connection-manager';
import { ChannelWrapper, AmqpConnectionManager } from 'amqp-connection-manager';
import * as dayjs from 'dayjs';
const OPTIONS = { durable: true, autoDelete: true }; // 队列opts
const EXCHANGE_CHARGE_DELAY = 'exchange.charge.delay'; // 延时订单
const QUEUE_CHARGE_DELAY = 'queue.charege.delay';
@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {
private connection: AmqpConnectionManager;
private channelWrapper: ChannelWrapper;
@Config('rabbitmq')
mqConfig;
@Inject()
logger;
@Init()
async connect() {
// 创建连接,你可以把配置放在 Config 中,然后注入进来
this.connection = await amqp.connect(this.mqConfig);
// 创建 channel
this.channelWrapper = await this.connection.createChannel({
json: true,
setup: function (channel) {
return Promise.all([
// 延时Exchange
channel.assertExchange(EXCHANGE_CHARGE_DELAY, 'x-delayed-message', {
durable: true,
autoDelete: true,
arguments: {
'x-delayed-type': 'direct',
},
}),
channel.assertQueue(QUEUE_CHARGE_DELAY, OPTIONS),// 队列
channel.bindQueue(QUEUE_CHARGE_DELAY, EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER'),// 绑定交换机
]);
},
});
}
// 发送预约订单
public async sendDelayOrderToExchange(message: string) {
this.logger.info(`发送延时订单:${message} 当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);
await this.channelWrapper.publish(EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER', message, {
headers: { 'x-delay': 10 * 1000 },// 延时时间 单位毫秒
});
}
@Destroy()
async close() {
await this.channelWrapper.close();
await this.connection.close();
}
}
2、消费者:在consumer文件夹下新建mq.consumer.ts,通过监听延时队列接受消息
import { Consumer, MSListenerType, RabbitMQListener, Inject } from '@midwayjs/decorator';
import { ConsumeMessage } from 'amqplib';
import { Context } from '@midwayjs/rabbitmq';
import * as dayjs from 'dayjs';
const QUEUE_CHARGE_DELAY = 'queue.charege.delay';
@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {
@Inject()
ctx: Context;
@Inject()
logger;
@RabbitMQListener(QUEUE_CHARGE_DELAY, {
durable: true,
autoDelete: true,
})
async delayOrder(msg: ConsumeMessage) {
if (msg && msg.content) {
const id = msg.content.toString('utf-8');
this.logger.info(`预约订单号:${id} 当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);
}
}
}
在configuration.ts文件中调用测试
import { Configuration, App } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
.....
.....
.....
export class ContainerLifeCycle {
@App()
app: koa.Application;
@Inject()
rabbitmqService: RabbitmqService;
async onReady() {
await this.rabbitmqService.sendDelayOrderToExchange('123456789');
}
}