RabbitMQ 订阅模型-路由模式

news2024/12/30 3:04:54

订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。

在 Direct 模型下:

  • 队列与交换机绑定,不能任意绑定,而要指定一个 RoutingKey (路由 key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不在把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 RoutingKey 与消息的 Routing Key完全一致,才会收到消息。

文章目录

    • 一、RabbitMQ 订阅模型-路由(Direct)模式
        • 1、RabbitMQ 路由(direct)模式
        • 2、路由(direct)模式组成
    • 二、RabbitMQ 订阅模型-路由(Direct)模式实现
        • 1、添加 Maven 依赖
        • 2、封装工具类 ConnectionUtil
        • 3、生产者实现
        • 4、消费者-1 实现
        • 5、消费者-2 实现
    • 三、订阅模型 三种模式区别
        • 1、RabbitMQ 消息订阅(Fanout)模式
        • 2、RabbitMQ 路由(direct)模式
        • 3、RabbitMQ 主题(topic)模式


一、RabbitMQ 订阅模型-路由(Direct)模式

1、RabbitMQ 路由(direct)模式

订阅模型-路由模式,此时生产者发送消息时需要指定 RoutingKey,即路由 Key,Exchange 接收到消息时转发到与 RoutingKey 相匹配的队列中。

image-20221201012250811

在 Direct 模型下:

  • 队列与交换机绑定,不能任意绑定,而要指定一个 RoutingKey (路由 key)
  • 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
  • Exchange 不在把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的 RoutingKey 与消息的 Routing Key完全一致,才会收到消息。

2、路由(direct)模式组成

RabbitMQ 订阅模型-路由模式(Fanout)模式主要有以下六个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。

  • 交换机(Exchange) :在 RabbitMQ 的消息传递模型中,对于 Exchange 的核心思想就是:生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机。交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。Exchanges 的类型:直接(direct)、主题(topic)、标题(headers)、扇出(fanout)

  • Routing Key 路由关键字,exchange 根据这个关键字进行消息投递,Exchange 接收到的消息会带有 RoutingKey 这个字段,Exchange 就是根据这个 RoutingKey 和当前 Exchange 所有绑定的 BindingKey 做匹配,如果满足要求,就往 BindingKey 所绑定的 Queue 发送消息,这样我们就解决了我们向 RabbitMQ 发送一次消息,可以分发到不同的 Queue 的过程

  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成

  • 消费者2(consumer):领取任务并完成…

  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。


二、RabbitMQ 订阅模型-路由(Direct)模式实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
  	...
</dependencies>

2、封装工具类 ConnectionUtil

package com.lizhengi.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 连接工具类封装
 * @date 2022-12-26 11:39 上午
 **/
public class ConnectionUtil {

    /**
     * 创建 MQ 连接工厂对象
     */
    private static final ConnectionFactory CONNECTION_FACTORY;

    // 类加载时,重量级资源加载
    static {
        CONNECTION_FACTORY = new ConnectionFactory();
        //设置连接MQ主机
        CONNECTION_FACTORY.setHost("127.0.0.1");
        //设置连接MQ端口号
        CONNECTION_FACTORY.setPort(5672);
        //设置连接MQ虚拟主机
        CONNECTION_FACTORY.setVirtualHost("/test");
        //设置连接MQ虚拟主机的用户名密码
        CONNECTION_FACTORY.setUsername("admin");
        CONNECTION_FACTORY.setPassword("123456");
    }

    /**
     * 建立与RabbitMQ连接 工具方法
     *
     * @return Connection
     */
    public static Connection getConnection() {
        try {
            //返回连接对象
            return CONNECTION_FACTORY.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭通道和连接 工具方法
     *
     * @param channel    Channel
     * @param connection Connection
     */
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、生产者实现

package com.lizhengi.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-路由模式(Direct)模式 生产者
 * @date 2022-12-26 3:43 下午
 **/
public class Producer {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        /*  为通道声明交换机
         *  参数1:交换机名称;参数2:交换机类型
         */
        channel.exchangeDeclare("logs_direct", "direct");
        //发送消息
        String routingKey = "error";
//        String routingKey = "info";
        channel.basicPublish("logs_direct", routingKey, null, ("The message's routingKey is " + routingKey + " !").getBytes());

        //释放资源
        ConnectionUtil.closeConnectionAndChanel(channel, connection);
    }
}

4、消费者-1 实现

package com.lizhengi.direct;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-路由(Direct)模式 消费者
 * @date 2022-12-26 3:46 下午
 **/

public class Customer1 {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        //  绑定交换机
        channel.exchangeDeclare("logs_direct", "direct");
        //  为单个 Customer 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //  绑定交换机和队列和routingKey
        //  参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key
        channel.queueBind(queueName, "logs_direct", "error");
        //  消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

5、消费者-2 实现

package com.lizhengi.direct;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 订阅模型-路由(Direct)模式 消费者
 * @date 2022-12-26 3:47 下午
 **/
public class Customer2 {
    public static void main(String[] args) throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        assert connection != null;
        Channel channel = connection.createChannel();
        //  绑定交换机
        channel.exchangeDeclare("logs_direct", "direct");
        //  为单个 Customer 创建临时队列
        String queueName = channel.queueDeclare().getQueue();
        //  绑定交换机和队列和routingKey
        //  参数1:队列名字;参数2:交换机名称;参数3:routingKey 路由key
        channel.queueBind(queueName, "logs_direct", "info");
        channel.queueBind(queueName, "logs_direct", "error");
        channel.queueBind(queueName, "logs_direct", "warning");
        //  消费消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1:" + new String(body));
            }
        });
    }
}

