RabbitMQ灵活运用,怎么理解五种消息模型

news2024/11/15 10:21:18

RabbitMQ灵活运用,怎么理解五种消息模型

  • 简介
  • 一、AMQP协议
  • 二、交换机类型与默认交换机
    • 1. 交换机的四种类型
    • 2. 默认交换机
  • 三、五种模式速览
    • 1. 一对一简单模式
    • 2. work模式(轮询)
    • 3. 发布/订阅模式
    • 4. 路由模式(自称direct模式)
    • 5. Topic模式
  • 四、实例
    • 1. 生产者代码
    • 2. 消费者代码
  • 五、总结


简介

上次我们介绍了,为什么rabbitMQ会被很多人中意选型,从而成为火热的MQ组件,今天就先来说一说MQ的基础使用 ———— 其五种消息模型


一、AMQP协议

我们都知道,RabbitMQ是一个使用Erlang语言,基于AMQP协议的MQ组件,那什么是AMQP协议呢,我们就从这开始今天的学习。

AMQP全称为 Advanced Message Queuing Protocol(高级消息队列协议),是一个面向消息的中间件传输协议,用于在应用程序之间进行异步消息通信。

AMQP协议定义了多种角色和服务,包括生产者、消费者、交换器、队列等。其中生产者负责生成消息,消费者负责接收和处理消息,交换器则负责将消息路由到队列中。如下图
在这里插入图片描述

AMQP协议的消息传输基于消息队列,支持消息的确认、持久化、事务等功能,同时支持不同的消息传输模式,如点对点(point-to-point)和发布-订阅(publish-subscribe)模式。

我们今天要介绍的RabbitMQ的五种模型,其实都是基于AMQP的基础模型或其演变而来

二、交换机类型与默认交换机

1. 交换机的四种类型

在进一步讲解之前,我们需要知道RabbitMQ,交换机有四种类型,分别是:Direct、Fanout、Topic和Headers。

  • direct:
    按照消息的路由键(routing key)完全匹配来投递消息。直接匹配模式,将消息发送到与路由键完全匹配的队列中。direct模式可以使用RabbitMQ自带的默认交换机,所以不需要将交换机进行任何绑定操作

  • topic:
    使用通配符进行模糊匹配,消息会带有一个路由键(routing key),而队列绑定到交换机上时,也可以指定主题,而且还能使用一定的正则形式,只要主题匹配上消息的路由键,该消息就会发送至该队列
    符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
    符号“ * ” 匹配不多不少一个词 eg:“ log.* ”只能匹配到“log.erro”

  • headers:
    按照消息头(header)匹配来投递消息。根据消息头的键值对匹配,当消息头与某个绑定的headers完全匹配时,才会将消息发送到该队列中。

  • fanout :
    将接收到的所有消息广播到它知道的所有队列中,如果routing_key 有指定也不会生效

不难发现,这四种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,四种类别对应着四种判断角度。fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;headers —— 这种模式的交换机不再以消息的routing_key作为判断依据,而是在队列绑定交换机时,每个队列需提供一个Map,当消息发送给交换机时,交换机会解析消息头,看看有没有能和各队列Map吻合的属性,有则发给该队列。

2. 默认交换机

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同

如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。默认交换机可以通过设置routing_key来指定消息的目的地,例如:

//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。

三、五种模式速览

1. 一对一简单模式

在这里插入图片描述

  • 概念
    生产者将消息发送到“ hello”队列。使用者从该队列接收消息。

  • 图解
    “ P”是我们的生产者
    “ C”是我们的消费者
    中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。需要注意的是,虽然看起来生产者直接连接了队列,但是实际上,它连接的是rabbitMQ的默认交换机

在这里插入图片描述


2. work模式(轮询)

在这里插入图片描述

  • 概念
    work模式是一个生产者,一个队列,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列,C1\C2是竞争关系,一个消息被C1消费,C2将没有消息

  • 优点及作用
    一个生产者一个队列多个消费者模式能够并行化工作,解决了消息积压问题

  • 工作原理
    默认情况下,RabbitMQ将每个消息按顺序发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

  • 注意
    轮询分发采用平均,导致机器性能的浪费,可以将消费者信道设置如下,代表不公平分发

int preFetchCount = 1;
// 该参数的意思是消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者,该设置需要和手动ACK配合使用
channel.basicQos(preFetchCount )


3. 发布/订阅模式

前两种发布模式,在图中都没有画出 Exchange交换机,属于是一种模型的简化,实际上上两种模式,都将消息发布给了rabbitMQz自带的默认交换机,然后默认交换机再将消息转发给消息队列
(PS:每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同)
在这里插入图片描述

  • 概念
    和模式二不同的是,模式三是没有路由规则,多个队列,多个消费者生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,因此一个消息可以被多个消费者消费

4. 路由模式(自称direct模式)

在这里插入图片描述

  • 概念
    交换机连接接队列发送消息需要指定routing_key,所以模式四就是生产者发送消息到交换机并且要指定routing_key,消费者将队列绑定到交换机时需要指定路由key
  • 算法
    背后的路由算法很简单:消息自带一个routing_key(亦称路由键),交换机会把该消息路由给与其routing_key完全匹配的队列

