RabbitMQ系列(14)--Topics交换机的简介与实现

news2025/1/11 9:57:56

1、Topics交换机的介绍

Topics交换机能让消息只发送往绑定了指定routingkey的队列中去,不同于Direct交换机的是,Topics能把一个消息往多个不同的队列发送;Topics交换机的routingkey不能随意写,必须是一个单词列表,并以点号分隔开,例如“one.two.three”,除此外还有两个替换符,*(星号)能代替一个单词,#(井号)可以代替零个或多个单词,例如“*.one.*”是中间是one的3个单词,“*.*.one”是最后一个是one的3个单词,“one.#”是第一个单词是one的多个单词,若队列绑定键是#,这个队列将接收所有数据,这时候类似fanout交换机,若队列绑定键中没有#和*出现,这时候就类似direct交换机

 2、Topics交换机的实现 

(1)新建一个名为topics的包,用于装发布确认的代码

效果图:

(2)新建一个名为Receive01的类用于编写消费者的代码

代码如下:

 注:RabbitMqUtils工具类的实现在我的另一篇文章里,有需要的同学可以查看参考

RabbitMQ系列(6)--RabbitMQ模式之工作队列(Work queues)的简介及实现_Ken_1115的博客-CSDN博客

package com.ken.topics;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

/**
 * 消息接收
 */
public class Receive01 {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "topic_exchange";

    //接收消息
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //声明队列
        String queueName = "Q1";
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(queueName,false,false,false,null);
        //队列与交换机通过routingkey进行捆绑
        channel.queueBind(queueName,EXCHANGE_NAME,"*.one.*");

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }

}

(3)复制Receive01类并粘贴重命名为Receive02

代码如下:

package com.ken.topics;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

/**
 * 消息接收
 */
public class Receive02 {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "topic_exchange";

    //接收消息
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");

        //声明队列
        String queueName = "Q1";
        /**
         * 创建队列
         * 第一个参数:队列名称
         * 第二个参数:服务器重启后队列是否还存在,即队列是否持久化,true为是,false为否,默认false,即消息存储在内存中而不是硬盘中
         * 第三个参数:该队列是否只供一个消费者进行消费,是否进行消息共享,true为只允许一个消费者进行消费,false为允许多个消费者对队列进行消费,默认false
         * 第四个参数:是否自动删除,最后一个消费者断开连接后该队列是否自动删除,true自动删除,false不自动删除
         * 第五个参数:其他参数
         */
        channel.queueDeclare(queueName,false,false,false,null);
        //队列与交换机通过routingkey进行捆绑
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.two");
        //队列与交换机通过routingkey进行捆绑
        channel.queueBind(queueName,EXCHANGE_NAME,"three.#");

        /**
         * 声明消费者接收消息后的回调方法(由于回调方法DeliverCallback是函数式接口,所以需要给DeliverCallback赋值一个函数,为了方便我们这里使用Lambda表达式进行赋值)
         * 为什么要这样写呢,是因为basicConsume方法里的参数deliverCallback的类型DeliverCallback用 @FunctionalInterface注解规定DeliverCallback是一个函数式接口,所以要往deliverCallback参数传的值要是一个函数
         *
         * 以下是DeliverCallback接口的源代码
         *  @FunctionalInterface
         *  public interface DeliverCallback {
         *      void handle (String consumerTag, Delivery message) throws IOException;
         *  }
         */
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody(),"UTF-8"));
            System.out.println("接收队列:" + queueName + " 绑定键:" + message.getEnvelope().getRoutingKey());
        };

        /**
         * 用信道对消息进行接收
         * 第一个参数:消费的是哪一个队列的消息
         * 第二个参数:消费成功后是否要自动应答,true代表自动应当,false代表手动应答
         * 第三个参数:消费者接收消息后的回调方法
         * 第四个参数:消费者取消接收消息后的回调方法(正常接收不调用)
         */
        channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
    }

}

(4)新建一个名为Emit的类用于编写生产者的代码

代码如下:

package com.ken.topics;

import com.ken.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;


/**
 * 发消息
 */
public class Emit {

    //声明交换机的名称
    public static  final String EXCHANGE_NAME = "topic_exchange";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        Map<String,String> bindingKeyMap = new HashMap<>();
        bindingKeyMap.put("four.one.two","被队列Q1Q2接收");
        bindingKeyMap.put("three.one.five","被队列Q1Q2接收");
        bindingKeyMap.put("four.one.six","被队列Q1接收");
        bindingKeyMap.put("three.seven.six","被队列Q2接收");
        bindingKeyMap.put("three.eight.two","虽然满足两个绑定,但只被队列Q2接收一次");
        bindingKeyMap.put("three.seven.six","不匹配任何绑定,不会被任何队列接收到,会被丢弃");
        bindingKeyMap.put("four.one.nine.two","四个单词,不匹配任何绑定,会被丢弃");
        bindingKeyMap.put("three.one.nine.two","四个单词,但匹配Q2");

