RabbitMQ练习(Topics)

news2024/11/6 3:05:28

 1、RabbitMQ教程

《RabbitMQ Tutorials》icon-default.png?t=N7T8https://www.rabbitmq.com/tutorials

2、环境准备

参考:《RabbitMQ练习(Hello World)》和《RabbitMQ练习(Work Queues)》。

确保RabbitMQ、Sender、Receiver、Receiver2容器正常安装和启动。

root@k0test1:~# docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
root@k0test1:~# docker start sender
root@k0test1:~# docker start receiver
root@k0test1:~# docker start receiver2
root@k0test1:~# docker network inspect bridge

网络拓扑:

3、Topics练习

3.1 概述

在前面的Routing练习中,不再使用仅能进行简单广播的fanout exchange,而是使用了direct exchange,从而实现选择性地接收日志。但direct exchange仍然有局限性——它不能基于多个条件进行路由。

在常见的日志系统中,不仅可以根据严重性订阅日志,还可以根据发出日志的来源进行订阅。比如Unix工具syslog,它根据严重性(信息/警告/关键...)和设施(认证/定时任务/内核...)来发出日志(原文:You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).)。

这将提供很大的灵活性——比如只希望接收来自'cron'的关键错误日志,以及来自'kern'的所有日志。

要在日志系统中实现这一点,需要学习更复杂的主题交换机(topic exchange)

3.2 Topic exchange

1、路由键格式要求

发送到主题交换机(topic exchange)的消息所携带的路由键(routing_key)不能是任意形式——它必须是用分隔的单词列表。这些单词可以是任何内容,但通常它们和消息的某些特征相关。一些有效的路由键示例包括:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。路由键中的单词数量可以随意,只要不超过255字节的限制。

2、绑定键格式要求

绑定键(binding key)也必须是相同的形式。主题交换机(topic exchange)背后的逻辑类似于直接交换机(direct exchange)——路由键需要和绑定键进行匹配,使用特定路由键发送的消息将被传递到所有与匹配绑定键绑定的队列。然而,对于绑定键有两个重要的特殊情况:

  • * 可以替代恰好一个单词。
  • # 可以替代0个或多个单词。

3、举例说明:

在这个例子中,我们将发送描述动物的消息。这些消息将使用由三个词组成的路由键发送(由两个点分隔)。路由键的第一个词将描述速度(celerity),第二个词描述颜色(colour),第三个词描述物种(species):"<celerity>.<colour>.<species>"。

我们创建了三个绑定(bindings):

  • Q1 绑定的键是 "*.orange.*",意味着它对所有橙色的动物感兴趣。
  • Q2 绑定的键有两个:"*.*.rabbit" 和 "lazy.#",意味着它对所有关于兔子的消息以及所有懒惰的动物感兴趣。

这些绑定可以总结为:

  • Q1 对所有橙色的动物感兴趣。
  • Q2 想要听到所有关于兔子的消息,以及所有关于懒惰动物的消息。

具有路由键 "quick.orange.rabbit" 的消息将被发送到两个队列。消息 "lazy.orange.elephant" 也会被发送到两个队列。另一方面,"quick.orange.fox" 只会被发送到第一个队列,而 "lazy.brown.fox" 只会被发送到第二个队列。"lazy.pink.rabbit" 将只被发送到第二个队列一次,尽管它匹配了两个绑定。"quick.brown.fox" 不匹配任何绑定,因此将被丢弃。

如果我们违反协议,发送了一个或四个词的消息,比如 "orange" 或 "quick.orange.new.rabbit",那么这些消息将不会匹配任何绑定,并将丢失。

另一方面,尽管 "lazy.orange.new.rabbit" 有四个词,但它将匹配最后一个绑定,并将被发送到第二个队列。

主题交换机(Topic Exchange)

主题交换机(Topic Exchange)是一个非常强大的消息交换机制,它可以像其他类型的交换机一样工作。以下是主题交换机的两种主要行为:

  1. 当一个队列使用"#"(井号)作为绑定键与主题交换机绑定时,它将接收到所有的消息,而不管路由键是什么。这与扇出交换机(fanout exchange)的行为类似,即所有发送到交换机的消息都会被分发到所有绑定的队列。

  2. 如果在绑定中没有使用特殊字符"*"(星号)和"#"(井号),主题交换机的行为就会和直接交换机(direct exchange)一样。这意味着消息只会被分发到那些绑定键与消息的路由键完全匹配的队列。

