深入详解高性能消息队列中间件 RabbitMQ

news2025/1/19 3:14:52

 目录

1、引言

2、什么是 RabbitMQ ?

3、RabbitMQ 优势

4、RabbitMQ 整体架构剖析

4.1、发送消息流程

4.2、消费消息流程

5、RabbitMQ 应用

5.1、广播

5.2、RPC


VC++常用功能开发汇总(专栏文章列表,欢迎订阅,持续更新...)icon-default.png?t=N7T8https://blog.csdn.net/chenlycly/article/details/124272585C++软件异常排查从入门到精通系列教程(专栏文章列表,欢迎订阅,持续更新...)icon-default.png?t=N7T8https://blog.csdn.net/chenlycly/article/details/125529931C++软件分析工具从入门到精通案例集锦(专栏文章正在更新中...)icon-default.png?t=N7T8https://blog.csdn.net/chenlycly/article/details/131405795C/C++基础与进阶(专栏文章,持续更新中...)icon-default.png?t=N7T8https://blog.csdn.net/chenlycly/category_11931267.html开源组件及数据库技术(专栏文章,持续更新中...)icon-default.png?t=N7T8https://blog.csdn.net/chenlycly/category_12458859.html网络编程与网络问题分享(专栏文章,持续更新中...)icon-default.png?t=N7T8https://blog.csdn.net/chenlycly/category_2276111.html

1、引言

       在进行系统设计的时候,各个模块、服务器之间为了实现数据的交互,通常是建立连接通过发送消息来进行。如果将他们一一建立连接,就会出现链路太多,每一条链路都必须感知对端等问题。此场景下消息将非常混乱,后期维护也将非常痛苦。为了解决这个问题,精简系统,引入RabbitMq。各相关模块不在相互发送消息,而将消息都发送给RabbitMQ,由RabbitMQ负责将消息传递出去。

       那么,什么是RabbitMQ?RabbitMQ又是如何实现这些功能的呢?   

2、什么是 RabbitMQ ?

       在讲RabbitMQ之前,需要先了解一下AMQP的概念。

       AMQP,即Advanced Message Queuing Protocol(高级消息队列协议),是一个提供统一消息服务的应用层标准高级消息队列协议。AMQP是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件传递消息,不受客户端/中间件不同产品、不同开发语言等条件的限制。该协议是一种二进制协议,提供客户端应用于消息中间件之间异步、安全、高效的交互。相对于我们常见的REST API,AMQP更容易实现,可以降低开销,同时灵活性高,可以轻松的添加负载平衡和高可用性的功能,并保证消息传递,在性能上AMQP协议也相对更好一些。

       RabbitMQ是AMQP的一个开源实现,服务器端用Erlang语言编写,用于在分布式系统中存储转发消息,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、 ActionScript、XMPP、STOMP等,支持AJAX。 MQ(Messages Queue)是一种应用程序与应用程的通信方法。RabbitMQ相当于生产者与消费者的模式,消息发送端(生产者)将消息写入消息队列,消息接收端(消费者)从消息队列中取出消息、消费消息;而消息的发送端无需知道消息接受端的存在,反之亦然。

3、RabbitMQ 优势

        RabbitMQ主要有以下几个优势:

  • 可靠性(Reliablity):使用了一些机制来保证可靠性,比如持久化、传输确认、发布确认。
  • 灵活的路由(Flexible Routing):在消息进入队列之前,通过Exchange来路由消息。对于典型的路由功能,Rabbit已经提供了一些内置的Exchange来实现。针对更复杂的路由功能,可以将多个Exchange绑定在一起,也通过插件机制实现自己的Exchange。
  • 消息集群(Clustering):多个RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 高可用(Highly Avaliable Queues):队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
  • 多种协议(Multi-protocol):支持多种消息队列协议,如STOMP、MQTT等。
  • 多种语言客户端(Many Clients):几乎支持所有常用语言,比如Java、.NET、Ruby等。
  • 管理界面(Management UI):提供了易用的用户界面,使得用户可以监控和管理消息Broker的许多方面。
  • 跟踪机制(Tracing):如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
  • 插件机制(Plugin System):提供了许多插件,来从多方面进行扩展,也可以编辑自己的插件。

4、RabbitMQ整体架构剖析

        在详细介绍RabbitMQ之前,先介绍几个重要的概念:

  • Queue:消息队列
  • Exchange:交换机,它会按照路由规则来投递消息
  • Routing key:路由关键字,exchange会根据它来进行消息投递
  • Bind:绑定了queue和exchange,根据路由规则将消息会投递到对应的消息队列中去。
  • Producer:消息生产者
  • Consumer:消息的消费者

       RabbitMQ的整体架构图如下所示:

P(Producer,消息生产者)负责发送,C(Consumer,消息消费者)负责消费消息。其中交换机exchange、队列Queue的定义、exchange与Queue的绑定既可以放在发送端,也可以放在消费端,但是不管放在何处定义,要在使用前定义,否则会出错。本文统一将exchange放在生产者端来定义,而将queue的定义,queue与exchange的绑定放在消费端来处理。另外,为了防止第一次使用exchange是在消费端,可以在消费端也同时定义exchange。本文不考虑这种情况,默认在消费端使用exchange的时候已定义过。

4.1、发送消息流程

      P端发送消息的基本过程是:

1)连接服务器;
2)声明exchange,并设置其相关属性;
3)将消息发送到exchange。

其中,exchange有3种类型:fanout、routing、topic:

1)fanout不处理路由键,为空即可,只要简单的将队列绑定到交换机上,那么发送到交换机上的消息都会被转发到与该交换机绑定的所有队列上。
2)Routing处理路由键,需要将一个队列绑定到交换机上,要求消息与一个特定的路由键完全匹配。
3)Topic将路由键与某模式进行匹配,此时队列需要绑定到一个模式上。匹配的规格是”#”匹配一个或多个词,”*”匹配一个词。

4.2、消费消息流程

      C消费消息的基本过程是:

1)连接服务器;
2)声明队列queue及其属性(持久化、无消费者时是否自动删除队列等等);
3)设置routingkey,并且通过routingkey将queue与exchange绑定到一起;
4)等待消息,消费消息。

其中,queue可以设置的属性有:Exclusive、auto_delete、durable。

1)Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。
2)Auto_delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
3)Durable:服务器重启后,队列不会丢失。 

      对上述的exchange、queue、binding的一个例子:

Mq.queue_bind(“QueueTest”, “ExchangeTest”, “Test”)

这个绑定的意思是:任何发送到交换机ExchangeTest的具有路由键Test的消息都会被路由到名为QueueTest的队列中。

5、RabbitMQ 应用

       一般平台的消息大致分为两种类型:notif和req-ack-notif。对应于rabbitmq正好有两种模型:publish/subscribe和rpc。下面根据实际应用来讲解这两个模型。

5.1、广播

      假设应用服务器收到了一条消息A,需要广播给其他多个业务服务器。按照图一中rabbitmq的基本结构我们应该能想到两种方式:

Method1

Method2

上述两种方法哪一种能实现我们的目的?答案是Method1,如果采用Method2的话,queue会将消息依次分发给两个消费端,例如客户端C1收到消息1,3,5…,客户端C2收到消息2,4,6…。

       虽然此种方法不能实现我们的目的,但在此处插入一点,及每条消息的处理量可能而且几乎肯定是不同的,所以有时会出现客户端C1处理完了N条消息,但客户端C2一条还没处理完,为了解决这个问题,rabbitmq提供了公平调度的概念即Fair dispatch:Rabbitmq不会在同一时间给工作者分配多个任务,只有在工作者完成任务之后,才会再次接收到任务。

       回到刚才讨论的地方,我们已经确立了使用Method1来完成该功能,现在根据该方法进行一些简单的编码验证(注:验证语言为python)。publish/subscribe模型之P客户端代码如下:

import pika

#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#声明交换机
channel.exchange_declare(exchange='exchangeTest', type='fanout')

#发送消息
channel.basic_publish(exchange='exchangeTest', routing_key='', body='Hello World!')
connection.close()

publish/subscribe模型之C客户端代码:

import pika

#建立连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

#创建queue
channel.queue_declare(queue=’QueueTest’)

#绑定
channel.queue_bind(exchange=’exchangeTest’, queue=’QueueTest’)
def callback(ch, method, properties, body):    
print “ [x] Received %r” %(body, )    
channel.basic_consume(callback, queue =’QueueTest’, no_ack=True)
channel.start_consuming()

       AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流, 被分配了一个整数标识,自动由Connection()类的.channel()方法维护。每个AMQP程序至少要有一个连接和一个channel。

5.2、RPC

       对于大部分消息我们不仅仅是通知,更多的是需要对方在接收到消息后给我们回复的。此时,
