【消息中间件】Rabbitmq消息可靠性、持久化机制、各种消费

news2024/11/18 19:45:07

原文作者:我辈李想
版权声明:文章原创,转载时请务必加上原文超链接、作者信息和本声明。


文章目录

  • 前言
  • 常见用法
    • 1.消息可靠性
    • 2.持久化机制
    • 3.消息积压
      • 批量消费:增加 prefetch 的数量,提高单次连接的消息数
      • 并发消费:多部署几台消费者实例
    • 4.重复消费
  • 二、其他
    • 1.队列存在大量unacked数据


前言


常见用法

1.消息可靠性

RabbitMQ 提供了多种机制来确保消息的可靠性,以防止消息丢失或被意外删除。以下是几种提高消息可靠性的方法:

  1. 持久化消息(Durable Message):在发布消息时,将消息的 deliveryMode 设置为 2,即可将消息设置为持久化消息。持久化消息会将消息写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。

  2. 持久化队列(Durable Queue):创建队列时,将队列的 durable 参数设置为 true,即可创建一个持久化队列。持久化队列会将队列的元数据和消息都存储在磁盘上,即使消息队列服务器重启,队列的元数据和消息仍然可以恢复。

  3. 确认模式(Publisher Confirms):使用确认模式可以确保消息被成功发送到 RabbitMQ 服务器,并得到确认。通过在信道上使用 channel.confirmSelect() 启用确认模式,然后通过 channel.waitForConfirms() 方法来等待服务器的确认。

  4. 事务模式(Transactions):使用事务模式可以保证消息的原子性,要么全部发送成功,要么全部失败。通过在信道上使用 channel.txSelect() 开启事务模式,在发送消息后使用 channel.txCommit() 提交事务,或使用 channel.txRollback() 进行回滚。

  5. 消费者应答(Consumer Acknowledgement):在消费者接收和处理消息后,必须发送确认应答给 RabbitMQ 服务器。通过使用 channel.basicAck() 方法发送确认应答,以告知服务器消息已经成功处理。

通过使用上述机制,可以在 RabbitMQ 中实现消息的可靠性传输和处理,以防止消息的丢失和重复传递。

2.持久化机制