主题交换机提供了一种灵活的方式来根据消息的路由键进行消息的路由和分发。通过使用星号(*)和井号(#)作为通配符,可以创建复杂的路由规则,从而实现对消息的精确控制。

3.3 代码说明

日志系统中使用主题交换机,日志的路由键将有两个单词组成:"<facility>.<severity>",每个日志消息的路由键将由两个部分组成,第一部分是设施或系统组件的名称,第二部分是日志消息的严重性级别。例如,一个路由键可能是"auth.error",表示与认证相关的错误消息。使用主题交换机,可以基于这些路由键的不同组合来灵活地路由和分发日志消息。 

3.3.1 Sending

进入sender容器,vi编写emit_log_topic.py:

root@sender:/# vi emit_log_topic.py
root@sender:/# cat emit_log_topic.py 
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(
    exchange='topic_logs', routing_key=routing_key, body=message)
print(f" [x] Sent {routing_key}:{message}")
connection.close()
root@sender:/#

这段代码是一个使用 Python 编写的简单 RabbitMQ 发送程序,它使用了 pika 库来与 RabbitMQ 服务器进行交互。下面是代码的详细说明:

  1. 导入所需库:

    • import pika: 导入 pika 库,这是 Python 中用于与 RabbitMQ 交互的库。
    • import sys: 导入 sys 模块,用于访问由命令行参数提供的值。
  2. 建立连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')): 创建一个到 RabbitMQ 服务器的连接。这里指定了服务器的 IP 地址为 172.17.0.2
  3. 创建通道:

    • channel = connection.channel(): 在建立的连接上创建一个新的通道。
  4. 声明交换机:

    • channel.exchange_declare(exchange='topic_logs', exchange_type='topic'): 声明一个名为 topic_logs 的主题交换机。exchange_type='topic' 指定了交换机的类型为 topic,这意味着它可以根据路由键的模式来路由消息。
  5. 设置路由键和消息:

    • routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info': 从命令行参数中获取路由键。如果有提供第一个参数(sys.argv[1]),则使用它作为路由键;如果没有提供,则默认使用 'anonymous.info'
    • message = ' '.join(sys.argv[2:]) or 'Hello World!': 从命令行参数中获取消息内容。如果有提供第二个及后续参数,将它们连接成一个字符串作为消息;如果没有提供,则默认消息为 'Hello World!'
  6. 发布消息:

    • channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message): 使用指定的交换机和路由键发布消息。消息的内容是之前设置的 message
  7. 打印消息确认:

    • print(f" [x] Sent {routing_key}:{message}"): 打印一条消息到控制台,确认消息已经发送,并显示路由键和消息内容。
  8. 关闭连接:

    • connection.close(): 关闭与 RabbitMQ 服务器的连接。

这个脚本可以作为命令行工具使用,通过指定不同的路由键和消息内容来发送消息到 RabbitMQ。使用示例:

python emit_log_topic.py auth.error "User authentication failed"

这将向 RabbitMQ 发送一个路由键为 auth.error 的消息,内容为 "User authentication failed"。如果没有提供路由键,消息将使用默认的 'anonymous.info' 路由键发送。

3.3.2 Receiving

进入receiver/receiver2容器,vi编写receive_logs_topic.py:

root@receiver:/# vi receive_logs_topic.py 
root@receiver:/# cat receive_logs_topic.py 
import pika
import sys

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='172.17.0.2'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs', exchange_type='topic')