我们就需要rabbitmq提供的RPC模型,如下图所示:

       RPC模型与广播模型相比,最大的区别是消费者客户端在接收到消息的时候,需要给发送者P回复消息。而同样的,消息生产者P也不仅仅是做为发送端了,他还需要接收来自消费端C回复的消息。

       由P到C我们知道直接将Queue1绑定到exchange上就OK了,那么C回复消息的时候通过什么回给P呢?为此,rabbitmq在P发送消息的时候,提供设置回调队列及关联ID,C在给P回复消息的时候,通过回调队列即可。提供关联ID的目的是即使P端收到Queue2的消息,也要验证Correlation_Id是否匹配,不匹配的话,直接忽略。

       使用如下的代码进行验证(注:验证语言为python),RPC模型之P端的代码如下:

import pika
class Center(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters( 
host='localhost'))
        self.channel = self.connection.channel()      
        #定义接收返回消息的队列,此处为一随机生成的队列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue
        #等待接收消息
        self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)

    #定义接收到返回消息的处理方法
    def on_response(self, ch, method, props, body):
        self.response = body
    def request(self, n):
        self.response = None

#发送计算请求        
self.channel.basic_publish(exchange='',
 routing_key='compute_queue', properties=pika.BasicProperties
(reply_to = self.callback_queue,), body=str(n))
        #接收返回的数据
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
center = Center()
response = center.request(30)
print " [.] Got %r" % (response,)

RPC模型之C端代码:

import pika

class Center(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters( 
host='localhost'))
        self.channel = self.connection.channel()      
        #定义接收返回消息的队列,此处为一随机生成的队列
        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        #等待接收消息
        self.channel.basic_consume(self.on_response, no_ack=True,queue=self.callback_queue)
    #定义接收到返回消息的处理方法
    def on_response(self, ch, method, props, body):
        self.response = body
    def request(self, n):
        self.response = None

#发送计算请求        
self.channel.basic_publish(exchange='',
 routing_key='compute_queue', properties=pika.BasicProperties
(reply_to = self.callback_queue,),body=str(n))
        #接收返回的数据
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)
center = Center()
response = center.request(30)
print " [.] Got %r" % (response,)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1177769.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【工具使用-信号叠加演示】一种演示不同频率信号叠加的工具

一,简介 本文主要介绍一种网页演示不同频率的正弦信号叠加的工具,供参考。 二,说明 网址:https://teropa.info/harmonics-explorer/ 打开后可以设置不同的信号,然后最上面是不同信号的频率叠加之后的效果&#xff…

Blender vs 3ds Max:谁才是3D软件的未来

在不断发展的3D建模和动画领域,两大软件巨头Blender和3ds Max一直在争夺顶级地位。 随着技术的进步和用户需求的演变,一个重要问题逐渐浮出水面:Blender是否最终会取代3ds Max?本文将深入探讨二者各自的优势和劣势、当前状况&…

2024好用免费的mac苹果电脑杀毒软件CleanMyMac

杀毒软件在苹果家族中是一个小众软件,百度搜索苹果电脑杀毒软件,可能各种杀软良莠不齐,因为在这个市场非常小,绝大多数都是冲着“清理”去的,而不是杀毒。最近测试了一款Mac电脑杀毒软件,杀毒效果也是一般般…

WebDAV之π-Disk派盘 + MiXplorer

MiXplorer是一款非常强大实用的手机文档管理器,能给用户提供了一系列的文档处理功能,包括本地文件浏览、文件排序、文件筛选、切换视图、新建文件、添加收藏等等,同时还能将你手机里的所有文件都罗列出来,简洁明了,让用户一眼就能够找到相应的文件并对其进行编辑,或是删除…

YOLOv5:通过真实结果的txt文件与预测结果的txt文件进行结果评估

YOLOv5:通过真实结果的txt文件与预测结果的txt文件进行结果评估 前言前提条件相关介绍项目结构YOLOv5:通过真实结果的txt文件与预测结果的txt文件进行结果评估val_txt.py输出结果 参考 前言 由于本人水平有限,难免出现错漏,敬请批…

Istio快速入门

