RabbitMQ简单模式和工作模式

news2025/1/11 0:01:22

RabbitMQ 是一个消息队列中间件,用于在分布式系统中进行消息传递。在 RabbitMQ 中,有几种工作模式,其中简单模式和工作模式是其中两种基本的模式之一。

  1. 简单模式(Simple Mode):

    • 在简单模式中,有一个生产者(Producer)将消息发送到一个队列(Queue)中,然后有一个消费者(Consumer)从队列中接收并处理消息。
    • 这是最基本的消息队列模式,适用于单个生产者和单个消费者的场景。
    • 生产者将消息发送到队列,而消费者从队列中接收并处理消息,消息的传递是单向的。
  2. 工作模式(Work Queue Mode):

    • 工作模式也被称为竞争消费者模式。在这种模式下,有多个消费者监听同一个队列,但每条消息只能被其中一个消费者接收和处理。
    • 当消息被发送到队列时,它将被发送给下一个空闲的消费者,从而实现消息的分发和并发处理。
    • 这种模式对于处理大量工作的情况很有用,可以通过增加消费者的数量来提高消息处理的速度。

在 RabbitMQ 中,简单模式和工作模式的实现通常使用一些基本的概念,包括生产者、消费者、队列和消息。生产者负责发送消息到队列,而消费者则负责从队列中接收和处理消息。

下面是一个使用 RabbitMQ 和 Node.js(使用 amqplib 库)以及 TypeScript 实现工作模式的简单示例。在这个例子中,我们将使用 amqplib 库来连接 RabbitMQ 服务器,并使用 TypeScript 来编写代码。

首先,确保你已经安装了 amqplib 库。可以使用以下命令进行安装:

npm install amqplib

接下来,创建一个生产者和一个消费者的 TypeScript 文件。以下是示例代码:

producer.ts:

import * as amqp from 'amqplib';

async function produce() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'work_queue';

  await channel.assertQueue(queue, { durable: true });

  for (let i = 0; i < 10; i++) {
    const message = `Message ${i}`;
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    console.log(` [x] Sent '${message}'`);
  }

  setTimeout(() => {
    connection.close();
    process.exit(0);
  }, 500);
}

produce();

consumer.ts:

import * as amqp from 'amqplib';

async function consume() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  const queue = 'work_queue';

  await channel.assertQueue(queue, { durable: true });
   // 设置每次只处理一个消息[平均分配的概念,不会让一个work太忙和太闲]
    //这一行代码的作用是告诉 RabbitMQ 不要在消费者未确认(ack)之前向其发送新的消息
  await channel.prefetch(1);

  console.log(' [*] Waiting for messages. To exit press CTRL+C');

  await channel.consume(queue, async (msg) => {
    if (msg !== null) {
      const message = msg.content.toString();
      console.log(` [x] Received ${message}`);

      // Simulate some work
      await new Promise(resolve => setTimeout(resolve, 1000));

      console.log(' [x] Done');
      channel.ack(msg);
    }
  });
}

consume();

这个示例中,生产者将消息发送到名为 work_queue 的队列中,而消费者则监听该队列并处理消息。消费者使用 channel.prefetch(1) 来确保一次只接收一个消息,从而实现竞争消费者模式。

记得在运行前启动 RabbitMQ 服务器,并确保 TypeScript 文件已编译成 JavaScript。你可以使用以下命令进行编译:

tsc producer.ts
tsc consumer.ts

然后,分别运行 producer.jsconsumer.js。这样你就可以在 RabbitMQ 中看到消息的生产和消费过程。

RabbitMQ消息持久化和手动应答

