py_rabbitmq

news2024/10/7 7:32:25

安装

服务端

https://www.jianshu.com/p/2fb6d5ac17b9

客户端

pip install pika

文档

https://rabbitmq.com/tutorials/tutorial-one-python.html

简单示例

生产者
import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()
# 创建一个队列
channel.queue_declare(queue="rb_queue_01")
channel.basic_publish(exchange="",
                      routing_key='rb_queue_01',
                      body='hello world3')
connection.close()
消费者
import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()

channel.queue_declare(queue="rb_queue_01")
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(queue='rb_queue_01', auto_ack=True, on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

ack

消息确认机制:确认消费了,移除队列

消费者

import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()

channel.queue_declare(queue="rb_queue_01")
def callback(ch, method, properties, body):
    print("消费者接受任务:==>  %r" % body )
    # int('ffff')
    # ack回传,告诉服务端已经取走了
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='rb_queue_01', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

durable

服务端挂掉了, 声明队列时做持久化

import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()
# 创建一个队列 ,声明持久化
channel.queue_declare(queue="rb_queue_02", durable=True)
channel.basic_publish(exchange="",
                      routing_key='rb_queue_01',
                      body='ack',
                      properties=pika.BasicProperties(
                          delivery_mode=2,  # make message persistent
                      ))
connection.close()

闲置的就消费

消费者

import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()


channel.queue_declare(queue="rb_queue_02")
def callback(ch, method, properties, body):
    print("消费者接受任务:==>  %r" % body)
    # int('ffff')
    # ack回传,告诉服务端已经取走了
    ch.basic_ack(delivery_tag=method.delivery_tag)
# Work Queues
# 闲置的就派发
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rb_queue_01', on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

发布订阅

https://rabbitmq.com/tutorials/tutorial-three-python.html

type = fanout

在这里插入图片描述

消费者(订阅者)
import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()
# 创建交换机,m1交换机名字
# fanout:将消息发送给所有的队列
channel.exchange_declare(exchange="m1", exchange_type="fanout")

# 声明队列,随机生成一个队列
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue

# 绑定exchange和队列
channel.queue_bind(exchange='m1', queue=queue_name)

# 接收消息
def callback(ch, method, properties, body):
    print("消费者接受任务:==>  %r" % body)
    # int('ffff')
    # ack回传,告诉服务端已经取走了
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 闲置的就派发
# channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
发布者(消费者)
import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()
# 发布者先运行就创建
channel.exchange_declare(exchange='m1', exchange_type='fanout')
# routing_key为空,通过交换机而不是直接交付,一个发布到多个
channel.basic_publish(exchange="m1", routing_key="", body="hhsaf")

type = direct

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1mWOvL4Y-1682781289097)(C:\Users\asus\AppData\Roaming\Typora\typora-user-images\image-20230429224837442.png)]

根据关键字不同交给不同的人

direct_发布者
import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()
# 发布者先运行就创建
channel.exchange_declare(exchange='m2', exchange_type='direct')
# routing_key为空,通过交换机而不是直接交付,一个发布到多个
channel.basic_publish(exchange="m2", routing_key="r2", body="r2")
direct_订阅者1
import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()
# 创建交换机,m1交换机名字
# fanout:将消息发送给所有的队列
channel.exchange_declare(exchange="m2", exchange_type="direct")

# 声明队列,随机生成一个队列
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue

# 绑定exchange和队列, 加上routingkey, 交换机收到来自发布者的消息+routing_key, 然后通过消费者绑定的routing_key进行分发
channel.queue_bind(exchange='m2', queue=queue_name, routing_key="r1")
channel.queue_bind(exchange='m2', queue=queue_name, routing_key="r2")


# 接收消息
def callback(ch, method, properties, body):
    print("消费者接受任务:==>  %r" % body)
    # int('ffff')
    # ack回传,告诉服务端已经取走了
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 闲置的就派发
# channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
direct_订阅者2
import pika
import rabbitmq_study.settings as settings

credentials = pika.PlainCredentials(settings.USER, settings.PASSWORD)
connection = pika.BlockingConnection(pika.ConnectionParameters(settings.HOST, credentials=credentials))

channel = connection.channel()
# 创建交换机,m1交换机名字
# fanout:将消息发送给所有的队列
channel.exchange_declare(exchange="m2", exchange_type="direct")

# 声明队列,随机生成一个队列
result = channel.queue_declare(queue="", exclusive=True)
queue_name = result.method.queue

# 绑定exchange和队列, 加上routingkey, 交换机收到来自发布者的消息+routing_key, 然后通过消费者绑定的routing_key进行分发
channel.queue_bind(exchange='m2', queue=queue_name, routing_key="r1")

