rabbitmq-常见七种消息队列-控制台界面管理-python-实现简单访问

news2025/1/11 9:49:47

文章目录

    • 1.消息的基本概念
      • 1.1.生产者和消费者
      • 1.2.消息队列(Queue)
      • 1.3.交换机(Exchange)
      • 1.4.消息确认
    • 2.七种队列模式
      • 2.1.简单模式(Hello World)
      • 2.2.工作队列模式(Work queues)
      • 2.3.发布订阅模式(Publish/Subscribe)
      • 2.4.路由模式(Routing)
      • 2.5.主题模式(Topics)
      • 2.6.远程过程调用(RPC)
      • 2.7.发布者确认(Publisher Confirms)
    • 3.rabbitmq控制台管理
      • 3.1.三种基本队列
        • 3.1.2.Classic经典队列-单机环境队列
        • 3.1.3.Quorum仲裁队列-高可用队列
        • 3.1.3.Stream队列-大规模队列
      • 3.2.生产者和消费者
      • 3.3.交换机(exchange)
    • 4.Python访问rabbitmq
      • 4.1.生产者提交消息
      • 4.2.消费者执行消息
    • 5.总结

1.消息的基本概念

1.1.生产者和消费者

生产者(Producer)
消息的创建者。
负责创建和推送数据到消息服务器。

消费者(Consumer)
消息的接收方。
负责接收消息和处理数据。

1.2.消息队列(Queue)

消息队列是RabbitMQ的内部对象,用于存储生产者的消息直到发送给消费者,它是消费者接收消息的地方。

消息队列的重要属性:

持久性
broker重启前都有效。
自动删除
在所有消费者停止使用之后自动删除。
惰性
没有主动声明队列,调用会导致异常。
排他性
一旦启用,声明它的消费者才能使用。

1.3.交换机(Exchange)

交换机用于接收,分配消息。

  1. 生产者要先指定一个routing key,然后将消息发送到交换机。
  2. routing key需要与exchange type和binding key联合使用才能最终生效。
  3. 交换机将消息路由到一个或多个队列中,或丢弃。

交换机包含4中类型: direct, topic, fanout, headers。

direct(直连交换机)
具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列。先匹配,再投送。Direct Exchange是RabbitMQ的默认交换机模式。这是最简单的模式。它根据routing key全文匹配去寻找队列。

1.4.消息确认

消息确认是指当一个消息从队列中投递给消费者(consumer)后,消费者会通知一下消息代理(broker)。消息确认可以自动,也可以由处理消息的开发者手动执行。当启用消息确认后,消息代理需要收到来自消费者的确认回执后,才完全将消息从队列中删除。

2.七种队列模式

2.1.简单模式(Hello World)

在这里插入图片描述

做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B。
单生产者,单消费者,单队列。

应用场景:

将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。

2.2.工作队列模式(Work queues)

在这里插入图片描述

在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者。适用于资源密集型任务, 单个消费者处理不过来,需要多个消费者进行处理的场景。单生产者,多消费者,单队列。

应用场景:

一个订单的处理需要10s,有多个订单可以同时放到消息队列, 然后让多个消费者同时并行处理,而不是单个消费者的串行消费。

2.3.发布订阅模式(Publish/Subscribe)

在这里插入图片描述
一次向许多消费者发送消息,将消息将广播到所有的消费者。单生产者,多消费者,多队列。

应用场景:

更新商品库存后需要通知多个缓存和多个数据库。

结构如下:

一个fanout类型交换机扇出两个消息队列,分别为缓存消息队列、数据库消息队列
一个缓存消息队列对应着多个缓存消费者
一个数据库消息队列对应着多个数据库消费者

2.4.路由模式(Routing)

在这里插入图片描述
根据Routing Key有选择地接收消息。多消费者,选择性多队列,每个队列通过routing key全文匹配。发送消息到交换机并且要指定路由键(Routing key) 。消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息。

应用场景:

在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12 promote,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息。

2.5.主题模式(Topics)

在这里插入图片描述
主题交换机方式接收消息,将routing key和模式进行匹配。多消费者,选择性多队列,每个队列通过模式匹配。队列需要绑定在一个模式上。#匹配一个词或多个词,*只匹配一个词。

应用场景:

iphone促销活动可以接收主题为多种iPhone的消息,如iphone12、iphone13等。

2.6.远程过程调用(RPC)

在这里插入图片描述

在远程计算机上运行功能并等待结果。

应用场景:

需要等待接口返回数据,如订单支付。

2.7.发布者确认(Publisher Confirms)

与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。

应用场景:

对于消息可靠性要求较高,比如钱包扣款。

3.rabbitmq控制台管理

打开浏览器。访问 http://127.0.0.1:15672
出现管理页面:
账号:guest
密码:guest

