RabbitMQ相关概念介绍

news2025/2/28 6:20:43

        这篇文章主要介绍RabbitMQ中几个重要的概念,对于初学者来说,概念性的东西可能比较难以理解,但是对于理解和使用RabbitMQ却必不可少,初学阶段,现在脑海里留有印象,随着后续更加深入的学习,就会很容易理解。对于消息队列的高手,这篇文章如有阐述不到位的,可以一起交流。

        RabbitMQ整体上是一个生产者和消费者模型,如下图。

生产者和消费者

        RabbitMQ的Producer和其他消息队列的Producer没有什么不同,都是用来将消息发送到服务器,只是在实现上有所区别,关于RabbitMQ客户端的实现,包括API和网络模型等,后面会专门有文章介绍。

        RabbitMQ Consumer连接Broker,并订阅它关注的队列,只要队列上有消息,Consumer就会接收到消息并开始消费消息。RabbitMQ的消费端默认是推模型。

        有Producer、Consumer,那么就会有一个地方来存储、转发消息,RabbitMQ Broker完成这项工作,在这里,可以先简单的把一个Broker理解为一个RabbitMQ 节点或实例。

队列

        Queue是RabbitMQ的内部对象,是实际存储Producer发送的消息的地方,这点和Kafka存储消息的模型不一样。

        Producer发送消息并不是直接发送到Queue,而是在发送消息的请求中声明Exchange(交换器) 和 RoutingKey(绑定键),Broker会根据Exchange 和 RoutingKey找到相应的Queue,并保存消息内容到Queue。

        Consumer订阅的是Queue,所有直接从Queue上消费消息。RabbitMQ支持多个Consumer同时订阅一个Queue,这时Broker会轮询Consumer,把Queue中的消息均摊到所有订阅次Queue的Consumer。但是RabbitMQ不支持队列层面的广播消费。

交换器、路由键、绑定

        下面介绍的是RabbitMQ中非常重要的概念,生产和消费消息都是以此为基础,也是对AMQP协议的具体实现。

        交换器(Exchange)负责按照一定的规则分发消息,Producer发送的消息实际上是到Exchange,由Exchange将消息路由到一个或多个Queue,如果找不到Queue,则根据客户端配置,要么返回给Producer,要么直接丢弃。

        路由键(RoutingKey)指定了路由规则,在Producer发送消息的时候,一般会指定RoutingKey,这样就知道消息需要路由到哪里。

        绑定(BingKey)将Exchange和Queue关联起来,这样,RabbitMQ就知道如何正确的将消息路由到队列了。

        关于路由键和绑定键,对于初学者可能有点混淆,这里分享下我的理解:路由键是在客户端发送消息的时候,告诉服务器,我发送的消息根据我指定的路由键去找队列;而绑定键,是在创建的时候使用的,告诉服务器,交换器是如何与队列关联的。具体可以对照下面的代码示例来理解可能容易点。

交换器类型

        RabbitMQ提供了多个交换器类型来满足不同的需求:fanout、direct、topic、headers。

fanout

fanout exchange会把消息发送到所有与该交换器绑定的队列中,会忽略Procuder发送消息时申明的RoutingKey。如下图,Producer发送message给fanout_exchange,并制定了routing key 为 info,最终queue1和queue2都收到了这条消息。

 

    public void testFanoutExchange() throws IOException, TimeoutException {
        // fanout类型的 exchange, 在server端忽略 routing key,只要发送到 exchange,任何和这个 exchange 绑定的 queue都会收到这条消息

        String exchangeName = "fanout_exchange";
        String queue1 = "queue1";
        String queue2 = "queue2";
        String warning = "warning";
        String info = "info";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, null);

        channel.queueDeclare(queue1, true, false, false, null);
        channel.queueDeclare(queue2, true, false, false, null);

        channel.queueBind(queue1, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, info);

        String message = "info";
        channel.basicPublish(exchangeName, info, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        channel.close();
        connection.close();

    }

direct

direct exchange也是一种比较简单的exchange,在路由的时候,只有routing key 和binding key完全匹配的时候,才会路由到queue。如下图,Producer发送消息到direct_exchange,routing_key 是info,只有queue2才会收到消息。

    public void testDirectExchange() throws IOException, TimeoutException {
        String exchangeName = "direct_exchange";
        String queue1 = "direct_queue1";
        String queue2 = "direct_queue2";
        String warning = "warning";
        String info = "info";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, null);
        channel.queueDeclare(queue1, true, false, false, null);
        channel.queueDeclare(queue2, true, false, false, null);

        channel.queueBind(queue1, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, warning);
        channel.queueBind(queue2, exchangeName, info);

        String message = "direct exchange";
        channel.basicPublish(exchangeName, info, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        channel.close();
        connection.close();

    }

topic

