推或拉? rabbitMQ 消费模式该如何选择

news2025/1/16 19:09:19

系列文章目录

消息队列选型——为什么选择RabbitMQ
RabbitMQ 五种消息模型
RabbitMQ 能保证消息可靠性吗


文章目录

  • 系列文章目录
  • 前言
  • 一、推拉两种模式的概念
  • 二、推模式的使用及优势
    • 1. 使用
    • 2. 优劣
  • 三、拉模式的使用及优势
    • 1. 使用
    • 2. 优劣
  • 四、消费端Ack模式与Qos
    • 1. Ack模式
    • 2. Qos 服务质量
  • 五、总结


前言

在前面的选型对比中,我们提到了rabbitMQ同时支持推和拉的消息投递方式,那么什么是消息的推和拉?我们又该如何选择呢?今天我们就一起来看下吧


一、推拉两种模式的概念

MQ 是一个非常重要的消息传递架构,它可以实现解耦并且提高系统的可靠性和吞吐量。 在很多MQ组件中,消息可以通过推和拉模式进行传递。

  • 推模式
    在推模式下,当一个生产者发布消息到队列时,队列会立即将这条消息发送给所有订阅了该队列的消费者
  • 拉模式
    在拉模式下,当生产者发布消息到队列时,队列不会立即发送消息给消费者,而是等待消费者请求消息后才发送

二、推模式的使用及优势

1. 使用

代码如下(示例):

package com.zhanfu.springboot.demo.mq;
import com.rabbitmq.client.*;
import java.io.IOException;

public class PushConsumer {
    private final static String QUEUE_NAME = "myqueue";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received '" + message + "'");
            }
        };
        // 开始消费消息,第二个参数为是否自动ack
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

我们定义了一个consumer 消费者,然后把该消费者使用basicConsume方法来订阅某个队列的消息。当有消息到达队列时,consumer 里的handleDelivery方法就会被调用

2. 优劣

  1. 优势:这种方式可以实现实时通信
  2. 劣势:如果消费者的处理能力跟不上生产者的速度,就会在消费者处造成消息堆积

三、拉模式的使用及优势

1. 使用

代码如下(示例):

import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.*;

public class RabbitMQPullConsumer {
    private static final String QUEUE_NAME = "queue_name";
    private static final String HOST_NAME = "host_name";
    private static final String USERNAME = "username";
    private static final String PASSWORD = "password";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建连接和通道
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST_NAME);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);

        // 消费消息
        while (true) {
        	// 手动从队列中获取一条消息,第二个参数为是否自动ack
            GetResponse response = channel.basicGet(QUEUE_NAME, false);
            if (response == null) {
                // 没有消息,则等待一段时间再继续检查
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                String message = new String(response.getBody(), "UTF-8");
                System.out.println("Received message: " + message);
                channel.basicAck(response.getEnvelope().getDeliveryTag(), false); // 手动Ack
            }
        }
    }
}

如果获取的GetResponse对象为null,则表示队列中没有消息。我们将等待一段时间(这里是1秒),然后再继续检查队列中是否有新消息。

如果获取的GetResponse对象不为null,则表示有新消息。我们将从GetResponse对象中提取消息内容,然后输出它。如果我们设置basicGet()方法的第二个参数为true,则表示自动ack,即在获取消息后直接将消息从队列中删除。

这是一个基本的拉模式的RabbitMQ消费者实现。当然,生产上如果采用拉模式,我们更推荐使用多个线程异步的方式处理,一个线程负责循环拉取消息后,存入一个长度有限的阻塞队列,另一个/一些线程从阻塞队列中取出消息,处理完一条则手动Ack一条,这样更有效率。

2. 优劣

  1. 优势:消费端可以按照自己的处理速度来消费,避免产生在消费端产生消息堆积。
  2. 劣势:消息传递方面可能会有一些延迟,当处理速度长时间小于消息发布速度时,容易造成大量消息堆积在rabbitMQ服务器,一些时间敏感的队列,可能会使内部的消息失效

四、消费端Ack模式与Qos

1. Ack模式

我们在上面的代码中,不难发现,不管是推模式还是拉模式,方法的入参中都有一个布尔变量

// 推模式,直接订阅
channel.basicConsume(QUEUE_NAME, true, consumer);
// 拉模式,一次拉取一条
channel.basicGet(QUEUE_NAME, true);

这个布尔变量其实就是是否自动Ack,其实我们在《RabbitMQ 能保证消息可靠性吗》一文中就提到过,这是一种消息确认机制:

  • 自动ACK
    即意味着,我们收到消息时就会通知RabbitMQ服务器,服务器就会将该消息已经被消费,进而删除。
  • 手动ACK
    -即意味着在某个我觉得可以通知RabbitMQ服务器时,我才会通知。

那么两种模式我们该怎么取舍?