在RabbitMQ中,消息持久化是一种机制,可以确保消息在服务器宕机或重启之后不丢失。默认情况下,RabbitMQ的消息是存储在内存中的,如果服务器宕机,则会导致消息的丢失。要实现消息的持久化,可以采取以下步骤:

  1. 创建一个持久化的交换机(Exchange):
    在定义交换机时,将其durable参数设置为true,例如:

    channel.exchangeDeclare("exchange_name", "direct", true);
    
  2. 创建一个持久化的队列(Queue):
    在定义队列时,将其durable参数设置为true,例如:

    channel.queueDeclare("queue_name", true, false, false, null);
    
  3. 将持久化的队列与交换机进行绑定:
    使用队列和交换机的bind方法进行绑定,例如:

    channel.queueBind("queue_name", "exchange_name", "routing_key");
    
  4. 发布持久化的消息:
    在发布消息时,将消息的deliveryMode属性设置为2,表示消息是持久化的,例如:

    String message = "Hello RabbitMQ!";
    channel.basicPublish("exchange_name", "routing_key", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    

通过以上步骤,就可以实现消息的持久化。当RabbitMQ服务器宕机或重启后,消息会被保存在磁盘中,并在服务器恢复后重新投递给消费者。需要注意的是,虽然消息被持久化了,但是在发送到队列之前,仍然有可能发生丢失,所以在实际的应用中,还需要考虑一些因素,比如网络故障、消费者的可靠性等。

3.消息积压

批量消费:增加 prefetch 的数量,提高单次连接的消息数

为了提高消费性能,可以将多个消息批量进行消费,减少消费者和消息队列的交互次数。通过设置合适的批量消费大小,可以在一次网络往返中消费多个消息,从而提高消费性能。
要实现RabbitMQ的批量消费,可以使用RabbitMQ的channel.basicQos方法来设置每次消费的消息数量。以下是一个示例代码,演示如何实现批量消费:

import pika

def callback(ch, method, properties, body):
    print("Received message: %s" % body)
    # 处理消息的逻辑

    # 发送确认给RabbitMQ
    ch.basic_ack(delivery_tag=method.delivery_tag)

def consume_messages():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 设置每个消费者一次性获取的消息数量
    channel.basic_qos(prefetch_count=10)

    # 注册消费者并开始消费消息
    channel.basic_consume(queue='my_queue', on_message_callback=callback)

    # 进入一个循环,一直等待消息的到来
    channel.start_consuming()

consume_messages()

在这里插入图片描述

在上面的代码中,我们通过channel.basic_qos(prefetch_count=10)设置每次处理的消息数量为10。这样,在消费者处理完10条消息之前,RabbitMQ将不会再向其发送更多消息。

这样,就实现了RabbitMQ的批量消费。你可以根据需求,在basic_qos方法中设置适合你的消息数量。

并发消费:多部署几台消费者实例

可以采用多线程或多进程的方式进行消息的并发消费,将多个消费者并行处理消息。通过增加并发消费者的数量,可以提高消息的处理速度,提高消费的性能。
使用进程池来消费RabbitMQ的消息可以更好地管理并发性能。通过使用进程池,可以在一个固定的池子中创建多个进程,并且复用它们来消费消息,从而减少进程创建和销毁的开销。

以下是一个使用进程池消费RabbitMQ消息的示例:

import multiprocessing
import os
import time
import pika

def consumer(queue_name):
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name)

    def callback(ch, method, properties, body):
        print(f'Process {os.getpid()} received message: {body}')
        time.sleep(1)

    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    channel.start_consuming()

def main():
    # 创建进程池
    pool = multiprocessing.Pool(processes=5)

    # 在进程池中提交任务
    for _ in range(5):
        pool.apply_async(consumer, ('my_queue',))

    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

在上述示例中,我们使用multiprocessing.Pool来创建一个包含5个进程的进程池。然后,我们使用apply_async方法向进程池中提交任务,每个任务都是调用consumer函数来消费"my_queue"队列中的消息。进程池会自动分配任务给闲置的进程来处理。通过closejoin方法,我们可以确保所有任务都被完成。

4.重复消费

  1. 消息确认:在消费者处理完一条消息后,通过调用basic_ack方法手动确认消息已经成功消费。这样,RabbitMQ就会将该消息标记为已经处理,不会再次发送给其他消费者。同时,还可以设置auto_ack参数为False,禁用自动消息确认机制,以确保消息被正确确认。

  2. 消息持久化:可以通过设置消息的delivery_mode属性为2来将消息标记为持久化消息。这样,即使消费者在处理消息时发生故障,消息也会被保存在磁盘上,待消费者恢复正常后会重新投递。

  3. 唯一消费者:可以通过设置队列的exclusive参数为True,创建一个排他队列。这样,只有一个消费者可以连接到该队列,并独占地消费其中的消息,避免重复消费。

  4. 消息去重:在消费者端可以维护一个已消费消息的记录,例如在数据库或缓存中记录已消费的消息的ID或唯一标识。每次消费消息时,先检查记录中是否已经存在该消息,如果存在则跳过,避免重复处理。

  5. 幂等操作:在消费者的处理逻辑中,要确保操作是幂等的,即多次执行同一个操作的效果和执行一次的效果是一样的。这样,即使消息被重复消费,也不会产生副作用。

二、其他

1.队列存在大量unacked数据

通过rabbitmq的后台管理,进入相应的队列,滑到最下边,找到purge。purge将清空这个队列的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1338345.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

最广泛应用的金融风控算法-评分卡

欢迎关注主页个人介绍及相关链接,获取更多算法源码材料 2023数据资源入表白皮书,推荐系统源码下载-CSDN博客 用友BIP数据资产入表解决方案白皮书,推荐系统源码下载-CSDN博客 背景 信用是一切社会金融体系的根本,有了每个人的信…

