Python入门自学进阶-Web框架——39、redis、rabbitmq、git——2

news2025/1/11 19:39:23

RabbitMQ的exchange,即交换机有不同的类型:

1.direct Exchange(直接交换机)
匹配路由键,只有完全匹配消息才会被转发
2.Fanout Excange(扇出交换机)
将消息发送至所有的队列
3.Topic Exchange(主题交换机)
将路由按模式匹配,此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*” 只会匹配到“abc.def”。
4.Header Exchange
在绑定Exchange和Queue的时候指定一组键值对,header为键,根据请求消息中携带的header进行路由

RabbitMQ六种工作模式
六种模式分别为Hello world、Work queues(工作队列)、Publish/Subscribe(发布订阅)、Routing(路由)、Topics(主题)、RPC(远程调用),除了RPC模式外,其余的模式都是从简单的使用到更为灵活的使用,基本的代码框架都是差不多的,只是在不同的模式下达到的效果不同,它们各有各的特点,在实际使用中应该根据需求来选择具体的模式,而不是简单粗暴的选择最“高端”的模式。

1. Hello world模式(也叫simple (简单模式))
Hello world模式是最简单的一种模式,一个producer发送message,另一个consumer接收message。

 

2. Work queues模式(工作模式)
Work queues模式即工作队列模式,也称为Task queues模式(任务队列模式),这个模式的特点在于,同一个queue可以允许多个consumer从中获取massage,RabbitMQ默认会从queue中依次循环的给不同的consumer发送message。一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次

3. Publish/Subscribe模式(发布订阅模式)相当于广播
相对于工作/任务模式中的一个message只能发送给一个consumer使用,发布订阅模式会将一个message同时发送给多个consumer使用,其实就是producer将message广播给所有的consumer。

生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息。 

使用fanout交换机类型,传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。

不需要指定 routing_key ,即使指定了也是无效。
需要提前将 exchange 和 queue 绑定,一个 exchange 可以绑定多个 queue,一个queue可以绑定多个exchange。
需要先启动 订阅者,此模式下的队列是 consumer 随机生成的,发布者 仅仅发布消息到 exchange ,由 exchange 转发消息至 queue。如果不先启动订阅者,则发布者发布的消息订阅者是无法事后接收到的。
发布者:

import pika  # 链接mq需要pika模块
import json

user_info = pika.PlainCredentials('tester','test1234')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.117',5672,'/',user_info))
channel = connection.channel()

# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储

channel.exchange_declare(exchange='logs',
                         exchange_type='fanout',
                         )
for i in range(0,10):
    message = json.dumps({'消息ID':'1000%s'%i,},ensure_ascii=False)
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=bytes(message,encoding='utf8'),
                          )
    print(message)

connection.close()

接收者:

import pika
import json

user_info = pika.PlainCredentials('tester','test1234')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.117',5672,'/',user_info))
channel = connection.channel()

# 创建临时队列,队列名传空字符或不设置,将创建唯一的临时queue,consumer关闭后,队列自动删除
result = channel.queue_declare('',exclusive=True)
queue_name = result.method.queue
print("temp queue name:",queue_name)
channel.queue_bind(exchange='logs',
                   queue=queue_name,
                   )

def callback(ch, method, properties, body):
    print("[x] Received %r" % str(body,encoding='utf8'))
    # 如果basic_consume中auto_ack为False,则这里要手动进行应答
    channel.basic_ack(delivery_tag=method.delivery_tag) # 手动应答
    print('手动应答队列中消息')

channel.basic_consume(queue=queue_name, # 接收指定queue的消息
                      on_message_callback=callback, # 设置收到消息的回调函数
                      auto_ack=False) # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息,False表示不自动确认,需要在callback中手工确认

print('[*] Waiting for message. To exit press CTRL+C')

# 一直处于等待接收消息的状态,如果没收到消息就一直处于阻塞状态,收到消息就调用上面的回调函数
channel.start_consuming()

