RabbitMQ 消费者

news2025/1/12 18:39:58

  RabbitMQ的消费模式分两种:推模式和拉模式,推模式采用Basic.Consume进行消费,拉模式则是调用Basic.Get进行消费。
  消费者通过订阅队列从RabbitMQ中获取消息进行消费,为避免消息丢失可采用消费确认机制

消费者

  • 拉模式
    • 拉模式的实现
  • 推模式
  • 消费确认与拒绝
    • 消息确认的实现
    • 消息拒绝的实现
    • basicRecover
  • basicQos 限制消费
  • 总结

拉模式

  顾名思义,拉模式就是消费者主动的从RabbitMQ中获取数据,通过拉模式每次获取数据只能获取一条。拉模式的时序图如下图所示。
在这里插入图片描述
  RabbitMQ每次接收到Get请求后会将队列中即将被消费的消息发送给消费者,消费者接收处理消息后向RabbitMQ发送消费应答,然后该消息将从队列中移除。
  需要注意的是拉模式普遍仅适用用从RabbitMQ中获取一条数据的场景,如果以循环的方式获取批量数据将影响RabbitMQ的性能。

拉模式的实现

  拉模式通过以下方法实现:

/**
* queue 队列名称
* autoAck 是否开启自动应答
*/
GetResponse basicGet(String queue,boolean autoAck)

  如上述代码所示channel.basicGet方法返回的是一个GetResponse,在GetResponse对象中包含了一条消息内容,消费者可以获取该消息并进行处理。

推模式

  推模式是指RabbitMQ将消息主动推送给订阅监听队列的消费者。在RabbitMQ推送消息的过程中其并不关心该消费者是否完成上一条消息的消费,只要队列中存在消息则向消费者推送,当然推送消息的个数会受Basic.Qos的限制。Basic.Qos指定了某个消费者可以保持的未应答的消息数量。

    /**
     * Start a consumer. Calls the consumer's {@link Consumer#handleConsumeOk}
     * method.
     * Provide access to <code>basic.deliver(Broker推送消息)</code>, <code>basic.cancel</code>
     * and shutdown signal callbacks (which is sufficient
     * for most cases). See methods with a {@link Consumer} argument
     * to have access to all the application callbacks.
     * @param queue 队列名称
     * @param autoAck 是否自动确认
     * @param consumerTag 消费者标签,消费者的唯一标识符
     * @param noLocal 是否可以接收同Connection中生产者的消息(true不能接收)
     * @param exclusive 是否设置排他
     * @param arguments 其他参数
     * @param deliverCallback 消息接收回调
     * @param cancelCallback 消费取消回调
     * @param shutdownSignalCallback 连接或者信道关闭回调
     * @return the consumerTag associated with the new consumer
     */
    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException;

    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

  可以通过上述两种方法(设置参数最多的)实现声明消费者。其中Consumer的定义如下:


public interface Consumer {
    /**
     * 消费者通过basicConsume被注册后调用
     */
    void handleConsumeOk(String consumerTag);

    /**
     * 消费者通过basicCancel取消时调用
     */
    void handleCancelOk(String consumerTag);

    /**
     * 消费者不通过basicCancel取消时调用
     */
    void handleCancel(String consumerTag) throws IOException;

    /**
     * 通道或者连接关闭时调用
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    /**
     * 接收重新发送的未被确认的消息时调用
     */
    void handleRecoverOk(String consumerTag);

