RabbitMQ:订阅模型-消息订阅模式

news2025/1/18 19:10:30

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

RabbitMQ 单生产单消费模型主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

文章目录

    • 一、RabbitMQ 订阅模型-消息订阅(Fanout)模式
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、消息订阅(Fanout)模式组成
        • 3、消息订阅(Fanout)模式流程
    • 二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现
        • 1、添加 Maven 依赖
        • 2、封装工具类 ConnectionUtil
        • 3、生产者实现
        • 4、消费者-1 实现
        • 5、消费者-2 实现
        • 6、消费者-3 实现
    • 三、订阅模型 三种模式区别
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、RabbitMQ 路由(direct)模式
        • 3、RabbitMQ 主题(topic)模式


一、RabbitMQ 订阅模型-消息订阅(Fanout)模式

1、RabbitMQ 消息订阅(Fanout)模式

订阅模型-消息订阅模式,也可以称为广播模式,生产者将消息发送到 Exchange,Exchange 再转发到与之绑定的 Queue中,每个消费者再到自己的 Queue 中取消息。

image-20221201012238266

2、消息订阅(Fanout)模式组成

RabbitMQ 订阅模型-消息订阅(Fanout)模式主要有以下五个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

3、消息订阅(Fanout)模式流程

消息订阅(Fanout)模式流程:

  1. 消息订阅(Fanout)模式 可以有多个消费者
  2. 每个消费者有自己的 queue(队列)
  3. 每个队列都要绑定到 Exchange(交换机)
  4. 生产者发送的消息,只能发送到交换机,交换机来决定要发给那个队列,生产者无法决定
  5. 交换机把消息发送给绑定过的所有队列
  6. 队列的消费者都能拿到消息,实现一条消息被多个消费者消费

二、RabbitMQ 订阅模型-消息订阅(Fanout)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
  	...
</dependencies>

2、封装工具类 ConnectionUtil

package com.lizhengi.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 连接工具类封装
 * @date 2022-12-26 11:39 上午
 **/
public class ConnectionUtil {

    /**
     * 创建 MQ 连接工厂对象
     */
    private static final ConnectionFactory CONNECTION_FACTORY;

    // 类加载时,重量级资源加载
    static {
        CONNECTION_FACTORY = new ConnectionFactory();
        //设置连接MQ主机
        CONNECTION_FACTORY.setHost("127.0.0.1");
        //设置连接MQ端口号
        CONNECTION_FACTORY.setPort(5672);
        //设置连接MQ虚拟主机
        CONNECTION_FACTORY.setVirtualHost("/test");
        //设置连接MQ虚拟主机的用户名密码
        CONNECTION_FACTORY.setUsername("admin");
        CONNECTION_FACTORY.setPassword("123456");
    }

    /**
     * 建立与RabbitMQ连接 工具方法
     *
     * @return Connection
     */
    public static Connection getConnection() {
        try {
            //返回连接对象
            return CONNECTION_FACTORY.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭通道和连接 工具方法
     *
     * @param channel    Channel
     * @param connection Connection
     */
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、生产者实现

package com.lizhengi.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 生产者
 * @date 2022-12-26 11:44 上午
 **/
public class Producer {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        /*  为通道声明交换机
         *  参数1:交换机名称;参数2:交换机类型
         */
        channel.exchangeDeclare("logs", "fanout");
        //发送消息
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("logs", "", null, ("fanout type message" + i).getBytes());
        }
        //释放资源
        ConnectionUtil.closeConnectionAndChanel(channel, connection);
    }
}

4、消费者-1 实现

package com.lizhengi.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 消费者
 * @date 2022-12-26 11:45 上午
 **/
public class Customer1 {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        //  绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //  为单个 Customer 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //  绑定交换机和队列
        //  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
        channel.queueBind(queueName, "logs", "");
        //  消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

5、消费者-2 实现

package com.lizhengi.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 消费者
 * @date 2022-12-26 11:45 上午
 **/
public class Customer2 {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        //  绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //  为单个 Customer 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //  绑定交换机和队列
        //  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
        channel.queueBind(queueName, "logs", "");
        //  消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者2:" + new String(body));
            }
        });
    }
}

6、消费者-3 实现

package com.lizhengi.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-消息订阅(Fanout)模式 消费者
 * @date 2022-12-26 11:45 上午
 **/
