typescript 实现RabbitMQ死信队列和延迟队列 订单10分钟未付归还库存

news2024/9/28 5:24:33

Manjaro安装RabbitMQ

安装

sudo pacman -S rabbitmq rabbitmqadmin

启动管理模块

sudo rabbitmq-plugins enable rabbitmq_management

sudo rabbitmq-server

管理界面
http://127.0.0.1:15672/
默认用户名和密码都是guest。
要使用 rabbitmqctl 命令添加用户并分配权限,您可以按照以下步骤进行操作:

  1. 添加用户
rabbitmqctl add_user mingcai password

请将 password 替换为您想要设置的实际密码。

  1. 分配权限
rabbitmqctl set_permissions -p / mingcai ".*" ".*" ".*"

这个命令将用户 mingcai 授予对所有虚拟主机的所有资源的读、写和管理权限。如果您只想给予特定权限,请适当调整正则表达式 ".*",以授予适当的权限。例如,如果您只想给予读取权限,可以使用 "^amq\."

  1. 可选步骤:设置用户角色

您可以将用户分配给不同的角色,以便更好地管理权限。例如,您可以将用户添加到 administrator 角色以获取管理员权限:

rabbitmqctl set_user_tags mingcai administrator

这样,用户 mingcai 就被赋予了管理员权限。

请确保您具有适当的权限来执行这些操作,并确保替换示例中的用户名和密码为您自己的实际值。

死信队列

在这里插入图片描述

在这里插入图片描述
标题:利用RabbitMQ死信队列处理消息的三种情况

在消息队列的应用中,处理异常情况和消息的延迟成为了一项重要的任务。RabbitMQ作为一款流行的消息队列服务,提供了死信队列(Dead Letter Exchange)功能,能够有效地处理消息被拒绝、消息过期以及队列达到最大长度等情况。本文将介绍如何利用RabbitMQ的死信队列来处理这三种情况,并提供了TypeScript示例代码。

1. 消息被拒绝

当消费者无法处理某条消息时,可以选择将其标记为“被拒绝”。这种情况下,我们可以配置RabbitMQ,将被拒绝的消息发送到一个死信队列,以后再处理。

// 引入amqplib库
import * as amqp from 'amqplib';

// 连接到RabbitMQ服务器
const connection = await amqp.connect('amqp://localhost');

// 创建Channel
const channel = await connection.createChannel();

// 定义队列
const queueName = 'my_queue';
await channel.assertQueue(queueName, {
  // 设置死信交换机
  deadLetterExchange: 'my_dead_letter_exchange'
});

// 消费消息
channel.consume(queueName, (msg) => {
  // 处理消息
  if (msg) {
    // 处理失败,拒绝消息并将其重新放回队列
    // channel.reject(msg, true); // 第二个参数设为 true 表示将消息重新放回队列
    // 处理失败,拒绝消息
    channel.reject(msg, false); // 第二个参数设为 false 表示将消息投递到死信队列
     // or 处理失败,拒绝消息并将其重新放回死信队列
    channel.nack(msg, false, false); // 第二个参数设为 false 表示不将消息重新放回原队列,第三个参数设为 false 表示不拒绝当前和之前所有未确认的消息
  }
});

2. 消息过期

有时候我们希望消息在一定时间内被处理,如果超过了这个时间,就认为消息已经过期。RabbitMQ允许我们设置消息的过期时间,并在消息过期后将其发送到死信队列。

// 发布消息
await channel.sendToQueue(queueName, Buffer.from('Hello'), {
  expiration: '60000' // 设置过期时间为60秒
});

3. 队列达到最大长度

为了避免队列过载,我们可以限制队列的最大长度。当队列达到最大长度时,新的消息将被拒绝,并发送到死信队列。

// 定义队列
await channel.assertQueue(queueName, {
  maxLength: 100, // 设置最大队列长度为100
  deadLetterExchange: 'my_dead_letter_exchange'
});

通过以上配置,我们可以利用RabbitMQ的死信队列来处理消息被拒绝、消息过期以及队列达到最大长度等情况,保证消息系统的稳定性和可靠性。

以上是利用TypeScript示例代码演示了如何在RabbitMQ中使用死信队列。希望这篇文章对你有所帮助!

延时队列