三、订阅模型 三种模式区别

1、RabbitMQ 消息订阅(Fanout)模式

RabbitMQ 消息订阅(Fanout)模式把交换机(Exchange)收到的消息发送给所有绑定了该交换机的队列,忽略路由(RoutingKey)。

这种模式下,消息会被所有消费者消费。也就是说,只要是"绑定"到某个交换机的队列,都会收到生产者发送到该交换机的消息。

2、RabbitMQ 路由(direct)模式

RabbitMQ 路由(direct)模式生产者发送信息时,需要指定一个路由(RoutingKey),交换机(Exchange)会根据路由将消息发送到绑定了此路由的队列中。

3、RabbitMQ 主题(topic)模式

在实际的运用中,广播模式(fanout)和路由模式(direct)虽然功能能支持一定场景,但是任然有一定的局限性,比如不能根据多重条件来进行路由选择。

发送给主题模式交换机的信息不能是任意设置的选择键,必须是用小数点隔开的一系列的标识符,标识符可以随意填充,但是一般是与某些特征相关联。

选择键不能超过 255 个字符。合法的键比如 “vip.user.order”,“common.user.pay”。 绑定键必须以相同的格式,特殊情况:“*” (星号)可以代替任意一个标识符;“#”(井号)可以代替0个或多个标识符。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/122240.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

线上贷款申请违约风险预测大数据模型

通过模型可以得出模型分类准确率&#xff0c;通过客户信用违约风险预测模型&#xff0c;可以实现多渠道预警&#xff0c;形成多角度观察、多方面分析、多渠道传递的工作局面。

Python每日一练 10——for循环

Python每日一练 10——for循环 文章目录Python每日一练 10——for循环一、for循环介绍二、实例一&#xff1a;等差数列前n项和三、实例二&#xff1a;计算阶乘四、实例三&#xff1a;拉马努金法计算圆周率五、实例四&#xff1a;百钱买百鸡一、for循环介绍 for循环一般用于循环…

前端_Vue_9.模板引用、组件基础

文章目录一、模板引用1.1. 访问模板引用1.2. v-for 中的模板引用1.3. 函数模板引用1.4. 组件上的 ref1.5. 小结二、组件基础 ⭐2.1. 定义一个组件2.2. 使用组件2.3. 传递props2.4. 监听事件2.5. 通过插槽&#xff08;slot&#xff09;来分配内容2.6. 动态组件2.7. DOM模板解析注…

go 库 Cobra 现代化的命令行框架

go 库 Cobra 现代化的命令行框架 文章目录go 库 Cobra 现代化的命令行框架1. 简介2. 主要功能3. 应用举例4. Cobra 安装5. 使用 Cobra 库创建命令5.1 创建 rootCmd5.2 创建 main.go5.3 添加命令5.4 编译并运行6. 特性6.1 使用标志6.2 非选项参数验证6.3 PreRun and PostRun Hoo…

手绘图说电子元器件-电声转换器件

电声转换器件包括能够将电信号转换为声音的扬声器、耳机、讯响器和蜂鸣器,能够将声音转换为电信号的传声器,能够进行电磁转换的磁头和具有压电效应的晶体等。 扬声器 扬声器俗称喇叭,是一种常用的电声转换器件,其基本作用是将电信号转换为声音,在收音机、录音机、电视机…

Linux | 套接字(socket)编程 | TCP协议讲解 | 通信模型搭建

文章目录TCP模型的特性TCP接口介绍TCP服务器套接字设置TCP客户端套接字设置TCP模型的特性 TCP是属于传输层协议的一种&#xff0c;上篇博客介绍了另一种传输层协议——UDP&#xff0c;关于它们之间的区别&#xff0c;这里再提一下 TCPUDP传输层协议传输层协议有连接无连接可靠…

Word控件Spire.Doc 【评论】教程(3):在C#、VB.NET中从Word文档中提取注释并保存在TXT文件中

Spire.Doc for .NET是一款专门对 Word 文档进行操作的 .NET 类库。在于帮助开发人员无需安装 Microsoft Word情况下&#xff0c;轻松快捷高效地创建、编辑、转换和打印 Microsoft Word 文档。拥有近10年专业开发经验Spire系列办公文档开发工具&#xff0c;专注于创建、编辑、转…

[Leetcode] 合并两个有序数组、链表

1.合并两个有序数组 原地合并数组&#xff0c;即不使用额外的空间 --> 使用三个指针&#xff0c;从尾部往前处理 题目链接&#xff1a;https://leetcode.cn/problems/merge-sorted-array/ nums1 总长度 mn&#xff0c;自身长度m&#xff1b;nums2 自身长度n&#xff0c; 使…

