RabbitMQ python第三方库pika应用入门实践

news2025/1/17 15:23:45

1. RabbitMQ简介

RabbitMQ是一个可靠、高效的开源消息代理服务器,基于AMQP协议。它具备以下特点:

  • 可以支持多种消息协议,如AMQP、STOMP和MQTT等。
  • 提供了持久化、可靠性和灵活的路由等功能。
  • 支持消息的发布和订阅模式。
  • 具备高可用性和可扩展性。

RabbiMQ的核心概念包括生产者、消费者、队列、交换机和绑定。生产者将消息发送到交换机,交换机根据其类型和绑定规则将消息路由到队列,然后消费者从队列中获取消息进行处理。

RabbitMQ相关概念

  • Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker。
  • Virtual host:出于多租户和安全因素的设计,把AMQP的基本组件划分到一个虚拟的分组中,类似于网络中的namespace概念,当多个不同的用户使用同一个RabbitMQ Server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange/queue等。
  • Connection:publisher/consumer和broker之间的TCP连接。
  • Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量大的时候建立TCP Connection的开销都将是巨大的,效率也是非常低的。Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread会创建单独的Channel进行通信,AMQP的method包含了channel id帮助客户端和message broker识别channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection,极大减少了操作系统建立TCP连接的开销。

相关术语

  • producer:生产者,向队列中发送消息的程序。(在图表中通常使用P表示)
  • queue:队列,用于存储消息,定义在RabbitMQ内部,queue本质上是一个消息缓存buffer,生产者可以往里发送消息,消费者也可以从里面获取消息。(在图表中通常使用Q表示)
  • consumer:消费者,等待并从消息队列中获取消息的程序。(在图表中通常使用C表示)
  • exchange:交换机,用于将producer发送来的消息发送到queue,事实上,producer是不能直接将message发送到queue,必须先发送到exchange,再由exchange发送到queue。

注:生产者和消费者可能在不同的程序或主机中,当然也有可能一个程序有可能既是生产者,也是消费者。

2. pika简介

在Python中,pika是一个用于处理RabbitMQ消息队列的第三方库,它允许开发者在Python应用程序中发送和接收消息,实现应用程序之间的异步通信。

主要功能

  • 连接管理:pika提供了与RabbitMQ服务器建立连接的功能。
  • 通道管理:通过连接,可以创建多个通道(channel),每个通道代表一个独立的通信流。
  • 消息发送与接收:开发者可以使用pika发送消息到指定的队列(queue),并从队列中接收消息。
  • 交换机与队列声明:支持声明交换机(exchange)、队列,以及它们之间的绑定(binding)关系。
  • 消息确认:支持消息的自动确认(auto-ack)或手动确认(manual ack),以确保消息的可靠传递。

使用流程

  • 创建连接:使用pika.BlockingConnection或pika.SelectConnection等类创建与RabbitMQ服务器的连接。
  • 创建通道:通过连接对象的channel()方法创建通道。
  • 声明交换机与队列:使用通道对象的exchange_declare()和queue_declare()方法声明交换机和队列。
  • 绑定交换机与队列:使用通道对象的queue_bind()方法将队列绑定到交换机。
  • 发送消息:使用通道对象的basic_publish()方法发送消息到指定的交换机和路由键(routing key)。
  • 接收消息:使用通道对象的basic_consume()方法开始消费队列中的消息,并通过回调函数处理接收到的消息。
  • 关闭连接:在不再需要时,使用连接对象的close()方法关闭连接。

安装pika

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika

3. pika应用入门

3.1. 生产者

import pika
 
# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
channel = connection.channel()

# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
channel.queue_declare(queue='hello_world', durable=True)
 
# 3.向指定队列插入数据
poiid = 'xxxxxx'
channel.basic_publish(exchange='',  # 简单模式
                      routing_key='hello_world',  # 指定队列
                      body=poiid,  # 向队列中添加的数据
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      )
                      )
print(" [x] Sent 'Hello World!'") 

在这里插入图片描述
查看虚拟主机virtual-host: /typc-fpd-dev下队列hello_world。
在这里插入图片描述

3.2. 侦听消费者

import pika
 
# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
 
# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
# 注意:这一步不是必须的,但是如果消费者先启动而不是生成者先启动时,这时队列中还没有hello_world队列,这时就会报错
channel.queue_declare(queue='hello_world', durable=True)