public class Customer3 {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        //  绑定交换机
        channel.exchangeDeclare("logs", "fanout");
        //  为单个 Customer 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //  绑定交换机和队列
        //  参数1:队列名字;参数2:交换机名称;参数3:routingKey么啥意义
        channel.queueBind(queueName, "logs", "");
        //  消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者3:" + new String(body));
            }
        });
    }
}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/116835.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习10大经典算法详解

“数据算法模型”。 面对具体的问题&#xff0c;选择切合问题的模型进行求解十分重要。有经验的数据科学家根据日常算法的积累&#xff0c;往往能在最短时间内选择更适合该问题的算法&#xff0c;因此构建的模型往往更准确高效。本文归纳了机器学习的10大算法&#xff0c;并分别…

Python基础语法(一)

Python基础语法 文章目录Python基础语法基础语法变量的语法(1) 定义变量(2) 使用变量变量的类型(1) 整数(2) 浮点数(小数)(3) 字符串(4) 布尔(5) 其他动态类型特性输入输出注释通过控制台输出通过控制台输入运算符算术运算符关于除法// 取整除法关系运算符逻辑运算符关于短路求…

美格智能Cat.1无线POS终端解决方案,引领消费支付新场景

近年来&#xff0c;随着我国移动互联网的蓬勃发展和智能手机的快速渗透&#xff0c;移动支付在我国全面普及。尤其是后疫情时代下&#xff0c;无接触观念的普及&#xff0c;使我国消费市场形成了以移动支付为主的消费习惯&#xff0c;并催生了万千移动支付场景终端的数字化、智…

磁盘被写保护怎么办?5个方案解除它

硬盘、移动硬盘、U盘、SD卡和TF卡&#xff08;也称为手机存储卡&#xff09;具有写保护功能。当它们出现写保护的状态&#xff0c;我们就没有办法在里面写入数据。具体而言&#xff0c;就是无法保存和删除文件。磁盘被写保护怎么办&#xff1f;你需要下面5个方案帮助你&#xf…

20221227英语学习

今日短文 How to Become an Expert 想成为行业的专家&#xff1f;不是只花时间就够了 The drive to become expert – to become as good as we can be, at whatever we’ve chosen to do – is something we all share.It is not about external markers of success.It’s a…

01【WEB开发、Servlet】

文章目录01【WEB开发、Servlet】一、WEB开发简介1.1 什么是WEB开发1.2 软件的架构1.2.1 BS和CS概述1.2.2 WEB资源的类别1&#xff09;静态网站的特点&#xff1a;2&#xff09;动态网站的特点&#xff1a;1.3 Web服务器1.3.1 什么是服务器&#xff08;硬件&#xff09;1.3.2 什…

再也不愁渲染素材了?AI 生成3D纹理 #Polycam3D 推出新功能

最近有不少群友运用 AIGC 工具来提升工作效率&#xff0c;我听说连 3D 数字资产的渲染贴图素材都能生成了。Mixlab小杜3D 内容制作工具也是我非常感兴趣的领域&#xff0c;Polycam3D 本是一款扫描建模工具&#xff0c;近期也推出了AI生成3D纹理的功能&#xff0c;推荐大家去尝试…

启封化工行业ERP方案 ---危险化学品的备案管理

目录 危险化学品的备案管理制度 易制毒制爆危险化学品采购流程 Sage X3 ERP 危化品备案管理方案 危险化学品的备案管理制度 不少化工企业在日常的生产经营过程中&#xff0c;都有可能会涉及到易制毒、易制爆相关的危险化学品的购买和使用&#xff0c;由于易制爆、易制毒危险…

Vue组件、组件通信、路由、axios、$event、$refs、跨域代理、element-ui

文章目录{ { } }插值表达式$eventv-for删除、新增axios方法优化启动 Vue项目Vue项目的运行流程组件的三个结构组件的使用组件之间的通信父子 组件通信兄弟组件通信操作DOM插槽 slot移除node_modules路由安装、入门嵌套路由获取路由参数跨域代理element-ui表单验证Message 消息提…

基于Java+SQL Server开发(PC)学生管理系统【100010054】

