RabbitMQ入门案例之Topic模式

news2025/1/6 18:38:40

前言:

本文章将介绍RabbitMQ中的Topic(主题)模式,其中还会涉及 ‘#’ 和 ‘*’ 两个通配符在RabbitMQ中的区别。

官网文档地址:https://rabbitmq.com/getstarted.html

什么是Topic模式

RabbitMQ的Topic模式是一种基于主题的消息传递模式。它允许发送者向一个特定的主题(topic)发布消息,同时,订阅者也可以针对自己感兴趣的主题进行订阅。
在这里插入图片描述

在Topic模式中,主题通过一个由单词和点号组成的字符串来描述。例如,“*.china.#”表示匹配所有以“china”为结尾的主题,比如“bj.china”或“shanghai.china.weather”等。( ‘ # ’ 和 ‘ * ’ 会再后面介绍)

当一个消息被发布到Topic交换机(Exchange)时,交换机会将消息转发给所有与该主题匹配的队列。消费者(即订阅者)可以对队列进行绑定,通过指定自己感兴趣的主题进行绑定。

通过使用Topic模式,我们可以实现高度灵活的信息交换模式,同时,确保只有感兴趣的消费者才会收到消息,提高了系统的效率和可靠性。

‘ # ’ 和 ‘ * ’二者的区别

在RabbitMQ的Topic模式中,符号“#”和“*”都用于匹配主题,但它们的意义是不同的。

符号“#”表示通配符可以匹配0个或者多个单词。例如,“china.#”可以匹配所有的以“china”为前缀的主题,例如“china.beijing”,“china.shanghai.weather”等等。

符号“ * ”表示通配符:可以匹配一个单词。例如,“china.*”可以匹配所有的以“china”为前缀并且后面只有一个单词的主题,例如“china.beijing”,“china.shanghai”,但是“china.shanghai.weather”不会被匹配。

总的来说,“#”更加灵活,可以匹配更多的情况,而“*”则更加具体,只能匹配一个单词。但是,使用通配符需要注意,因为它可能会匹配到不可预测的主题,可能会导致消息被传递到错误的队列或者丢失。因此,在设计主题时需要慎重考虑,并尽量减少通配符的使用。

Topic模式实操

老规矩,我们先到RabbitMQ的web管理界面去创建一个Topic的交换机
在这里插入图片描述
效果如下:
在这里插入图片描述
点击该topic_exchange,进入到下图界面,并绑定消息队列,如果队列不存在需要先创建在过来绑定
在这里插入图片描述
最终效果:
在这里插入图片描述
接下来就是代码部分了,我们需要创建一个maven项目,然后将下面的依赖导入:

	<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <!--RabbitMQ依赖-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>

然后创建生产者,代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @description: Producer 简单队列生产者
 */
public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            String message = "超级无敌爱学习";
            String  exchangeName = "topic-exchange";
            String routingKey1 = "pz.class.student";
            String routingKey2 = "class.user.student";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

创建消费者,代码如下:

import com.rabbitmq.client.*;
import java.io.IOException;
/**
 * @description: Consumer
 * @Date : 2021/3/2
 */