4. Routing模式(路由模式),相当于组播
路由模式中,exchange类型为direct,与发布订阅模式相似,但是不同之处在于,发布订阅模式将message不加区分广播给所有的绑定queue,但是路由模式中,允许queue在绑定exchange时,同时指定 routing_key ,exchange就只会发送message到与 routing_key 匹配的queue中,其他的所有message都将被丢弃。当然,也允许多个queue指定相同的 routing_key ,此时效果就相当于fanout类型的发布订阅模式了。

生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列 

生产者:

import pika  # 链接mq需要pika模块
import sys

user_info = pika.PlainCredentials('tester','test1234')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.117',5672,'/',user_info))
channel = connection.channel()

channel.exchange_declare(exchange='direct-logs',
                         exchange_type='direct', # 类型为direct
                         durable = True,
                         )

severity = sys.argv[1] if len(sys.argv)>1 else 'info'  # 定义消息严重级别
message = ''.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='direct-logs',
                      routing_key=severity, # 把消息发送到一组队列,这一组队列按routing_key分组
                      body=message)
print(" [x] Sent %r:%r" % (severity,message))
connection.close()

消费者:

import pika
import sys

user_info = pika.PlainCredentials('tester','test1234')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.117',5672,'/',user_info))
channel = connection.channel()

# 创建临时队列,队列名传空字符或不设置,将创建唯一的临时queue,consumer关闭后,队列自动删除
result = channel.queue_declare('',exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]  # 可以输入多个级别
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities: # 循环绑定routing_key
    channel.queue_bind(exchange='direct-logs',
                       queue=queue_name,
                       routing_key=severity,
                       )

def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    # 如果basic_consume中auto_ack为False,则这里要手动进行应答
    channel.basic_ack(delivery_tag=method.delivery_tag) # 手动应答
    print('手动应答队列中消息')

channel.basic_consume(queue=queue_name, # 接收指定queue的消息
                      on_message_callback=callback, # 设置收到消息的回调函数
                      auto_ack=False) # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息,False表示不自动确认,需要在callback中手工确认

channel.start_consuming()

运行结果:

5. Topics模式(主题模式)
主题模式的exchange类型为topic,相较于路由模式,主题模式更加灵活,区别就在于它的routing_key可以带通配符 * (匹配一个单词)和 # (匹配0个或多个单词),每个单词以点号分隔,但注意,routing_key的总大小不能超过255个字节。

如果一个message同时匹配了多个queue中的routing_key,那这几个queue都会收到这个message,如果一个message同时匹配了一个queue中的多个routing_key,那这个queue也只会接收一次这条message,如果一个message没有匹配上任何routing_key,那么这个message将被丢弃。

如果routing_key定义为 # (就只有这一个通配符),那么这个queue将接收所有message,就像exchange类型为fanout的发布订阅模式一样,如果routing_key两个通配符都没有使用,那么这个queue将会接收固定routing_key的message,就像exchange类型为direct的路由模式一样。

producer端:从代码上讲,producer的代码与路由模式没什么区别,只不过在routing_key的传值上需要注意与想要发送到的queue进行匹配。

生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者。 

生产者:

import pika  # 链接mq需要pika模块
import sys

user_info = pika.PlainCredentials('tester','test1234')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.117',5672,'/',user_info))
channel = connection.channel()

channel.exchange_declare(exchange='topic-logs',
                         exchange_type='topic', # 类型为direct
                         durable = True,
                         )

routing_key = sys.argv[1] if len(sys.argv)>1 else 'anonymous.info'  # 定义消息严重级别

message = ''.join(sys.argv[2:]) or 'Hello World!'

channel.basic_publish(exchange='topic-logs',
                      routing_key=routing_key, # 把消息发送到一组队列,这一组队列按routing_key分组
                      body=message)
print(" [x] Sent %r:%r" % (routing_key,message))
connection.close()

消费者:

import pika
import sys