计算机毕业设计---ssm+mysql+jsp实现的校园二手市场交易平台源码

项目介绍 本系统主要实现的功能有: 前台:(1)二手物品信息查看、搜索。 (2)学生注册登录、个人信息修改。 (3)二手物品信息发布、编辑。 (4)二手物品评论、回…

axios进行图片上传组件封装

文章目录 前言图片上传接口(axios通信)图片上传使用upload上传头像效果展示总结 前言 node项目使用 axios 库进行简单文件上传的模块封装。 图片上传接口(axios通信) 新建upload.js文件,定义一个函数,该函数接受一个上传路径和一…

Gateway集成方法以及拦截器和过滤器的使用

前提&#xff1a;请先创建好一个SpringBoot项目 1. 引入依赖 SpringCloud 和 alibabaCloud 、 SpringBoot间对版本有强制要求&#xff0c;我使用的springboot是3.0.2的版本。版本对应关系请看&#xff1a;版本说明 alibaba/spring-cloud-alibaba Wiki GitHub <dependency…

python脚本抢各大平台大额优惠卷

文章目录 python脚本抢各大平台大额优惠卷写在前面准备阶段一、所需工具二、ChromeDriver下载教程 三、Seleuinm安装1、打开cmd&#xff0c;输入如下命令 开始抢券淘宝脚本京东抢购脚本 python脚本抢各大平台大额优惠卷 写在前面 当电商平台上演盛大的购物狂欢时&#xff0c;如…

MongoDB ReplicaSet 部署

文章目录 前言1. 环境准备2. 生成密钥3. 配置参数4. 创建 ReplicaSet5. 副本集维护5.1 新增成员5.2 移除节点5.4 主节点降级5.5 阻止选举5.6 允许副本节点读5.7 延迟观测 6. 连接副本集 后记 前言 本篇文章介绍 MongoDB ReplicaSet 如何搭建&#xff0c;及常用的维护方法。 1…

VScode跑通Remix.js官方的contact程序开发过程

目录 1 引言 2 安装并跑起来 3 设置根路由 4 用links来添加风格资源 ​5 联系人路由的UI 6 添加联系人的UI组件 7 嵌套路由和出口 8 类型推理 9 Loader里的URL参数 10 验证参数并抛出响应 书接上回&#xff0c;我们已经跑通了remix的quick start项目&#xff0c;接下…

【JVM】虚拟机的组成+字节码文件组成+类的生命周期

什么是JVM&#xff1f; JVM 本质上是一个运行在计算机上的程序&#xff0c;他的职责是运行Java字节码文件。 JVM的功能 1.解释和运行&#xff1a;对字节码文件中的指令实时的解释成机器码让计算机执行。 2.内存管理&#xff1a;自动为对象、方法等分配内存&#xff0c;自动…

JAVA8项目升级JDK17指南

JAVA8项目升级JDK17指南 JAVA8项目升级JDK17指南一、模块化对反射的影响二、删除的内置类 JAVA8项目升级JDK17指南 随着SpringBoot2.7的发布&#xff0c;支持jdk8~jdk21。Springboot3.X发布&#xff0c;最低需要jdk17。升级jdk17是大势所趋。 参考1&#xff1a;重磅&#xff…

K8S 外部访问配置、 Ingress、NodePort

将K8S部署应用提供给外部访问一般有三种方式&#xff1a; NodePort 暴露端口到节点&#xff0c;提供了集群外部访问的入口LoadBalancer 需要负载均衡器&#xff08;通常都需要云服务商提供&#xff0c;裸机可以安装 METALLB 测试&#xff09;Ingress 统一管理 svc的外部访问&am…

Kruskal(克鲁斯卡尔)算法总结