result = channel.queue_declare('', exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(
        exchange='topic_logs', queue=queue_name, routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
    print(f" [x] {method.routing_key}:{body}")


channel.basic_consume(
    queue=queue_name, on_message_callback=callback, auto_ack=True)

channel.start_consuming()
root@receiver:/# 

这段代码是一个使用 Python 编写的 RabbitMQ 接收程序,它同样使用了 pika 库来与 RabbitMQ 服务器进行交互。下面是代码的详细说明:

  1. 导入所需库:

    • import pika: 导入 pika 库,用于与 RabbitMQ 交互。
    • import sys: 导入 sys 模块,用于访问命令行参数。
  2. 建立连接:

    • connection = pika.BlockingConnection(pika.ConnectionParameters(host='172.17.0.2')): 创建一个到 RabbitMQ 服务器的阻塞模式连接,指定服务器的 IP 地址。
  3. 创建通道:

    • channel = connection.channel(): 在连接上创建一个新的通道。
  4. 声明交换机:

    • channel.exchange_declare(exchange='topic_logs', exchange_type='topic'): 声明一个名为 topic_logs 的主题交换机。
  5. 声明队列:

    • result = channel.queue_declare('', exclusive=True): 声明一个非持久的、独占的队列。exclusive=True 意味着这个队列只对连接它的客户端可见,并且当连接关闭时,队列将被删除。
    • queue_name = result.method.queue: 从结果中获取队列名称。
  6. 检查绑定键:

    • binding_keys = sys.argv[1:]: 从命令行参数中获取所有的绑定键。
    • if not binding_keys: ...: 如果没有提供绑定键,打印用法信息并退出程序。
  7. 绑定队列到交换机:

    • 循环遍历所有的 binding_keys,使用 channel.queue_bind() 方法将队列绑定到 topic_logs 交换机,并为每个绑定键设置相应的路由键。
  8. 等待日志消息:

    • 打印提示信息,告知用户程序正在等待日志消息,并说明如何退出程序。
  9. 定义消息回调函数:

    • def callback(ch, method, properties, body): ...: 定义一个回调函数,当接收到消息时会被调用。它打印出消息的路由键和消息体。
  10. 设置消息消费:

    • channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True): 设置消息消费,指定队列名称、回调函数和自动确认消息。
  11. 开始接收消息:

    • channel.start_consuming(): 启动消息接收循环,直到用户中断(例如使用 CTRL+C)。

这个脚本可以作为命令行工具使用,通过指定一个或多个绑定键来接收匹配这些键的消息。例如,如果你想要接收所有与 auth 相关的日志消息,你可以使用以下命令行:

python receive_logs_topic.py auth.*

这将使得脚本接收所有路由键以 auth 开头的日志消息。使用星号(*)作为通配符,可以匹配任意数量的字符。

 3.4、开始测试

3.4.1 接收所有日志

1、在 receiver 容器中打开命令行界面,运行以下命令以接收所有日志:

root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings:

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic <--创建的topic exchange
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct

root@30acfada6737:/#  rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0 <--创建的临时队列
root@30acfada6737:/#  rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
root@30acfada6737:/# 

根据 rabbitmqctl list_bindings 命令的输出,第一个绑定的路由键实际上与目的地的名称相同。这里是具体的解释:

  1. 源(source): 没有显示名称,这通常指的是默认的交换机(default exchange)。每个队列在创建时都会自动绑定到这个默认交换机上,使用队列的名称作为路由键。

  2. 源类型(source_kind): exchange 表示源是一个交换机。

  3. 目的地(destination): amq.gen-eiFT38g9RD-iuc-RCn0FMw 是一个自动生成的队列名称。

  4. 路由键(routing_key): 与目的地名称相同,即 amq.gen-eiFT38g9RD-iuc-RCn0FMw。这意味着只有当消息的路由键与队列名称完全匹配时,消息才会被路由到这个队列。

  5. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

  6. 第二个绑定:

    • 源(source): topic_logs,这是一个命名的主题交换机。
    • 目的地(destination): amq.gen-eiFT38g9RD-iuc-RCn0FMw,与第一个绑定中的队列相同。
    • 路由键(routing_key): #,这是一个特殊字符,表示这个队列将接收所有通过 topic_logs 交换机的消息。

这意味着:

  • 第一个绑定是队列与其默认交换机之间的绑定,当消息不指定交换机名字(此时使用默认交换机),同时路由键和队列名称相同时,才会被这个队列接收。
  • 第二个绑定是队列与 topic_logs 交换机之间的绑定,由于使用了 # 作为路由键,这个队列将接收所有发送到 topic_logs 交换机的消息,无论它们的实际路由键是什么。

如果你想要测试这些绑定关系,你可以:

  • 使用 emit_log_topic.py脚本发送消息到 topic_logs 交换机,不指定路由键或使用 任意信息 作为路由键,然后观察这些消息是否被自动生成的队列接收。
  • 如果你有 receive_logs_topic.py 脚本正在监听这个队列,你应该能够看到所有发送的消息被打印出来。

 3、在 sender 容器中打开命令行界面,运行以下命令以发送日志:

root@sender:/# python3 emit_log_topic.py "kern.critical" "A critical kernel error"
 [x] Sent kern.critical:A critical kernel error
root@sender:/# python3 emit_log_topic.py "test" "This is a test"                  
 [x] Sent test:This is a test
root@sender:/# 

4、观察 receiver 容器的命令行输出,检查是否收到了发送的日志。

root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'

5、receiver容器继续准备接受所有日志。 

3.4.2 接收特定设施的日志

1、在 receiver2 容器中打开命令行界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic 
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct
root@30acfada6737:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0  <---receiver2创建的临时队列
root@30acfada6737:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
        exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

根据rabbitmqctl list_bindings 命令的输出:

topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []

我们可以看到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机(exchange)的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-XmrA_fE3vyX-j4pFSbpqww,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): kern.*,这表明这个绑定将匹配所有以 kern. 开头的路由键。这是一个主题路由键,使用星号(*)作为通配符,代表任意数量的字符。

  7. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