user_info = pika.PlainCredentials('tester','test1234')
connection = pika.BlockingConnection(pika.ConnectionParameters(
    '192.168.1.117',5672,'/',user_info))
channel = connection.channel()

channel.exchange_declare(exchange='topic-logs',
                         exchange_type='topic',
                         durable=True,)

# 创建临时队列,队列名传空字符或不设置,将创建唯一的临时queue,consumer关闭后,队列自动删除
result = channel.queue_declare('',exclusive=True)
queue_name = result.method.queue


binding_keys = sys.argv[1:]  # 可以输入多个级别
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys: # 循环绑定routing_key
    channel.queue_bind(exchange='topic-logs',
                       queue=queue_name,
                       routing_key=binding_key,
                       )

def callback(ch, method, properties, body):
    print("[x] Received %r" % body)
    # 如果basic_consume中auto_ack为False,则这里要手动进行应答
    channel.basic_ack(delivery_tag=method.delivery_tag) # 手动应答
    print('手动应答队列中消息')

channel.basic_consume(queue=queue_name, # 接收指定queue的消息
                      on_message_callback=callback, # 设置收到消息的回调函数
                      auto_ack=False) # 指定为True,表示消息接收到后自动给消息发送方回复确认,已收到消息,False表示不自动确认,需要在callback中手工确认

print('[*] Waiting for message. To exit press CTRL+C')
channel.start_consuming()

运行结果:可以使用*,#等进行过滤

 6. RPC模式
RPC远程调用(Remote Procedure Call)模式其实就是使用消息队列处理请求的一种方式,通常请求接收到后会立即执行且多个请求是并行执行的,如果一次性来了太多请求,达到了服务端处理请求的瓶颈就会影响性能,但是如果使用消息队列的方式,最大的一点好处是可以不用立即处理请求,而是将请求放入消息队列,服务端只需要根据自己的状态从消息队列中获取并处理请求即可。

producer端:RPC模式的客户端(producer)需要使用到两个queue,一个用于发送request消息(此queue通常在服务端声明和创建),一个用于接收response消息。另外需要特别注意的一点是,需要为每个request消息指定一个uuid(correlation_id属性,类似请求id),用于识别返回的response消息是否属于对应的request。

 客户端client:

import pika
import uuid

class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            '192.168.1.117', 5672, '/', pika.PlainCredentials('tester','test1234')
        ))
        self.channel = self.connection.channel()

        result = self.channel.queue_declare('',exclusive=True)  # 随机生成一个临时的唯一的queue
        self.callback_queue = result.method.queue # 这个临时唯一queue的名字
        # 注意,这个临时queue不是用于发送消息的,是用于接收消息的,这个queue名字,
        # 会传给server端,server端用这个Queue发送消息,也就是客户端指定了服务器端要使用的queue
        self.channel.basic_consume(on_message_callback=self.on_response,
                                   auto_ack=False,
                                   queue=self.callback_queue,)  
        # 这是客户端发送完请求后,接收服务器端返回消息的配置,注意queue就是上面生成的临时queue

    def on_response(self,ch,method,props,body):
        if self.corr_id == props.correlation_id:
            self.response = body
            ch.basic_ack(delivery_tag=method.delivery_tag)
            print("手动应答成功")


    def call(self,n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,
                                       correlation_id=self.corr_id,
                                    # 这个参数是用来标识本次请求,如果客户端发送多个请求,每个请求有不同的uuid,以此进行区分,类似cookie
                                   ),
                                   body=str(n))

        while self.response is None:
            self.connection.process_data_events() # 以非阻塞的方式去检查有没有新消息,
        return int(self.response)

fibonacci_rpc = FibonacciRpcClient()

print("[x] Requesting fib(7)")
response = fibonacci_rpc.call(7)
print("[.] Got %r" % response)

服务器端server:

import pika

user_info = pika.PlainCredentials('tester','test1234')
connection = pika.BlockingConnection(pika.ConnectionParameters(
            '192.168.1.117',5672,'/',user_info,))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')