知识概览 克鲁斯卡尔算法适用于稀疏图求最小生成树&#xff0c;时间复杂度为O(mlogm)。 例题展示 题目链接 Kruskal算法求最小生成树 859. Kruskal算法求最小生成树 - AcWing题库https://www.acwing.com/problem/content/861/ 代码 #include <iostream> #include &l…

【Git-IDEA】在 IDEA 中使用 Git(clone、pull、push、merge、建立本地分支与远程分支的连接)

【Git-IDEA】在 IDEA 中使用 Git&#xff08;clone、pull、push、merge、建立本地分支与远程分支的连接&#xff09; 1&#xff09;Gitee2&#xff09;配置 Git3&#xff09;初始化本地仓库4&#xff09;连接远程仓库5&#xff09;clone5.1.方式一5.2.方式二 6&#xff09;分支…

el-date-picker周选择器获取选择的日期范围

<el-date-pickerv-model"formData.date"type"week"format"yyyy 第 WW 周"placeholder"选择周"change"weekChange"> </el-date-picker>// 方法一&#xff1a;weekChange(val) {let startTime new Date(val.getT…

向华为学习:IPD运作-PDP产品开发流程-验证和发布阶段的关键活动

前几天华研荟为您分享了IPD体系中产品开发流程&#xff08;PDP流程&#xff0c;很多时候也直接称为IPD流程&#xff09;前三个阶段&#xff1a;概念、计划和开发阶段的主要内容和关键活动。 今天我们继续来介绍PDP流程的后面两个&#xff1a;验证、发布阶段的主要内容和关键活动…

处理及调度与死锁

处理及调度与死锁 一、前言 前面介绍了进程与线程的相关概念&#xff0c;现在继续学习处理机调度&#xff0c;处理机是系统最重要的资源&#xff0c;提高处理机的利用率和改善系统性能&#xff0c;在很大程度上取决于处理机调度性能的好坏&#xff0c;下面来介绍处理的调度以…

大一C语言查缺补漏 12.24

遗留问题&#xff1a; 6-1 1 在C语言中&#xff0c;如果要保留小数的话&#xff0c;一定要除以2.0&#xff0c;而不是2。 设整型变量m,n,a,b的值均为1&#xff0c;执行表达式&#xff08;m a>b&#xff09;||(n a<b)后&#xff0c;表达式的值以及变量m和n的值是&#…

iPhone恢复出厂设置照片还在吗?分享3个恢复方法!

随着使用时间的增长&#xff0c;手机可能会出现卡顿、运行缓慢等情况。此时&#xff0c;将手机恢复出厂设置可以清除缓存和一些不必要的文件&#xff0c;从而提高设备的运行速度。 但是&#xff0c;每当涉及恢复出厂设置的问题时&#xff0c;许多小伙伴都会关心一个问题&#…

Centos7:Jenkins+gitlab+node项目启动(1)

安装Jenkins 虚拟机配置 需要的软件 https://download.csdn.net/download/myy2012/88668255 解压到目录 用xftp 上传 开始安装jdk rmp -ivh jdk-8u181-linux-x64.rpm 开始安装jenkins rmp -ivh jenkins-2.99-1.1.noarch.rpm 修改用户与端口(端口按需修改) vim /etc/sy…

mysql中按字段1去重,按字段2降序排序

数据举例 sql语句 按字段field4降序排序&#xff0c;按字段field1去重 SELECT tt1.name2,tt1.field1,tt1.field2,tt1.field4 from ( select tt2.name2,tt2.field1,tt2.field2,tt2.field4 from t2 tt2 ORDER BY tt2.field4 DESC ) tt1 GROUP BY tt1.field1执行结果

stm32学习笔记:TIM-定时中断和外部时钟

定时器四部分讲解内容&#xff0c;本文是第一部分 ​​​​​TIM简介 基本定时器 时基单元&#xff1a;预分频器、计数器、自动重装载寄存器 预分频器之前&#xff0c;连接的就是基准计数时钟的输入&#xff0c;由于基本定时器只能选择内部时钟&#xff0c;所以可以认为这根…