这个绑定表明,任何发送到 topic_logs 交换机并且路由键以 kern. 开头的消息都将被路由到名为 amq.gen-XmrA_fE3vyX-j4pFSbpqww 的队列。例如,消息使用路由键 kern.warningkern.critical 都会被这个队列接收。

如果你想测试这个特定的绑定,你可以按照以下步骤操作:

  1. 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用一个以 kern. 开头的路由键,例如 kern.critical

  2. 确保 receive_logs_topic.py 脚本正在监听 amq.gen-XmrA_fE3vyX-j4pFSbpqww 队列。

  3. 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用 kern.critical 路由键发送的消息。

  4. 如果没有收到消息,请检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。

  5. 你还可以测试发送使用不同路由键的消息,以验证只有匹配 kern.* 模式的消息才会被接收。

 3、在 sender 容器中打开命令行界面,继续运行以下命令以发送日志:

root@sender:/# python3 emit_log_topic.py "kern.critical" "third: A critical kernel error" 
 [x] Sent kern.critical:third: A critical kernel error
root@sender:/# python3 emit_log_topic.py "test" "fourth: This is a test"     
 [x] Sent test:fourth: This is a test
root@sender:/#   

4、观察 receiver 容器的命令行输出,检查是否收到了发送的日志。

root@receiver:/# python receive_logs_topic.py "#"
bash: python: command not found
root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'
 [x] kern.critical:b'third: A critical kernel error'
 [x] test:b'fourth: This is a test'

5、观察 receiver2容器的命令行输出,检查是否收到了发送的日志。

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'third: A critical kernel error'

3.4.3 接收特定严重性的日志

1、在 receiver2 容器中新开命令行界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct
root@30acfada6737:/# rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-dZZcREH6GManw2OusGgU2g  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0
root@30acfada6737:/# rabbitmqctl list_bindings
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
        exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   amq.gen-dZZcREH6GManw2OusGgU2g  []
        exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   *.critical      []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

根据rabbitmqctl list_bindings 命令的输出:

topic_logs      exchange        amq.gen-dZZcREH6GManw2OusGgU2g  queue   *.critical      []

可以了解到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-dZZcREH6GManw2OusGgU2g,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): *.critical,这表明这个绑定将匹配所有以 *.critical 结尾的路由键。在这个上下文中,星号(*)作为通配符,代表任意数量的字符,但在这个特定的路由键中,它实际上匹配任何以点(.)后跟 critical 结尾的字符串。

  7. 参数(arguments): 空列表 [] 表示这个绑定没有使用任何额外的参数。

这个绑定表明,任何发送到 topic_logs 交换机并且路由键以 *.critical 结尾的消息都将被路由到名为 amq.gen-dZZcREH6GManw2OusGgU2g 的队列。例如,消息使用路由键 auth.criticalcron.critical 都会被这个队列接收。

如果你想测试这个特定的绑定,你可以按照以下步骤操作:

  1. 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用一个以 .critical 结尾的路由键,例如 auth.critical

  2. 确保 receive_logs_topic.py 脚本正在监听 amq.gen-dZZcREH6GManw2OusGgU2g 队列。

  3. 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用 auth.critical 路由键发送的消息。

  4. 如果没有收到消息,请检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。

  5. 你还可以测试发送使用不同路由键的消息,以验证只有匹配 *.critical 模式的消息才会被接收。例如,发送一个路由键为 info.critical 的消息,它应该被接收;而发送一个路由键为 info.normal 的消息,则不应该被这个特定的队列接收。

3、在 sender 容器中打开命令行界面,继续运行以下命令以发送第5个和第6个日志:

root@sender:/# python3 emit_log_topic.py "auth.critical" "fifth log: This is fifth log"          
 [x] Sent auth.critical:fifth log: This is fifth log
root@sender:/# python3 emit_log_topic.py "kern.critical" "sixth log: This is sixth log"              
 [x] Sent kern.critical:sixth log: This is sixth log
root@sender:/# 

4、观察日志接收情况:

receiver(收到5、6)