        for (Map.Entry<String, String> bindingKeyEntry : bindingKeyMap.entrySet()) {
            String routingKey = bindingKeyEntry.getKey();
            String message = bindingKeyEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:" + message);
        }

    }

}

 (5)分别先运行Receive01、Receive02、Emit

(6)查看Receive01和Receive02接收消息的情况

从上述结果可看出topic交换机实现成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/726524.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

leetcode极速复习版-第三章哈希表

目录 哈希表 哈希表理论基础 242.有效的字母异位词 349. 两个数组的交集 202. 快乐数 1. 两数之和 454.四数相加II 383. 赎金信 15. 三数之和 18. 四数之和 哈希表 哈希表理论基础 那么哈希表能解决什么问题呢&#xff0c;一般哈希表都是用来快速判断一个元素是否出…

机器人制作开源方案 | 卧式车床模型概述

1. 功能说明 本文示例将通过程序控制模拟车床的运动效果--模拟车床进行加工时各个结构的运动方式。车床的主要运动包括夹持工件主轴的旋转运动、刀具进给运动、刀具的换刀旋转运动、溜板的平动、尾座上顶针的进给运动。 2. 结构说明 该车床主轴旋转运动选择带传动的传动方式&am…

Elasticsearch实战(二十四)---ES数据建模一对多模型Nested结构

Elasticsearch实战—ES数据建模一对多模型Nested结构 文章目录 Elasticsearch实战---ES数据建模一对多模型Nested结构1.ES 一对多模型Nested 结构模型实战2.ES字段查询2.1 非Nested 错误结构及错误查询2.2 Nested结构&#xff0c;正确查询 3.Nested结构原理 我们如何把Mysql的模…

vue 中使用 vxe-table 制作可编辑表格

项目上有一个表格需要实现在线编辑&#xff0c;开始用了 element 的el-table 实现&#xff0c;单元格内基础情况就是监听了单击单元格切换一个span标签与input标签&#xff0c;复杂点的单元格使用了大量的条件判断来实现对应的编辑操作&#xff0c;比如下拉选&#xff0c;popov…

github Couldn‘t connect to server

Couldnt connect to server 问题描述解决git clone 出错直接访问github没啥问题ping github.com手动指定域名映射关系再次测试git命令 总结参考 问题描述 前一天还是好好的&#xff0c;能git pull和git push&#xff0c;昨天回来之后怎么操作都是Couldnt connect to server。百…

【数据结构】堆的实现(向下调整和向上调整法)和堆排序的实现

目录 一、堆的概念引入 二、小堆的实现 首先&#xff0c;我们会跟线性表一样建立一个动态数组来存堆的数据 ①、堆的初始化--HeapInit ②、小堆的向下调整法的实现 ③、堆排序的实现 ④、堆的插入和向上调整法 ⑤、删除堆顶数据 ⑥、获取堆顶 三、时间复杂度总结&#…

C# PaddleInference OCR识别 学习研究Demo

说明 基于开源项目 https://github.com/sdcb/PaddleSharp VS2022.net4.8 OpenCvSharp4Sdcb.PaddleInference 效果 项目 代码 using Sdcb.PaddleInference.Native; using Sdcb.PaddleInference; using System; using System.Collections.Generic; using OpenCvSharp.Extensi…

15年前的手机并没有jvm虚拟机,为何可以运行Java游戏

2000年代初期&#xff0c;随着移动通信技术的发展&#xff0c;手机逐渐普及。那个时代的手机功能相对比较单一&#xff0c;主要用于打电话和发送短信。但是&#xff0c;随着技术的进步&#xff0c;人们开始在手机上玩游戏&#xff0c;而其中最受欢迎的游戏就是Java游戏。在那个…

ChatLaw,开源了!

公众号关注 “GitHubDaily” 设为 “星标”&#xff0c;每天带你逛 GitHub&#xff01; 最近这段时间&#xff0c;AI 的整体热度有所下降&#xff0c;但是 AI 技术在各行各业的探索脚步&#xff0c;却一直没有停止。 在 ChatGPT 刚发布时&#xff0c;有不少业内人士认为&#x…