从实际生产的角度,我强烈建议将Ack模式设置为手动,这样可以保证消费者处理消息失败时,消息不会立即从队列中删除,而是需要重新分配给其他消费者,增强了系统的容错性和可靠性。同时,建议在消费者处理消息时,对消息进行异常处理,确保消息能够正确地被处理

2. Qos 服务质量

在rabbitMQ的API中,我们不难发现有一项叫做 Qos(quality of service—— 服务质量) 的参数设置
在这里插入图片描述

These settings impose limits on the amount of data the server will deliver to consumers before requiring acknowledgements.Thus they provide a means of consumer-initiated flow control.
这些设置对服务器在接收到Ack之前将传递给消费者的数据量进行了限制。因此,它们提供了一种由消费者发起的流量控制方式。

通俗的讲,这个Qos参数是配合Ack 用来控制消费端的流量的(即限流:自动ACK不需要设置Qos;手动ACK模式时,则可以设置Qo,控制消费者处理消息的速度,它拥有三个参数

  1. prefetchSize:服务器将传递的最大内容量(以八位字节为单位),如果不受限制,则为0
  2. prefetchCount:服务器将传递的最大消息条数,如果不受限制,则为0
  3. global:是否将该设置应用到整个信道,还是仅此一个消费者

在上面拉模式的代码中,我们就使用了一个 channel.basicQos(1); ,也是最常用的限制方法,即以消息条数为单位限制,其实设置的就是最大消息条数

    /**
     * Request a specific prefetchCount "quality of service" settings
     * for this channel.
     * <p>
     * Note the prefetch count must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
     *
     * @param prefetchCount maximum number of messages that the server
     *                      will deliver, 0 if unlimited
     * @throws java.io.IOException if an error is encountered
     * @see #basicQos(int, int, boolean)
     */
    void basicQos(int prefetchCount) throws IOException;

我们可以设置QoS,控制消费者处理消息的速度。通过合理设置预取计数或预取字节数,可以确保消费者处理消息的速度与RabbitMQ服务器发送消息的速度相匹配,避免队列中积压过多的未确认消息需要注意的是,这个prefetchCount的取值是一个经验值,太大容易导致消费端内存消耗过高,太小则效率低下,一般建议在100以下


五、总结

综合来看,推模式适合实时通信且生产者和消费者的速度相当的场景,而且对于利于压榨出消费者端的处理潜力;拉模式适合大量数据的场景,并且可以更好地控制消息的消费进度。但是,实际应用中更多地是将两种模式结合使用,以达到更好的效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/689277.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux——1初识linux

目录 1.1 硬件和软件 1.2 初识Linux 1.2.1 Linux的诞生 1.2.2 LInux内核 1.2.3 Linux发行版 1.3 虚拟机介绍 1.4 VMware WorkStation 安装 1.5 在VMware上安装Linux 1.6 远程连接Linux系统 1.6.1 图形化、命令行 1.6.2 FinalShell 1.1 硬件和软件 我们所熟知的计算…

6.S081——设备中断与驱动部分(串口驱动与Console)——xv6源码完全解析系列(8)

0.briefly speaking 点此返回上一篇博客 上一篇博客中我们简单介绍了UART和PLIC的初始化过程&#xff0c;并迭代式的分析了console的读写操作&#xff0c;这篇博客接着上一篇的话题&#xff0c;研究一下一个字符是怎么一步步被显示到我们的屏幕上的&#xff0c;经过了哪些设备…

PyCharm 2021.1.1 x64的安装 和环境搭建

环境变量 D:\Python38 path 免安装

【id:80】【20分】B. 复数的加减乘运算(运算符重载)

题目描述 定义一个复数类&#xff0c;通过重载运算符&#xff1a;、-、*&#xff0c;实现两个复数之间的各种运算。 class Complex { private:float real, image; public:Complex(float x 0, float y 0);friend Complex operator(Complex&, Complex&);friend Comple…

python中使用OAK-D PRO相机实现OCR功能

目录 OAK简介Tesseract简介Tesseract OCR安装包安装 Tesseract OCR 代码实现 OAK简介 OAK&#xff08;OpenCV AI Kit&#xff09;是一个开源的智能视觉平台&#xff0c;它集成了硬件和软件组件&#xff0c;旨在提供高性能的实时目标检测、识别和跟踪等视觉AI功能。OAK由Luxoni…

【夜深人静学JAVA | 第二十三篇】集合体系结构

目录 前言&#xff1a; 单列集合&#xff1a; set与list的区别&#xff1a; 双列集合&#xff1a; map的特点&#xff1a; 总结&#xff1a; 前言&#xff1a; JAVA中为我们提供了很多集合&#xff0c;这些集合都有自己很独特的特点&#xff0c;因此我们要学习所有的…

【Nginx】第五章 Nginx配置实例-负载均衡

5.1 实现效果 浏览器地址栏输入地址 http://192.168.6.100/edu/index.html&#xff0c;负载均衡效果&#xff0c;将请求平均分配到8080和8081两台服务器上。 5.2 准备工作 &#xff08;1&#xff09;准备两台tomcat服务器&#xff0c;一台8080&#xff0c;一台8081 &#x…

数据结构C语言版本(上)

第一章 绪论 第一节 什么是数据结构&#xff1f; 估猜以下软件的共性&#xff1a;学生信息管理、图书信息管理、人事档案管理。   数学模型&#xff1a;用符号、表达式组成的数学结构&#xff0c;其表达的内容与所研究对象的行为、特性基本一致。 信息模型&#xff1a;信息…

FANUC机器人SRVO-220 SDI保险丝熔断报警处理方法

FANUC机器人SRVO-220 SDI保险丝熔断报警处理方法 一般在R-30iB Mate Plus柜的机器人上会遇到这个报警&#xff0c;R-30iB Plus柜则不会遇到这个报警。 如下图所示&#xff0c; 故障原因&#xff1a; 机器人EE接口的接线有短路的情况&#xff0c;检查EE接口的接线&#xff0…

Mybatis-Plus:实现自定义SQL

目录 1.简介 2.自定义SQL具体实现 2.1.注解SQL 2.2.Wrapper传参注解SQL 2.3.Wrapper传参xml文件SQL 2.4.正常传参XML文件SQL 3.总结 1.简介 Mybatis-Plus&#xff08;以下简称MBP&#xff09;的初衷是为了简化开发&#xff0c;而不建议开发者自己写SQL语句的&#xff1b…

多元分类预测 | Matlab鲸鱼算法(WOA)优化极限学习机(ELM)的分类预测,多特征输入模型。WOA-ELM分类预测模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元分类预测 | Matlab鲸鱼算法(WOA)优化极限学习机(ELM)的分类预测,多特征输入模型。WOA-ELM分类预测模型 多特征输入单输出的二分类及多分类模型。程序内注释详细,直接替换数据就可以用。程序语言为matlab,程…

【MySQL的存储过程】

目录 一、存储过程的概述1、存储过程的定义2、存储过程的优点 二、存储过程的步骤&#xff08;面试题&#xff09;1、创建存储过程2、存储过程的参数 三、删除存储过程四、存储过程的控制语句1. 条件语句if-then-else end if2. 循环语句while end while 一、存储过程的概述 …

微信小程序配合Tdesign实现验证码倒计时

效果 点击发送验证码后 实现 wxml <view class"userName"><view class"name">Code.<text>*</text></view><t-input placeholder"" value"{{code}}" type"number" bindchange"onP…

elasticsearch删除脏数据(根据指定字段删除数据)

场景 es中出现几条脏数据&#xff0c;现在要把这几条数据直接删掉 思路 找到要删除的脏数据&#xff0c;一般是根据id之类的字段来删除&#xff0c;因为id具有唯一性&#xff0c;其实和mysql差不多 执行 1、先查到该条记录&#xff08;注意我们这边使用的是 ticketId字段&…

vue中日,周,月,年时间选择器(基于elementui)

通过选择上面的选项展示选择不同的日期,周,月份,年份 因为项目中点击切换时需要传递不同的日期, 例如:日,即选择日期的00:00分-23:59 周:即选择当月的第三周,截取第三周的周一和第三周的周日为开始时间和截止时间传值 月,即选择月的第一天---选择月得最后一天传值 <templ…

【静态连接和动态连接】C/C++编程中的两种有效链接策略

一、静态连接和动态连接 链接分为两种&#xff1a;静态链接、动态链接。 1&#xff09;静态链接 静态链接&#xff1a;由链接器在链接时将库的内容加入到可执行程序中。 优点&#xff1a; 对运行环境的依赖性较小&#xff0c;具有较好的兼容性 缺点&#xff1a; 生成的程…

企业级微服务架构实战项目--xx优选-商品分类和搜索

一 商品分类和搜索 点击分类&#xff0c; &#xff08;1&#xff09;左侧显示商品分类&#xff0c;右侧显示对应商品分类下的商品列表 &#xff08;2&#xff09;如果商品分类下没有数据&#xff0c;则显示空内容

【正则表达式】匹配选择题、判断题

试卷文本 使用https://github.com/Minuhy/python_docx_export导出的word文档文本&#xff1a; 2022-2023学年第二学期期末课程考核试卷&#xff08;A1&#xff09;卷 课程名称&#xff1a; 分布式数据库HBase 考核形式&#xff1a; 上机考试 年级、专业、层次&#xff1…

【wireshark】rtp流分析

分析wifi下的rtp传输 选中一个udp传输 udp.dstport == 41447解码为rtp 右键 decode as 过滤某一条rtp流 udp.dstport == 41447 && rtp

vscode 加上c++11编译选项

问题描述 vscode 运行C11代码出现此错误 error This file requires compiler and library support for the ISO C 2011 standard. This support must be enabled with the -stdc11 or -stdgnu11 compiler options. 提示我们需要在编译命令中加一行选项&#xff0c;加入c11编译…