    /**
     * 接收消息时调用
     * @param consumerTag 消费者标签
     * @param envelope 打包消息的数据
     * @param properties 消息的内容标头数据
     * @param body 消息内容
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}

消费确认与拒绝

  为了保障消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数,当autoAck为true时RabbitMQ会自动的把发送出去的消息设置为确认,然后从队列中删除;当autoACK为false时RabbitMQ会等待消费者显式回复确认信号后才从内存中移去消息(先标记再删除)。
  autoAck参数意为自动应答,但是如果该参数为true时则rabbitMQ将自动将发送的消息标记确认,无需消费者进行应答。

  当autoAck参数为false时,对于RabbitMQ服务器而言,队列中的消息分成两部分:一部分时等待投递给消费者的消息;一部分时已经投递给消费者,但是还未收到消费者确认消息的消息
  RabbitMQ不会为未确认的消息设置过期时间,如果一个消息一直未被消费者确认,那么这个消息再RabbitMQ中将一直保存为投递未确认状态,指导消费者确认或者消费者断开连接,如果消费者断开连接,则该消费者接收但未确认的消息将重新入队。

消息确认的实现

  消息的显式确认需要消费者再声明的过程中设置autoAck=false。然后该消费者消费的消息可以显式的进行确认应答。确认应答方法如下:

	 /**
     * @param 消息的标签,可通过Delivery.getEnvelope().getDeliveryTag()获取
     * @param 如果为true则将发送给该消费者的该消息之前的所有未应答的消息进行应答,如果为false则仅应答一条消息
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

  当进行消息的批量确认时,将所有发送给该消费者未确认的消息进行确认,而针对监听同一队列的其他消费者的未确认消息并不进行处理。

消息拒绝的实现

  RabbitMQ提供了两种消息拒绝的方法:Basic.Reject和Basic.Nack命令;其两者的区别时Nack可以进行批量拒绝。

    /**
     * @param deliveryTag 消息标签
     * @param requeue 为true时被拒绝的消息重新入队,否则将成为死信
     * @throws java.io.IOException if an error is encountered
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

    /**
     * @param deliveryTag 消息标签
     * @param multiple 如果为true则批量拒绝自该消息之前所有未确认的发送给该消费者的消息
     * @param requeue 为true时被拒绝的消息重新入队,否则将成为死信
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

basicRecover

该方法可以将某个消费者未应答(确认或者拒绝)的消息重新入队,该方法会导致:

  • 投递而未被应答的消息可以重新发送给消费者进行处理
  • 消费者的消息队列被清空,可以重新接收到其他消息
    /**
     * <p>
     *  Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * </p>
     * Equivalent to calling <code>basicRecover(true)</code>, messages
     * will be requeued and possibly delivered to a different consumer.
     * @see #basicRecover(boolean)
     */
     Basic.RecoverOk basicRecover() throws IOException;

    /**
     * Ask the broker to resend unacknowledged messages.  In 0-8
     * basic.recover is asynchronous; in 0-9-1 it is synchronous, and
     * the new, deprecated method basic.recover_async is asynchronous.
     * @param requeue If true, messages will be requeued and possibly
     * delivered to a different consumer. If false, messages will be
     * redelivered to the same consumer.
     */
    Basic.RecoverOk basicRecover(boolean requeue) throws IOException;

basicQos 限制消费

  默认情况下,消费者对于接收的消息数量并未限制,也就是说,一旦RabbitMQ中接收到消息并且存在消费者,则RabbitMQ将把消息发送到相关的消费者中,并不关心消费者是否消息完信息。
  轮询的默认消息分发机制会导致消费者资源不能合理利用、消费者消息积压导致内存溢出等问题。为解决上述问题可以使用basicQos方法实现限制信道上消费者所能保持的最大未确认消息数量。该方法如下:

    /**
     * @param prefetchSize 消息大小
     * @param prefetchCount 消息数量
     * @param global 是否全局
     * @throws java.io.IOException if an error is encountered
     */
    void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

针对global参数需要注意一下内容:

  • 当global=true时信道上所有的消费者都需要遵从消息数量限定值(某个信道上所有消费者未确认消息数量<=prefetchCount)
  • 等global=false时新的消费者需要遵从消息数量的限定值。
  • 可以调用两次basicQos方法,并使用不同的global参数,这种情况下两次配置都可以生效。

总结