root@receiver:/# python receive_logs_topic.py "#"
bash: python: command not found
root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'
 [x] kern.critical:b'third: A critical kernel error'
 [x] test:b'fourth: This is a test'
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'

 receiver2 第一个终端(只收到6):

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'third: A critical kernel error'
 [x] kern.critical:b'sixth log: This is sixth log'

receiver2第二个终端(收到5、6):

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'

3.4.4 创建多个绑定的测试

1、在 receiver2 容器中新开第三个终端界面,运行以下命令以接收日志:

root@receiver2:/# python3 receive_logs_topic.py "kern.*" "*.critical"
 [*] Waiting for logs. To exit press CTRL+C

2、rabbitmq容器上查看exchange, queue,bindings: 

root@30acfada6737:/# rabbitmqctl list_exchanges
Listing exchanges for vhost / ...
name    type
amq.topic       topic
topic_logs      topic
amq.fanout      fanout
amq.direct      direct
amq.headers     headers
amq.match       headers
amq.rabbitmq.trace      topic
        direct
root@30acfada6737:/# rabbitmqctl list_queues   
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
amq.gen-zR4Pv3875mquqMTLIkMCUQ  0
amq.gen-eiFT38g9RD-iuc-RCn0FMw  0
amq.gen-HS_61KsFpuerwoA17IMUmw  0
amq.gen-XmrA_fE3vyX-j4pFSbpqww  0
root@30acfada6737:/# rabbitmqctl list_bindings 
Listing bindings for vhost /...
source_name     source_kind     destination_name        destination_kind        routing_key     arguments
        exchange        amq.gen-zR4Pv3875mquqMTLIkMCUQ  queue   amq.gen-zR4Pv3875mquqMTLIkMCUQ  []
        exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   amq.gen-eiFT38g9RD-iuc-RCn0FMw  []
        exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   amq.gen-HS_61KsFpuerwoA17IMUmw  []
        exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   amq.gen-XmrA_fE3vyX-j4pFSbpqww  []
topic_logs      exchange        amq.gen-eiFT38g9RD-iuc-RCn0FMw  queue   #       []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   *.critical      []
topic_logs      exchange        amq.gen-zR4Pv3875mquqMTLIkMCUQ  queue   *.critical      []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   kern.*  []
topic_logs      exchange        amq.gen-XmrA_fE3vyX-j4pFSbpqww  queue   kern.*  []
root@30acfada6737:/# 

rabbitmqctl list_bindings 命令输出中:

topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   *.critical      []
topic_logs      exchange        amq.gen-HS_61KsFpuerwoA17IMUmw  queue   kern.*  []

可以看到以下信息:

  1. 虚拟主机(vhost): 绑定关系是在默认的虚拟主机 / 下列出的。

  2. 源(source_name): topic_logs,这是一个主题交换机的名称。

  3. 源类型(source_kind): exchange 表示源是一个交换机。

  4. 目的地(destination_name): amq.gen-HS_61KsFpuerwoA17IMUmw,这是一个自动生成的队列名称。

  5. 目的地类型(destination_kind): queue 表示目的地是一个队列。

  6. 路由键(routing_key): 存在两条绑定,每条绑定有不同的路由键:

    • *.critical:这个绑定将匹配所有以 .critical 结尾的路由键,例如 auth.critical 或 cron.critical
    • kern.*:这个绑定将匹配所有以 kern. 开头的路由键,例如 kern.warning 或 kern.emerg
  7. 参数(arguments): 两个绑定的参数列表都是空的 [],表示这些绑定没有使用任何额外的参数。

这个输出显示了 topic_logs 交换机绑定到同一个队列 amq.gen-HS_61KsFpuerwoA17IMUmw 上的两种不同的路由模式。这意味着这个队列将接收符合这两种模式的任何消息。

测试这些绑定的步骤:

  1. 发送消息:

    • 使用 emit_log_topic.py 脚本发送消息到 topic_logs 交换机,使用不同的路由键来测试绑定。例如:
      • 发送一个路由键为 auth.critical 的消息,应该被队列接收。
      • 发送一个路由键为 kern.warning 的消息,也应该被队列接收。
  2. 监听队列:

    • 确保 receive_logs_topic.py 脚本正在监听 amq.gen-HS_61KsFpuerwoA17IMUmw 队列。
  3. 观察输出:

    • 观察 receive_logs_topic.py 脚本的输出,检查是否收到了使用上述路由键发送的消息。
  4. 验证绑定:

    • 如果消息没有被接收,检查脚本的 RabbitMQ 连接设置和队列监听设置是否正确。
    • 验证发送的消息路由键是否符合任一绑定的模式。
  5. 探索其他路由键:

    • 发送不符合上述两种模式的消息,例如使用路由键 info.normal,以验证消息不会被接收。