SSRF渗透与攻防(一)

目录 前言 SSRF是什么 危害&#xff08;利用方式): SSRF漏洞原理&#xff1a; CURL协议&#xff1a; SSRF常见场景 社会化分享功能&#xff1a; 如何发现SSRF漏洞 工具利用&#xff1a; 如何防御SSRF漏洞 前言 SSRF(Server-Side Request Forgery:服务器端请求伪造) 是…

rocketmq 实战问题汇总

rocketmq 实战过程会遇到这样或者那样的问题&#xff0c;今天我们专门抽出一篇文章来分析一下汇总一下&#xff0c;避免以后踩同样的坑&#xff1a; 1、找不到JDK的问题&#xff1a; 综合分析&#xff0c;是因为JDK安装的目录有空格导致的&#xff1a;Program Files 两个单词之…

电子招标采购系统源码—企业战略布局下的采购寻源

​ 智慧寻源 多策略、多场景寻源&#xff0c;多种看板让寻源过程全程可监控&#xff0c;根据不同采购场景&#xff0c;采取不同寻源策略&#xff0c; 实现采购寻源线上化管控&#xff1b;同时支持公域和私域寻源。 询价比价 全程线上询比价&#xff0c;信息公开透明&#xff0…

CANoe—基于DoIP通过CAPL实现与ECU通信测试

如下连接是在CANoe中基于DoIP通过加载诊断数据库实现CANoe与待测ECU诊断通信: CANoe链接 本文继续此话题,通过一个简单的CAPL Demo,实现CANoe与ECU进行DoIP通信。 首先在CANoe新建Ethernet工程: 在CANoe “Simulation Setup”中新建CAPL Test Module: 在此例中采用CANo…

DHCP原理和实验

目录 DHCP基本认识和原理 场景一、同网段DHCP 场景二、不同段DHCP&#xff08;中继DHCP&#xff09; DHCP基本认识和原理 DHCP&#xff08;Dynamic Host Configuration Protocol动态主机协议&#xff09;。 作用&#xff1a;为局域网络中主机动态分发地址&#xff0c;以及…

C#里使用ExcelDataReader读取EXCEL文件的简单方法

C#里使用ExcelDataReader读取EXCEL文件的简单方法 读取EXCEL文件是比较常见的需求,所以在C#里也会经常遇到。 比如客户需要保存的条码数据,他们可以使用EXCEL来扫码进去,并且进行修改和核验, 然后软件就需要读取这些EXCEL文件,并且从这里得到所需要的条码。 要从EXCEL里…

Android 查看隐私权限方法调用者集合

背景 辛辛苦苦迭代完当前版本&#xff0c;准备推送 App 到应用市场上架&#xff0c;却收到拒审通知&#xff1a;App隐私合规上架护航版检测报告&#xff0c; 报告内容&#xff1a; 场景2:APP以隐私政策弹窗的形式向用户明示收集使用规则&#xff0c;未经用户同意&#xff0c;…

页面崩溃了!记录一次测试中出现的前端内存溢出现象

前情回顾 前几天在一次web应用测试过程中&#xff0c;前端发起了向后端接口的查询请求&#xff0c;由于后端响应较慢&#xff0c;前端一直处于等待响应返回状态。在几分钟后&#xff0c;突然页面出现让人惊悚的“噢噢&#xff0c;页面崩溃了”几个大字。 看到这几个字的一瞬间…

基于GitLab构建企业级CICD-Gitlab-Runner

背景 在过往企业开发中&#xff0c;大部分企业从开发到测试&#xff0c;到部署目前还是手工进行在一些某些中大型企业中&#xff0c;目前构建及部署还是直接使用二进制包部署&#xff0c;或直接单机运行在某些场合下&#xff0c;仓库中代码的编译需要硬件支持&#xff0c;致使…

SSM框架学习记录-Maven_day01

1.分模块开发 将原始模块按照功能拆分成若干个子模块&#xff0c;方便模块间的相互调用&#xff0c;接口共享&#xff1a;比如有订单和商品两个模块&#xff0c;它们都需要使用到商品的模型类&#xff0c;如果在这两个模块中都写模型类&#xff0c;就会出现重复代码&#xff0c…

Doris部分列更新在广告行业应用

背景&#xff1a;业务需要在不同的时间点对同一个session_id上的广告行为&#xff08;展示、点击、转换等&#xff09;数据的更新。 基于HBase归因 更新原理&#xff1a;以session_id为Key在HBase中写入数据&#xff0c;数据更新是先点查到历史数据&#xff0c;补齐当前数据后…

风已起,待云涌---多维度理解云安全

Fix the Unknown,Before You Know it. 新时代大门开启的时候&#xff0c;蜂拥而上的大都是勇士&#xff0c;风已起&#xff0c;待云涌&#xff01; 1.云安全&#xff1a; 未来安全的能力将成为计算、存储、网络之外的第四大基础设施&#xff0c;并全部融入到云基础设施中&…