  消费者就是针对某个队列进行消息监听和消息消费的。消费者消费消息存在拉模式和推模式,推模式的是使用场景相对比较多。
  为确保消息被合法的消费,RabbitMQ提供了消费确认机制,投递的消息并不能被理解完成了消费,仅消费者确认消费该消息才会被移除队列。
  默认的消息投递机制时轮询,轮询的消息分发并会关系消费者的性能以及消息积压的问题,因此需要限制每个消费者所能保持的最大未确认的消息数量。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/915419.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChatGPT应用于高职教育的四大潜在风险

目前&#xff0c;ChatGPT还是一种仍未成熟的技术&#xff0c;当其介入高职教育生态后&#xff0c;高职院校师生在享受ChatGPT带来的便利的同时&#xff0c;也应该明白ChatGPT引发的风险也会随之进入高职教育领域&#xff0c;如存在知识信息、伦理意识与学生主体方面的风险与挑战…

轻松正确使用代理IP

Hey&#xff0c;亲爱的程序员小伙伴们&#xff01;在进行爬虫时&#xff0c;你是否曾使用过别人的代理IP&#xff1f;是否因此慌乱&#xff0c;担心涉及违法问题&#xff1f;不要惊慌&#xff01;今天我将和你一起揭开法律迷雾&#xff0c;为你的爬虫之路保驾护航。快跟上我的节…

C++核心编程——类和对象(二)、友元、多态、文件操作

C对象模型和this指针 4.3.1 成员变量和成员函数分开存储 在C中&#xff0c;类内的成员变量和成员函数分开存储 只有非静态成员变量才属于类的对象上 空类&#xff08;类里面是空的&#xff09;&#xff0c;空对象占用内存空间为&#xff1a;1字节。 静态成员变量&#xff0…

简单屏幕共享 通过web screego windows 生成证书

生成证书用 linux 生成&#xff0c;在 windows 下使用 windows 生成证书 https://juejin.cn/post/6925006735933440014 下载地址 https://github.com/screego/server/releases 修改完配置后&#xff0c;运行 screego serve 需要修改的几个地方 # 局域网 ip 或公网 ip&…

PHP请求API接口对接电商平台亚马逊国际站按关键字搜索商品案例

关键词搜索商品API接口的用途主要包括以下几个方面&#xff1a; 实现商品搜索&#xff1a;通过关键词搜索商品API接口&#xff0c;电商平台可以为消费者提供一个简单、快捷的商品搜索功能。用户只需输入关键词&#xff0c;就可以得到与该关键词相关的商品列表。 提供便捷的商…

vue 转盘抽奖功能,可控制抽奖概率

实现逻辑&#xff1a; 思路&#xff1a;首先需要一个转盘&#xff0c;然后需要一个抽奖按钮定位在中间&#xff0c;图片提前设计或者用背景颜色代替&#xff08;这里用的是图片&#xff0c;然后计算概率&#xff09;&#xff0c;使用css完成转动效果&#xff0c;每次转动完成之…

谈谈收音机的发展

目录 1.什么是收音机 2.收音机的工作原理 3.收音机的发展历史 4.收音机的历史作用 1.什么是收音机 收音机是一种电子设备&#xff0c;用于接收和播放广播电台的无线电信号。它是人们获取各种音乐、新闻、娱乐和其他广播节目的常用设备。 收音机通常由以下几个部分组成&…

无涯教程-PHP - 简介

PHP 7是最期待的&#xff0c;它是PHP编程语言的主要功能版本。 PHP 7于2015年12月3日发布。本教程将以简单直观的方式教您PHP 7的新功能及其用法。 无涯教程假设您已经了解旧版本的PHP&#xff0c;现在就可以开始学习PHP 7的新功能。 使用下面的示例- <html><head&…

【学会动态规划】摆动序列(27)

目录 动态规划怎么学&#xff1f; 1. 题目解析 2. 算法原理 1. 状态表示 2. 状态转移方程 3. 初始化 4. 填表顺序 5. 返回值 3. 代码编写 写在最后&#xff1a; 动态规划怎么学&#xff1f; 学习一个算法没有捷径&#xff0c;更何况是学习动态规划&#xff0c; 跟我…

优化学习体验是在线培训系统的关键功能

在线培训系统是当今教育领域的一个重要工具&#xff0c;帮助学生和教师提高学习效果和教学质量。一个功能完善的在线培训系统可以提供丰富多样的学习资源和交互方式&#xff0c;以满足不同学生的需求。 个性化学习路径 每个学生的学习需求和进度都不同。通过个性化学习路径功…

【Python机器学习】实验16 卷积、下采样、经典卷积网络

文章目录 卷积、下采样、经典卷积网络1. 对图像进行卷积处理2. 池化3. VGGNET4. 采用预训练的Resnet实现猫狗识别 TensorFlow2.2基本应用5. 使用深度学习进行手写数字识别 卷积、下采样、经典卷积网络 1. 对图像进行卷积处理 import cv2 path data\instance\p67.jpg input_…

AMBA总线协议(7)——AHB(五):传输仲裁

一、前言 在之前的文章中我们讨论了AHB的很多传输细节&#xff0c;主要有控制信号&#xff0c;地址信号的译码&#xff0c;从机的响应等&#xff0c;其中重点介绍了双周期响应&#xff0c;最后介绍了数据总线及端结构&#xff0c;在本文中我们将继续介绍AHB传输的仲裁机制。 仲…

利用大模型反馈故障的解决方案

背景 观测云有两个错误巡检脚本&#xff0c;RUM 错误巡检和 APM 错误巡检&#xff0c;代码均开源。 错误巡检的主要目的是发现新出现的错误消息(error stack)&#xff0c;原有的巡检在上报了相应的事件报告后&#xff0c;只是定位了问题&#xff0c;并没有给出合适的解决方案。…

数据分析实战│价格预测挑战【文末赠书】

文本分析是指对文本信息的表示及特征项的选取&#xff0c;商品文本的描述能够反映特定立场、观点、价值和利益。考虑到网上海量的商品数量&#xff0c;对产品的定价难度很大&#xff0c;因此可以使用商品描述帮助商户定价。比如&#xff0c;服装具有较强的季节性价格趋势&#…

PHP 创业感悟交流平台系统mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP 创业感悟交流平台系统&#xff08;含论坛&#xff09;是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 源码下载&#xff1a; https://download.csdn.…

C++中cin >> str 和 string类的getline(cin, str) 用来读取用户输入的两种不同方式的不同点

C中cin >> str 和 string类的getline(cin, str) 用来读取用户输入的两种不同方式的不同点 在C中&#xff0c;string类是标准库提供的字符串类&#xff0c;它可以帮助我们处理和操作字符串。它在<string>头文件中定义。string类提供了一系列成员函数和操作符&#…

Numpy入门(5)—应用举例

NumPy应用举例 5.1 计算激活函数Sigmoid和ReLU 使用ndarray数组可以很方便的构建数学函数&#xff0c;并利用其底层的矢量计算能力快速实现计算。下面以神经网络中比较常用激活函数Sigmoid和ReLU为例&#xff0c;介绍代码实现过程。 计算Sigmoid激活函数 计算ReLU激活函数 使…

C++ vector模拟实现

建议将vector的模拟实现写在头文件中&#xff0c;测试使用部分写在.cpp文件中 vector是类模板&#xff0c;被封装在命名空间中 部分源码&#xff1a;&#xff08;删除某些内容后&#xff09; vector模拟实现的代码&#xff1a; #include<assert.h> namespace djx {tem…

【Git分支操作---讲解二】

Git分支操作---讲解二 查看分支创建分支切换分支修改分支切换分支合并分支合并分支【冲突】(只会修改主分支不会修改其他分支)什么时候会有冲突&#xff1f; 查看分支 创建分支 切换分支 修改分支 切换分支 合并分支 合并分支【冲突】(只会修改主分支不会修改其他分支) 什么时…

国产精品:讯飞星火最新大模型V2.0

大家好&#xff0c;我是爱编程的喵喵。双985硕士毕业&#xff0c;现担任全栈工程师一职&#xff0c;热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…