通过这些步骤,你可以验证 topic_logs 交换机的绑定是否按预期工作,并且消息是否能够正确地根据路由键模式被路由到指定的队列。

3、在 sender 容器中打开命令行界面,继续运行以下命令以发送第7个和第8个日志:

root@sender:/# python3 emit_log_topic.py "auth.critical" "seventh log: This is 7th log"       
 [x] Sent auth.critical:seventh log: This is 7th log
root@sender:/# python3 emit_log_topic.py "kern.warning" "eighth log: This is 8th log"                      
 [x] Sent kern.warning:eighth log: This is 8th log
root@sender:/# python3 emit_log_topic.py "info.normal" "ninth log: This is 9th log"                   
 [x] Sent info.normal:ninth log: This is 9th log
root@sender:/# 

4、观察日志接收情况:

receiver: (收到7、8、9)

root@receiver:/# python3 receive_logs_topic.py "#"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'A critical kernel error'
 [x] test:b'This is a test'
 [x] kern.critical:b'third: A critical kernel error'
 [x] test:b'fourth: This is a test'
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'
 [x] auth.critical:b'seventh log: This is 7th log'
 [x] kern.warning:b'eighth log: This is 8th log'
 [x] info.normal:b'ninth log: This is 9th log'

receiver2第一个终端:(收到8,匹配kern.*)

root@receiver2:/# python3 receive_logs_topic.py "kern.*"
 [*] Waiting for logs. To exit press CTRL+C
 [x] kern.critical:b'third: A critical kernel error'
 [x] kern.critical:b'sixth log: This is sixth log'
 [x] kern.warning:b'eighth log: This is 8th log'

receiver2第二个终端:(收到7,匹配*.critical)

(receiver2误操作中断了之前的连接,重建了和rabbitmq的连接)

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'fifth log: This is fifth log'
 [x] kern.critical:b'sixth log: This is sixth log'
^CTraceback (most recent call last):
  File "//receive_logs_topic.py", line 32, in <module>
    channel.start_consuming()
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 1883, in start_consuming
    self._process_data_events(time_limit=None)
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 2044, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 842, in process_data_events
    self._flush_output(common_terminator)
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/blocking_connection.py", line 514, in _flush_output
    self._impl.ioloop.poll()
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/select_connection.py", line 579, in poll
    self._poller.poll()
  File "/usr/local/lib/python3.10/dist-packages/pika/adapters/select_connection.py", line 1184, in poll
    events = self._poll.poll(self._get_max_wait())
KeyboardInterrupt

root@receiver2:/# python3 receive_logs_topic.py "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'seventh log: This is 7th log'

receiver2第三个终端:(收到7、8,匹配kern.*或者*.critical)

root@receiver2:/# python3 receive_logs_topic.py "kern.*" "*.critical"
 [*] Waiting for logs. To exit press CTRL+C
 [x] auth.critical:b'seventh log: This is 7th log'
 [x] kern.warning:b'eighth log: This is 8th log'

4、小结

这篇文章是 RabbitMQ 官方教程的第五部分,使用 Python 客户端 Pika 来演示如何使用主题交换机(Topic Exchange)。以下是对文章内容的总结:

1、预备条件

  • RabbitMQ 需要安装并运行在标准端口(5672)上。
  • 如果使用不同的主机、端口或凭据,则需要调整连接设置。
  • 需要使用 Pika RabbitMQ 客户端版本 1.0.0。

2、教程重点

  • 教程介绍了如何使用主题交换机来改善日志系统,允许基于多个标准进行消息路由。
  • 通过使用主题交换机,可以实现类似于 Unix 系统中 syslog 工具的功能,即根据消息的严重性和来源进行路由。

3、主题交换

  • 主题交换机的消息路由键(routing_key)必须是由点分隔的单词列表。
  • 绑定键(binding_key)也采用相同的格式,可以使用特殊字符:
    • *(星号):匹配正好一个单词。
    • #(井号):匹配零个或多个单词。

4、示例

  • 假设发送的消息描述动物,路由键由三个单词组成,分别表示速度、颜色和物种。
  • 创建了三个绑定:
    • Q1 绑定到 *.orange.*:对所有橙色动物感兴趣。
    • Q2 绑定到 *.*.rabbit 和 lazy.#:对所有兔子和所有懒惰动物感兴趣。

