midwayjs 框架使用 rabbitmq 消息延迟

news2025/1/10 1:44:32

插件rabbitmq_delayed_message_exchange是RabbitMQ官方提供的一种用于实现延迟消息的解决方案。该插件将交换机类型扩展至x-delayed-message,这种类型的交换机能够将消息暂时挂起,直到设定的延迟时间到达,才将消息投递到绑定的队列中。这一特性使得RabbitMQ能够轻松处理延迟消息的场景,无需额外的业务逻辑来定时检查和触发消息的投递。

插件需要在服务端安装并开启后使用。

消息发送:生产者向一个x-delayed-message类型的交换机发送消息,同时在消息属性中设置x-delay头,表示消息应延迟的时间(单位:毫秒)。
延迟处理:交换机接收到消息后,不会立即投递给队列,而是将其挂起,等待设定的延迟时间。在此期间,消息处于未投递状态。
消息投递:一旦达到延迟时间,交换机会将消息投递给与之绑定的队列。此时,消息的行为就像普通消息一样,可以被消费者消费。
消息消费:消费者从队列中拉取消息,执行相应的业务逻辑。

1、生产者:在service文件夹下建立rabbitmq.service.ts文件,通过调用sendDelayOrderToExchange方法发送消息,x-delay 设置延时时间  单位ms

import { Provide, Scope, ScopeEnum, Init, Autoload, Destroy, Config, Inject } from '@midwayjs/decorator';
import * as amqp from 'amqp-connection-manager';
import { ChannelWrapper, AmqpConnectionManager } from 'amqp-connection-manager';

import * as dayjs from 'dayjs';

const OPTIONS = { durable: true, autoDelete: true }; // 队列opts
const EXCHANGE_CHARGE_DELAY = 'exchange.charge.delay'; // 延时订单
const QUEUE_CHARGE_DELAY = 'queue.charege.delay';

@Autoload()
@Provide()
@Scope(ScopeEnum.Singleton) // Singleton 单例,全局唯一(进程级别)
export class RabbitmqService {
    private connection: AmqpConnectionManager;

    private channelWrapper: ChannelWrapper;

    @Config('rabbitmq')
    mqConfig;


    @Inject()
    logger;

