运维视角:rabbitmq教程(四)工作模式

news2025/1/16 1:35:27

今天这篇文章,通过python代码来测试rabbitmq交换机以及队列的工作模式,以此更加透彻的理解它的工作方式

一、简单模式

1、测试代码

生产者代码:

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.14', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.queue_declare(queue='hello')  # 如果指定的queue不存在,则会创建一个queue,如果已经存在 则不会做其他动作

for i in range(3):
    new_dic = {"name"+str(i): ""+str(i)}
    # channel.basic_publish(exchange='', routing_key='hello', body=b'hello world_'+str(i).encode('utf-8'))
    channel.basic_publish(exchange='', routing_key='hello', body=str(new_dic).encode('utf-8'))
    # print("send success...")
connection.close()  # 关闭连接

消费者代码:

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.14', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.queue_declare(queue='hello')

# 回调函数
def callback(ch, method, properties, body):
    # print('消费者收到:{}'.format(body))
    print(body.decode('utf-8'))

channel.basic_consume(queue='hello',  # 接收指定queue的消息
                      auto_ack=True,  # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息
                      on_message_callback=callback  # 设置收到消息的回调函数
                      )

channel.start_consuming()   # 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数

如果再运行一个消费者(两个消费者),当有生产者生产消息的时候,会出现轮训的效果,即,每个消费者消费一个消息;

2、消息确认

当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除;这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。

证实该现象的测试:

往一个队列中存储了100个消息,开启一个消费者工作,中途模拟消费者中断或崩溃。终端查询队列,消息显示为0;队列将100个消息发个消费者,不管消费者是否正确处理,就直接删除了这些消息,不做存留;

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。
消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
在消费者回调函数中添加 basic_ack 方法,示例:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 消息确认

basic_ack 不可与 auto_ack 一同使用,会报错

3、持久化

——队列持久化

channel.queue_declare(queue='task_queue', durable=True)

在生产者、消费者代码中,添加 durable=True 可以使队列持久化,如:重启rabbitmq后创建的队列不丢失;
[shell]# rabbitmqctl list_queues 查看队列以及消息数量

——消息持久化

channel.basic_publish(exchange='',
                          routing_key="task_queue",
                          body=str(new_dic).encode('utf-8'),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          ))

在生产者代码中,设置 properties 方法值 delivery_mode=2 参数,可以使消息持久化,没有被处理的消息会保存在队列中;

4、公平调度

以上的代码,一直是按照轮训的方式来派发消息的。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

channel.basic_qos(prefetch_count=1)

示例配置:

...
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(1)
    ch.basic_ack(delivery_tag=method.delivery_tag)  # 消息确认

channel.basic_qos(prefetch_count=1)  # 一次性取几个消息
channel.basic_consume('task_queue', callback)
...

二、发布订阅模式

将同一个消息发送给多个消费者;

1.1-生产者代码

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.13', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.exchange_declare(exchange='logs', exchange_type='fanout')   # 创建一个fanout交换机,命名为logs

for i in range(10):
    new_dic = {"name" + str(i): "" + str(i)}

    channel.basic_publish(exchange='logs', routing_key='', body=str(new_dic).encode('utf-8'))
    print(" [x] Sent %r" % new_dic)

connection.close()

1.2-消费者代码一

(临时队列和随机的队列名称)

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.14', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel

channel.exchange_declare(exchange='logs', exchange_type='fanout')   # 创建一个fanout交换机,命名为logs
result = channel.queue_declare(exclusive=True, queue="")      # 临时队列(exclusive=True表示断开连接时,这个队列被删除)
queue_name = result.method.queue        # 获取随机队列名
channel.queue_bind(exchange='logs', queue=queue_name)   # 将队列和交换机绑定


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)


channel.basic_consume(queue_name, callback)
channel.start_consuming()

1.3-消费者代码二

(指定的队列名称)

import pika
import time

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.14', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel

channel.exchange_declare(exchange='logs', exchange_type='fanout')   # 创建一个fanout交换机,命名为logs
# channel.queue_declare(exclusive=True, queue="bind_queue_test")      # 创建一个临时队列(exclusive=True表示断开连接时,这个队列被删除)
channel.queue_declare(queue="bind_queue_test", durable=True)      # 创建一个队列

channel.queue_bind(exchange='logs', queue="bind_queue_test")   # 将队列和交换机绑定

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume("bind_queue_test", callback)
channel.start_consuming()