def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)

def on_request(ch,method,props,body):
    n = int(body)

    print("[.] fib(%s)" % n)
    response = fib(n)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_message_callback=on_request, queue='rpc_queue')

print("[x] Awaiting RPC requests")
channel.start_consuming()

要注意的是,作为RPC模式,client端一开始是消息发送方,即发布者,server端是消费者,当server端收到消息后,经过处理,要将处理结果再返回给client端,此时,server端就是发布者,client端就是消费者,并且server端发布时使用的queue是client端指定的,即client端生成的临时queue。

correlation_id主要是为了在异步处理中,客户端发送多个请求,服务器端返回的响应因处理速度不同,可能响应的顺序也不同,为了区分不同的请求的响应,使用此标志。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/847975.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【机器学习 | 决策树】利用数据的潜力:用决策树解锁洞察力

🤵‍♂️ 个人主页: AI_magician 📡主页地址: 作者简介:CSDN内容合伙人,全栈领域优质创作者。 👨‍💻景愿:旨在于能和更多的热爱计算机的伙伴一起成长!!&…

山东气象 × 和鲸科技:第一届齐鲁风云气象智能算法挑战赛圆满收官

7 月 20 日,中国气象局印发《人工智能气象应用工作方案(2023 - 2030年)》,旨在加快布局国产人工智能气象应用技术体系建设,推动人工智能技术在气象观测、预报及服务中的深度融合应用,为监测精密、预报精准、…

网络安全 Day30-运维安全项目-堡垒机部署

运维安全项目-堡垒机部署 1. 运维安全项目-架构概述2. 运维安全项目之堡垒机2.1 堡垒机概述2.2 堡垒机选型2.3 环境准备2.4 部署Teleport堡垒机2.4.1 下载与部署2.4.2 启动2.4.3 浏览器访问teleport2.4.4 进行配置2.4.5 安装teleport客户端 2.5 teleport连接服务器 1. 运维安全…

【Windows】Windows开机密码重置