    @Init()
    async connect() {
        // 创建连接,你可以把配置放在 Config 中,然后注入进来
        this.connection = await amqp.connect(this.mqConfig);

        // 创建 channel
        this.channelWrapper = await this.connection.createChannel({
            json: true,
            setup: function (channel) {
                return Promise.all([
                    // 延时Exchange
                    channel.assertExchange(EXCHANGE_CHARGE_DELAY, 'x-delayed-message', {
                        durable: true,
                        autoDelete: true,
                        arguments: {
                            'x-delayed-type': 'direct',
                        },
                    }),
                    channel.assertQueue(QUEUE_CHARGE_DELAY, OPTIONS),// 队列
                    channel.bindQueue(QUEUE_CHARGE_DELAY, EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER'),// 绑定交换机
                ]);
            },
        });
    }

    // 发送预约订单
    public async sendDelayOrderToExchange(message: string) {
        this.logger.info(`发送延时订单:${message}  当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);
        await this.channelWrapper.publish(EXCHANGE_CHARGE_DELAY, 'DELAY_ORDER', message, {
            headers: { 'x-delay': 10 * 1000 },// 延时时间 单位毫秒
        });
    }

    @Destroy()
    async close() {
        await this.channelWrapper.close();
        await this.connection.close();
    }
}

2、消费者:在consumer文件夹下新建mq.consumer.ts,通过监听延时队列接受消息

import { Consumer, MSListenerType, RabbitMQListener, Inject } from '@midwayjs/decorator';
import { ConsumeMessage } from 'amqplib';
import { Context } from '@midwayjs/rabbitmq';

import * as dayjs from 'dayjs';

const QUEUE_CHARGE_DELAY = 'queue.charege.delay';

@Consumer(MSListenerType.RABBITMQ)
export class UserConsumer {
    @Inject()
    ctx: Context;

    @Inject()
    logger;

    @RabbitMQListener(QUEUE_CHARGE_DELAY, {
        durable: true,
        autoDelete: true,
    })
    async delayOrder(msg: ConsumeMessage) {
        if (msg && msg.content) {
            const id = msg.content.toString('utf-8');
            this.logger.info(`预约订单号:${id} 当前时间:${dayjs().format('YYYY-MM-DD HH:mm:ss')}`);
        }
    }
}

在configuration.ts文件中调用测试

import { Configuration, App } from '@midwayjs/core';
import * as koa from '@midwayjs/koa';
.....
.....
.....
export class ContainerLifeCycle {
    @App()
    app: koa.Application;

    @Inject()
    rabbitmqService: RabbitmqService;


    async onReady() {
        await this.rabbitmqService.sendDelayOrderToExchange('123456789');
 
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2082028.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3插件原理

概览 vue3的生态圈提供了许多方便的插件或者工具,比如pinia、vue-router和Element Plus等,使用插件的写法一般如下: const app createApp(App);app.use(pinia); app.use(router); app.use(ElementPlus, { locale }); // 第二个参数:{locale} 为传给插…

milvus多个Querynode,资源消耗都打在一个节点上

milvus 查询时的原理 当读取数据时,MsgStream对象在以下场景中创建: 在 Milvus 中,数据必须先加载后才能读取。当代理收到数据加载请求时,会将请求发送给查询协调器,查询协调器决定如何将分片分配到不同的查询节点。…

最长回文子串:动态规划推导

最长回文子串:结合图形推导动态规划 题目介绍 本题可以在力扣找到,题号为5。 给你一个字符串 s,找到 s 中最长的 回文子串。 示例 1: 输入:s “babad” 输出:“bab” 解释:“aba” 同样是符…

Composio:开源项目中的AI智能体任务执行利器

目录 一、引言二、Composio 简介三、Composio 的功能特性四、Composio 的应用场景五、Composio 的应用实践1、安装 Composio 核心库2、安装OpenAI3、添加 GitHub 集成4、初始化Composio工具集5、获取预配置的 GitHub 工具6、工具函数配置7、执行工具函数 六、结语 一、引言 在…

可定制化内容具体识别事物,多方位同时监管的智慧快消开源了

智慧快消视频监控平台是一款功能强大且简单易用的实时算法视频监控系统。它的愿景是最底层打通各大芯片厂商相互间的壁垒,省去繁琐重复的适配流程,实现芯片、算法、应用的全流程组合,从而大大减少企业级应用约95%的开发成本。国产化人工智能“…

Vue实现zip压缩下载

1,安装依赖npm //jszip是一个用于创建、读取和编辑.zip文件的JavaScript库 https://stuk.github.io/jszip/ npm install jszip https://www.npmjs.com/package/file-saver npm install file-saver 2,在所需的页面中引入对应包 import JSZip from &…

3.服务注册_服务发现

文章目录 1.服务注册_服务发现1.1服务注册概念及图解介绍2.2 CAP理论2.3 常见的注册中心(了解)2.4 Eureka组件介绍2.4.1.搭建注册中心2.4.2服务注册2.4.3服务发现 大家好,我是晓星航。今天为大家带来的是 服务注册_服务发现 相关的讲解!😀 1…

自然语言常见面试题及答案(41~60)

Reply:面试题 获取资料下载 文章目录 41. 谈谈在自然语言处理中,如何评估模型的性能?42. 什么是语言模型(Language Model)?它在自然语言处理中的作用是什么?43. 如何进行文本分类任务&#xff…

外卖点餐配送系统源码的模块化设计:快速开发与迭代的秘诀

在快速发展的外卖行业中,点餐配送系统的开发需要具备高效、可扩展、易维护的特点。模块化设计能够有效地解决这些问题,通过将系统功能分解为多个独立的模块,使得开发团队可以快速开发和迭代每个模块,减少耦合度,提高系…

SpringBoot-读取配置文件方式

前言 Spring Boot提供了多种灵活的方式来读取配置文件,以适应不同的开发和部署需求,SpringBoot启动的时候,读取配置文件的时候,首先获取的是file:/config/文件下的配置文件,也就是项目下config文件里面的配置文件&…

Leetcode 216.组合总和Ⅲ 回溯+剪枝 C++实现

Leetcode 216.组合总和Ⅲ 问题:找出所有相加之和为 n 的 k 个数的组合,且满足下列条件: 只使用数字 1 到 9每个数字 最多使用一次 返回 所有可能的有效组合的列表 。该列表不能包含相同的组合两次,组合可以以任何顺序返回。 算…

【rancher镜像】修改rancher官方镜像仓库为私有仓库

背景 在使用rancher构建k8s时,由于中国区网络问题经常没法访问docker的官方仓库镜像docker.io,而rancher在构建k8s时,会默认从docker.io去下载镜像,由于网络原因,构建就会存在问题,镜像无法下载&#xff0…

Cxx primer-chap17-Specialized Library Facilities

tuple(元组)是一个模板,类似于pair,但是支持多个member,其主要用于将多个数据合并成一个对象:不像pair访问成员是固定的(first/second),访问tuple的成员需要使用函数模板get:bitset类比位运算方…

PDF秒变Word,你的文档编辑从此开挂!

在现代办公中,PDF和Word是我们最常接触的两种文件格式。PDF因其良好的兼容性和固定的格式而广受欢迎,但在编辑时却常常让人感到束手无策。而Word则因其强大的编辑功能成为文档处理的首选。 那么,如何将PDF转化为Word,让文档编辑更…

Linux多线程——线程的概念和控制

文章目录 线程的概念进程和线程对比 线程的控制创建线程与分配任务线程终止线程等待线程分离 pthread线程库 线程的概念 线程是我们经常听到的一个概念,他和进程有什么关系呢 从操作系统课本里我们可能听说过,线程是一个微缩版的进程,他拥有…

vue将二维码做成名片,并且生成图片保存

效果图 1. 安装html2canvas 首先,你需要在你的Vue项目中安装html2canvas。你可以通过npm或yarn来安装它: npm install html2canvas # 或者 yarn add html2canvas2.组件形式 2.1 创建组件 在你的Vue项目中,创建一个新的Vue组件&#x…

YOLO-World: Real-Time Open-Vocabulary Object Detection:实时开放词汇对象检测

YOLO系列探测器已成为高效实用的工具。然而,它们对预定义和训练的对象类别的依赖限制了它们在开放场景中的适用性。针对这一限制,我们引入了YOLO-World,这是一种创新方法,通过视觉语言建模和大规模数据集的预训练,增强…

深度学习入门-10

基于小土堆学习 池化层学习 池化层(Pooling Layer)是卷积神经网络(CNN)中的一种重要组件,它的主要作用是逐步减小数据的空间尺寸(即高度和宽度),以减少网络中参数的数量和计算量&a…

OpenCV绘图函数(2)绘制圆形函数circle()的使用

操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 绘制一个圆。 cv::circle 函数用于绘制一个给定中心和半径的简单圆或填充圆。 函数原型 void cv::circle (InputOutputArray img,Point cen…

a探索Python中的DOM操作神器:pyquery

文章目录 探索Python中的DOM操作神器:pyquery背景:为什么选择pyquery?pyquery是什么?如何安装pyquery?五个简单的pyquery函数使用方法场景应用:pyquery在实际开发中常见bug及解决方案总结 探索Python中的DO…