1、RabbitMQ教程
《RabbitMQ Tutorials》https://www.rabbitmq.com/tutorials
RabbitMQ是一个消息代理,它接受并转发消息。你可以将其想象成一个邮局:当你将需要邮寄的信件放入邮筒时,你可以确信邮递员最终会将邮件投递给你的收件人。在这个类比中,RabbitMQ既是邮筒,也是邮局,还是邮递员。
RabbitMQ与邮局的主要区别在于它不处理纸张,而是接受、存储和转发数据的二进制块(binary blobs of data)——即消息。
RabbitMQ以及消息传递通常使用一些专业术语。以下是一些常见的术语和它们的含义:
-
消息(Message):消息是发送者发送和接收者接收的数据单元。
-
生产者(Producer):生产者是发送消息的一方。
-
消费者(Consumer):消费者是接收和处理消息的一方。
-
队列(Queue):队列是消息的缓冲区,用于存储等待被消费者处理的消息。
-
交换机(Exchange):交换机接收来自生产者的消息,并将它们路由到一个或多个队列。
-
绑定(Binding):绑定是将队列和交换机连接在一起的规则,定义了交换机应将哪些消息发送到哪个队列。
-
路由键(Routing Key):路由键是消息的一部分,用于决定消息应该发送到哪个队列。
-
虚拟主机(Virtual Host):虚拟主机是RabbitMQ中的一个命名空间,它拥有自己的队列、交换机和绑定。
-
持久性(Durability):持久性是指消息或队列在RabbitMQ重启后仍然保持存在的特性。
-
确认(Acknowledgment):确认是消费者向RabbitMQ发送的一个信号,表明消息已经被成功接收和处理。
-
死信队列(Dead Letter Exchange):死信队列是用于存储无法被正常处理的消息的队列。
-
消息模式(Message Pattern):消息模式是消息传递中使用的设计模式,如发布/订阅、请求/响应等。
2、环境准备
2.1 准备虚机
可参考《VMware Workstation安装Ubuntu 22.04笔记》。
2.2 安装Docker
可参考《Ubuntu 22.04 Docker安装笔记》。
root@k0test1:~# ip a
1: lo: <LOOPBACK,UP,LOWER_UP> mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host
valid_lft forever preferred_lft forever
2: ens32: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 00:50:56:39:7e:a9 brd ff:ff:ff:ff:ff:ff
altname enp2s0
inet 10.0.20.70/24 brd 10.0.20.255 scope global ens32
valid_lft forever preferred_lft forever
inet6 fe80::250:56ff:fe39:7ea9/64 scope link
valid_lft forever preferred_lft forever
3: docker0: <NO-CARRIER,BROADCAST,MULTICAST,UP> mtu 1500 qdisc noqueue state DOWN group default
link/ether 02:42:40:29:d1:63 brd ff:ff:ff:ff:ff:ff
inet 172.17.0.1/16 brd 172.17.255.255 scope global docker0
valid_lft forever preferred_lft forever
root@k0test1:~# docker --version
Docker version 27.1.2, build d01f264
root@k0test1:~#
2.3 使用Dockerfile创建Docker images
可参考《Dockerfile创建Docker image练习》,创建Docker images ubtest:22.04。
root@k0test1:~# pwd
/root
root@k0test1:~# cat Dockerfile
FROM ubuntu:22.04
RUN apt-get -qq update \
&& apt-get -qq install vim -y \
&& apt-get -qq install iproute2 -y \
&& apt-get -qq install iputils-ping -y \
&& apt-get -qq install openssh-server -y \
&& rm -rf /var/lib/apt/lists/*
RUN mkdir /var/run/sshd
RUN echo "root:openstack" | chpasswd
RUN echo 'PermitRootLogin yes' >> /etc/ssh/sshd_config
CMD ["/usr/sbin/sshd", "-D"]
root@k0test1:~# docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ubtest 22.04 4a9576459c4c 9 hours ago 234MB
ubuntu 22.04 8a3cdc4d1ad3 7 weeks ago 77.9MB
hello-world latest d2c94e258dcb 15 months ago 13.3kB
root@k0test1:~#
2.4 Docker安装RabbitMQ Server
尝试使用 RabbitMQ,可以使用社区提供的 Docker 镜像来快速部署:
root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
--rm
: 这个选项意味着当容器退出时,自动清理容器文件系统。
--name rabbitmq
: 这为容器指定了一个名称rabbitmq
,方便后续的管理和引用。
-p 5672:5672
: 这将容器内部的 5672 端口映射到宿主机的 5672 端口,5672 端口是 RabbitMQ 服务默认的 AMQP 协议端口。
-p 15672:15672
: 这将容器内部的 15672 端口映射到宿主机的 15672 端口,15672 端口是 RabbitMQ 管理界面的 HTTP 端口。
rabbitmq:3.13-management
: 这是指定要运行的 Docker 镜像的名称和标签。3.13-management
标签表明这是一个 RabbitMQ 3.13 版本的镜像,且包含了管理插件。
安装过程中的部分信息:
部分安装信息:
...
2024-08-17 22:25:33.847883+00:00 [info] <0.742.0> Management plugin: HTTP (non-TLS) listener started on port 15672
2024-08-17 22:25:33.848083+00:00 [info] <0.772.0> Statistics database started.
2024-08-17 22:25:33.848604+00:00 [info] <0.771.0> Starting worker pool 'management_worker_pool' with 3 processes in it
2024-08-17 22:25:33.859781+00:00 [info] <0.790.0> Prometheus metrics: HTTP (non-TLS) listener started on port 15692
2024-08-17 22:25:33.860550+00:00 [info] <0.676.0> Ready to start client connection listeners
2024-08-17 22:25:33.862729+00:00 [info] <0.834.0> started TCP listener on [::]:5672
completed with 5 plugins.
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0> Server startup complete; 5 plugins started.
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0> * rabbitmq_prometheus
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0> * rabbitmq_federation
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0> * rabbitmq_management
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0> * rabbitmq_management_agent
2024-08-17 22:25:33.938357+00:00 [info] <0.676.0> * rabbitmq_web_dispatch
2024-08-17 22:25:34.093106+00:00 [info] <0.9.0> Time to start RabbitMQ: 7726 ms
Management Plugin HTTP Listener: 管理插件的 HTTP (非TLS) 监听器已经在端口 15672 上启动,可以通过访问这个端口来使用 RabbitMQ 的 Web 管理界面。
Prometheus Metrics: 服务器已成功启动 Prometheus 指标的 HTTP (非TLS) 监听器,监听端口为 15692。这意味着 RabbitMQ 可以为 Prometheus 提供监控数据,便于系统监控和性能分析。
Client Connection Listeners: 服务器已准备好启动客户端连接监听器。
TCP Listener: 服务器已在端口 5672 上启动了 TCP 监听器,这是 AMQP 客户端连接到 RabbitMQ 服务器的标准端口。
Plugins Started: 服务器启动了 5 个插件,包括:
rabbitmq_prometheus
: 用于 Prometheus 监控的插件。rabbitmq_federation
: 用于跨服务器消息传递的插件。rabbitmq_management
: 提供管理界面和HTTP API的插件。rabbitmq_management_agent
: 管理代理插件,支持管理界面。rabbitmq_web_dispatch
: 处理HTTP请求的插件。Server Startup Complete: RabbitMQ 服务器启动完成,总共耗时 7726 毫秒。
这些日志条目表明 RabbitMQ 服务器已成功启动,并且所有必要的插件都已加载。如果需要访问或管理 RabbitMQ 服务器,可以通过管理界面(默认端口为 15672)进行操作。
Web登录RabbitMQ(缺省username/password: guest/guest):
2.5 使用Docker容器准备两台测试机器
使用Docker容器准备两台测试机器,名称分别为:sender和receiver。
root@k0test1:~# docker run --name sender --hostname sender -d ubtest:22.04
4a86598c28928cffebbe2e61a86305b54176319006e26fc8bf69201cda6b8748
root@k0test1:~# docker run --name receiver --hostname receiver -d ubtest:22.04
17d06c4aca4a7c4133fd786a0f849354b3c48bc824443c7bde9a979a05dac37f
root@k0test1:~#
root@k0test1:~# docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
17d06c4aca4a ubtest:22.04 "/usr/sbin/sshd -D" 2 minutes ago Up 2 minutes receiver
4a86598c2892 ubtest:22.04 "/usr/sbin/sshd -D" 2 minutes ago Up 2 minutes sender
cb4bd3e88529 rabbitmq:3.13-management "docker-entrypoint.s…" 2 hours ago Up 2 hours 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, :::5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp, :::15672->15672/tcp rabbitmq
root@k0test1:~#
2.6 在测试机器安装RabbitMQ客户端软件
RabbitMQ 支持多种消息协议,其中包括 AMQP 0-9-1,这是一个开放的、通用的消息传递协议。对于不同编程语言的开发者来说,RabbitMQ 提供了多种客户端库,以便能够与 RabbitMQ 服务器进行交互。
在 Python 中,pika 是一个广泛使用的 RabbitMQ 客户端库,它允许 Python 应用程序作为消息的发送者(生产者)和接收者(消费者)。
在两台测试机器分别安装python3、pip3和pika软件:
sender:
root@k0test1:~# docker exec -it sender /bin/bash
root@sender:/# apt update
root@sender:/# apt install python3
root@sender:/# apt install python3-pip
root@sender:/# python3 -m pip install pika --upgrade
Collecting pika
Downloading pika-1.3.2-py3-none-any.whl (155 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 155.4/155.4 KB 1.2 MB/s eta 0:00:00
Installing collected packages: pika
Successfully installed pika-1.3.2
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
root@sender:/#
receiver:
root@k0test1:~# docker exec -it receiver /bin/bash
root@receiver:/# apt update
root@receiver:/# apt install python3
root@receiver:/# apt install python3-pip
root@receiver:/# python3 -m pip install pika --upgrade
Collecting pika
Downloading pika-1.3.2-py3-none-any.whl (155 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 155.4/155.4 KB 824.8 kB/s eta 0:00:00
Installing collected packages: pika
Successfully installed pika-1.3.2
WARNING: Running pip as the 'root' user can result in broken permissions and conflicting behaviour with the system package manager. It is recommended to use a virtual environment instead: https://pip.pypa.io/warnings/venv
root@receiver:/#
2.7 网络拓扑
3、Hello world练习
3.1 概述
一个典型的消息传递系统中的组件和它们之间的关系如下图所示,P是生产者(Producer),C是消费者(Consumer),中间是一个队列(Queue)——一个消息缓冲区(原文:In the diagram below, "P" is our producer and "C" is our consumer. The box in the middle is a queue - a message buffer that RabbitMQ keeps on behalf of the consumer.):
-
生产者(Producer):这里的"P"代表生产者,也就是发送消息的一方。生产者生成数据或信息,然后将其发送到消息系统。
-
消费者(Consumer):"C"代表消费者,即接收和处理消息的一方。消费者从消息系统中获取数据,并对其进行处理。
-
队列(Queue):中间的"盒子"指的是队列,它是RabbitMQ用来存储消息的缓冲区。队列在消息传递中扮演着至关重要的角色,它确保即使在消费者暂时无法接收消息的情况下,消息也不会丢失。
-
消息缓冲(Message Buffer):队列作为消息缓冲区,可以存储生产者发送的消息,直到消费者准备好接收它们。
-
RabbitMQ:RabbitMQ是一个消息代理,它负责维护队列和消息的传递。它接受生产者发送的消息,并将其存储在队列中,然后根据消费者的请求将消息传递给消费者。
-
代表消费者(on behalf of the consumer):RabbitMQ维护队列是为了消费者的利益。这意味着RabbitMQ确保消息在消费者准备接收之前被安全地存储,并且在消费者请求时能够可靠地传递给消费者。
总结来说,一个消息传递的基本流程:生产者发送消息到RabbitMQ,RabbitMQ将消息存储在队列中,消费者从队列中接收消息并进行处理。队列作为消息的临时存储,确保了消息传递的可靠性和灵活性。
在本练习中,将用Python编写两个小程序:一个生产者(发送者)发送一条消息,和一个消费者(接收者)接收消息并打印出来。
在本网络环境中,生产者位于Sender容器,消费者位于Receiver容器。
3.2 Sending
进入sender容器,vi编写send.py:
root@k0test1:~# docker exec -it sender /bin/bash
root@sender:/# pwd
/
root@sender:/# vi send.py
root@sender:/# cat send.py
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
root@sender:/#
这段Python代码是一个生产者脚本,用于向RabbitMQ发送消息。下面是代码的具体解释:
import pika
:导入pika
库,这是一个用于与RabbitMQ交互的Python客户端库。创建到RabbitMQ服务器的连接:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
:这行代码创建了一个连接,连接到IP地址为172.17.0.2
的RabbitMQ服务器。这里的host
参数应该设置为RabbitMQ服务运行的主机地址。创建一个通道:
channel = connection.channel()
:在连接上创建一个新的通道,用于消息的发送和接收。声明一个队列:
channel.queue_declare(queue='hello')
:声明一个名为hello
的队列。如果该队列不存在,RabbitMQ将自动创建它。在发送消息之前确保接收队列存在是非常重要的,因为如果消息发送到不存在的位置,RabbitMQ将会丢弃该消息。发布消息:
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
:通过通道发布一条消息。消息内容为'Hello World!'
。这里没有指定交换机(exchange),所以消息将直接发送到指定的队列(routing_key也设置为hello
,与队列名相同)。在RabbitMQ中,消息不能直接发送到队列,而是必须通过一个交换机(Exchange)。这里使用了默认交换机,这个交换机可以通过一个空字符串来标识。默认交换机是RabbitMQ中一个特殊的实体,它允许我们直接指定消息应该发送到哪个队列。这是通过在发送消息时设置
routing_key
参数来实现的,routing_key
应该与目标队列的名称相匹配。打印消息确认:
print(" [x] Sent 'Hello World!'")
:在控制台打印一条消息,表示消息已经成功发送。关闭连接:
connection.close()
:关闭与RabbitMQ的连接,释放资源。这个脚本是一个简单的生产者示例,用于演示如何使用
pika
库向RabbitMQ发送消息。在实际应用中,你可能需要处理连接异常、消息确认等更复杂的逻辑。
3.3 Receiving
进入receiver容器,vi编写receive.py:
root@k0test1:~# docker exec -it receiver /bin/bash
root@receiver:/# pwd
/
root@receiver:/# vi receive.py
root@receiver:/# cat receive.py
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
root@receiver:/#
这段Python代码是一个使用
pika
库与RabbitMQ交互的消费者脚本。下面是代码的逐行解释:
import pika, sys, os
:导入所需的模块,pika
用于RabbitMQ通信,sys
用于访问与Python解释器相关的变量和函数,os
用于操作系统功能。定义
main()
函数:这是脚本的主要入口点。创建到RabbitMQ服务器的连接:
connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2'))
:创建一个连接到IP地址为172.17.0.2
的RabbitMQ服务器。创建一个通道:
channel = connection.channel()
:在连接上创建一个新的通道。声明一个队列:
channel.queue_declare(queue='hello')
:声明一个名为hello
的队列,如果该队列不存在,RabbitMQ将自动创建它。- 在send.py代码中已经声明了队列,但在某些情况下,或者为了代码的健壮性和可维护性,重复声明队列是一种好的实践。
定义回调函数
callback
:
def callback(ch, method, properties, body)
:定义一个函数,当消息到达时会被调用。参数包括通道(ch
)、方法(method
)、属性(properties
)和消息体(body
)。print(f" [x] Received {body}")
:在控制台打印接收到的消息内容。设置消息消费:
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
:设置消费者开始监听名为hello
的队列,当有新消息到达时调用callback
函数。auto_ack=True
表示自动发送确认给RabbitMQ,表明消息已被接收。打印等待消息的提示:
print(' [*] Waiting for messages. To exit press CTRL+C')
:通知用户脚本正在等待消息,并告知如何退出。开始接收消息:
channel.start_consuming()
:启动消息接收循环。检查是否作为主程序运行:
if __name__ == '__main__':
:如果这个脚本是作为主程序运行的,而不是被导入到其他脚本中。捕获
KeyboardInterrupt
异常:
try...except KeyboardInterrupt:
:捕获CTRL+C
中断信号,允许用户通过键盘中断退出脚本。退出程序:
sys.exit(0)
:尝试正常退出程序。except SystemExit: os._exit(0)
:如果sys.exit()
抛出SystemExit
异常,使用os._exit(0)
强制退出程序。这个脚本作为一个RabbitMQ消费者,监听名为
hello
的队列,并在控制台打印出接收到的消息。使用CTRL+C
可以中断消息接收循环并退出程序。
3.4 开始测试
1、在receive容器,运行消费者程序receive.py
消费者程序将开始运行并等待消息。消费者程序将持续运行并监听指定的队列。只要它在运行,它就会接收并处理发送到队列的消息。
root@receiver:/# python3 receive.py
[*] Waiting for messages. To exit press CTRL+C
2、此时,在rabbitmq容器,查看队列:
root@0840a29d9e2f:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name messages
hello 0
root@0840a29d9e2f:/#
这里是命令输出的解释:
root@0840a29d9e2f:/#
:这是你的命令行提示符,显示你当前以root
用户登录到一个容器或者某个系统的shell中。
rabbitmqctl list_queues
:这是你执行的命令,用来列出RabbitMQ中所有的队列。
Timeout: 60.0 seconds ...
:这个信息表明rabbitmqctl
命令设置了60秒的超时时间。
Listing queues for vhost / ...
:这表明命令正在列出RabbitMQ的默认虚拟主机(vhost)下的队列。
name messages
:这是列标题,表示输出将显示队列的名称和消息数量。
hello 0
:这是列出的一个队列,名为hello
,当前有0条消息。根据这个输出,可以得出以下结论:
- RabbitMQ服务器上至少有一个名为
hello
的队列。- 这个队列目前没有任何消息。
3、在sender容器,启动生产者程序send.py发送消息
root@sender:/# python3 send.py
[x] Sent 'Hello World!'
root@sender:/#
4、观察消费者响应
一旦生产者发送了消息,消费者应该会接收到消息,并在控制台中打印出消息内容。
root@receiver:/# python3 receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!' <-----消费者收到的生产者发生的消息
从这个输出可以得出以下结论:
- 消费者程序正在运行,并且已经成功连接到RabbitMQ服务器。
- 消费者程序监听的队列接收到了消息。
- 接收到的消息内容是
'Hello World!'
。
5、重复第3步,生产者多次发送消息, 消费者多次接收消息:
root@sender:/# python3 send.py
[x] Sent 'Hello World!'
root@sender:/# python3 send.py
[x] Sent 'Hello World!'
root@sender:/# python3 send.py
[x] Sent 'Hello World!'
root@sender:/# python3 send.py
[x] Sent 'Hello World!'
root@sender:/# python3 send.py
[x] Sent 'Hello World!'
root@sender:/#
root@receiver:/# python3 receive.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World!'
[x] Received b'Hello World!'
[x] Received b'Hello World!'
[x] Received b'Hello World!'
[x] Received b'Hello World!'
6、停止消费者程序
要停止消费者程序,你可以在运行它的终端中按下CTRL+C
。程序应该会捕捉到这个中断信号,并优雅地关闭通道和连接,然后退出。
4、Wireshark抓包
4.1 抓包方式
在虚机上运行wireshark,捕获docker0端口流量:
root@k0test1:~# wireshark &
[1] 2562
4.2 抓包信息
4.3 典型数据包
1、生产者向RabbitMQ Server发送消息
Frame 42: 88 bytes on wire (704 bits), 88 bytes captured (704 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:03 (02:42:ac:11:00:03), Dst: 02:42:ac:11:00:02 (02:42:ac:11:00:02)
Internet Protocol Version 4, Src: 172.17.0.3 (172.17.0.3), Dst: 172.17.0.2 (172.17.0.2)
Transmission Control Protocol, Src Port: 48864, Dst Port: 5672, Seq: 382, Ack: 597, Len: 22
Advanced Message Queuing Protocol
Type: Method (1)
Channel: 1
Length: 14
Class: Basic (60)
Method: Publish (40)
Arguments
Ticket: 0
Exchange:
Routing-Key: hello
.... ...0 = Mandatory: False
.... ..0. = Immediate: False
这段 Wireshark 捕获的信息描述了一个使用 AMQP 协议的 "Publish" 方法调用,它用于在 RabbitMQ 等消息代理中发送消息到一个交换机。以下是对这些信息的详细解释:
Advanced Message Queuing Protocol: 表示这是 AMQP 协议的数据包。
Type: Method (1): 表示这是一个方法类型的消息,用于执行操作。
Channel: 1: 指定了 AMQP 通道号,通道是 AMQP 连接中的一个虚拟通道,用于隔离消息。
Length: 14: 表示方法帧有效载荷的长度是 14 字节。
Class: Basic (60): 表示使用的是 AMQP 协议的基础类(Basic Class),这个类定义了基本操作,如消息的发布和接收。
Method: Publish (40): 表示执行的操作是发布消息(Publish)。这是客户端用来将消息发送到交换机的方法。
Arguments:
- 包含了执行 Publish 方法所需的参数。
Ticket: 0: 指定了用于访问限制的票据ID。
Exchange:: 指定了消息要发送到的交换机名称。在这个例子中,交换机名称没有给出,可能使用默认交换机。
Routing-Key: hello: 指定了路由键,用于决定消息如何从交换机路由到队列。在这个例子中,路由键是 "hello",这意味着消息将被发送到名为 "hello" 的队列(如果交换机配置正确)。
Mandatory: False: 表示如果消息无法被路由到任何队列,不强制客户端接收到一个返回(否定确认)。
Immediate: False: 表示消息应该在所有绑定的队列为空时被路由,而不是直接返回给发送者。
这个数据帧表示客户端通过 AMQP 协议的 Publish 方法向交换机发送了一个消息,消息将根据指定的路由键 "hello" 被路由到相应的队列。如果使用的是默认交换机,并且存在一个名为 "hello" 的队列,消息将被投递到那个队列中。
2、RabbitMQ 服务器向消费者发送消息
Frame 48: 175 bytes on wire (1400 bits), 175 bytes captured (1400 bits) on interface docker0, id 0
Ethernet II, Src: 02:42:ac:11:00:02 (02:42:ac:11:00:02), Dst: 02:42:ac:11:00:04 (02:42:ac:11:00:04)
Internet Protocol Version 4, Src: 172.17.0.2 (172.17.0.2), Dst: 172.17.0.4 (172.17.0.4)
Transmission Control Protocol, Src Port: 5672, Dst Port: 47890, Seq: 648, Ack: 446, Len: 109
Advanced Message Queuing Protocol
Type: Method (1)
Channel: 1
Length: 59
Class: Basic (60)
Method: Deliver (60)
Arguments
Consumer-Tag: ctag1.a0f530198eb74e1a85390418796baf1a
Delivery-Tag: 1
.... ...0 = Redelivered: False
Exchange:
Routing-Key: hello
Advanced Message Queuing Protocol
Type: Content header (2)
Channel: 1
Length: 14
Class ID: Basic (60)
Weight: 0
Body size: 12
Property flags: 0x0000
Properties
Advanced Message Queuing Protocol
Type: Content body (3)
Channel: 1
Length: 12
Payload: 48656c6c6f20576f726c6421
这段 Wireshark 捕获的信息描述了一个使用 AMQP 协议的 "Deliver" 方法调用,这通常表示 RabbitMQ 服务器正在向消费者发送消息。以下是对这些信息的详细解释:
Advanced Message Queuing Protocol: 表示这是 AMQP 协议的数据包。
Type: Method (1): 表示这是一个方法类型的消息,用于执行操作。
Channel: 1: 指定了 AMQP 通道号,通道是 AMQP 连接中的一个虚拟通道,用于隔离消息。
Length: 59: 表示方法帧有效载荷的长度是 59 字节。
Class: Basic (60): 表示使用的是 AMQP 协议的基础类(Basic Class),这个类定义了基本操作,如消息的发布和接收。
Method: Deliver (60): 表示执行的操作是消息分派(Deliver)。这是服务器用来向消费者发送消息的方法。
Arguments:
- 包含了执行 Deliver 方法所需的参数。
Consumer-Tag:
ctag1.a0f530198eb74e1a85390418796baf1a
: 指定了消费者的标签,用于区分不同的消费者。每个消费者在订阅队列时会被分配一个唯一的标签。Delivery-Tag: 1: 指定了分派给消费者的消息的序列号。这是消费者用来确认消息的标识符。
Redelivered: False: 表示这条消息不是重新分派的。如果消息由于某些原因未能被消费者处理,它可能会被重新分派,此时该标志会设置为
True
。Exchange: 指定了消息最初发送到的交换机名称。在这个例子中,交换机名称没有给出,可能使用默认交换机。
Routing-Key: hello: 指定了消息的路由键,用于决定消息如何从交换机路由到队列。在这个例子中,路由键是 "hello"。
这个数据帧表示 RabbitMQ 服务器正在向一个消费者分派一条消息,这条消息最初是发送到默认交换机,并且使用路由键 "hello" 被路由到一个队列。消费者通过其唯一的消费者标签
ctag1.a0f530198eb74e1a85390418796baf1a
来接收这条消息。如果消费者成功接收并准备好处理这条消息,它将发送一个针对这个Delivery-Tag
的确认回执给服务器。如果消费者设置为手动确认模式,那么它必须显式地发送确认,否则消息可能会被重新分派或在消费者断开连接时返回给队列。
4.4 流量图
5、小结
通过本次练习了解了如何在 RabbitMQ 中使用命名队列发送和接收消息。这里是这个过程的简要概述:
-
发送消息到命名队列:
- 使用AMQP协议和RabbitMQ客户端库(如pika for Python)连接到RabbitMQ服务器。
- 声明一个命名队列(例如
hello
),如果它尚不存在,RabbitMQ将自动创建它。 - 通过交换机将消息发送到队列,当不使用特定交换机时,可以使用默认交换机,并将队列名作为路由键。
-
接收来自命名队列的消息:
- 同样,首先建立与RabbitMQ服务器的连接,并创建通道。
- 声明相同的命名队列,确保队列存在。
- 使用
basic_consume
方法订阅队列,并提供一个回调函数来处理接收到的消息。 - 调用
start_consuming
方法开始接收消息,并等待回调函数被触发。
-
消息确认:
- 在手动确认模式下,消费者在处理完每条消息后需要发送一个确认回执给RabbitMQ,告知消息已被成功处理。
- 在自动确认模式下,RabbitMQ会在消息被接收后自动标记消息为已确认。