# 3.确定回调函数
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    # 手动应答
    poiid = body.decode('utf-8')  # 将 bytes 转换为字符串 
    Core.setPIO(poiid)            # 输入数据
    Core.task_process()           # 处理数据
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 4.确定监听队列参数
channel.basic_consume(queue='hello_world',  # 指定队列
                      auto_ack=False,  # 手动应答方式
                      on_message_callback=callback)
 
print(' [*] Waiting for messages. To exit press CTRL+C')

    
# 5.正式监听
channel.start_consuming()

3.3. 主动处理消费消息

在Pika中,basic_get方法确实可以用于从队列中直接获取消息,但通常不推荐在生产环境中使用,因为它不是高效的消息处理方式。不过,如果你确实需要这种方法,以下是如何使用basic_get的示例:

# 1.连接rabbit
credentials = pika.PlainCredentials('rabbit', '*****')  # rabbit用户名和密码
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.17.61',port = 5671,virtual_host = '/typc-fpd-dev',credentials = credentials))
channel = connection.channel()

time.sleep(1)
     
# 2.创建持久化队列
# 注意:非持久化队列不能变持久化队列,反之也是这样的,所有创建队列中不能创建和非持久化队列重名的队列
# 注意:这一步不是必须的,但是如果消费者先启动而不是生成者先启动时,这时队列中还没有hello2队列,这时就会报错
channel.queue_declare(queue='hello_world', durable=True)
count = 5
for i in range(count):  
    print('取消息开始时间')  
    method_frame, header_frame, body = channel.basic_get(queue='hello_world', auto_ack=False)  
    if method_frame:  
        # 处理消息体 
        print('body:',body)  
        poiid = body.decode('utf-8')  # 将 bytes 转换为字符串 
        Core.setPIO(poiid)
        Core.task_process()
        time.sleep(2)        
    
        # 如果你设置了auto_ack=False,则需要手动确认消息  
        channel.basic_ack(delivery_tag=method_frame.delivery_tag)  
    else:  
        print("没有消息可以获取,", str(i))
    time.sleep(1)
print('取消息完成时间')
connection.close()#来关闭连接 

参考:

三只松鼠. python 操作RabbitMq详解. 博客园. 2019.03

卫玠_juncheng. Python三方库:Pika(RabbitMQ基础使用). CSDN博客. 2024.03

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1804419.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

From self-attention 2 flash-attention 数学原理与 cuda 实现优化

self attension 是transformer 编码器和解码器中共同的一个计算环节,在整个transformer 网络体系中耗费的算力比例占主导。所以节省self attention 的正向和反向的计算时间,就可以加速 transormer 的训练和推理过程。 1,self attention 的数…

学习笔记——路由网络基础——环回接口(loopback)

6、环回接口(loopback) (1)定义 环回接口(loopback) :是一种虚拟的接口,是一种纯软件性质的虚拟接口,模拟一个单独的网段。 Loopback等于在设备中模拟另外不同的网络,实现不需要物理接口连接设备,依然可以模拟的功能…

MobileNetV4实战:使用 MobileNetV4实现图像分类任务(二)

文章目录 训练部分导入项目使用的库设置随机因子设置全局参数图像预处理与增强读取数据设置Loss设置模型设置优化器和学习率调整策略设置混合精度,DP多卡,EMA定义训练和验证函数训练函数验证函数调用训练和验证方法 运行以及结果查看测试完整的代码 在上…

了解Synchronized对象头?

1、对象头的结构 Java对象存储在内存中结构为: 对象头(Header):实例数据(Instance Data):定义类中的成员属性对齐填充字节(Padding):由于HotSpot虚拟机的自…

高通SDX12:Voice Over USB 功能调试

一、功能概述及使用环境 Linux PC 作为上位机,内置 SLIC基于高通 SDX12 平台的设备作为从设备,通过USB连接到 Linux PC 上,在 PC 上枚举 UAC 设备从设备进行 MO/MT Call 时,上位机使用 arecord 进行录音,音频数据通过 USB 传至上位机,上位机停止录音后再使用 aplay 进行播…

经典文献阅读之--Online Monocular Lane Mapping(使用Catmull-Rom样条曲线完成在线单目车道建图)

0. 简介 对于单目摄像头完成SLAM建图这类操作,对于自动驾驶行业非常重要,《Online Monocular Lane Mapping Using Catmull-Rom Spline》介绍了一种仅依靠单个摄像头和里程计生成基于样条的在线单目车道建图方法。我们提出的技术将车道关联过程建模为一个…

【STM32】ucOS-III多任务程序