2.1-测试结论

发布订阅的方式可以实现将消息同时发给多个消费者;
它是通过扇形交换机(exchange_type=‘fanout’)实现的。扇形交换机会将消息复制发送到与其绑定的队列中,每个队列会同时接到相同的消息;所以【消费者代码一】中创建的队列名称都是临时(独占)、随机的,这样的话,其他消费者就不能消费这个队列,也不会冲突;
当然,也可以创建指定的队列名称并持久化。但是这样的消费者不能是多个,否则在这个队列中就会是轮训的分发消息了;

2.2-验证测试

步骤:
1、运行了一个【消费者代码一】;
2、运行了两个【消费者代码二】:消费者代码A和消费者代码B;
3、运行【生产者代码】;发送 0到9 个消息;
结果:
【消费者代码一】全部接收到了 0-9 个消息;
【消费者代码A】接收到了 0、2、4、6、8;
【消费者代码B】接收到了 1、3、5、7、9;

三、路由模式

生产者和消费者设置 routing_key 参数;路由器通过该参数来判断消息转发给哪个队列;

1.1-生产者

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.13', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')    # 创建直连交换机

severity = "error"  # 绑定的routing_key
message = "messages test ---- " + severity  # 要发送的消息
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=str(message).encode('utf-8'))
print(" [x] Sent %r:%r" % (severity, message))

connection.close()

1.2-消费者一

routing_key 为“info”

import pika
import time

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.14', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(exclusive=True, queue="")  # 创建一个临时随机名称的队列(exclusive=True表示断开连接时,这个队列被删除)
queue_name = result.method.queue  # 获取随机队列名
severity = "info"
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)  # 将队列和交换机绑定

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(queue_name, callback)
channel.start_consuming()

1.3-消费者二

routing_key 为“error”

import pika
import time

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.14', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')

result = channel.queue_declare(exclusive=True, queue="")  # 创建一个临时随机名称的队列(exclusive=True表示断开连接时,这个队列被删除)
queue_name = result.method.queue  # 获取随机队列名
severity = "error"
channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)  # 将队列和交换机绑定

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(queue_name, callback)
channel.start_consuming()

2.1-测试结论

通过设置 routing_key 参数,可以指定消息发送到哪个队列;
验证方法:多个消费者绑定同一个 routing_key;
但是,扇型交换机(fanout exchange)没有足够的灵活性 —— 它能做的仅仅是广播;
通过绑定 routing_key 的方式可以让直连交换机实现类似于扇形交换机发布订阅的效果;(扇形交换机会忽略routing_key这个参数)

2.2-验证测试

步骤:
1、运行消费者一和二;
2、运行生产者;
结果:
当 routing_key 为 info 时,消息发送到了消费者一;当 routing_key 为 error 时,消息发送到了消费者二;

四、主题交换机

主题交换机的路由键必须是一个由.分隔开的词语列表,如:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。词语的个数可以随意,但是不要超过255字节。
● * (星号) 用来表示一个单词.
● # (井号) 用来表示任意数量(零个或多个)单词。
在这里插入图片描述
Q1的绑定键为 .orange.,Q2的绑定键为 ..rabbit 和 lazy.#
示例:
携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列;
携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去;
携带有 quick.orange.fox 的消息会投递给第一个队列;
携带有 lazy.brown.fox 的消息会投递给第二个队列;
携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定;
携带着 quick.brown.fox 的消息不会投递给任何一个队列。
如果我们违反约定,发送了一个携带有一个单词或者四个单词(“orange” or “quick.orange.male.rabbit”)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。
但是另一方面,即使 “lazy.orange.male.rabbit” 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

1.1-生产者

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.13', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')  # 创建一个主题交换机

routing_key = "lazy.orange.male.rabbit"
message = "messages test ---- " + routing_key  # 要发送的消息

channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=str(message).encode('utf-8'))

print(" [x] Sent %r:%r" % (routing_key, message))

connection.close()

1.2-消费者一

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.14', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')  # 创建一个主题交换机

result = channel.queue_declare(exclusive=True, queue="")  # 创建一个临时随机名称的队列(exclusive=True表示断开连接时,这个队列被删除)
queue_name = result.method.queue  # 获取随机队列名

binding_key = "*.orange.*"	# 绑定路由
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)  # 将队列和交换机绑定

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))

channel.basic_consume(queue_name, callback)
channel.start_consuming()

1.2-消费者二

