rabbitMQ:绑定Exchange发送和接收消息(topic)

news2024/11/28 2:31:23

topic交换机和fanout交换机类似,也是广播机制,但是topic需要绑定RoutingKey,绑定RoutingKey时可以使用通配符(*,#)代替。

*:只能一个单词

#:0个或多个单词

编写topic消息发送类

1.编写Receive1类

package com.it.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receive1 {
    public static void main(String[] args)  {
        //创建链接工厂对象
        ConnectionFactory factory=new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("123456");
        factory.setHost("192.168.174.129");
        factory.setPort(5672);

        Connection connection=null;//定义链接对象
        Channel channel=null;//定义通道对象

        try {
            connection=factory.newConnection();//实例化链接对象
            channel=connection.createChannel();//实例化通道对象

            channel.queueDeclare("topicQueue1",true,false,false,null);
            channel.exchangeDeclare("topicExchange", "topic", true);
            channel.queueBind("topicQueue1","topicExchange","aa");
            channel.basicConsume("topicQueue1",true, "",new DefaultConsumer(channel) {

                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    //获取消息数据
                    String bodyStr = new String(body);
                    System.out.println("1消费者aa:"+bodyStr);
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

编写Receive2类

package com.it.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receive2 {
    public static void main(String[] args)  {
        //创建链接工厂对象
        ConnectionFactory factory=new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("123456");
        factory.setHost("192.168.174.129");
        factory.setPort(5672);

        Connection connection=null;//定义链接对象
        Channel channel=null;//定义通道对象

        try {
            connection=factory.newConnection();//实例化链接对象
            channel=connection.createChannel();//实例化通道对象

            channel.queueDeclare("topicQueue2",true,false,false,null);
            channel.exchangeDeclare("topicExchange", "topic", true);
            channel.queueBind("topicQueue2","topicExchange","aa.*");
            channel.basicConsume("topicQueue2",true, "",new DefaultConsumer(channel) {

                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    //获取消息数据
                    String bodyStr = new String(body);
                    System.out.println("2消费者aa.*:"+bodyStr);
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

编写Receive3类

package com.it.rabbitmq.topic;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receive3 {
    public static void main(String[] args)  {
        //创建链接工厂对象
        ConnectionFactory factory=new ConnectionFactory();
        factory.setUsername("root");
        factory.setPassword("123456");
        factory.setHost("192.168.174.129");
        factory.setPort(5672);

        Connection connection=null;//定义链接对象
        Channel channel=null;//定义通道对象

        try {
            connection=factory.newConnection();//实例化链接对象
            channel=connection.createChannel();//实例化通道对象

            channel.queueDeclare("topicQueue3",true,false,false,null);
            channel.exchangeDeclare("topicExchange", "topic", true);
            channel.queueBind("topicQueue3","topicExchange","aa.#");
            channel.basicConsume("topicQueue3",true, "",new DefaultConsumer(channel) {

                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    //获取消息数据
                    String bodyStr = new String(body);
                    System.out.println("3消费者aa.#:"+bodyStr);
                }
            });

        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }
}

2.分别启动三个接收类

 

 

 

注意:

1、在topic模式中必须要指定Routingkey,并且可以同时指定多层的RoutingKey,每个层次之间使用 点分隔即可 例如 test.myRoutingKey

编写topic的消息接收类

1.编写发送类

package com.it.rabbitmq.exchange.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {
    public static void main(String[] args) {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //配置rabbitMQ的连接信息
        factory.setHost("192.168.174.129");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("123456");
        //定义连接
        Connection connection = null;
        //定义通道
        Channel channel = null;

        try {
            connection = factory.newConnection();
            channel = connection.createChannel();

            channel.exchangeDeclare("topicExchange", "topic", true);

            String message = "topic的测试消息!";
            channel.basicPublish("topicExchange", "aa", null, message.getBytes("utf-8"));
            System.out.println("消息发送成功,direct");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }
}

2.测试发送类

1)当RoutingKey为aa时

 接收类1(aa),接收类3(aa.#)可以接收到消息

 

 

2)当RoutingKey为aa.bb时

接收类2(aa.*)和接收类3(aa.#)可以接收消息

 

 

3)当RoutingKey为aa.bb.cc时

接收类3(aa.#)可以接收消息

 

注意:

1、Topic模式的消息接收时必须要指定RoutingKey并且可以使用# 和 *来做统配符号,#表示通配任意一个单词 *表示通配任意多个单词,例如消费者的RoutingKey为test.#或#.myRoutingKey都可以获取RoutingKey为test.myRoutingKey发送者发送的消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/3809.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

区块链的认识

目录 频繁的交易如何记录这些交易信息呢? 那我们的链又是如何连接起来的? 谁做记账先生呢? 共识机制 如何处理有些人距离账目的公共太远的问题? 安全性: 去中心化: 总结: 频繁的交易如何记录…

云原生k8s的盘古开天辟地

背景 容器(docker)流行开来,容器调度机制风起云涌,各路神仙用systemctl调度, 也有号称容器宗师的出品docker swarm, 各大门派也有各自的旗帜主张。天下混沌,血雨腥风,龙藏深泉,而谷歌的k8s就是这个主角。 …

较低成本的ISO7637-2 5A 5B抛负载保护方案

科普下什么是抛负载,抛负载测试方案以及后端电路参数的选择。 在众多汽车电子电磁兼容测试中,最具破坏性的就是ISO7637-2的5A 5B测试了,当然也有的测试项目放在ISO16750标准中,但测试波形大体相同。上海雷卯有专门的文章描述这2个…

基于JavaWeb的手机商城系统设计与实现

目录 摘要 I Abstract II 第1章 绪论 1 1.1 课题背景 1 1.2 目的和意义 1 1.3 系统设计思想 3 1.4 本文的结构 3 第2章 可行性分析 4 2.1 业务流程图 4 2.2 经济可行性 6 2.3 技术可行性 6 2.4 运行可行性 6 2.5 本章小结 7 第3章 需求分析 8 3.1 关于电商的前世今生和目前发展…

C++基础知识梳理<2>(引用、内联函数、auto关键字) [入门级】

目录 一、引用 1. 引用概念 2. 引用特性 2.1 引用在定义时必须初始化 2.2 一个变量可以有多个引用 2.3 引用一旦引用一个实体,再不能引用其他实体 3. 常引用 3.1 取别名的规则 3.2 权限放大error 3.3 权限不变 3.4 权限缩小 4. 引用原理与拓展 4.1…

RestTemplate.exchange各种用法(包括泛型等 --全)

文章目录前言1. Get请求1.1 返回基本类型1.2 返回自定义对象类型1.3 返回List\<T> 类型1.4 返回Map\<K,V> 类型1.5 返回自定义泛型类型2.Post请求2.1 传headerbody返回对象类型2.2 传headerbody返回自定义泛型类型3. 异常情况处理4. RestTemplate配置Bean最后前言 …

学习笔记之Vue基础学习——持更

Vue学习一、Vue简介1.1 什么是Vue&#xff1f;1.2 Vue的特点1.3 Vue官网使用1.4 搭建Vue开发环境1.5 Hello小案例总结案例&#xff1a;二、模板语法2.1 两大类型三、数据绑定3.1 两种方式四、el和data的两种写法4.1 el的两种写法4.2 data的两种写法五、MVVM模型5.1 什么是MVVM模…

目标检测(5)—— YOLO系列V1

一、YOLO系列V1 经典的one-stage方法&#xff0c;You Only Look Once将检测问题转化成回归问题&#xff0c;一个CNN搞定可以对视频进行实时监测 YOLO系列的速度更快&#xff0c;我们检测的物体很简单&#xff0c;进行取舍&#xff0c;舍弃了一些精度。 V1核心思想 现在要预测…

Windows与Linux行尾换行符引发Git的一系列惨案

1 前言 最近在使用 Git 提交代码的时候&#xff0c;老是碰到一段看起来 “没有任何改动” 的代码&#xff0c;被 diff 检测出异常&#xff0c;很是苦恼&#xff0c;特别是项目紧急的时候&#xff0c;不敢用 VSCode 编辑了&#xff0c;只能用 vim 进行少量代码的修改、上库。 …

传统Spring项目的创建和使用xml文件来保存对象和取对象

传统Spring项目的创建和使用xml文件来保存对象和取对象## 传统Spring项目的创建 一、创建一个maven项目&#xff08;maven项目无需使用模板&#xff09; 二、导入Spring依赖&#xff08;Spring Context依赖和Spring Beans依赖&#xff09; 可以从maven仓库获取&#xff0c;也…

Java 中代码优化的 30 个小技巧(中)

11 位运算效率更高 如果你读过 JDK 的源码&#xff0c;比如 ThreadLocal、HashMap 等类&#xff0c;你就会发现&#xff0c;它们的底层都用了位运算。 为什么开发 JDK 的大神们&#xff0c;都喜欢用位运算&#xff1f; 答&#xff1a;因为位运算的效率更高。 在 ThreadLoca…

数码相机raw照片编辑Capture One Pro中文

怎么编辑数码相机拍摄的raw格式的照片&#xff1f;Capture One Pro 22是一款专业、强大、易于使用的图像编辑软件&#xff0c;与主流相机型号兼容&#xff0c;直接导入照片进行编辑操作&#xff0c;包括佳能、尼康、索尼、富士等。将所有必备工具和高端性能融于一体、使您在一套…

riscv引导程序及仿真记录

1.riscv基本的寄存器列表 这里只关注32个通用寄存器x0-x31 2.引导程序代码 # 1 "iriscvboot.casm" # 1 "<built-in>" # 1 "<command-line>" # 1 "/usr/include/stdc-predef.h" 1 3 4 # 1 "<command-line>&qu…

【Linux】进程间通信

文章目录1.进程间通信基础2.管道2.1匿名管道2.1.1匿名管道的原理2.2匿名管道的特点2.3匿名管道函数2.3.1用例2.3.2实现ps -ajx | grep bash指令2.4匿名管道的特点2.5管道的大小2.6管道的生命周期2.7进程池3.命名管道FIFO3.1命名管道的接口3.2命名管道和匿名管道的区别3.3用FIFO…

大数据面试重点之kafka(七)

大数据面试重点之kafka(七) Kafka的分区器、拦截器、序列化器&#xff1f; 问过的一些公司&#xff1a;ebay 参考答案&#xff1a; Kafka中&#xff0c;先执行拦截器对消息进行相应的定制化操作&#xff0c;然后执行序列化器将消息序列化&#xff0c;最后执行分 区器选择对应分…

python:基础知识

环境&#xff1a; window11python 3.10.6vscodejavascript、c/c/java/c#基础&#xff08;与这些语言对比&#xff09; 注释 一、数据类型 基础六大数据类型&#xff0c;可以使用 type()查看&#xff0c;如下图&#xff1a; 1.1 数字&#xff08;Number&#xff09; 支持 整…

联邦学习--记录

简介 联邦学习&#xff08;Federated Learning&#xff09;是一种新兴的人工智能基础技术&#xff0c;其设计目标是在保障大数据交换时的信息安全、保护终端数据和个人数据隐私、保证合法合规的前提下&#xff0c;在多参与方或多计算结点之间开展高效率的机器学习。其中&#…

【机器学习大杀器】Stacking堆叠模型-English

1. Introduction The stacking model is very common in Kaglle competitions. Why? 【机器学习大杀器】Stacking堆叠模型&#xff08;English&#xff09; 1. Introduction 2. Model 3: Stacking model 2.1 description of the algorithms: 2.2 interpretation of the es…

浅谈Vue中 ref、reactive、toRef、toRefs、$refs 的用法

&#x1f4ad;&#x1f4ad; ✨&#xff1a; 浅谈ref、reactive、toRef、toRefs、$refs   &#x1f49f;&#xff1a;东非不开森的主页   &#x1f49c;: 技术需沉淀&#xff0c;不要浮躁&#x1f49c;&#x1f49c;   &#x1f338;: 如有错误或不足之处&#xff0c;希望可…

Redhat(3)-Bash-Shell-正则表达式

1.bash脚本 2.bash变量、别名、算术扩展 3.控制语句 4.正则表达式 1.bash脚本 #!/bin/bash#this is basic bash script<< BLOCK This is the basic bash script BLOKC: This is the basic bash script echo "hello world!" 双引号、单引号只有在变量时才有区…