topic exchange 是比较灵活,实际项目中用的比较多的一种,它也是将消息路由到 routing key 和 binding key 匹配的队列中,匹配规则如下:

  • routing key 和 binding key 以点(.)为分隔符,被点号分割的字符串为一个独立的匹配单元,如com.rabbitmq.client,com.java.util 等等。
  • ‘#’ 和 ‘*’ 用于做模糊匹配,‘#’ 匹配0个或多个词,‘*’ 匹配0个词。

下图中topic_exchange绑定了两个queue,*.rabbitmq.* 绑定颅queue1, *.*.client 、 com.# 都绑定了queue2,当binding key 为 com.rabbitmq.client,匹配queue1和queue2,因此都会收到消息;当binding key 为 org.rabbitmq.server 时,只有queue1匹配,当 binding key 为 com.hidden.demo 时,只有queue2匹配,当bingding key 为 aaa 时,queue1 和 queue2 都不匹配。

 

 

    public void testTopicExchange() throws IOException, TimeoutException {
        // * 匹配一个单词,# 匹配0个或多个单词
        String exchangeName = "topic_exchange";
        String queue1 = "topic_queue1";
        String queue2 = "topic_queue2";
        String routing1 = "*.rabbitmq.*";
        String routing2 = "*.*.client";
        String routing3 = "com.#";

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, null);
        channel.queueDeclare(queue1, true, false, false, null);
        channel.queueDeclare(queue2, true, false, false, null);

        channel.queueBind(queue1, exchangeName, routing1);
        channel.queueBind(queue2, exchangeName, routing2);
        channel.queueBind(queue2, exchangeName, routing3);

        String message = "topic exchange";
        // queue1, queue2
        channel.basicPublish(exchangeName, "com.rabbitmq.client", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // queue1
        channel.basicPublish(exchangeName, "org.rabbitmq.server", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // queue2
        channel.basicPublish(exchangeName, "com.hidden.demo", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        // NONE
        channel.basicPublish(exchangeName, "aaaa", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    }

headers

headers exchange 不依赖路由键的匹配规则来路由消息,而是根据消息的headers属性值,RabbitMQ收到消息后,对比消息的headers中的属性值是否与queue、exchange绑定时指定的属性值一致,如果完全匹配,则路由消息到队列。由于headers exchange 性能很差,所以这里不做代码演示,感兴趣的小伙伴可以实验一下。

        以上就是对RabbitMQ中的一些概念做了一下介绍,小伙伴们可以多做实验加深理解,我也是在写了几个UnitTest之后,才理解这些概念,尤其是Exchange、RoutingKey 和 BindingKey。

        感谢各位小伙伴的阅览,也很高兴能和各位小伙伴一起钻研探讨技术问题。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/347179.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

电源自动测试系统-电源模块批量自动化测试方案ATECLOUD-Power

1、测试名称 基于ATECLOUD的电源模块测试方案 2、测试目的 提升电源模块测试效率,减少测试人员成本,降低测试专业知识要求,增加数据精准度,为企业提供专业决策的数据支持,从而降本增效。 3、测试设备 示波器、电子…

20230215小结

1 t-sne 原理:利用两个向量之间的欧式距离转化成条件概率分布,可以把高维度的数据转化为低维度(1000,64)-》(1000,2),原先每个样本有64维度,转化为2维 2 swi…

linux系统编程入门

一、搭建环境 1、安装 Linux 系统(虚拟机安装、云服务器) https://releases.ubuntu.com/bionic/ 2、安装 XSHELL、XFTP https://www.netsarang.com/zh/free-for-home-school/ 3、安装 visual studio code https://code.visualstudio.com/ 4、Linu…

Unreal Engine角色涌现行为开发教程

在本文中,我将讨论如何使用虚幻引擎、强化学习和免费的机器学习插件 MindMaker 在 AI 角色中生成涌现行为。 目的是感兴趣的读者可以使用它作为在他们自己的游戏项目或具体的 AI 角色中创建涌现行为的指南。 推荐:使用 NSDT场景设计器 快速搭建 3D场景。…

一种基于加密域的数字图像水印算法的设计与实现(附Matlab源码)

一种基于加密域的数字图像水印算法的设计与实现 项目介绍 毕设项目 题目:一种基于加密域的数字图像水印算法的设计与实现 随着数字媒体技术的发展,数字媒体版权的保护得到了越来越多人的重视,数字水印技术作为数字媒体版权保护的有效手段…

通达信交易接口以什么形式执行下单的?

通达信程交易接口 以API形式来执行下单接口,一般不再需要通过接口系统之间进行连接,通过直接调用通达信dll交易函数的方式直接进行交易,包括下单,撤单,查询资金股份、当日委托、当日成交等方面都能很快的执行出来。以a…

【JDK8】MyBatis源码导入Idea

1.背景 为了更好的将MyBatis的开发设计思想带到日常开发工作,将MyBatis源码导入到本地开发工具中(idea)。我自己在导入的时候碰到几个问题,耽误了自己一点时间,这里我把它们记下来,后边的小伙伴可不要踩我的坑。 Java版本&#x…

黑帽SEO是什么?做了真的能够一直保持排名?

随着Google演算法一次又一次的更新,现在愈来愈多人重视所谓的网站SEO。但是内行的人都知道,网站要提高排名并非一天两天的事,所以有些人就会使用不法手段想借此提高排名,这也就是常听到的「黑帽SEO」。但是做黑帽SEO真的能快速提高…

【爬虫+数据清洗+可视化分析】用Python分析哔哩哔哩“狂飙”的评论数据

一、背景介绍您好,我是马哥python说,一枚10年程序猿。2023开年这段时间,《狂飙》这部热播剧引发全民追剧,不仅全员演技在线,更是符合反黑主旋律,因此创下多个收视率记录!基于此热门事件&#xf…

嵌入式开发之Vscode实用插件大全

嵌入式开发之Vscode实用插件大全① Chinese (Simplified) (简体中文) (神器)② C/C &CMake & C/C Extension Pack(神器)③ Better C Syntax④ Doxygen Documentation Generator(神器)⑤ vscode-ico…

存储类别、链接与内存管理(一)

1、一些必要的基础概念 (1)对象 从硬件的角度,被存储的每个值都被占用了一定的物理内存,C语言把这样的一块内存称为对象对象可以存储一个或多个值一个对象可能并未存储实际的值,也可能存储一个或多个值,但…

初阶函数递归经典例题(1)

1、递归实现n的k次方 2、计算一个数的每位之和(递归实现) 3、strlen的模拟(递归实现) 讲解之间我们先回顾下递归的知识点: 1、什么是递归? 程序调用自身的编程技巧称为递归。(即一个函数在其…

蓝牙耳机什么牌子的好又实惠?实惠好用的蓝牙耳机品牌

随着科技的发展,耳机领域的新品是越来越多,很多品牌如雨后春笋般涌现,耳机的样式也是层出不穷,下面小编整理了几款实惠好用的蓝牙耳机品牌。 一、南卡小音舱蓝牙耳机 参考价格:239元 单耳重:3.1g 推荐系…

用ChatGPT写一个基于ChatGPT API的对话机器人

采用的是国区的网站 Q:写一个调用chatgpt的聊天机器人的python程序 A: python import requests# 聊天机器人的API地址 url https://api.chatgpt.com/v2/query# 请求参数 params {prompt: 你好,user_key: YOUR_USER_KEY }# 发送请求 response request…

数据仓库实战

目录1、最佳实战1.1 表的分类1.2 ETL策略1.3 任务调度2、项目实战2.1 项目概述2.2 数据描述2.3 架构设计2.4 环境搭建2.5 项目开发1、最佳实战 1.1 表的分类 维度建模中表的类型:事实表和维度表 事实表又可以分为:事务事实表、周期快照事实表、累积快照…

公司项目引入这种方式,开发应用真是又快又准!

试想一下,你开足马力提了一串需求,给开发精英团队也好,给外包也行,都要等个半年甚至更久才会给到你一个满意的产品,你是否还有动力? 这还不止,业务越来越复杂,最初的需求也在随着着…

jsp医院管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

一、源码特点 jsp 医院管理系统 是一套完善的web设计系统,对理解JSP java编程开发语言有帮助,系统具有完整的源代码和数据库,系统主要采用B/S模式开发。开发环境为TOMCAT7.0,Myeclipse8.5开 发,数据库为Mysql,使用ja…

大数据之-Nifi-Nifi的安装_启动_认识Nifi的操作台---大数据之Nifi工作笔记0002

然后我们看一下如何安装nifi 这个上一节已经说了 然后看一下环境准备,这个自己去安装就可以了,需要jdk,1.8就可以了,然后 maven安装上就可以了 然后去下载,这里下载Linux版本的 1.9.2的版本比较稳定 下载以后,避免端口冲突要修改端口默认是8080,修改为58080 然后启动很简单,看…

node.js校园快递智能仓储物流系统vue

开发语言 node.js 框架:Express 前端:Vue.js 数据库:mysql 数据库工具:Navicat 开发软件:VScode 系统能够提供简洁、全面且清爽的用户界面,使操作人员可以直观明了。 系统可以实现管理员信息管理、收件人信息管理、快…

【C++】智能指针(万字详解)

🌈欢迎来到C专栏~~智能指针 (꒪ꇴ꒪(꒪ꇴ꒪ )🐣,我是Scort目前状态:大三非科班啃C中🌍博客主页:张小姐的猫~江湖背景快上车🚘,握好方向盘跟我有一起打天下嘞!送给自己的一句鸡汤&…