import pika

user_info = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.2.15', '5672', '/', user_info))
channel = connection.channel()  # 创建一个channel
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')  # 创建一个主题交换机

result = channel.queue_declare(exclusive=True, queue="")  # 创建一个临时随机名称的队列(exclusive=True表示断开连接时,这个队列被删除)
queue_name = result.method.queue  # 获取随机队列名

binding_key = "*.*.rabbit"	# 绑定路由
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)  # 将队列和交换机绑定
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key="lazy.#")  # 将队列和交换机绑定,并绑定路由


def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body,))


channel.basic_consume(queue_name, callback)
channel.start_consuming()

五、其他

这期的文章主要集中在了python上,编辑python代码来模拟生产者和消费者,以此发送、接收消息,能更加清楚的理解运作原理;
本篇文章中的代码案例部分基于rabbitmq的官方文档,同时也采集、测试了网上其他的代码案例,综合考量,更容易理解;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/402314.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ICG-Hydrazide,吲哚菁绿-酰肼,ICG-HZ结构式,溶于二氯甲烷等部分有机溶剂,

ICG-Hydrazide,吲哚菁绿-酰肼 中文名称:吲哚菁绿-酰肼 英文名称:ICG-Hydrazide 英文别名:ICG-HZ 性状:粉末或固体 溶剂:溶于二氯甲烷等部分有机溶剂 稳定性:-20℃密封保存、置阴凉干燥处、防潮 分子…

vue上实现左右关联滚动

先看效果&#xff1a; 代码&#xff1a; <template><div class"container"><!-- 左侧fixed导航区域 --><div class"left"><divv-for"item in leftList":key"item.id"class"left_item":class&…

Angular学习之ControlValueAccessor接口详解

ControlValueAccessor 是什么&#xff1f;为什么需要使用 &#xff1f;下面本篇文章就来带大家了解Angular中的ControlValueAccessor组件接口&#xff0c;希望对大家有所帮助&#xff01; ControlValueAccessor 是什么&#xff1f; 简单来说ControlValueAccessor是一个接口&am…

【Linux 网络编程2】应用层协议--http;序列化和反序列化,get和post请求传参的区别,cookie和sesion,编写一个简单的http

目录 1.序列化和反序列化 2.HTTP协议 3.编写一个简单的http 3.2.简单的http的使用 3.3.get和post请求传参的区别 4.http的状态码分类 5.cookie和sesion 1.序列化和反序列化 1.1.序列化和反序列化的优势 序列化将结构体转化为长字符串&#xff0c;便于传输&#xff1b;反序…

MyBatis源码用了哪些设计模式?

MyBatis源码用了哪些设计模式&#xff1f;前言一、创建型模式工厂模式单例模式建造者模式二、结构型模式适配器模式代理模式组合模式装饰器模式三、行为型模式模板模式策略模式迭代器模式总结前言 在 MyBatis 的两万多行的框架源码中&#xff0c;使用了大量的设计模式对工程架…

Oracle OCP 19c 考试(1Z0-083)中关于Oracle不完全恢复的考点(文末附录像)

欢迎试看博主的专著《MySQL 8.0运维与优化》 下面是Oracle 19c OCP考试&#xff08;1Z0-083&#xff09;中关于Oracle不完全恢复的题目: A database is configured in ARCHIVELOG mode A full RMAN backup exists but no control file backup to trace has been taken A media…

Spatial-Temporal Graph ODE Networks for Traffic Flow Forecasting

Spatial-Temporal Graph ODE Networks for Traffic Flow Forecasting 摘要 交通流量的复杂性和长范围时空相关性是难点 经典现存的工作&#xff1a; 1.利用浅图神经网络&#xff08;shallow graph convolution networks&#xff09;和 时间提取模块去分别建模空间和时间依赖…

【Python3安装部署的保姆级教程】

如何在Windows 10上安装Python Python是一种越来越受欢迎的编程语言,无论是对于初学者还是有经验的开发者。Python灵活多用,擅长脚本、自动化、数据分析、机器学习和后端开发。在本教程中,你将学习如何使用Windows的Python安装程序在Windows 10上安装Python。 第一步 — 下…

Python3-错误和异常

Python3 错误和异常 作为 Python 初学者&#xff0c;在刚学习 Python 编程时&#xff0c;经常会看到一些报错信息&#xff0c;在前面我们没有提及&#xff0c;这章节我们会专门介绍。 Python 有两种错误很容易辨认&#xff1a;语法错误和异常。 Python assert&#xff08;断…