什么是延时队列?顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
延时队列在项目中的应用还是比较多的,尤其像电商类平台:
1、订单成功后,在30分钟内没有支付,自动取消订单
2、外卖平台发送订餐通知,下单成功后60s给用户推送短信。
3、如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
4、淘宝新建商户一个月内还没上传商品信息,将冻结商铺等

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

npm install amqplib --save
npm install @types/amqplib --save-dev

在这里插入图片描述

总结

在这里插入图片描述

rabbitmqadmin 使用入门

rabbitmqadmin 是 RabbitMQ 的命令行管理工具,可以用于执行各种管理任务,如创建队列、交换机,查看队列状态等。以下是一些基本的用法示例:

export RABBITMQ_SERVER=127.0.0.1
export RABBITMQ_PORT=5672   
export RABBITMQ_USER=mingcai     
export RABBITMQ_PASSWORD=password

rabbitmqadmin list exchanges
  1. 查看 RabbitMQ 服务器信息
rabbitmqadmin status
  1. 列出所有交换机
rabbitmqadmin list exchanges
  1. 列出所有队列
rabbitmqadmin list queues
  1. 创建一个交换机
rabbitmqadmin declare exchange name=my_exchange type=direct
  1. 创建一个队列
rabbitmqadmin declare queue name=my_queue
  1. 绑定队列到交换机
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
  1. 发送消息到指定交换机
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello, RabbitMQ!"
  1. 获取队列消息
rabbitmqadmin get queue=my_queue

这些命令只是一些基本用法示例,rabbitmqadmin 工具支持更多功能和选项。你可以通过运行 rabbitmqadmin help 命令来获取更详细的帮助信息,或者查看官方文档以了解更多选项和使用方法。

延时3秒和8秒全部代码

// delayProducer.ts
import * as amqp from 'amqplib';

async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();

    const exchange = 'routing_exchange';
    // 定义 dlx-exchange
    const dlxExchangeName = 'dlx-exchange';


    // 声明交换机
    await channel.assertExchange(exchange, 'direct', {durable: true});
    await channel.assertExchange(dlxExchangeName, 'direct', { durable: true });//消息防止丢失
    const dlxqueueBindings= [
        {
            dlxQueueName: 'dlx-3_second_queue', routingKey: 'fast',
        },
        {
            dlxQueueName: 'dlx-8_second_queue', routingKey: 'slow'
        }
    ];
    for (const binding of dlxqueueBindings) {

        // 绑定延迟死信队列
        await channel.assertQueue(binding.dlxQueueName );
        //死信交换机和死信队列绑定 Routing key fast 的消息
        await channel.bindQueue(binding.dlxQueueName, dlxExchangeName, binding.routingKey); // 将 dlx-queue 绑定到死信交换机
    }


    // 定义队列和路由键的映射
    const queueBindings = [
        {
            queue: '3_second_queue', routingKey: 'fast', arguments: {
                'x-message-ttl': 3000, // TTL 设置为 3 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称
            }
        },
        {
            queue: '8_second_queue', routingKey: 'slow', arguments: {
                'x-message-ttl': 8000, // TTL 设置为 8 秒 消息被拒绝或过期时将重新发布到的交换器的可选名称。
                'x-dead-letter-exchange': 'dlx-exchange'//消息被拒绝或过期时将重新发布到的交换器的可选名称
            }
        }
    ];

    // 声明队列,并将队列绑定到交换机上
    for (const binding of queueBindings) {
        await channel.assertQueue(binding.queue, {durable: true, arguments: binding.arguments});
        await channel.bindQueue(binding.queue, exchange, binding.routingKey);
    }


    for (let i = 0; i < 10; i++) {
        await new Promise((resolve) => {
            setTimeout(() => {
                resolve(1)
            }, 1000)
        })
        const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
        console.log('当前中国时间:', chinaTime);
        // 发送消息到交换机,并设置不同的路由键
        await sendMessage(channel, exchange, 'fast', `[${i}] ${chinaTime} Message for the fast queue`);
        await sendMessage(channel, exchange, 'slow', `[${i}] ${chinaTime} Message for the slow queue`);

    }

    // 关闭连接
    setTimeout(async () => {
        await channel.close();
        await connection.close();
    }, 10000); //10 秒后关闭连接
}


async function sendMessage(channel: amqp.Channel, exchange: string, routingKey: string, message: string) {
    channel.publish(exchange, routingKey, Buffer.from(message));
    console.log(`Sent message '${message}' with routing key '${routingKey}'`);
}