【STM32】uc/OS-III多任务程序 文章目录 【STM32】uc/OS-III多任务程序STM32F103C8T6移植uC/OS-III基于HAL库超完整详细过程与相关实验实验任务实验过程一、 uC/OS-III源码下载二、 建立STM32CubeMX工程三、 复制uC/OS-III文件到工程文件夹四、 添加工程组件和头文件路径五、修…

【中颖】SH79F9202 串口通信

头文件 uart.h #ifndef UART_H #define UART_H#include "SH79F9202.h" #include "LCD.h" #include "timer2.h" #include "timer5.h" #include "cpu.h" #include "key.h" #include "io.h" #include &qu…

【C++】深入理解decltype和decltype(auto)

深入理解decltype和decltype(auto) 一、decltype语法介绍二、decltype的推导规则1. expr不加括号2. expr加上括号 三、关于decltype的CV属性推导四、 decltype(auto) 的使用 一、decltype语法介绍 decltype关键字是C11新标准引入的关键字,它…

向量数据库是什么?

向量数据库是什么? 随着人工智能和机器学习技术的迅猛发展,向量数据库作为一种新型数据库引起了广泛关注。向量数据库专门用于存储和查询高维向量数据,是在大规模数据检索和相似性搜索领域的重要工具。 向量数据库的定义 向量数据库是一种…

心链13---主页切换功能 + loading特效 + 导航栏完善 + 队伍页接口修改

心链 — 伙伴匹配系统 直接取出所有用户,依次和当前用户计算分数,取 TOP N(54 秒) 优化方法: 切忌不要在数据量大的时候循环输出日志(取消掉日志后 20 秒)Map 存了所有的分数信息,占…

上位机图像处理和嵌入式模块部署(f407 mcu和其他mcu品类的选择)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 很多朋友读书的时候学的是stm32,工作中用的也是stm32。这本来问题不大,但是过去两三年的经历告诉我们,mcu的使用…

Polar Web【中等】反序列化

Polar Web【中等】反序列化 Contents Polar Web【中等】反序列化思路&探索EXPPHP生成PayloadGET传递参数 运行&总结 思路&探索 一个经典的反序列化问题,本文采用PHP代码辅助生成序列字符串的方式生成 Payload 来进行手动渗透。 打开站点,分析…

Python编程基础4

模块:模块支持从逻辑上组织Python代码,当代码量变得非常大的时候,最好把代码分成一些有组织的代码段。代码片段相互间有一定的联系,可能是一个包含数据成员和方法的类、函数、变量。 搜索路径:模块的导入需要一个叫做‘…

构建智能汽车新质生产力丨美格智能亮相2024高通汽车技术与合作峰会

近日,以“我们一起,驭风前行”为主题的2024高通汽车技术与合作峰会在无锡国际会议中心隆重举行。作为高通公司的战略合作伙伴,美格智能受邀全程参与此次汽车技术与合作峰会。在峰会现场,美格智能产品团队隆重展示了多款基于高通平…

Wireshark自定义Lua插件

背景: 常见的抓包工具有tcpdump和wireshark,二者可基于网卡进行抓包:tcpdump用于Linux环境抓包,而wireshark用于windows环境。抓包后需借助包分析工具对数据进行解析,将不可读的二进制数转换为可读的数据结构。 wires…

VUE封装-自定义权限控制指令

在实际开发中,会遇到很多的权限控制、资源位的场景,其实就是用来控制某个组件的展示与否,可以是一个按钮、一个报表、一个TAB页面等 例如下图,我想通过当前登录的用户控制谷歌的这个logo显示与否 因为设计到的权限、资源位控制比…

摆脱Jenkins - 使用google cloudbuild 部署 java service 到 compute engine VM

在之前 介绍 cloud build 的文章中 初探 Google 云原生的CICD - CloudBuild 已经介绍过, 用cloud build 去部署1个 spring boot service 到 cloud run 是很简单的, 因为部署cloud run 无非就是用gcloud 去部署1个 GAR 上的docker image 到cloud run 容…

GUI编程-01

组件 窗口 弹窗 面板 文本框 列表框 按钮 图片 监听事件 鼠标 键盘事件 破解工具 Java提供了丰富的图形用户界面(Graphics User Interface,GUI)的类库,基于这些类库可以编写窗口程序。 Java关于图形界面的类库主要放在…

【Redis学习笔记05】Jedis客户端(string、list、set)

Jedis客户端 1. 命令 1.1 String类型 1.1.1 常见命令 SET命令 语法:SET key value [EX seconds | PX milliseconds] [NX|XX] 说明:将string类型的value值设置到指定key中,如果之前该key存在,则会覆盖原先的值,原先…