【浏览器篇】记录下浏览器保存PDF文件不同方式的小区别

【浏览器篇】记录下浏览器保存PDF文件不同方式的小区别 以前不太注意这些&#xff0c;最近搞文档比较多才发现为何保存的一部分PDF文件里面字体可以复制可以搜索&#xff0c;一部分保存的PDF里面的字体却无法复制、无法搜索等&#xff0c;发现是不同保存方式得到的文档权限不一…

SQL注入攻击原理 实战

我来进行实战了&#xff0c;总在看教程。 文章目录 前言一&#xff0c;网站是否存在sql漏洞二、判断一下字段3. 判断显点4.查找相关信息1.查询数据库2.版本3.数据库表名4.字段名5,查询 总结 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 前言&#xff…

华为OD机试真题 Python 实现【学校的位置】【2023Q1 100分】,附详细解题思路

目录 一、题目描述二、输入描述三、输出描述四、Python算法源码五、效果展示1、输入2、输出3、说明 一、题目描述 为了解决新学期学生暴涨的问题&#xff0c;小乐村要建所新学校。考虑到学生上学安全问题&#xff0c;需要所有学生家到学校距离最短。假设学校和所有的学生家&am…

unity+pico neo3入门教程1-基础传送

tips&#xff1a;之前入门教程如果没有左手柄&#xff0c;查看一下自己的手柄设置&#xff0c;左右手柄&#xff0c; Helloworld型 1.基础传送&#xff0c;调式地面传送功能&#xff0c;通过手柄默认的“握手键”&#xff0c;瞬移&#xff0c; VR头显&#xff0c;添加Teleport…

Go语言远程调试

Go语言远程调试 1、安装dlv # 安装dlv $ go install github.com/go-delve/delve/cmd/dlvlatest$ dlv version Delve Debugger Version: 1.20.1 Build: $Id: 96e65b6c615845d42e0e31d903f6475b0e4ece6e $2、命令行远程调试 我们远程(Linux服务器)有如下代码&#xff1a; [ro…

(四)Kafka 消费者

文章目录 1. Kafka 消费者相关概念消费者和消费者组&#xff08;1&#xff09;横向伸缩消费者&#xff08;2&#xff09;横向伸缩消费者组 分区再平衡再均衡的类型&#xff08;1&#xff09;主动再均衡&#xff08;2&#xff09;协作再均衡&#xff08;增量再均衡&#xff09; …

MyBatisAnnotationSqlInjection.ql学习

源码位置 java\ql\src\experimental\Security\CWE\CWE-089 源代码 /*** name SQL injection in MyBatis annotation* description Constructing a dynamic SQL statement with input that comes from an* untrusted source could allow an attacker to modify …

【UE5 Cesium】14-Cesium for Unreal 加载服务器上的倾斜摄影

目录 前言 步骤 一、下载安装tomcat 10 二、下载安装JDK 三、启动Tomcat 四、Tomcat加载倾斜摄影 五、UE中加载Tomcat上的倾斜摄影 前言 上一篇文章&#xff08;【UE5 Cesium】13-Cesium for Unreal 加载本地倾斜摄影&#xff09;介绍了如何在UE中加载本地倾斜摄影&am…

链表专题1—24. 两两交换链表中的节点 234.回文链表 143.重排链表 141.环形链表 142.环形链表II 160.链表相交 C++实现

文章目录 24. 两两交换链表中的节点234.回文链表链表转数组统计长度反转后半部分链表 快慢指针 143. 重排链表数组 双指针 超时双队列反转和插入链表 141. 环形链表142.环形链表II160.链表相交 24. 两两交换链表中的节点 迭代法&#xff0c;时间复杂度&#xff1a; O ( n ) O(n…

App store里简单好用的便签app有哪些?

作为一个打工人&#xff0c;我经常需要一个简单而又好用的便签应用来记录我的各种事务和备忘。我曾在App Store里尝试了许多便签应用&#xff0c;但有一款应用真正让我留下了深刻的印象——敬业签。 敬业签的简单和易用性让我爱不释手。无论是添加新的便签&#xff0c;设置提醒…

基础大模型能像人类一样标注数据吗?

自从 ChatGPT 出现以来&#xff0c;我们见证了大语言模型 (LLM) 领域前所未有的发展&#xff0c;尤其是对话类模型&#xff0c;经过微调以后可以根据给出的提示语 (prompt) 来完成相关要求和命令。然而&#xff0c;直到如今我们也无法对比这些大模型的性能&#xff0c;因为缺乏…