在 RabbitMQ 中,消息持久化和手动应答是两个关键的概念,它们可以帮助确保消息的可靠传递和处理。下面简要介绍这两个概念:

  1. 消息持久化(Message Durability):

    • 默认情况下,RabbitMQ 中的消息是瞬时的,也就是说,如果 RabbitMQ 服务器停止或崩溃,所有未处理的消息都会丢失。
    • 通过将消息标记为持久化,你可以确保消息在 RabbitMQ 服务器重启后仍然可用。要实现消息持久化,需要在发送消息时设置消息的 deliveryMode 属性为 2persistent)。
    • 例如,在生产者端设置消息为持久化:
    channel.sendToQueue(queue, Buffer.from(message), { persistent: true });
    
    • 在消费者端,你需要确保队列和消息都被声明为持久化:
    channel.assertQueue(queue, { durable: true });
    

    这样,即使 RabbitMQ 服务器重启,持久化的消息也不会丢失。

  2. 手动应答(Manual Acknowledgment):

    • 默认情况下,RabbitMQ 使用自动应答(auto-acknowledgment)模式,即一旦消息被传递给消费者,RabbitMQ 就将其标记为已处理。
    • 在某些情况下,你可能需要更细粒度的控制,以确保消息在被消费者完全处理之后才被标记为已处理。这就是手动应答的用途。
    • 在消费者端,需要将 noAck 设置为 false,表示手动应答模式:
channel.consume(queueName, async (msg: Message | null) => {
  if (msg) {
    const data: EmailTask = JSON.parse(msg.content.toString());
    console.log('Processing mail task:', msg.content.toString());
    try {
      //模拟邮件发送
      await new Promise(resolve => setTimeout(resolve, 1000));
      console.log(' [x] Done');
      channel.ack(msg);
    } catch (error) {
      console.log('error:', data);
      // 处理消息失败,判断是否需要进行重试
      if (canRetry(msg)) {
        // 重新入队,进行下一次尝试
        channel.reject(msg, true);
      } else {
        // 不进行重试,将消息从队列中移除
        channel.ack(msg);
      }
    }
  }
}, { noAck: false });//默认false 



  • 在这种情况下,消费者需要在处理完消息后显式调用 channel.ack(msg) 来确认消息已被处理。如果消费者崩溃或在处理消息时发生错误,消息将保持在队列中,直到被明确确认。
  • 在 RabbitMQ 中,channel.reject 方法用于拒绝一条消息。它的参数如下channel.reject(msg, requeue);
    msg: 要拒绝的消息对象。
    requeue: 如果设置为 true,则被拒绝的消息将被重新排队,即重新放回队列。如果设置为 false,则消息将被删除。默认为 true。

综合使用消息持久化和手动应答,可以确保在面对不同情况时,消息的可靠传递和处理。

重试间隔和次数

在这里插入图片描述
在这里插入图片描述

  1. 重新投递消息并设置头部信息:

    • 在处理消息失败时,将消息重新投递到队列,并设置一个头部信息,例如 x-redelivered-count,用来记录消息的重试次数。
    • 在消费者端,根据这个头部信息来判断是否达到重试次数的上限,如果是,则不再重新投递,可能将消息放入死信交换机。
  2. 使用外部存储记录重试次数:

    • 每次消息处理失败时,将消息的唯一标识(例如 UUID)和重试次数记录到外部存储中(例如 Redis、Memcache、MySQL)。
    • 在消费者端,在每次重新处理时,从外部存储中获取当前重试次数,并判断是否达到重试次数的上限。
  3. 自定义插件:

    • 编写一个 RabbitMQ 插件,实现自定义的消息重试逻辑,包括记录重试次数、判断是否重新投递等。
    • 这样可以更灵活地控制消息的处理流程。

需要注意的是,这些方法都是基于 RabbitMQ 不直接提供重试次数限制的情况下的一些自定义实践。在回答中也提到了关于 quorum queues 的更新,以及支持通过策略(policy)来限制重投递次数的可能性。因此,具体的实现方式可能会随着 RabbitMQ 版本的更新而有所变化。

await channel.consume(queueName, async (msg: Message | null) => {
  if (msg) {
    const data: EmailTask = JSON.parse(msg.content.toString());
    let retryCount = msg.properties.headers['x-retry-count'] || 0;
    console.log('Processing message:', data);
    console.log('Retry count:', retryCount);
    try {
      if (data.to.includes('recipient1@example.com')) {
        throw new Error('邮件发送失败...');
      }
      //发送邮件
      await new Promise(resolve => setTimeout(resolve, 1000));
      channel.ack(msg);
    } catch (error) {
      console.log('error:', data);
      // 增加重试次数
      retryCount++;
      // 判断是否达到最大重试次数
      if (retryCount < maxRetryAttempts) {
        // 重新发送消息到队列
        channel.sendToQueue(queueName, msg.content, {
          persistent: true,
          headers: {
            'x-retry-count': retryCount,
          },
        });
      } else {
        // 不进行重试,将消息从队列中移除
        channel.ack(msg);
      }
    }
  }
});