setupRouting().catch(console.error);

//消费者 dlx-3_second_queue.ts
import * as amqp from 'amqplib';

async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();

    let queue = 'dlx-3_second_queue'
    // 定义队列和路由键的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 确认消息已被处理
        }
    });


}


setupRouting().catch(console.error);
//dlx-8_second_queue.ts
import * as amqp from 'amqplib';

async function setupRouting() {
    const connection = await amqp.connect('amqp://mingcai:password@127.0.0.1');
    const channel = await connection.createChannel();

    let queue = 'dlx-8_second_queue'
    // 定义队列和路由键的映射
    await channel.consume(queue, (msg) => {
        if (msg !== null) {
            const chinaTime = new Date().toLocaleString('zh-CN', { timeZone: 'Asia/Shanghai' });
            console.log(`Received message ${chinaTime}'${msg.content.toString()}' from queue '${queue}'`);
            channel.ack(msg); // 确认消息已被处理
        }
    });


}


setupRouting().catch(console.error);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1551050.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【保姆级讲解如何构建Python虚拟环境】

&#x1f525;博主&#xff1a;程序员不想YY啊&#x1f525; &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家&#x1f4ab; &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 &#x1f308;希望本文对您有所裨益&#xff0c;如有…

清华镜像源设置

1、找官方地址没问题 https://mirrors.tuna.tsinghua.edu.cn/ 2、怎么设置的&#xff1f; 3、怎么就知道如何设置&#xff0c;或者我换成Ubuntu怎么设置&#xff1f; https://mirrors.tuna.tsinghua.edu.cn/help 在左侧列表找到要设置的系统就可以了 我什么都不知道&#xff…

实力上榜 | 创新微MinewSemi再获“物联之星”年度企业投资价值50强

近日&#xff0c;由深圳市物联传媒有限公司、AIoT星图研究院、IOTE组委会、深圳市物联网产业协会主办的“物联之星”2023中国物联网行业年度榜单评选结果正式公布。经过层层筛选&#xff0c;创新微MinewSemi获评2023年度“中国物联网企业投资价值50强”&#xff0c;连续两年实力…

【保姆级讲解如何Chrome安装Vue-devtools的操作】

&#x1f308;个人主页:程序员不想敲代码啊&#x1f308; &#x1f3c6;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家&#x1f3c6; &#x1f44d;点赞⭐评论⭐收藏 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提…

SQL/日志监控框架log4jdbc

系列文章目录 文章目录 系列文章目录前言 前言 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站&#xff0c;这篇文章男女通用&#xff0c;看懂了就去分享给你的码吧。 log4jdbc is a Jav…

Linux Centos7安装Docker容器

Docker的简介 Docker 是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中,然后发布到任何流行的Linux机器或Windows 机器上,也可以实现虚拟化,容器是完全使用沙箱机制,相互之间不会有任何接口。 一个完整的Docker有以下几个部分…

百万组通用编码器 L1527芯片产品介绍,重码率很低

L1527 是 CMOS 结构的预烧内码&#xff08;遥控中的地址码&#xff09;通用编码器&#xff0c;内有 20 位可预烧写 100 万组内码组合&#xff0c;使得重码率很低&#xff0c;具有更高安全性。芯片内集成误操作禁止功能&#xff0c;在按键输入有效且状态不变时&#xff0c;芯片连…

【C语言】 gets()puts()fgets()fputs()字符串输入输出函数的用法

文章目录 C语言中的字符串输入输出函数&#xff1a;gets、puts、fgets与fputsgets函数puts函数fgets函数fputs函数 C语言中的字符串输入输出函数知识点总结结语 C语言中的字符串输入输出函数&#xff1a;gets、puts、fgets与fputs 在C语言中&#xff0c;处理字符串的输入和输出…

Git,GitHub,Gitee,GitLab 四者有什么区别?

目录 1. Git 2. GitHub 3. Gitee 4. GitLab 5. 总结概括 1. Git Git 是一个版本管理工具&#xff0c;常应用于本地代码的管理&#xff0c;下载完毕之后&#xff0c;我们可以使用此工具对本地的资料&#xff0c;代码进行版本管理。 下载链接&#xff1a; Git - Downlo…

企业如何申请邓白氏编码(DUNS)呢?