进程间通信IPC

进程间通信IPC (InterProcess Communication) 一、进程间通信的概念 每个进程各自有不同的用户地址空间&#xff0c;任何一个进程的全局变量在另一个进程中都看不到&#xff0c;所以进程之间要交换数据必须通过内核&#xff0c;在内核中开辟一块缓冲区&#xff0c;进程1把数据…

MySQL事务详解与隔离级别的实现

文章目录一、四个特性二、存在问题三、隔离级别四、实现原理0、SQL语句执行流程1&#xff09;buffer pool2&#xff09;执行流程1、日志1&#xff09;binlog2&#xff09;redolog3&#xff09;对比4&#xff09;undolog2、MVCC原理1&#xff09;隐式字段2&#xff09;undo log版…

气泡式水位计的安装方法详解

气泡水位计的安装实际上就是气管的安装&#xff0c;气管的安装是否正确将直接影响到仪器测量数据的结果&#xff0c;气泡水位计它由活塞泵产生的压缩空气流经测量管和气泡室&#xff0c;进入被测的水体中&#xff0c;测量管中的静压力与气泡室上的水位高度成正比。那么接下来就…

蓝桥杯集训·每日一题Week1

前缀和&#xff08;Monday&#xff09; AcWing 3956. 截断数组&#xff08;每日一题&#xff09; 思路&#xff1a; 首先可以预处理出前缀和。判断数组长度如果小于 333 或者前 nnn 项不是 333 的倍数&#xff0c;则可以直接输出 000。 否则就枚举所有 s[i]s[n]3s[i] \cfrac…

kali双网卡

先单独开启一个网卡&#xff0c;配置/etc/network/interfaces 修改为如下配置 This file describes the network interfaces available on your system and how to activate them. For more information, see interfaces(5). source /etc/network/interfaces.d/* The loopb…

JVM系列——Java与线程,介绍线程原理和操作系统的关系

并发不一定要依赖多线程(如PHP中很常见的多进程并发)。 但是在Java里面谈论并发&#xff0c;基本上都与线程脱不开关系。因此我们讲一下从Java线程在虚拟机中的实现。 线程的实现 线程是比进程更轻量级的调度执行单位。 线程的引入&#xff0c;可以把一个进程的资源分配和执行调…

【深度强化学习】(3) Policy Gradients 模型解析,附Pytorch完整代码

大家好&#xff0c;今天和各位分享一下基于策略的深度强化学习方法&#xff0c;策略梯度法是对策略进行建模&#xff0c;然后通过梯度上升更新策略网络的参数。我们使用了 OpenAI 的 gym 库&#xff0c;基于策略梯度法完成了一个小游戏。完整代码可以从我的 GitHub 中获得&…

原型模式(设计模式详解)

原型模式 描述 原型模式&#xff08;Prototype Pattern&#xff09;是一种创建型设计模式&#xff0c;它允许通过复制现有对象来创建新对象&#xff0c;而无需从头开始编写代码。 在原型模式中&#xff0c;一个原型对象作为模板&#xff0c;新的对象通过克隆这个原型对象而创…

MySQL OCP888题解048-letter N in slow query log(慢查询日志里的字母N)

文章目录1、原题1.1、英文原题1.2、中文翻译1.3、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3.1、知识点1&#xff1a;mysqldumpslow - 总结缓慢的查询日志文件4、实验4.1、实验14.1.1、实验目的4.1.2、实验前准备4.1.3、实验步骤4.1.4、实验结论5、总结1、原题 1.1…

dva01-初识

背景 React 本身只是一个 DOM 的抽象层&#xff0c;使用组件构建虚拟 DOM。如果开发大应用&#xff0c;还需要解决一个问题。 通信&#xff1a;React 只提供了一种传参手段&#xff0c;后续数据变化非常麻烦&#xff0c;无法适用于大应用。数据流&#xff1a;每次要更新数据&…

ACE C++网络通信框架深入解析ACE_Message_Block消息类

前言 我所见到最好消息包的接口设计莫过于ACE_Message_Block了。 为什么这样说呢&#xff1f; 对于它的API说明&#xff0c;我最初仅想在它的基础上提供注释说明&#xff0c;而不想多言其它&#xff0c;因为无需多言其他。 不过&#xff0c;后来还是补充两个图&#xff0c;以…