5、消息路由示例

  • 消息 quick.orange.rabbit 将被 Q1 和 Q2 接收。
  • 消息 lazy.orange.elephant 也将被两者接收。
  • 消息 quick.orange.fox 只匹配 Q1,而 lazy.brown.fox 只匹配 Q2。
  • 如果消息不符合绑定键的模式,如 orange 或 quick.orange.new.rabbit,它们将不会被任何队列接收。

6、代码示例

  • 提供了两个 Python 脚本示例:
    • emit_log_topic.py:用于发送消息到主题交换。
    • receive_logs_topic.py:用于接收主题交换的消息。

7、使用方法

  • 接收所有日志:python receive_logs_topic.py "#"
  • 接收特定设施的日志(如 kern):python receive_logs_topic.py "kern.*"
  • 只接收特定严重性的日志(如 critical):python receive_logs_topic.py "*.critical"
  • 创建多个绑定:python receive_logs_topic.py "kern.*" "*.critical"
  • 发送特定路由键的日志:python emit_log_topic.py "kern.critical" "A critical kernel error"

8、注意事项

  • 代码没有对路由或绑定键做任何假设,可以根据需要使用多个路由键参数。

9、结语

  • 教程鼓励用户尝试和玩耍这些程序,以更好地理解主题交换机(topic exchange)的工作方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2087366.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云原生向量数据库 PieCloudVector 助力多模态大模型 AI 应用

全球 AGI&#xff08;人工通用智能&#xff09;市场快速增长的背景下&#xff0c;企业应用成为推动这一领域发展的主要力量&#xff0c;企业如何选择合适的技术来支撑其智能化转型显得尤为重要。在墨天轮《数据库技术如何增强 AI 大模型&#xff1f;》数据库沙龙活动中&#xf…

C语言典型例题55

《C程序设计教程&#xff08;第四版&#xff09;——谭浩强》 题目&#xff1a; 例题4.7 兔子的繁殖。这是一个有趣的古典问题&#xff1a;有一对兔子&#xff0c;从出生后的第3个月开始起每个月都生一对兔子。小兔子长到第3个月又生一对兔子。假设所有兔子都不死&#xff0c;…

深度解读SGM41511电源管理芯片I2C通讯协议REG08寄存器解释

REG08 是 SGM41511 的第九个寄存器&#xff0c;地址为 0x08。这是一个只读&#xff08;R&#xff09;寄存器&#xff0c;用于报告各种状态信息。上电复位值&#xff08;PORV&#xff09;为 xxxxxxxx&#xff0c;表示上电时的初始状态是不确定的。这个寄存器提供了充电器当前状态…

HarmonyOS--合理使用页面间转场

一、概述 页面间转场是用户从一个页面切换到另一个页面时的过程&#xff0c;一个无缝流畅的转场动效可以提升用户的交互体验。从主页到详情页、从列表页到结果页都需要去设置一些转场动效使得用户体验更加流畅。基于用户行为和应用设计模式&#xff0c;我们总结出了一些常见的转…

C#/.net core “hello”.IndexOf(“\0”,2)中的坑

先想想看&#xff0c;你认为下面代码返回值是多少&#xff1f; "hello".IndexOf("", 2); "hello".IndexOf("\0", 2); "hello".IndexOf(\0, 2); 今天和大家分享关于.net core中与字符相关的一些奇怪问题。 首先我们先以.N…

Golang | Leetcode Golang题解之第383题赎金信

题目&#xff1a; 题解&#xff1a; func canConstruct(ransomNote, magazine string) bool {if len(ransomNote) > len(magazine) {return false}cnt : [26]int{}for _, ch : range magazine {cnt[ch-a]}for _, ch : range ransomNote {cnt[ch-a]--if cnt[ch-a] < 0 {r…

大模型知识检索RAG业务实践实践(初级篇)

大模型知识检索RAG业务实践实践(初级篇) 1.知识检索大图 大模型是现在一个非常热门的话题,大模型表现出的生成能力也是非常惊艳。但是强如 GPT4 这样的大模型,它在知识更新和幻觉上也会存在问题。比如说我们问互联网行业有什么大事,GPT4 的回答是三年前的内容。主要是说疫…

机械学习—零基础学习日志(如何理解概率论12)

假设检验 假设检验是有一些参数&#xff0c;已知条件&#xff0c;让你检验某种假设是否成立。 我们通过具体的题目来说明&#xff1a; 这里我们需要确认使用什么公式&#xff1a; 使用下面的公式如下图&#xff1a; 题目中&#xff0c;以21作为分界线&#xff0c;所以我们将是…