尤其是食品企业&#xff0c;药品企业在申请美国FDA认证的时候&#xff0c;经常会听到一个名词——“邓白氏编码”&#xff0c;申请邓白氏编码是企业顺利完成FDA注册认证的必要前提&#xff0c;因此都需要提供邓白氏编码。 今天&#xff0c;小编就来为大家详细介绍下邓白氏编码…

Redis命令-String命令

4.3 Redis命令-String命令 String类型&#xff0c;也就是字符串类型&#xff0c;是Redis中最简单的存储类型。 其value是字符串&#xff0c;不过根据字符串的格式不同&#xff0c;又可以分为3类&#xff1a; string&#xff1a;普通字符串int&#xff1a;整数类型&#xff0…

8868体育助力西甲巴塞罗那 运作球员转会

西甲联赛的巴塞罗那俱乐部是8868体育的合作球队之一&#xff0c;近日有消息称&#xff0c;巴萨将出售埃里克-加西亚&#xff0c;球员身价估值1500万欧元。目前巴萨正在明确中卫位置的战略&#xff0c;已经确定要回购在贝蒂斯表现出色的里亚德&#xff0c;巴萨的想法是将埃里克-…

C语言----简单文件处理

当大家学习过动态内存开辟后&#xff0c;那么我们就已经可以把内存的每一个地方使用了。但是大家有没有想过&#xff0c;我们现在是在自己电脑上打代码。以后工作了&#xff0c;自己写代码在自己电脑上&#xff0c;老板要是想要一份代码看看&#xff0c;或者你成为大能了&#…

Vue3气泡卡片(Popover)

效果如下图&#xff1a;在线预览 APIs 参数说明类型默认值必传title卡片标题string | slot‘’falsecontent卡片内容string | slot‘’falsemaxWidth卡片内容最大宽度string | number‘auto’falsetrigger卡片触发方式‘hover’ | ‘click’‘hover’falseoverlayStyle卡片样式…

cesium加载.tif格式文件

最近项目中有需要直接加载三方给的后缀名tif格式的文件 <script src"https://cdn.jsdelivr.net/npm/geotiff"></script> 或者 yarn add geotiff npm install geotiff 新建tifs.js import GeoTIFF, { fromBlob, fromUrl, fromArrayBuffer } from geotif…

智慧公厕四大核心能力,赋能城市公共厕所智能化升级

公共厕所是城市基础设施中不可或缺的一部分&#xff0c;但由于传统的公共厕所在建设与规划上&#xff0c;存在一定的局限性&#xff0c;导致环境卫生差、管理难度大、使用体验不佳等问题&#xff0c;给市民带来了很多不便。而智慧公厕作为城市智能化建设的重要组成部分&#xf…

【直播课】2024年PostgreSQL CM认证实战培训课程于4月27日开课!

课程介绍 了解关注开源技术&#xff0c;学习PG以点带面 Linux/Andriod&#xff08;操作系统&#xff09;、Apache/Tomcat&#xff08;应用服务器&#xff09;、OpenStack/KVM&#xff08;虚拟化&#xff09;、Docker/K8S&#xff08;容器化&#xff09;、Hadoop&#xff08;大…

全志R128 SDK HAL 模块开发指南——GPIO

GPIO 模块介绍 整个 GPIO 控制器由数字部分&#xff08;GPIO 和外设接口&#xff09;以及 IO 模拟部分&#xff08;输出缓冲&#xff0c;双下拉&#xff0c;引脚Pad&#xff09;组成。其中数字部分的输出可以通过 MUX 开关选择&#xff0c;模拟部分可以用来配置上下拉&#x…

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单实战案例 之七 简单闪烁效果

Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单实战案例 之七 简单闪烁效果 目录 Python 基于 OpenCV 视觉图像处理实战 之 OpenCV 简单实战案例 之七 简单闪烁效果 一、简单介绍 二、简单闪烁效果实现原理 三、简单闪烁效果案例实现简单步骤 四、注意事项 一、简单…

C++超市商品管理系统

一、简要介绍 1.本项目为面向对象程序设计的大作业&#xff0c;基于Qt creator进行开发&#xff0c;Qt框架版本6.4.1&#xff0c;编译环境MINGW 11.2.0。 2.项目结构简介&#xff1a;关于系统逻辑部分的代码的头文件在head文件夹中&#xff0c;源文件在s文件夹中。与图形界面…