文章目录 前言一、问题描述二、操作步骤2.1 安装DaBaiCai_d14_v6.0_2207_Online.exe2.2 插入U盘2.3 打开大白菜,点击“一键制作USB启动盘”2.4 等待进度条走完2.5 重启电脑,开机按“F12”或者“F8”(具体百度一下,对应品牌电脑开机…

DEVICENET转ETHERCAT网关连接西门子支持ethercat吗

你有没有遇到过生产管理系统中,设备之间的通讯问题?两个不同协议的设备进行通讯,是不是很麻烦?今天,我们为大家介绍一款神奇的产品,能够将不同协议的设备进行连接,让现场的数据交换不再困扰&…

Appium 移动端自动化测试 -- 常用元素操作

点击元素(element) element.click() 输入内容 element.send_keys(‘input_string’) 清空输入框内容 element.clear() 获取元素属性 element.get_property("text_length")element.get_attribute("class") 获取元素的text文本…

MySQL 慢查询探究分析

目录 背景: mysql 整体结构: SQL查询语句执行过程是怎样的: 知道了mysql的整体架构,那么一条查询语句是怎么被执行的呢: 什么是索引: 建立索引越多越好吗:   如何发现慢查询&#xff1…

Appium Android 自动化测试 -- 元素定位

自动化测试元素定位是难点之一,编写脚本时会经常卡在元素定位这里,有时一个元素能捣鼓一天,到最后还是定位不到。 Appium 定位方式和 selenium 一脉相承,selenium 中的定位方式Appium 中都支持,而 Appium 还增加了自己…

每天一道leetcode:剑指 Offer 32 - III. 从上到下打印二叉树 III(中等广度优先遍历)

今日份题目: 请实现一个函数按照之字形顺序打印二叉树,即第一行按照从左到右的顺序打印,第二层按照从右到左的顺序打印,第三行再按照从左到右的顺序打印,其他行以此类推。 示例 给定二叉树: [3,9,20,null,null,15,7…

pytorch Stream 多流处理

CUD Stream https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#c-language-extensions 中指出在kenel的调用函数中最后一个可选参数表示该核函数处在哪个流之中。 - 参数Dg用于定义整个grid的维度和尺寸,即一个grid有多少个block。为dim3类型。…

电脑开不了机如何解锁BitLocker硬盘锁

事情从这里说起,不想看直接跳过 早上闲着无聊,闲着没事干,将win11的用户名称改成了含有中文字符的用户名,然后恐怖的事情发生了,蓝屏了… 然后就是蓝屏收集错误信息,重启,蓝屏收集错误信息&…

【软件工程】2.4 建立动态模型

目录 2.4 建立动态模型 2.4.1 活动图 验证密码活动图 转账活动图 存款活动图 取款活动图 2.4.2 顺序图 2.4.3 状态图 ATM类状态图 总行状态图 分行状态图 2.4.4 协作图 总博客: 2.4 建立动态模型 开发交互式系统,动态模型非常重要 步骤&#…

电商运营必备!6步教你做好电商复盘

一、业绩及关键指标 1、总体完成情况:GMV、UV、转化率、客单价、件单价、连带率、UV价值等,达成率如何,数据同比、环比增幅,与大盘对比情况如何。 2、分阶段及日销情况:预热、预售、开门红、日销、大cu高潮期等销售达…

QFileDialog 对话框类

QFileDialog 对话框类 QFileDialog 对话框类是 QDialog 类的子类, 通过这个类可以选择要打开/保存的文件或者目录。关于这个类我们只需要掌握一些静态方法的使用就可以了。 /* 通用参数:- parent: 当前对话框窗口的父对象也就是父窗口- caption: 当前对话框窗口的标题- dir: 当…

Unity游戏源码分享-仿帝国时代游戏Demo-uRTS Toolkit

Unity游戏源码分享-仿帝国时代游戏Demo-uRTS Toolkit 游戏的架构值得参考 项目地址:https://download.csdn.net/download/Highning0007/88189905

医院临床病例管理系统,促进医疗数据的管理

医疗行业的不断发展,临床病例管理系统也逐渐成为医院管理重要的一环,医院临床病例管理系统是一款集医学信息管理、科研分析以及教学为一体的平台,为医生提供更加高效、准确的病例管理方式,提高诊疗质量,为医学科研提供…

‘vue’不是内部或外部命令,也不是可运行的程序或批处理文件的原因及解决方法

今天我在用node.js的时候,结果出现如下错误: C:\Users\xiesj> vue -v vue不是内部或外部命令,也不是可运行的程序或批处理文件。 原因: 1、确定npm是否已正确安装? 2、确定vue以及vue-cli已正确安装?…

亚马逊将与FTC官员会面!为避免反垄断诉讼做最后努力!

据外媒报道,据一位熟知内情的消息人士透露,亚马逊将于下周与美国联邦贸易委员会(FTC)会面,为避免反垄断诉讼做最后努力。这一举动被视作双方在对簿公堂或者达成和解之前最后的仪式性会议。 FTC在前总统唐纳德特朗普执…

智能文件改名,轻松复制文件并删除目标文件夹中的冗余文件

亲爱的用户们,我们常遇到这样的问题——在文件复制到指定文件夹时,目标文件夹中的冗余文件不断堆积,导致文件混乱、占用存储空间的情况?现在,我们为您带来了一种智能解决方案,让文件管理更简单 第一步&…

Cat.1 bis市场竞争加剧,美格智能脱颖而出有高招

全球物联网产业正值爆发期,2G/3G退网已成趋势,4G/5G协同发展持续进行中。在未来5~10年里,蜂窝物联网将由NB-IoT、LTE-Cat.1 bis、LTE-Cat.4 和RedCap、5G等技术来满足低速、中速、高速的物联网应用场景的需求。 Cat.1 bis承接了中速率物联网…