Istio快速入门 目录 文章目录 Istio快速入门目录本节实战前言1、安装安装方式1.使用 istioctl install2.使用 istioctl manifest generate 安装3.使用 Helm 安装4.使用 Istio Operator 安装 安装 Istio🚩 实战:istioctl 方式安装istio-2023.11.3(测试成功…

SRC实战 | CORS跨资源共享漏洞

CORS跨资源共享 跨源资源共享 (CORS) 是一种浏览器机制,允许网页使用来自其他页面或域的资产和数据。 大多数站点需要使用资源和图像来运行它们的脚本。这些嵌入式资产存在安全风险,因为这些资产可能包含病毒或允许服务器访问黑客。 CORS响应头 CORS通…

物联网AI MicroPython学习之语法 sys系统相关

学物联网,来万物简单IoT物联网!! sys 介绍 sys 模块中提供了与micropython运行环境有关的函数和变量。 常量说明 常量定义常量说明sys.argv当前程序启动的可变参数列表sys.byteorder字节顺序 (‘little’ - 小端, ‘big’ - 大…

深入理解强化学习——多臂赌博机:10臂测试平台

分类目录:《深入理解强化学习》总目录 为了大致评估贪心方法和 ϵ − \epsilon- ϵ−贪心方法相对的有效性,我们将它们在一系列测试问题上进行了定量比较。这组问题是2000个随机生成的 k k k臂赌博机问题,且 k 10 k10 k10。在每一个赌博机问…

Python的切片操作详细用法解析

在利用Python解决各种实际问题的过程中,经常会遇到从某个对象中抽取部分值的情况,切片操作正是专门用于完成这一操作的有力武器。理论上而言,只要条件表达式得当,可以通过单次或多次切片操作实现任意切取目标值。切片操作的基本语…

【计算机架构】程序指令计数 | 功耗计算 | 电力功耗 | 安德尔定律(Amdahl‘s Law)

0x00 程序的指令计数 程序的指令计数(Instruction Count)由程序本身、ISA(指令集架构)和编译器决定。这表示一个程序中包含的指令数量受到程序编写方式、计算机体系结构和编译器的影响。 每条指令的平均周期数(Averag…

如何更改IP地址为美国IP?美国静态住宅代理如何搭建?

相信很多做跨境电商或外贸如TikTok shop、Facebook商店、Amazon、领英的玩家都需要搭建独享的美国IP环境来运营店铺,那么如何搭建稳定独享的IP环境呢?加下来为你详细介绍,助力您的跨境业务。 一、选择合适的代理IP 代理IP可以帮助隐藏用户真…

XSS漏洞利用工具BeEF

BeEF是Browser Exploitation Framework的缩写。随着人们越来越多地关注针对包括移动客户端在内的客户端的网络传播攻击,BeEF使专业的渗透测试人员可以使用客户端攻击向量来评估目标环境的实际安全状况。与其他安全框架不同,BeEF超越了硬化的网络边界和客…

breach1靶机攻略

breach1 准备 这个靶机ip固定为 192.168.110.140 使用vmware的话,将它加入一张仅主机的网卡就行,比如vmnet7,然后vmnet设置成192.168.110.0网段,kali也新建一张网卡加入该网卡 扫描 nmap --min-rate 10000 -p- 192.168.110.1…

登录Tomcat控制台,账号密码输入正确但点击登录没反应不跳转到控制台页面

在tomcat-users.xml里面可以查看登录tomcat控制台的账号密码,如果账号密码输入正确还是登录不进去,则很有可能是tomcat的账号被锁了(可在catalina.xxx.log里面查看)。tomcat账号被锁定后默认情况是不访问控制台后5分钟自动解锁&am…

第六章:Property-based Testing and Test Oracles

文章目录 Test OraclesActive and Passive Test OraclesTypes of Test OraclesFormal, executable specificationsSolved examplesMetamorphic oraclesAlternative implementations (备用实现)Heuristic oracles (启发式)The Golden Program!Oracle Deviation (Oracle偏差)T…

Rust编程基础之引用与借用

1.引用与借用 在上一章节最后的代码中, 我们必须将 String 返回给调用函数,以便在调用 calculate_length 后仍能使用 String,因为 String 被移动到了 calculate_length 内。相反我们可以提供一个 String 值的引用(reference)。引…

BGF-YOLO | 增强版YOLOV8 | 用于脑瘤检测的多尺度注意力特征融合

基于You Only Look Once(YOLO)的目标检测器在自动脑瘤检测中展现出卓越的准确性。在本文中,我们开发了一种新的BGF-YOLO架构,通过将双层路由注意力(BRA)、广义特征金字塔网络(GFPN)和第四检测头整合到YOLOv8中来实现。BGF-YOLO包含了一个注意力机制,用于更加关注重要的…

北马“破3收割机”,特步成赛场和市场双面“赢家”

北马已经落下帷幕,尽管当天全国共有20多场城市马拉松开跑,但作为国内第一个城市马拉松,有“国马”之称的北马,还是赛事的中心点。北马相关的所有标签,都会在跑圈内外引起足够多的讨论,比如一双跑鞋——顶级…

局域网和广域网的区别

局域网靠交换机通信; 广域网靠路由器将多个局域网连接起来通信; 在防火墙内部的叫内网,外部的叫外网; 用ipconfig查到的ip是本机的内网IP;在网页上看到的是连接互联网所用的IP即往往IP 记住:IP地址是唯一的&#x…