# 接收消息
def callback(ch, method, properties, body):
    print("消费者接受任务:==>  %r" % body)
    # int('ffff')
    # ack回传,告诉服务端已经取走了
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 闲置的就派发
# channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

type = topic

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-14qQSLwu-1682781289112)(C:\Users\asus\AppData\Roaming\Typora\typora-user-images\image-20230429230557241.png)]

路由模糊匹配

匹配规则
  • *:匹配一个单词(单词)

    old.alex:
    old.* 可以
    old.# 可以
    
  • #:匹配多个单词

    old.alex.ll:
    old.* 不可以
    old.# 可以
    

和direct一样,只不过改改模式和routingkey

RPC

远程过程调用

待补充

_consuming()


#### type = topic

[外链图片转存中...(img-14qQSLwu-1682781289112)]

路由模糊匹配

##### 匹配规则

- *:匹配一个单词(单词)

old.alex:
old.* 可以
old.# 可以




- #:匹配多个单词

old.alex.ll:
old.* 不可以
old.# 可以




和direct一样,只不过改改模式和routingkey

### RPC

**远程过程调用**

待补充

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/476797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python每日一练(20230430)

目录 1. 移除元素 🌟 2. 删除排序链表中的重复元素 🌟 3. 搜索旋转排序数组 II 🌟🌟 🌟 每日一练刷题专栏 🌟 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 专栏 1.…

总结836

学习目标: 4月(复习完高数18讲内容,背诵21篇短文,熟词僻义300词基础词) 学习内容: 暴力英语:背诵《keep your direction》,默写,英语语法 高等数学:刷题&a…

node笔记_安装nvm管理node版本

文章目录 前言下载nvm安装nvmnvm路径node路径查看版本nvm -v查看nvm的node版本列表(nvm list available)配置nvm的镜像库mirror选择node版本安装 (node install version)使用指定的node版本(nvm use) node环境变量配置配置NODE_PA…

【打卡】图像检索与匹配4 孪生网络

任务4:孪生网络 孪生网络是一种由两个相同结构的神经网络组成的模型,其目的是将两个输入数据映射到一个共同的向量空间中,并计算它们之间的相似度或距离。它通常用于图像匹配、人脸识别、语义匹配等任务中。 步骤1:构建三元组数…

MIT 6.824 lab4A总结

Background 一个raft集群的性能很明显和raft的数量有关系,更重要的是如果我们多个key放在一个raft集群里,这样的并行性不太好。所以我们可以考虑分片,利用操作潜在的并行性来提升性能。每一个副本组只管理几个分片的put和get,并且…

网络基础设施 拥塞控制

我经常说,传统的 TCP 优化已经到顶,不会有大意义了,这有两方面意思。 一方面,内在的,TCP 的 ACK 时钟带回的信息就那么多,用足了又能怎样。一个学习最差的差生能控制的分数是是 0~100 分的区间…

【Linux】基础IO——文件系统|软硬链接|动静态库

文章目录 一、磁盘1. 物理结构2. 存储结构3. 逻辑抽象结构 二、文件系统1. 文件系统的结构2. 查看文件3. 删除文件 三、软硬链接1. 软链接2. 硬链接3. ACM 时间 四、动静态库1. 动静态库的介绍2. 静态库的制作3. 动态库的制作4. 动态库的加载 一、磁盘 基于上篇博客所写到的文…

从0搭建Vue3组件库(十一): 集成项目的编程规范工具链(ESlint+Prettier+Stylelint)

欲先善其事,必先利其器。一个好的项目是必须要有一个统一的规范,比如代码规范,样式规范以及代码提交规范等。统一的代码规范旨在增强团队开发协作、提高代码质量和打造开发基石,所以每个人必须严格遵守。 本篇文章将引入 ESLintPrettierStylelint 来对代码规范化。 ESlint ES…

【计算机网络】学习笔记:第三章 数据链路层(八千字详细配图)【王道考研】

基于本人观看学习b站王道计算机网络课程所做的笔记&#xff0c;不做任何获利 仅进行交流分享 特此鸣谢王道考研 若有侵权请联系&#xff0c;立删 如果本篇笔记帮助到了你&#xff0c;还请点赞 关注 支持一下 ♡>&#x16966;<)!! 主页专栏有更多&#xff0c;如有疑问欢迎…

redhat 8.7 安装oracle 11g-11.2.0.4