请添加图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1412124.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

找不到d3dx9_43.dll怎么办?有什么方法能解决这个问题

d3dx9_43.dll 是一个与 Microsoft DirectX 9.0c 版本相关的动态链接库&#xff08;DLL&#xff09;文件。DirectX 是由微软开发的一个应用程序接口&#xff08;API&#xff09;&#xff0c;用于处理多媒体相关任务&#xff0c;尤其是在游戏和高性能图形应用中。具体到 d3dx9_43…

我的隐私计算学习——联邦学习(5)

笔记内容来自多本书籍、学术资料、白皮书及ChatGPT等工具&#xff0c;经由自己阅读后整理而成。 &#xff08;七&#xff09;联邦迁移学习 ​ 相关研究表明&#xff0c;联邦迁移学习不需要主服务器作为各参与方间的协调者&#xff0c;旨在让模型具备举一反三能力&#xff0c;在…

C++ 11 多线程库初步学习

在C11标准中&#xff0c;可以简单通过使用thread库&#xff0c;来管理多线程。 thread库可以看做对不同平台多线程API的一层包装&#xff1b;因此使用新标准提供的线程库编写的程序是跨平台的。 使用时需要#include <thread>头文件&#xff1b; #include <iostream&g…

【SpringCloud Nacos】 微服务治理介绍及Nacos引入初体验

文章目录 前言服务治理介绍什么是服务治理1、服务发现2、服务配置3、服务健康检测 常见的注册中心ZookeeperEurekaConsulNacos Nacos 简介Nacos 实战入门搭建nacos环境1、安装nacos2、配置nacos3、访问nacos 将商品微服务注册到 nacos1、在 pom. xml 中添加 nacos 的依赖2、在主…

Pycharm2023.3.2使用conda创建工程

1 conda环境 举个例子&#xff0c;创建一个环境&#xff0c;名叫Pytorch&#xff0c;使用的python版本是3.7 &#xff08;1&#xff09;创建环境 conda create -n Pytorch python3.7&#xff08;2&#xff09;激活环境 conda activate Pytorch&#xff08;3&#xff09;查看…

小型商用机器人,如何做到小而强?

兼顾体型和性能。 体型和性能的矛盾 一直以来&#xff0c;商用清洁机器人的应用场景主要集中在大型商场、超市、写字楼等&#xff0c;为什么1000平米以下的小型商超等中小场景却很少涉足&#xff1f;原因可以说有很多&#xff0c;但核心为两方面&#xff0c;一方面&#xff0…

windows?linux?如何使用JMeter

windows?linux?如何使用JMeter 安装JMeter的步骤以GUI模式启动JMeter如何在非GUI模式下运行JMeter在linux中使用JMeter 安装JMeter的步骤 JMeter 是一个纯 Java应用程序&#xff0c;应该在任何具有兼容Java实现的系统上正确运行。 安装 JMeter 的步骤 步骤1&#xff09;安…

网络安全防御保护实验(一)

目录 一、规划ip地址 二、配ip地址 三、交换机和防火墙的配置 四、进行测试 实验要求&#xff1a;防火墙向下使用子接口连接生产区和办公区&#xff0c;所有分区设备可以ping通网关。 一、规划ip地址 二、配ip地址 三、交换机和防火墙的配置 四、进行测试

IP被封怎么办?访问网站时IP被阻止?解决IP禁令全方法

相信很多人遇到过IP禁令&#xff1a;比如你在访问社交媒体、搜索引擎或电子商务网站时会被限制访问&#xff0c;又或者你的的账号莫名被封&#xff0c;这些由于网络上的种种限制我们经常会遭遇IP被封的情况&#xff0c;导致无法使用继续进行网络行动。在本文中&#xff0c;我们…

Android开发修炼之路——(一)Android App开发基础-1

本文介绍基于Android系统的App开发常识&#xff0c;包括以下几个方面&#xff1a;App开发与其他软件开发有什么不一样&#xff0c;App工程是怎样的组织结构又是怎样配置的&#xff0c;App开发的前后端分离设计是如何运作实现的&#xff0c;App的活动页面是如何创建又是如何跳转…