在这里插入图片描述
在direct模式中,第一个队列以 orange为 key 绑定至交换机 x,第二个队列以black和green绑定。注意:如果有消息没有使用上述key,比如某个消息的key 是 red ,那么该消息将会被丢弃

另外,用相同的路由键可以绑定多个队列
在这里插入图片描述


5. Topic模式

在这里插入图片描述

  • 概念
    topic模式的routing_key使用通配符组成进行模糊匹配

符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
符号“ * ” 只匹配一个词 eg:“ log.* ”只能匹配到“log.erro”

举例说明,当使用下列 routing_key 发送消息时:

  1. “quick.orange.rabbit”的消息将传递到两个队列;
  2. “lazy.orange.elephant ”也将发送给他们两个;
  3. “quick.orange.fox ”只会进入Q1,而“ lazy.brown.fox ”只会进入Q2;
  4. “lazy.pink.rabbit ”将被传递到Q2只有一次,即使两个绑定都匹配;
  5. “quick.brown.fox ”与任何绑定都不匹配,因此将被丢弃;
  6. “orange“ 和“”quick.orange.male.rabbit“,因为单词个数对不上,则不会被匹配
  7. “lazy.orange.male.rabbit ”即使有四个单词,也将匹配最后一个绑定,并将其传送到Q2队列。

四、实例

我们以最复杂的Topic模式为例,实际写一段java代码

1. 生产者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicProducer {

    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建Topic类型的交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        // 待发送的消息
        String routingKey = "topic.key1";
        String message = "hello rabbitmq, this is topic message";

        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
        System.out.println("Sent message: " + message + ", routingKey: " + routingKey);

        channel.close();
        connection.close();
    }
}

2. 消费者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TopicConsumer {

    private static final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 创建Topic类型的交换机,由于发送者已创建,此步其实可省略,一般由发送方建立交换机
        // channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个非持久、独占、自动删除的队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定路由键为 "topic.#" 的消息
        channel.queueBind(queueName, EXCHANGE_NAME, "topic.#");

        System.out.println("Waiting for messages. To exit press CTRL+C");

        // 消费消息
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String routingKey = envelope.getRoutingKey();
                String message = new String(body, "UTF-8");
                System.out.println("Received message: " + message + ", routingKey: " + routingKey);
            }
        };

        channel.basicConsume(queueName, true, consumer);
    }
}


五、总结

rabbitMQ的五种模式就介绍完了,其实后三种模式非常相像,我们可以这么理解Topic模式

当队列用‘#’绑定时它将接收所有消息,与routing_key无关,此时就像fanout模式一样
当队列绑定中不使用‘*’和‘#’时,主题交换就像direct模式一样。

而至于前两种模式,由于模型中不带有交换机,所以生产者和消费者都是直接连接的Queue,则就是直肠子的模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/697763.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Android 应用自动开启辅助(无障碍)功能并使用辅助(无障碍)功能

一.背景 由于最近的项目需要开启无障碍功能然后实现对应的功能需求,但是由于需求是需要安装后就开启辅助功能,不要在繁琐的在设置中开启辅助功能,所以需要如何在应用中开启辅助功能。 二.前提条件 将普通应用转换成系统应用,然后将系统的framework.jar包放到应用中并且可以…

vscode配置task.json和launch.json启动调试

首先说一下参考博文: 文章标题“VScode 调试教程 tasks.json和launch.json的设置(超详细)” 地址:https://blog.csdn.net/qq_59084325/article/details/125662393 官方文档太官方,其他人的文档也看过,单独…

微信小程序浏览docx,pdf等文件在线预览使用wx.openDocument