public class Consumer {
    private static Runnable runnable = () -> {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("43.139.42.244");
        connectionFactory.setPort(5678);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //获取队列的名称
        final String queueName = Thread.currentThread().getName();
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
            //channel.queueDeclare("queue1", false, false, false, null);
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println(queueName + ":开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    };
    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }
}

接下来执行生产者代码,在这段代码中,我们先对路由key1进行发送消息并通过web管理界面查看效果:

在这里插入图片描述

执行结果:
在这里插入图片描述
web管理界面查看结果:
在这里插入图片描述

通过上面的图,我们可以发现,我们消息通过topic_exchange这个交换机通过指定路由key发送到了绑定的消息队列中,由于routingkey使用的是通配符发方式,其中“queue2 -> #.class.* ” , “ queue3 -> #.student.#”,又由于通配符,# 号是指0个及以上,* 号是仅匹配一个,那么结果就是符合预期的,因为routingkey1= pz.class.student,class前有一个,后面有一个,会映射到queue2,student前面有多个,后面没有可以映射到queue3,结果就和图示一样啦~~

接下来就执行routekey2的路由key来看看会发生什么效果:
在这里插入图片描述
执行结果
在这里插入图片描述
web管理界面查看结果:
在这里插入图片描述
可以看到这次queue1和queue3,接收到了消息,可以自己尝试分析一下,这里不做过多赘述。

以上便是本章全部内容,感谢阅读ovo
如有错误,感谢指正

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/670280.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot 如何使用 Spring Integration 处理事件

SpringBoot 如何使用 Spring Integration 处理事件 Spring Integration 是 Spring Framework 的一个扩展&#xff0c;它提供了一种基于消息传递的集成模式。使用 Spring Integration&#xff0c;我们可以将不同的应用程序、系统和服务连接起来&#xff0c;从而实现数据的传递、…

VMware中Linux虚拟机配置静态ip

一、输入ip addr查看ip地址 二、输入cd /etc/sysconfig/network-scripts进入centos网络配置文件夹 三、接着输入ls查看目录 四、 输入vi ifcfg-ens33进入网卡配置 五、 进入以后是这个界面&#xff0c;红色方框里的内容是需要手动修改的&#xff0c;下面图片里已经修改过了。 …

【C】分支和循环语句的简单介绍

语句 分支语句if语句语法结构代码演示 switch语句语法结构代码演示 循环语句while循环语法结构代码分析 for循环语法结构代码演示 do...while循环语法结构代码分析 什么是语句呢&#xff1f; 在C语言中由分号&#xff08;;&#xff09;隔开的就是一条语句。 分支语句 if语句 …

【算法设计与分析】期末考试知识总结(知识超浓缩版)

目录 简要介绍 复杂度 迭代 插入排序 二分查找 快排划分 选择排序 计数排序 基数排序 桶排序 递归 递归式的计算-四种方法 欧几里得算法 汉诺塔问题 快速排序 归并排序 堆排序 分治 二维极大点问题 一维最邻近点对 二维最邻近点对 逆序对的数目 凸包 最大字段…

RecyclerView 低耦合单选、多选模块实现

作者&#xff1a;丨小夕 前言 需求很简单也很常见&#xff0c;比如有一个数据列表RecyclerView&#xff0c;需要用户去点击选择一个或多个数据。 实现单选的时候往往简单下标记录了事&#xff0c;实现多选的时候就稍微复杂去处理集合和选中。随着项目选中需求增多&#xff0c…

k8s的部署

二进制搭建 Kubernetes v1.20 k8s集群master01&#xff1a;192.168.92.30 kube-apiserver kube-controller-manager kube-scheduler etcd k8s集群master02&#xff1a;192.168.92.21 k8s集群node01&#xff1a;192.168.92.40 kubelet kube-proxy docker k8s集群node02…

阿里云热修复打补丁包注意事件

1、每次发布app到应用市场前&#xff0c;注意保存没有加固前的apk文件和mapping.txt 2、修复好bug&#xff0c;打包app前&#xff0c;要做的事情 &#xff08;1)先把有问题的apk的mapping.txt文件复制到/app路径下 (2)修改混淆配置&#xff1a;将-printmapping mapping.txt使…

Android蓝牙协议知识汇总

蓝牙协议下载 蓝牙技术联盟网址&#xff1a;https://www.bluetooth.com/ 在这个网址搜索&#xff0c;比如&#xff1a; 在搜索结果中找到蓝牙协议规范&#xff1a; 点击上面网址&#xff1a; 蓝牙手册里包含了部分核心协议&#xff0c;比如L2CAP、SDP、ATT、GATT&#x…

Python 100%解析svg-captcha验证码

前言 前段时间接到一个需求&#xff0c;登陆某一个网站&#xff0c;然后录入数据&#xff1b;本来以为是一个很简单的需求&#xff0c;结果遇到几个难点&#xff1a; 登陆的时候需要有验证码验证码是一个请求路径&#xff0c;每请求一次验证码都不一样 本来一开始以为是常用的…

探究 CoreData 使用索引(Index)机制加速查表究竟如何实现?

问题现象 在  App 的开发中,CoreData 到底能不能用索引机制(Index)来加速查表?如果可以,又该如何创建和使用索引呢? 这是一个连  官方文档都模棱两可,Stackoverflow 里诸多大神都闪烁其词的话题。 在本篇博文中,您将学到如下内容: 什么是 CoreData 索引(Index…

SpringBoot + Ant Design Vue实现数据导出功能

SpringBoot Ant Design Vue实现数据导出功能 一、需求二、前端代码实现2.1 显示实现2.2 代码逻辑 三、后端代码实现3.1 实体类3.2 接收参数和打印模板3.3 正式的逻辑3.4 Contorller 一、需求 以xlsx格式导出所选表格中的内容要求进行分级设置表头颜色。 二、前端代码实现 2…

20230524 taro+vue3+webpack5+pdfjs时打包pdfjs进不来的问题

关闭taro的terser就可以了 terser:{enable:false }

UE中创建异步任务编辑器工具(Editor Utility Tasks)

在UE中我们往往需要执行一些编辑器下的异步任务&#xff0c;例如批量生成AO贴图、批量合并静态模型等&#xff0c;又不想阻碍主线程&#xff0c;因此可以使用Editor Utility Tasks直接创建UE编辑器下的异步任务。 如果你不太了解UE编辑器工具&#xff0c;可以参考这篇文章&…

Spring Boot 中自定义数据校验注解

Spring Boot 中自定义数据校验注解 在 Spring Boot 中&#xff0c;我们可以使用 JSR-303 数据校验规范来校验表单数据的合法性。JSR-303 提供了一些常用的数据校验注解&#xff0c;例如 NotNull、NotBlank、Size 等。但是&#xff0c;在实际开发中&#xff0c;我们可能需要自定…

2023年6月24日(星期六):骑行明郎

2023年6月24日(星期六)&#xff1a;骑行明郎&#xff0c;早8:30到9:00&#xff0c; 大观公园门囗集合&#xff0c;9:30点准时出发 【因迟到者&#xff0c;骑行速度快者&#xff0c;可自行追赶偶遇。】 偶遇地点: 大观公园门囗集合&#xff0c;家住南&#xff0c;东&#xff0c…

(二叉树) 100. 相同的树 ——【Leetcode每日一题】

❓100. 相同的树 难度&#xff1a;简单 给你两棵二叉树的根节点 p 和 q&#xff0c;编写一个函数来检验这两棵树是否相同。 如果两个树在结构上相同&#xff0c;并且节点具有相同的值&#xff0c;则认为它们是相同的。 示例 1&#xff1a; 输入&#xff1a;p [1,2,3], q …

使用代理ip做网页抓取需要注意什么

现在&#xff0c;很多公司为达成目标&#xff0c;都需要抓取大量数据。企业需要根据数据来作出重大决定&#xff0c;因此掌握准确信息至关重要。互联网上有许多宝贵的公共数据。问题是如何轻松采集这些数据&#xff0c;而无需让团队整天手动复制粘贴所需信息?网页抓取的定义越…

Qt学习11:Dialog对话框操作总结

文章目录 QDialogQDialogButtonBoxQMessageBoxQFileDialogQFontDialogQColorDialogQInputDialogQProgressDialog 文章首发于我的个人博客&#xff1a;欢迎大佬们来逛逛 QDialog Qt中使用QDialog来实现对话框&#xff0c;QDialog继承自QWidget&#xff0c;对话框分为**三种**&…

尿的唰唰和笑的哈哈

很多人说看不懂&#xff0c;不知道哪个是真哪个是假。我说都是真的。不同心不同理。全球并不同炎凉。窦唯有句歌词&#xff1a;天堂地狱皆在人间。何勇有句歌词&#xff1a;有人减肥&#xff0c;有人饿死没粮。&#xff08;1&#xff09;产业我过去说过顶天立地。立地&#xff…

专利背后的故事 | 一种异常信息检测方法和装置

Part01 专利发明的初衷 用户和实体行为分析&#xff08;UEBA&#xff09;在2018年入选Gartner为安全团队建议的十大新项目。UEBA近几年一直受到国内安全厂商的热捧。但是对于UEBA的理解&#xff0c;以及具体落实的产品方案&#xff0c;各厂商虽然明显不同&#xff0c;但在对账…