scoped属性和深度选择器

在Vue单文件组件&#xff08;SFC&#xff09;中&#xff0c;为了防止样式全局污染&#xff0c;可以给 所有的scoped的css编译出来都会变成.class[哈希值]的形式 我们只能修改带data-v-0dca3a9a作用域的样式&#xff0c;像是 如果修改el-table的宽度 .el-table {width: 60…

惠友小课堂】拇外翻常见的几个误区,来看看你中了几个?

拇外翻作为常见的足部畸形&#xff0c;在日常生活中困扰着许多人。歪脚趾不仅外观不好看&#xff0c;还会出现疼痛、影响行走运动。但大多数人对于拇外翻的认识都不足常常落入认知误区&#xff0c;快来看看你中了几个&#xff1f; 误区一 Q 我都没穿过高跟鞋&#xff0c;怎么也…

科大讯飞 再次引爆Ai

去年「科大讯飞版ChatGPT」星火大模型刚上线的时候&#xff0c;小编给大家推荐过一波&#xff0c;演示了其强大的功能&#xff0c;不少小伙伴都立马申请体验了一把&#xff0c;有小伙伴还私信我说功能非常强大&#xff0c;工作效率提高不少&#xff0c;支持国产大模型之类赞扬。…

Azure AI - 沉浸式阅读器,阅读障碍用户福音

目录 一、什么是沉浸式阅读器将内容划分开来提高可读性显示常用字词的图片突出显示语音的各个部分朗读内容实时翻译内容将单词拆分为音节 二、沉浸式阅读器如何工作&#xff1f;环境准备创建 Web 应用项目设置身份验证配置身份验证值安装标识客户端 NuGet 包更新控制器以获取令…

Dify学习笔记-工具(七)

1、工具 工具定义 工具可以扩展 LLM 的能力&#xff0c;比如联网搜索、科学计算或绘制图片&#xff0c;赋予并增强了 LLM 连接外部世界的能力。Dify 提供了两种工具类型&#xff1a;第一方工具和自定义工具。 你可以直接使用 Dify 生态提供的第一方内置工具&#xff0c;或者轻…

基于Java的高校运动会管理系统的设计与实现(源码+lw+部署文档+讲解等)

文章目录 前言 详细视频演示 具体实现截图 技术栈 后端框架SpringBoot 前端框架Vue 持久层框架MyBaitsPlus 系统测试 系统测试目的 系统功能测试 系统测试结论 代码参考 数据库参考 源码获取 前言 &#x1f497;博主介绍&#xff1a;✌全网粉丝10W,CSDN特邀作者、…

倍增算法笔记

主要应用场景 RMQ&#xff1a;区间最值问题 LCA&#xff1a;最近公共祖先问题 RMQ问题——区间最值 如果用数组f[N]存储,用数组a[i][j]表示从第i个数起连续 2^j 个数中的最大值,[i,i 2^j - 1],显然a[i][0] f[i],则很容易得到状态转移方程: a[i][j] max(a[i][j - 1], a[i …

免 费 小程序商城搭建之鸿鹄云商 SAAS云产品概述

【SAAS云平台】打造全行业全渠道全场景的SaaS产品&#xff0c;为店铺经营场景提供一体化解决方案&#xff1b;门店经营区域化、网店经营一体化&#xff0c;本地化、全方位、一站式服务&#xff0c;为多门店提供统一运营解决方案&#xff1b;提供丰富多样的营销玩法覆盖所有经营…

Java Web(五)--DOM

介绍 DOM 全称是 Document Object Model 文档对象模型&#xff1b; DOM 是 W3C&#xff08;万维网联盟&#xff09;的标准。 DOM 定义了访问 HTML 和 XML 文档的标准&#xff1a; "W3C 文档对象模型 &#xff08;DOM&#xff09; 是中立于平台和语言的接口&#xff0…

伊恩·斯图尔特《改变世界的17个方程》薛定谔方程笔记

想法是等这学期学到薛定谔方程后再把整份完善下。 它告诉我们什么&#xff1f; 这个方程不是把物质作为粒子&#xff0c;而是作为波&#xff0c;并描述这样的波如何传播。 为什么重要&#xff1f; 薛定谔方程是量子力学的基础&#xff0c;它与广义相对论一起构成了当今最有效的…