文章目录
- 1.消息的基本概念
- 1.1.生产者和消费者
- 1.2.消息队列(Queue)
- 1.3.交换机(Exchange)
- 1.4.消息确认
- 2.七种队列模式
- 2.1.简单模式(Hello World)
- 2.2.工作队列模式(Work queues)
- 2.3.发布订阅模式(Publish/Subscribe)
- 2.4.路由模式(Routing)
- 2.5.主题模式(Topics)
- 2.6.远程过程调用(RPC)
- 2.7.发布者确认(Publisher Confirms)
- 3.rabbitmq控制台管理
- 3.1.三种基本队列
- 3.1.2.Classic经典队列-单机环境队列
- 3.1.3.Quorum仲裁队列-高可用队列
- 3.1.3.Stream队列-大规模队列
- 3.2.生产者和消费者
- 3.3.交换机(exchange)
- 4.Python访问rabbitmq
- 4.1.生产者提交消息
- 4.2.消费者执行消息
- 5.总结
1.消息的基本概念
1.1.生产者和消费者
生产者(Producer)
消息的创建者。
负责创建和推送数据到消息服务器。
消费者(Consumer)
消息的接收方。
负责接收消息和处理数据。
1.2.消息队列(Queue)
消息队列是RabbitMQ的内部对象,用于存储生产者的消息直到发送给消费者,它是消费者接收消息的地方。
消息队列的重要属性:
持久性
broker重启前都有效。
自动删除
在所有消费者停止使用之后自动删除。
惰性
没有主动声明队列,调用会导致异常。
排他性
一旦启用,声明它的消费者才能使用。
1.3.交换机(Exchange)
交换机用于接收,分配消息。
- 生产者要先指定一个routing key,然后将消息发送到交换机。
- routing key需要与exchange type和binding key联合使用才能最终生效。
- 交换机将消息路由到一个或多个队列中,或丢弃。
交换机包含4中类型: direct, topic, fanout, headers。
direct(直连交换机)
具有路由功能的交换机,绑定到此交换机的时候需要指定一个routing_key,交换机发送消息的时候需要routing_key,会将消息发送道对应的队列。先匹配,再投送。Direct Exchange是RabbitMQ的默认交换机模式。这是最简单的模式。它根据routing key全文匹配去寻找队列。
1.4.消息确认
消息确认是指当一个消息从队列中投递给消费者(consumer)后,消费者会通知一下消息代理(broker)。消息确认可以自动,也可以由处理消息的开发者手动执行。当启用消息确认后,消息代理需要收到来自消费者的确认回执后,才完全将消息从队列中删除。
2.七种队列模式
2.1.简单模式(Hello World)
做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B。
单生产者,单消费者,单队列。
应用场景:
将发送的电子邮件放到消息队列,然后邮件服务在队列中获取邮件并发送给收件人。
2.2.工作队列模式(Work queues)
在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者。适用于资源密集型任务, 单个消费者处理不过来,需要多个消费者进行处理的场景。单生产者,多消费者,单队列。
应用场景:
一个订单的处理需要10s,有多个订单可以同时放到消息队列, 然后让多个消费者同时并行处理,而不是单个消费者的串行消费。
2.3.发布订阅模式(Publish/Subscribe)
一次向许多消费者发送消息,将消息将广播到所有的消费者。单生产者,多消费者,多队列。
应用场景:
更新商品库存后需要通知多个缓存和多个数据库。
结构如下:
一个fanout类型交换机扇出两个消息队列,分别为缓存消息队列、数据库消息队列
一个缓存消息队列对应着多个缓存消费者
一个数据库消息队列对应着多个数据库消费者
2.4.路由模式(Routing)
根据Routing Key有选择地接收消息。多消费者,选择性多队列,每个队列通过routing key全文匹配。发送消息到交换机并且要指定路由键(Routing key) 。消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息。
应用场景:
在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12 promote,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息。
2.5.主题模式(Topics)
主题交换机方式接收消息,将routing key和模式进行匹配。多消费者,选择性多队列,每个队列通过模式匹配。队列需要绑定在一个模式上。#匹配一个词或多个词,*只匹配一个词。
应用场景:
iphone促销活动可以接收主题为多种iPhone的消息,如iphone12、iphone13等。
2.6.远程过程调用(RPC)
在远程计算机上运行功能并等待结果。
应用场景:
需要等待接口返回数据,如订单支付。
2.7.发布者确认(Publisher Confirms)
与发布者进行可靠的发布确认,发布者确认是RabbitMQ扩展,可以实现可靠的发布。在通道上启用发布者确认后,RabbitMQ将异步确认发送者发布的消息,这意味着它们已在服务器端处理。
应用场景:
对于消息可靠性要求较高,比如钱包扣款。
3.rabbitmq控制台管理
打开浏览器。访问 http://127.0.0.1:15672
出现管理页面:
账号:guest
密码:guest
登录后,您将看到 RabbitMQ 的控制台界面。该界面将显示以下几个主要部分:
Overview:概览页面提供了关于 RabbitMQ 节点、队列和交换机等的统计信息。
Connections:连接页面提供了有关当前客户端连接的详细信息。
Channels:通道页面提供了有关当前打开通道的详细信息。
Exchanges:交换机页面提供了有关 RabbitMQ 服务器上已声明的交换机的详细信息。
Queues:队列页面提供了有关 RabbitMQ 服务器上已声明的队列的详细信息。
Admin:管理页面提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。
通过 RabbitMQ 控制台,您可以执行许多操作,例如创建和删除队列、交换机和绑定、查看队列和交换机的详细信息、监视连接和通道等。控制台还提供了一些高级管理功能,例如添加用户、设置权限和定义策略等。
3.1.三种基本队列
队列是具有两个主要操作的顺序数据结构:入队和出队。RabbitMQ中的队列是FIFO(先进先出)。一些队列因为一些特性,即消费者的优先级和重新排队,会影响消费者消费的顺序。
3.1.2.Classic经典队列-单机环境队列
这个是RabbitMQ最经典的队列类型。在单机环境中,拥有比较高的消息可靠性。
我们在创建队列的时候,根据上图可以看到,经典队列可以选择是否持久化(Durability)以及是否自动删除(Auto delete)两个属性。
Durability:是否持久化,可选项为持久化(Durable)和 临时(Transient)。Durable相对消息安全性更高。但是同时需要有更多的IO操作,所以生产和消费消息的性能,相比Transient会比较低。
Auto delete:是否自动删除,如果选择是,则消息会被其中一个消费者消费之后,队列会自动销毁,其他消费者也会断开连接。一般不会自动删除。
3.1.3.Quorum仲裁队列-高可用队列
仲裁队列,是RabbitMQ从3.8.0版本引入的新的队列类型,整个3.8.X版本,也是针对仲裁队列进行完善和优化。Quorum相比Classic在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum代替Classic。
Quorum是基于Raft一致性协议实现的一种新型的分布式消息队列,他实现了持久化,多备份的FIFO队列,主要就是针对RabbitMQ的镜像模式设计的。简单理解就是Quorum队列中的消息需要有集群中多半节点同意确认后,才会写入到队列中。这种队列类似于RocketMQ当中的DLedger集群。这种方式可以保证消息在集群内部不会丢失。同时,Quorum是以牺牲很多高级队列特性为代价,来进一步保证消息在分布式环境下的高可靠。从整体功能上来说,Quorum队列是在Classic经典队列的基础上做减法,因此对于RabbitMQ的长期使用者而言,其实是会影响使用体验的。他与普通队列的区别:
特性 | Classic | Quorum |
---|---|---|
非持久化队列(Non-durable queues ) | 支持 | 不支持 |
独占队列(Exclusivity ) | 支持 | 不支持 |
每条消息的持久化(Per message persistence ) | 每条消息 | 总是 |
会员变更(Membership changes ) | 自动 | 手动 |
消息TTL (Message TTL ) | 支持 | 支持(3.10版本开始) |
队列TTL (Queue TTL ) | 支持 | 支持 |
队列长度限制(Queue length limits ) | 支持 | 支持 |
懒加载(Lazy behaviour ) | 支持 | 始终 |
消息优先级(Message priority ) | 支持 | 不支持 |
消费者优先级(Consumer priority ) | 支持 | 支持 |
死信交换(Dead letter exchanges ) | 支持 | 支持 |
毒消息处理(Poison message handling ) | 不支持 | 支持 |
全局Qos (Global QoS Prefetch ) | 支持 | 不支持 |
Exclusivity表示独占队列,即表示队列只能由声明该队列的Connection连接来进行使用,包括队列创建、删除、收发消息等,并且独占队列会在声明该队列的Connection断开后自动删除。其中有个特例就是这个Poison Message。所谓毒消息是指消息一直不能被消费者正常消费(可能是由于消费者失败或者消费逻辑有问题等),就会导致消息不断的重新入队,这样这些消息就成为了毒消息。这些读消息应该有保障机制进行标记并及时删除。
Quorum队列会持续跟踪消息的失败投递尝试次数,并记录在x-delivery-count这样一个头部参数中。然后,就可以通过设置 Delivery limit参数来定制一个毒消息的删除策略。当消息的重复投递次数超过了Delivery limit参数阈值时,RabbitMQ就会删除这些毒消息。当然,如果配置了死信队列的话,就会进入对应的死信队列。
Quorum队列更适合于 队列长期存在,并且对容错、数据安全方面的要求比低延迟、不持久等高级队列更能要求更严格的场景。例如 电商系统的订单,引入MQ后,处理速度可以慢一点,但是订单不能丢失。
也对应以下一些不适合使用的场景:
一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列。
对消息低延迟要求高: 一致性算法会影响消息的延迟。
对数据安全性要求不高:Quorum队列需要消费者手动通知或者生产者手动确认。
队列消息积压严重 : 如果队列中的消息很大,或者积压的消息很多,就不要使用Quorum队列。Quorum队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。
3.1.3.Stream队列-大规模队列
Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型,也是目前官方最为推荐的队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。
Stream队列的核心是以append-only只添加的日志来记录消息,整体来说,就是消息将以append-only的方式持久化到日志文件中,然后通过调整每个消费者的消费进度offset,来实现消息的多次分发。这种队列提供了RabbitMQ已有的其他队列类型不太好实现的四个特点:
大规模分发(large fan-outs)
当想要向多个订阅者发送相同的消息时,以往的队列类型必须为每个消费者绑定一个专用的队列。如果消费者的数量很大,这就会导致性能低下。而Stream队列允许任意数量的消费者使用同一个队列的消息,从而消除绑定多个队列的需求。
消息回溯(Replay/Time-travelling)
RabbitMQ已有的这些队列类型,在消费者处理完消息后,消息都会从队列中删除,因此,无法重新读取已经消费过的消息。而Stream队列允许用户在日志的任何一个连接点开始重新读取数据。
高吞吐性能(Throughput Performance)
Stream队列的设计以性能为主要目标,对消息传递吞吐量的提升非常明显。
大日志(Large logs)
RabbitMQ一直以来有一个让人诟病的地方,就是当队列中积累的消息过多时,性能下降会非常明显。但是Stream队列的设计目标就是以最小的内存开销高效地存储大量的数据。
3.2.生产者和消费者
生产者是消息的提供方,消费者是消息的执行方,代表的是行为的不同,rabbitmq通过登录来验证不同的用户,不同的行为来代表不同人的特征。
3.3.交换机(exchange)
在RabbitMQ中,所有的producer都不会直接把message发送到queue中,甚至producer都不知道message在发出后有没有发送到queue中,事实上,producer只能将message发送给exchange,由exchange来决定发送到哪个queue中。
exchange的一端用来从producer中接收message,另一端用来发送message到queue,exchange的类型规定了怎么处理接收到的message,发布订阅模式使用到的exchange类型为 fanout ,这种exchange类型非常简单,就是将接收到的message广播给已知的(即绑定到此exchange的)所有consumer。
当然,如果不想使用特定的exchange,可以使用 exchange=‘’ 表示使用默认的exchange,默认的exchange会将消息发送到 routing_key 指定的queue,可以参考工作(任务)队列模式和Hello world模式。
4.Python访问rabbitmq
在python语言环境下,使用pika库来访问访问操作rabbitmq中间件。
pip install pika
4.1.生产者提交消息
# coding=utf-8
# producer
import pika
# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")
# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))
# 2. 在 connect 上创建一个 channel
channel = connect.channel()
# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)
# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')
# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')
# 6. 创建纯文本消息
msg_props = pika.BasicProperties()
msg_props.content_type = 'text/plain'
# 7. 将消息发送到 RabbitMQ
message = 'quit'
channel.basic_publish(exchange='hello', routing_key='world', properties=msg_props, body=message)
# 8. 关闭通道
channel.close()
# 9. 当生产者发送完消息后,可选择关闭连接
connect.close()
4.2.消费者执行消息
#!/usr/bin/env python
# coding=utf-8
# consumer
import pika
# 指定远程 rabbitmq 的用户名密码并创建凭证
credentials = pika.PlainCredentials(username="guest", password="guest")
# 1. 创建 connect 连接
connect = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1', port=5672, virtual_host='/', credentials=credentials))
# 2. 在 connect 上创建一个 channel
channel = connect.channel()
# 3. 在 channel 上声明交换器 exchange
channel.exchange_declare(exchange='hello', exchange_type='direct', passive=False, durable=True, auto_delete=False)
# 4. 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
channel.queue_declare(queue='hello')
# 5. 通过键 'world' 将队列和交换器绑定
channel.queue_bind(queue='hello', exchange='hello', routing_key='world')
# 6. 定义一个回调函数,用来接收生产者发送的消息
'''
在 Python3 中,bytes 和 str 的互相转换方式是
str.encode('utf-8')
bytes.decode('utf-8')
'''
def callback(channel, method, properties, body):
# 消息确认
channel.basic_ack(delivery_tag=method.delivery_tag)
if body.decode('utf-8') == "quit":
# 停止消费,并退出
channel.basic_cancel(consumer_tag='hello-consumer')
channel.close()
connect.close()
else:
print("msg is {}".format(body))
# print("msg type {}".format(type(body)))
# print("msg eval after type {}".format(type(eval(body))))
# 7. 消费者消费
channel.basic_consume('hello', callback, auto_ack=False)
# 8. 开始循环取消息
channel.start_consuming()
5.总结
通过使用rabbitmq技术,可以实现生产者和消费者模式,并实现两者的解耦,生产者负责通过交换机将数据存入队列,而消费者从队列中取数据,并执行相应的消息。可以用在服务器复杂耗时任务的并行计算中使用,与常用的web服务器(如apache等)解耦,提高服务器计算资源的利用效率。