wx.downloadFile({ url: fileUrl,//pdf链接success(res) {wx.openDocument({ //打开文档filePath: res.tempFilePath,fileType: "pdf",//文档类型showMenu: true,success: function (res) {wx.showToast({title: 打开文档成功,})},fail: function (res) {wx.showToas…

Stable Diffusion使用“面部修复”时报TypeError: ‘NoneType‘ object is not subscriptable错

问题 Stable Diffusion使用“面部修复”时报TypeError: ‘NoneType’ object is not subscriptable错 解决方案 下载【detection_Resnet50_Final.pth】和【parsing_parsenet.pth】到【repositories\CodeFormer\weights\facelib】目录下,并重新运行项目即可。 ht…

Unity 基础之 URP 项目创建\项目转URP Pipline

Unity 基础之 URP 项目创建\项目转URP Pipline 目录 Unity 基础之 URP 项目创建\项目转URP Pipline 一、简单介绍 二、创建 URP 项目 三、工程项目转 URP 一、简单介绍 Unity中的一些基础知识点,方便日后查阅。 Unity游戏开发中,这里简单介绍如何创…

Methodot低代码开发教程——玩转表格增删改查分页

目录 1、背景介绍 2、连接数据源 2.1 新增数据源 2.2 填写数据源信息 3、表格数据的展示 3.1 新增查询,编写查询语句 3.2 使用表格组件 3.3 同步数据源与表格列名 4、表格的数据新增 4.1 新增查询,编写新增语句 4.2 表格配置新增一行&#xff0…

华为云专家出品《从零到一•Python图像处理入门》电子书

《华为云云享.书库》系列电子书来啦! 本系列电子书旨在帮助开发者成长,汇聚华为云内外部专家技术精华制作而成。 本书《从零到一•Python图像处理》是该系列电子书第3部。 我们在华为开发者即将到来之际,开放电子书免费下载。 点击下方链接…

JVM探究

JVM探究 请谈谈你对JVM的理解?java8虚拟机和之前的变化、更新?什么是OOM,栈溢出StackOverFlowError?怎么分析?JVM的常用调优参数有哪些?内存快照如何 抓取,怎么分析Dump文件?知道吗…

Unity VR开发教程 OpenXR+XR Interaction Toolkit(八)手指触控 Poke Interaction

文章目录 📕教程说明📕XR Poke Interactor📕与 UI 进行触控交互⭐添加 Tracked Device Graphic Raycaster 和 XR UI Input Module 让 UI 可被交互 📕与物体进行交互⭐XR Simple Interactable⭐XR Poke Filter 往期回顾&#xff1a…

【Linux进程】进程的基本概念 {PCB结构体,进程表,Linux中的task_struct,查看进程,获取进程PID,使用fork创建子进程}

一、进程的基本概念 1.1 什么是进程? 进程是计算机中正在运行的程序的实例。它是操作系统进行资源分配和调度的基本单位。每个进程都有自己的内存空间、代码、数据和执行状态。进程可以独立运行,相互之间不会干扰。操作系统可以同时运行多个进程&#…

vue表格实现一个简单的合并单元格功能

用的是vue2ant-design-vue 但是vue3或者element-ui也是同理 先上效果 需要后端的数据将相同id的放在一起 否则也会有问题 例如: this.list [{id: 1,name: 舟山接收站,...}{id: 2,name: 舟山接收站碳中和LNG,...},{id: 2,name: 舟山接收站碳中和LNG,...} ]// th…

Redis7【⑤ Redis 发布 订阅】

Redis发布和订阅 本章了解即可,命令可以不用敲。 Redis 发布和订阅(Publish/Subscribe,简称 Pub/Sub)是一种消息传递模式,用于在 Redis 中实现消息的发布和订阅。 在 Redis 中,发布者(Publi…

maven打包所有依赖,对外提供sdk.jar

maven打包所有依赖 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compile.source>1.8</maven.compile.source><maven.compile.target>1.8</maven.compile.target></properties><…

Swin Transformer训练报错问题

1. 训练遇到报错问题 &#xff08;1&#xff09;mportError: cannot import name _pil_interp from timm.data.transforms 原因&#xff1a; timm.data.transforms里面没有_pil_interp&#xff0c;只有str_to_pil_interp、_str_to_pil_interpolation、_pil_interpolation_to_s…

rancher 节点重启无感发布

这里设置 时间 为120s &#xff0c;保证 新节点起来后&#xff0c;和 老节点并行2分钟后再剔除&#xff0c;老节点

el-select修改样式

目录 准备 修改placeholder颜色 修改右侧箭头 修改圆角边框 准备 <el-select v-model"goodsId" clearable placeholder"请选择" :popper-append-to-body"false"><el-option v-for"item in kindList" :key"item.value…

浙江宇视科技 网络视频录像机 ISC LogReport.php 远程命令执行漏洞

免责声明 文章仅供参考&#xff0c;任何个人和组织使用网络应当遵守宪法法律&#xff0c;遵守公共秩序&#xff0c;尊重社会公德&#xff0c;不得危害网络安全&#xff0c;不得利用网络从事危害国家安全、荣誉和利益&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机…

Vue Router activated deactivated 路由守卫

6.12.activated deactivated activated和deactivated是路由组件所独有的两个钩子&#xff0c;用于捕获路由组件的激活状态具体使用 activated路由组件被激活时触发deactivated路由组件失活时触发 src/pages/News.vue <template><ul><li :style"{opacity}…

1.2-程序设计语言与流程图基础

一、学习目标 了解计算机程序与程序设计语言。认识算法和流程图。理解计算机程序、程序设计语言、算法与流程图之间的关系。 1、计算机程序 计算机程序是人们使用指定的程序设计语言&#xff0c;根据需要事先编写的一系列控制计算机工作的命令。2、程序设计语言 程序设计语…

如何避免死锁:方法一

需要先看前文&#xff1a;死锁的产生_御坂美琴1的博客-CSDN博客 对两个资源使用一把锁。即小朋友玩敲鼓的时候会同时拿走鼓和鼓槌。 如图&#xff1a; 可以看到“线程1执行了&#xff0c;但是线程2没有执行&#xff0c;还在被阻塞着。为什么线程1运行完毕&#xff0c;线程2还没…