用manim证明函数的左右极限

http://t.csdnimg.cn/2pVdFhttp://t.csdnimg.cn/2pVdF在上一节的最后两个示例中&#xff0c;我们看到了两个不存在的限制。然而&#xff0c;对于每个例子来说&#xff0c;每个限制不存在的原因是不同的。 我们看一下下面的例子&#xff1a; 极限不存在&#xff0c;因为函数没有…

Redis基本全局命令

文章目录 get和setkeysexistsdelexpirettltype redis全局命令&#xff1a; redis支持很多种数据结构&#xff0c;整体上来说。redis是键值对结构&#xff0c;key固定就是字符串&#xff0c;value实际上就会有很多种&#xff0c;比如说&#xff1a; 字符串哈希表列表有序集合 …

linux系统编程-网络-tcp(29)

C/S B/S P2p模型 在Linux系统编程中&#xff0c;C/S&#xff08;Client/Server&#xff0c;客户端/服务器&#xff09;和B/S&#xff08;Browser/Server&#xff0c;浏览器/服务器&#xff09;模型是两种常见的架构模式&#xff0c;用于构建分布式应用程序。它们在设计和实现上…

C语言 | Leetcode C语言题解之第384题打乱数组

题目&#xff1a; 题解&#xff1a; typedef struct {int* num;int* src;int numsize; } Solution;Solution *obj NULL;Solution* solutionCreate(int* nums, int numsSize) {if (obj ! NULL) {return obj;}Solution *obj (Solution*)malloc(sizeof(Solution));obj->nums…

#C++ 笔记二

四、运算符重载 1.友元 1.1 概念 类实现了数据的隐藏和封装&#xff0c;类的数据成员一般定义为私有成员&#xff0c;仅能通过类的公有成员函数才能进行读写。 如果数据成员定义成公共的&#xff0c;则又破坏了封装性。但是在某些情况下&#xff0c;需要频繁的读写数据成员…

Java 7.3 - 分布式 id

分布式 ID 介绍 什么是 ID&#xff1f; ID 就是 数据的唯一标识。 什么是分布式 ID&#xff1f; 分布式 ID 是 分布式系统中的 ID&#xff0c;它不存在于现实生活&#xff0c;只存在于分布式系统中。 分库分表&#xff1a; 一个项目&#xff0c;在上线初期使用的是单机 My…

2-80 基于matlab-GUI,实现kalman滤波对目标物的位置进行检测跟踪

基于matlab-GUI,实现kalman滤波对目标物的位置进行检测跟踪。检测汽车中心和最大半径&#xff0c;与背景差分选择较大差异的区域进行形态学处理&#xff0c;用冒泡法对目标面积从大到小排序。程序已调通&#xff0c;可直接运行。 2-80 kalman视频跟踪滤波 - 小红书 (xiaohongsh…

光学涡旋Talbot阵列照明器的matlab模拟与仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 5.完整程序 1.程序功能描述 光学涡旋 Talbot 阵列照明器是一种利用光学涡旋&#xff08;Optical Vortex&#xff09;和 Talbot 效应&#xff08;Talbot Effect&#xff09;相结合的技术&…

【HTML源码】上传即可使用的在线叫号系统源码

这个叫号系统的过程是这样的 接了一个任务&#xff0c;某学校要对学生进行逐个面试&#xff0c;希望能有类似医院门诊那种叫号系统。 条件&#xff1a;首先说硬件&#xff0c;就是教室里边一台笔记本电脑&#xff0c;同屏到教室外面的电视机。 需求&#xff1a;软件需求是可…

汉诺塔递归解决思路图解分析,python代码实现

目录 4.假设四层汉诺塔&#xff0c;n4&#xff0c;利用整体思想分解为两层的情况 3.分解到n3 3.1 分解上面n4时第一个步骤&#xff1a; 3.2 分解上面n4时第三个步骤&#xff1a; 2.继续分解到n2 &#xff08;同理略&#xff09; 1.当分解到n1 python代码 问题&#xff1…

【Linux】升级OpenSSH版本规避远程代码执行漏洞

本文首发于 ❄️慕雪的寒舍 升级OpenSSH版本规避远程代码执行漏洞。 说明 今天早上逛别人的博客的时候看到了这个重磅消息。OpenSSH爆出能远程通过root身份执行任意代码的漏洞&#xff0c;影响版本是 8.5p1 < OpenSSH < 9.8p1&#xff0c;奇安信的报告可以点我查看。 上…

计算机三级网络第4套练习记背

计算机三级网络第4套练习记背