题目学生管理系统 一、摘要 在当今互联网行业&#xff0c;Java 的使用及热度在各大排行榜中始终位于前列&#xff0c;通过本次课程设计&#xff0c;巩固所学 Java 知识&#xff0c;了解 Java 项目的开发流程。本程序是使用 Java 开发的一款学生管理系统&#xff0c;设计中使用…

微信开放小程序SDK,几款SDK产品对比分析

前言 这几天看到微信团队推出了一个名为 Donut 的小程序原生语法开发移动应用框架&#xff0c;通俗的讲就是将微信小程序的能力开放给其他的企业&#xff0c;第三方的 App 也能像微信一样运行小程序了。 其实不止微信&#xff0c;面对广阔的B端市场&#xff0c;阿里也早已开放…

kafka学习笔记

1. 官网 ​​​​​​​​​​​​​​​​​​​​​​​​​Apache Kafka 2. akf X轴拆分: 水平复制&#xff0c;就是讲单体系统多运行几个实例&#xff0c;做集群加负载均衡的模式,主主、主备、主从。解决单点&#xff0c;高可用问题 Y轴拆分: 基于不同的业务拆分 Z轴拆…

年底了,千万不要跳槽。

最近不少人在私信问我&#xff1a;做了几年 Java 工程师&#xff0c;现在很迷茫&#xff0c;想跳槽但是感觉底气不足&#xff0c;不知道如何是好。 作为一个资历不浅的 Java 开发&#xff0c;这几年我面试过不少人。发现大多数面试者&#xff0c;虽然看起来工作努力&#xff0…

FPGA 点亮LED灯

设计流程 首先对项目要有一个全局的考虑&#xff0c;分析项目需要几个模块构成&#xff0c;确定各个子模块的关系和信号之间 的相互关系&#xff0c;然后确定模块的端口信号有哪些&#xff1b;根据每个模块的功能并结合芯片、接口的时序手册画 出该模块能正常工作的时序波形图…

CSS3【垂直对齐方式、光标类型、 边框圆角 、overflow溢出部分显示效果 、元素本身隐藏】

文章目录二、装饰2.1 认识基线&#xff08;了解&#xff09;2.2 文字对齐问题2.3 垂直对齐方式2.4 小结2.5&#xff08;拓展&#xff09;项目中 vertical-align 可以解决的问题2.6 光标类型2.7 边框圆角2.8 边框圆角的常见应用2.9 小结2.10 溢出部分显示效果2.11 小结2.12 元素…

智慧医院信息化建设解决方案

【版权声明】本资料来源网络&#xff0c;仅用于行业知识分享&#xff0c;供个人学习参考&#xff0c;请勿商用。 【侵删致歉】如有侵权请联系小编&#xff0c;将在收到信息后第一时间进行删除&#xff01; 完整资料领取见文末&#xff0c;部分资料内容&#xff1a; 远程会诊 云…

Java的设计模式

本文档是学习后的个人总结&#xff0c;用于备查&#xff0c;如果有冒犯&#xff0c;请联系我删除。 参考文档&#xff1a; 1、简单工厂模式&#xff0c;工厂方法模式&#xff0c;抽象工厂模式 2、Java中常用的设计模式 上面这个链接写的特别清楚&#xff0c;也有代码示例&#…

记一次云服务器EIP出现异常对外攻击的问题

大家好&#xff0c;我是早九晚十二&#xff0c;目前是做运维相关的工作。写博客是为了积累&#xff0c;希望大家一起进步&#xff01; 我的主页&#xff1a;早九晚十二 首先祝大家圣诞快乐&#xff0c;Merry Christma&#xff01; 中毒了 今天云主机运维人员告诉我说&#x…

【Flask框架】——26 ORM关联关系

1、表的外键关联 使用SQLAlchemy创建外键非常简单。在从表中增加一个字段&#xff0c;指定这个字段外键的是哪个表的哪个字段就可以了。从表中外键的字段&#xff0c;必须和主表的主键字段类型保持一致。 这种关联只关注数据表之间的外键关联&#xff0c;不考虑Python对象之间…

ORB-SLAM3代码和算法学习—双目和单目初始化

0总述 ORB-SLAM3算法中视觉的初始化依旧放在tracking线程中&#xff0c;因此在tracking中没有为imu模式设置单独的初始化函数&#xff0c;而IMU的初始化是在localMapping中实现的。 很有用的参考链接&#xff1a;https://cloud.tencent.com/developer/article/1761043 1双目…