redhat 8.7 安装oracle 11g-11.2.0.4 1、写在前面&#xff1a;这篇文章最后安装失败了。这是一次失败的尝试&#xff0c;仅做记录。结论是RHEL 8不支持Oracle 11g-11.2.0.4 安装&#xff0c;后续再研究怎么跑起来。1、数据库下载和安装文档1.1、查看oracle 11g 适合安装的linux…

阿里云版GPT官宣,我们问了它10个问题

4月7日&#xff0c;阿里云宣布自研大模型“通义千问”&#xff0c;目前已开始邀请用户测试体验。 阿里达摩院在NLP自然语言处理等前沿科研领域早已布局多年&#xff0c;并于2019年启动大模型研发&#xff0c;通义千问便是其最新成果&#xff0c;相当于阿里云版的“ChatGPT”。 …

让GPT成为护理专家 - 护士的工作如此简单

引子    书接上文《GPT接入企微应用 - 让工作快乐起来》&#xff0c;我把GPT接入了企微应用&#xff0c;不少同事都开始尝试起来了。有的浅尝辄止&#xff0c;有的刨根问底&#xff0c;五花八门&#xff0c;无所不有。这里摘抄几份&#xff1a; “帮我写一份表白信&#xff…

【Prompt】7 个向 chatGPT 高效提问的方法

欢迎关注【youcans的 AIGC 学习笔记】原创作品 【Prompt】7 个向 chatGPT 高效提问的方法 0. 向 chatGPT 高效提问的方法1. 提问方法&#xff1a;明确问题2. 提问方法&#xff1a;简洁清晰3. 提问方法&#xff1a;避免歧义4. 提问方法&#xff1a;提供上下文5. 提问方法&#x…

很不错的一篇文章,值得点赞收藏,带你全面了解MySQL性能调优、错误代码总结和全局参数配置(持续更新中ing)

前言 本文主要介绍当前MySQL性能优化原理实战&#xff0c;包括以下方面&#xff1a; 已更新文章目录MySQL遇到的的错误及解决方法全局参数文件配置详解。 后续希望大家提出宝贵的建议。喜欢的话点赞收藏关注走一波。如有错误的地方&#xff0c;请指出&#xff01;&#xff01;&…

C51 - 自写操作系统

最简OS 1> 版本1&#xff1a;任务建立与切换2> 版本2&#xff1a;定时器切换2.1> main.c2.2> task.c2.3> sleep.c 3> 版本3&#xff1a;加时间片轮转 在51单片机上&#xff0c;实现操作系统最简模型&#xff0c; 学习理解操作系统的基本概念&#xff1b; &am…

〖Python网络爬虫实战㉑〗- 数据存储之JSON操作

订阅&#xff1a;新手可以订阅我的其他专栏。免费阶段订阅量1000 python项目实战 Python编程基础教程系列&#xff08;零基础小白搬砖逆袭) 说明&#xff1a;本专栏持续更新中&#xff0c;目前专栏免费订阅&#xff0c;在转为付费专栏前订阅本专栏的&#xff0c;可以免费订阅付…

912. 排序数组

1.题目&#xff1a; 2.我的代码&#xff1a; C语言&#xff1a; /*** Note: The returned array must be malloced, assume caller calls free().*/ int* sortArray(int* nums, int numsSize, int* returnSize) {//希尔排序int gap numsSize;//多次预排while (gap > 1) {/…

【Linux】初识Linux

目录 &#x1f34e;一.Linux历史&#x1f34e; 1.UNIX发展的历史 2.Linux发展历史 &#x1f34f;二.开源&#x1f34f; &#x1f351;三.官网&#x1f351; &#x1f34a;四.企业应用现状&#x1f34a; 1.Linux在服务器领域的发展 2.Linux在桌面领域的发展 3.Linux在移…

自实现朴素贝叶斯分类器with案例:基于SMS Spam Collection数据集的广告邮件分类

目录 贝叶斯分类器何为朴素案例&#xff1a;基于SMS Spam Collection数据集的广告邮件分类SMS数据集词向量表示Laplacian平滑训练过程分类过程 完整代码 贝叶斯分类器 首先要理解贝叶斯决策的理论依据&#xff0c;引用西瓜书上的原话&#xff1a;对于分类任务&#xff0c;在所…

【小呆的力学笔记】非线性有限元的初步认识【二】

文章目录 1.2 有限元分析的数学原理1.2.1 基于最小势能原理的变分法提法1.2.1.a 弹性力学方程简化记法1.2.1.b 应变能密度和应变余能密度1.2.1.c 最小势能原理变分基础 1.2 有限元分析的数学原理 书接上回&#xff0c;我们已经回顾了线性有限元分析的理论基础——线弹性力学的…