登录后,您将看到 RabbitMQ 的控制台界面。该界面将显示以下几个主要部分:

Overview:概览页面提供了关于 RabbitMQ 节点、队列和交换机等的统计信息。
Connections:连接页面提供了有关当前客户端连接的详细信息。
Channels:通道页面提供了有关当前打开通道的详细信息。
Exchanges:交换机页面提供了有关 RabbitMQ 服务器上已声明的交换机的详细信息。
Queues:队列页面提供了有关 RabbitMQ 服务器上已声明的队列的详细信息。
Admin:管理页面提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。

通过 RabbitMQ 控制台,您可以执行许多操作,例如创建和删除队列、交换机和绑定、查看队列和交换机的详细信息、监视连接和通道等。控制台还提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。

3.1.三种基本队列

队列是具有两个主要操作的顺序数据结构:入队和出队。RabbitMQ中的队列是FIFO(先进先出)。一些队列因为一些特性,即消费者的优先级和重新排队,会影响消费者消费的顺序。
在这里插入图片描述

3.1.2.Classic经典队列-单机环境队列

这个是RabbitMQ最经典的队列类型。在单机环境中,拥有比较高的消息可靠性。
在这里插入图片描述我们在创建队列的时候,根据上图可以看到,经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。

Durability:是否持久化,可选项为持久化(Durable)和 临时(Transient)。Durable相对消息安全性更高。但是同时需要有更多的IO操作,所以生产和消费消息的性能,相比Transient会比较低。
Auto delete:是否自动删除,如果选择是,则消息会被其中一个消费者消费之后,队列会自动销毁,其他消费者也会断开连接。一般不会自动删除。

3.1.3.Quorum仲裁队列-高可用队列

仲裁队列,是RabbitMQ从3.8.0版本引入的新的队列类型,整个3.8.X版本,也是针对仲裁队列进行完善和优化。Quorum相比Classic在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum代替Classic。

在这里插入图片描述
Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是Quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。这种队列类似于RocketMQ当中的DLedger集群。这种方式可以保证消息在集群内部不会丢失。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。从整体功能上来说,Quorum队列是在Classic经典队列的基础上做减法,因此对于RabbitMQ的长期使用者而言,其实是会影响使用体验的。他与普通队列的区别:

特性ClassicQuorum
非持久化队列(Non-durable queues支持不支持
独占队列(Exclusivity支持不支持
每条消息的持久化(Per message persistence每条消息总是
会员变更(Membership changes自动手动
消息TTLMessage TTL支持支持(3.10版本开始)
队列TTLQueue TTL支持支持
队列长度限制(Queue length limits支持支持
懒加载(Lazy behaviour支持始终
消息优先级(Message priority支持不支持
消费者优先级(Consumer priority支持支持
死信交换(Dead letter exchanges支持支持
毒消息处理(Poison message handling不支持支持
全局QosGlobal QoS Prefetch支持不支持

Exclusivity表示独占队列,即表示队列只能由声明该队列的Connection连接来进行使用,包括队列创建、删除、收发消息等,并且独占队列会在声明该队列的Connection断开后自动删除。其中有个特例就是这个Poison Message。所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。
Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在x-delivery-count这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。当然,如果配置了死信队列的话,就会进入对应的死信队列。
Quorum队列更适合于 队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。

也对应以下一些不适合使用的场景:

一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。
对消息低延迟要求高: 一致性算法会影响消息的延迟。
对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。
队列消息积压严重 : 如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。

3.1.3.Stream队列-大规模队列

Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型,也是目前官方最为推荐的队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。
在这里插入图片描述

Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点:

大规模分发(large fan-outs)

当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。

消息回溯(Replay/Time-travelling)

RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。

高吞吐性能(Throughput Performance)

Stream队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显。

大日志(Large logs)

RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据。

3.2.生产者和消费者

生产者是消息的提供方,消费者是消息的执行方,代表的是行为的不同,rabbitmq通过登录来验证不同的用户,不同的行为来代表不同人的特征。

3.3.交换机(exchange)

在RabbitMQ中,所有的producer都不会直接把message发送到queue中,甚至producer都不知道message在发出后有没有发送到queue中,事实上,producer只能将message发送给exchange,由exchange来决定发送到哪个queue中。

exchange的一端用来从producer中接收message,另一端用来发送message到queue,exchange的类型规定了怎么处理接收到的message,发布订阅模式使用到的exchange类型为 fanout ,这种exchange类型非常简单,就是将接收到的message广播给已知的(即绑定到此exchange的)所有consumer。

当然,如果不想使用特定的exchange,可以使用 exchange=‘’ 表示使用默认的exchange,默认的exchange会将消息发送到 routing_key 指定的queue,可以参考工作(任务)队列模式和Hello world模式。

4.Python访问rabbitmq

在python语言环境下,使用pika库来访问访问操作rabbitmq中间件。

pip install pika

在这里插入图片描述

4.1.生产者提交消息

# coding=utf-8
# producer
import pika

# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")

# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))

# 2. 在 connect 上创建一个 channel
channel = connect.channel()

# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)

# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')

# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')

# 6. 创建纯文本消息
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'

# 7. 将消息发送到 RabbitMQ
message = 'quit'
channel.basic_publish(exchange='hello',  routing_key='world',  properties=msg_props, body=message)

# 8. 关闭通道
channel.close()

# 9. 当生产者发送完消息后,可选择关闭连接
connect.close()

4.2.消费者执行消息

#!/usr/bin/env python
# coding=utf-8

# consumer

import pika

# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")

# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))

# 2. 在 connect 上创建一个 channel
channel = connect.channel()

# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)

# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')

# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')

# 6. 定义一个回调函数,用来接收生产者发送的消息
'''

在 Python3 中,bytes 和 str 的互相转换方式是
str.encode('utf-8')
bytes.decode('utf-8')

'''
def callback(channel, method, properties, body):
    # 消息确认
    channel.basic_ack(delivery_tag=method.delivery_tag)

    if body.decode('utf-8') == "quit":
        # 停止消费,并退出
        channel.basic_cancel(consumer_tag='hello-consumer')
        channel.close()
        connect.close()
    else:
        print("msg is {}".format(body))
        # print("msg type {}".format(type(body)))
        # print("msg eval after type {}".format(type(eval(body))))


# 7. 消费者消费
channel.basic_consume('hello', callback,  auto_ack=False)

# 8. 开始循环取消息
channel.start_consuming()

5.总结

通过使用rabbitmq技术,可以实现生产者和消费者模式,并实现两者的解耦,生产者负责通过交换机将数据存入队列,而消费者从队列中取数据,并执行相应的消息。可以用在服务器复杂耗时任务的并行计算中使用,与常用的web服务器(如apache等)解耦,提高服务器计算资源的利用效率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1321109.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

windows下wsl(ubuntu)ldconfig报错

错误 sudo ldconfig /sbin/ldconfig.real: Cant link /usr/lib/wsl/lib/libnvoptix_loader.so.1 to libnvoptix.so.1 /sbin/ldconfig.real: /usr/lib/wsl/lib/libcuda.so.1 is not a symbolic link解决: 处理 sudo ldconfig 报错 libcuda.so.1 is not a symbolic …

GZ015 机器人系统集成应用技术样题8-学生赛

2023年全国职业院校技能大赛 高职组“机器人系统集成应用技术”赛项 竞赛任务书(学生赛) 样题8 选手须知: 本任务书共 25页,如出现任务书缺页、字迹不清等问题,请及时向裁判示意,并进行任务书的更换。参赛队…

RPC(3):HttpClient实现RPC之GET请求

1HttpClient简介 在JDK中java.net包下提供了用户HTTP访问的基本功能,但是它缺少灵活性或许多应用所需要的功能。 HttpClient起初是Apache Jakarta Common 的子项目。用来提供高效的、最新的、功能丰富的支持 HTTP 协议的客户端编程工具包,并且它支持 H…

3.2 内容管理模块 - 课程分类、新增课程、修改课程

内容管理模块-课程分类、新增课程、修改课程 文章目录 内容管理模块-课程分类、新增课程、修改课程一、课程分类1.1 课程分类表1.2 查询树形结构1.2.1 表自连接1.2.2 SQL递归 1.3 Mapper1.4 Service1.5 Controller1.6 效果图 二、添加课程2.1 需求分析2.2 数据表2.2.1 课程基础…

【linux】SSH终端Putty配置:文件上传/下载、显示中文字体、自动登录

文章目录 写在前面putty上传/下载文件1. 下载2. 解压和配置3. 使用sz/rz3.1 下载文件:sz3.2 上传文件:rz 显示中文字体1. 下载合适的字体2. 解压和安装3. putty配置 putty自动登录1. putty配置2. putty快捷方式配置3. 使用putty 写在后面 写在前面 一篇博客介绍了12种SSH终端工…

福德植保无人机工厂:创新科技与绿色农业的完美结合

亲爱的读者们,欢迎来到福德植保无人机工厂的世界。这里,科技与农业的完美结合为我们描绘出一幅未来农业的新篇章。福德植保无人机工厂作为行业的领军者,以其领先的无人机技术,创新的理念,为我们展示了一种全新的农业服…

C++共享和保护——(4)保护共享数据

归纳编程学习的感悟, 记录奋斗路上的点滴, 希望能帮到一样刻苦的你! 如有不足欢迎指正! 共同学习交流! 🌎欢迎各位→点赞 👍 收藏⭐ 留言​📝 一滴汗珠万粒粮,万粒汗珠谷…

Scala多线程爬虫程序的数据可视化与分析实践

一、Scala简介 Scala是一种多种类型的编程语言,结合了针对对象编程和函数式编程的功能。它运行在Java虚拟机上,具有强大的运算能力和丰富的库支持。Scala常用于大数据处理、并发编程和Web应用程序开发。其灵活性和高效性编程成为编写多线程爬虫程序的理…

SQL进阶理论篇(十二):InnoDB中的MVCC是如何实现的?

文章目录 简介事务版本号行记录的隐藏列Undo LogRead View的工作流程总结参考文献 简介 在不同的DBMS里,MVCC的实现机制是不同的。本节我们会以InnoDB举例,讲解InnoDB里MVCC的实现机制。 我们需要掌握这么几个概念: 事务版本号行记录的隐藏…

02.微服务组件 Eureka注册中心

1.Eureka注册中心 服务提供者与消费者: 服务提供者:一次业务中,被其它微服务调用的服务。(提供接口给其它微服务)服务消费者:一次业务中,调用其它微服务的服务。(调用其它微服务提供的接口)一个服务是消费者还是提供者&#xff…

Apache Flink(十五):Flink任务提交模式

🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客 🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。 🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频 目录

响应式布局2:手写响应式导航栏(BootStrap实现以及原生实现)

1.响应式导航栏介绍 响应式导航栏是一种在不同设备和屏幕尺寸下自适应布局和显示的导航栏。它可以根据用户所使用的设备(如桌面电脑、平板电脑或手机)自动调整其外观和交互方式,以提供更好的用户体验。 pc端: 手机端&#xff1a…

网络编程『socket套接字 ‖ 简易UDP网络程序』

🔭个人主页: 北 海 🛜所属专栏: Linux学习之旅、神奇的网络世界 💻操作环境: CentOS 7.6 阿里云远程服务器 文章目录 🌤️前言🌦️正文1.预备知识1.1.IP地址1.2.端口号1.3.端口号与进…

【具身智能评估6】Habitat 3.0: A Co-Habitat for Humans, Avatars and Robots

论文标题:Habitat 3.0: A Co-Habitat for Humans, Avatars and Robots 论文作者:Xavier Puig, Eric Undersander, Andrew Szot, Mikael Dallaire Cote, Tsung-Yen Yang, Ruslan Partsey, Ruta Desai, Alexander William Clegg, Michal Hlavac, So Yeon M…

出国旅游需要注意些什么

出国旅游是一种令人兴奋、令人期待的经历。然而,在进行这种经历之前,有几件事情是需要注意的。本文将为您介绍出国旅游需要注意的一些重要事项。首先,为了确保您的出国旅行顺利进行,您应该提前办理好您的签证和护照。不同国家对于…

JAVA主流日志框架梳理学习及使用

前言:目前市面上有挺多JAVA的日志框架,比如JUL(JDK自带的日志框架),Log4j,Logback,Log4j2等,有人可能有疑问说还有slf4j,不过slf4j不是一种日志框架的具体实现,而是一种日志门面(日志门面可以理解为是一种统…

Convolutional Neural Network(CNN)——卷积神经网络

1.NN的局限性 拓展性差 NN的计算量大性能差,不利于在不同规模的数据集上有效运行若输入维度发生变化,需要修改并重新训练网络容易过拟合 全连接导致参数量特别多,容易过拟合如果增加更多层,参数量会翻倍无法有效利用局部特征 输入…

【华为数据之道学习笔记】5-9图模型设计

图模型作为当前流行的信息处理加工技术,自提出以来,迅速在学术界和工业界得到了普及,在智能推荐、决策分析等方面有着广泛的应用。 图模型由节点和边组成。节点表示实体或概念,边则由属性或关系构成。实体指的是具有可区别性且独立…

区域和检索算法(leetcode第303题)

题目描述&#xff1a; 给定一个整数数组 nums&#xff0c;处理以下类型的多个查询:计算索引 left 和 right &#xff08;包含 left 和 right&#xff09;之间的 nums 元素的 和 &#xff0c;其中 left < right 实现 NumArray 类&#xff1a;NumArray(int[] nums) 使用数组…

Spark编程实验二:RDD编程初级实践

目录 一、目的与要求 二、实验内容 三、实验步骤 1、pyspark交互式编程 2、编写独立应用程序实现数据去重 3、编写独立应用程序实现求平均值问题 4、三个综合实例 四、结果分析与实验体会 一、目的与要求 1、熟悉Spark的RDD基本操作及键值对操作&#xff1b; 2、熟悉使…