5. 交换机

news2025/1/12 12:09:30

在上节中,我们创建了一个工作队列。我们假设的是工作队列背后,每个任务都恰好交付给一个消
费者(工作进程)。在这一部分中,我们将做一些完全不同的事情-我们将消息传达给多个消费者。这种模式
称为 ”发布/订阅”.
为了说明这种模式,我们将构建一个简单的日志系统。它将由两个程序组成:第一个程序将发出日志消
息,第二个程序是消费者。其中我们会启动两个消费者,其中一个消费者接收到消息后把日志存储在磁盘,
另外一个消费者接收到消息后把消息打印在屏幕上,事实上第一个程序发出的日志消息将广播给所有消费
者者
5.1. Exchanges
5.1.1.
Exchanges 概念
RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产
者甚至都不知道这些消息传递传递到了哪些队列中。
相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来
自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消
息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

在这里插入图片描述
5.1.2.
Exchanges 的类型
总共有以下类型:
直接(direct), 主题(topic) ,标题(headers) , 扇出(fanout)
5.1.3.
无名 exchange
在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。之前能实现的
原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。
在这里插入图片描述
第一个参数是交换机的名称。空字符串表示默认或无名称交换机:消息能路由发送到队列中其实
是由 routingKey(bindingkey)绑定 key 指定的,如果它存在的话
5.2. 临时队列
之前的章节我们使用的是具有特定名称的队列(还记得 hello 和 ack_queue 吗?)。队列的名称我们
来说至关重要-我们需要指定我们的消费者去消费哪个队列的消息。
每当我们连接到 Rabbit 时,我们都需要一个全新的空队列,为此我们可以创建一个具有随机名称
的队列,或者能让服务器为我们选择一个随机队列名称那就更好了。其次一旦我们断开了消费者的连
接,队列将被自动删除。
创建临时队列的方式如下:
String queueName = channel.queueDeclare().getQueue();
创建出来之后长成这样:
在这里插入图片描述
5.3. 绑定(bindings)
什么是 bingding 呢,binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队
列进行了绑定关系。比如说下面这张图告诉我们的就是 X 与 Q1 和 Q2 进行了绑定
在这里插入图片描述
5.4. Fanout
5.4.1.
Fanout 介绍
Fanout 这种类型非常简单。正如从名称中猜到的那样,它是将接收到的所有消息广播到它知道的
所有队列中。系统中默认有些 exchange 类型
在这里插入图片描述
5.4.2.
Fanout 实战
在这里插入图片描述
Logs 和临时队列的绑定关系如下图
在这里插入图片描述
ReceiveLogs01 将接收到的消息打印在控制台

public class ReceiveLogs01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息打印在屏幕.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("控制台打印接收到的消息"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

ReceiveLogs02 将接收到的消息存储在磁盘

public class ReceiveLogs02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/**
* 生成一个临时的队列 队列的名称是随机的
* 当消费者断开和该队列的连接时 队列自动删除
*/
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的 exchange 其中 routingkey(也称之为 binding key)为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("等待接收消息,把接收到的消息写到文件.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
File file = new File("C:\\work\\rabbitmq_info.txt");
FileUtils.writeStringToFile(file,message,"UTF-8");
System.out.println("数据写入文件成功");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

EmitLog 发送消息给两个消费者接收

public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
/**
* 声明一个 exchange
* 1.exchange 的名称
* 2.exchange 的类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner sc = new Scanner(System.in);
System.out.println("请输入信息");
while (sc.hasNext()) {
String message = sc.nextLine();
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}

5.5. Direct exchange
5.5.1.
回顾
在上一节中,我们构建了一个简单的日志记录系统。我们能够向许多接收者广播日志消息。在本
节我们将向其中添加一些特别的功能-比方说我们只让某个消费者订阅发布的部分消息。例如我们只把
严重错误消息定向存储到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
我们再次来回顾一下什么是 bindings,绑定是交换机和队列之间的桥梁关系。也可以这么理解:
队列只对它绑定的交换机的消息感兴趣。绑定用参数:routingKey 来表示也可称该参数为 binding key,
创建绑定我们用代码:channel.queueBind(queueName, EXCHANGE_NAME, “routingKey”);绑定之后的
意义由其交换类型决定。
5.5.2.
Direct exchange 介绍
上一节中的我们的日志系统将所有消息广播给所有消费者,对此我们想做一些改变,例如我们希
望将日志消息写入磁盘的程序仅接收严重错误(errros),而不存储哪些警告(warning)或信息(info)日志
消息避免浪费磁盘空间。Fanout 这种交换类型并不能给我们带来很大的灵活性-它只能进行无意识的
广播,在这里我们将使用 direct 这种类型来进行替换,这种类型的工作方式是,消息只去到它绑定的
routingKey 队列中去。
在这里插入图片描述
在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。队列 Q1 绑定键为 orange,
队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列
Q1。绑定键为 blackgreen 和的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

5.5.3.
多重绑定
在这里插入图片描述
当然如果 exchange 的绑定类型是 direct,但是它绑定的多个队列的 key 如果都相同,在这种情
况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多,如上图所示。
5.5.4.
实战
在这里插入图片描述
在这里插入图片描述

public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "disk";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "error");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
message="接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
File file = new File("C:\\work\\rabbitmq_info.txt");
FileUtils.writeStringToFile(file,message,"UTF-8");
System.out.println("错误日志已经接收");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = "console";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "info");
channel.queueBind(queueName, EXCHANGE_NAME, "warning");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" 接 收 绑 定 键 :"+delivery.getEnvelope().getRoutingKey()+", 消
息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//创建多个 bindingKey
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("info","普通 info 信息");
bindingKeyMap.put("warning","警告 warning 信息");
bindingKeyMap.put("error","错误 error 信息");
//debug 没有消费这接收这个消息 所有就丢失了
bindingKeyMap.put("debug","调试 debug 信息");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey,
null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息:" + message);
}
}
}
}

5.6. Topics
5.6.1.
之前类型的问题
在上一个小节中,我们改进了日志记录系统。我们没有使用只能进行随意广播的 fanout 交换机,而是
使用了 direct 交换机,从而有能实现有选择性地接收日志。
尽管使用 direct 交换机改进了我们的系统,但是它仍然存在局限性-比方说我们想接收的日志类型有
info.base 和 info.advantage,某个队列只想 info.base 的消息,那这个时候 direct 就办不到了。这个时候
就只能使用 topic 类型
5.6.2.
Topic 的要求
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单
词列表,以点号分隔开。这些单词可以是任意单词,比如说:“stock.usd.nyse”, “nyse.vmw”,
“quick.orange.rabbit”.这种类型的。当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中,其中有两个替换符是大家需要注意的
(星号)可以代替一个单词
#(井号)可以替代零个或多个单词
5.6.3.
Topic 匹配案例
下图绑定关系如下
Q1–>绑定的是
中间带 orange 带 3 个单词的字符串(
.orange.)
Q2–>绑定的是
最后一个单词是 rabbit 的 3 个单词(
..rabbit)
第一个单词是 lazy 的多个单词(lazy.#)
在这里插入图片描述
上图是一个队列绑定关系图,我们来看看他们之间数据接收情况是怎么样的
quick.orange.rabbit
被队列 Q1Q2 接收到
lazy.orange.elephant
被队列 Q1Q2 接收到
quick.orange.fox
被队列 Q1 接收到
lazy.brown.fox
被队列 Q2 接收到
lazy.pink.rabbit
虽然满足两个绑定但只被队列 Q2 接收一次
quick.brown.fox
不匹配任何绑定不会被任何队列接收到会被丢弃
quick.orange.male.rabbit
是四个单词不匹配任何绑定会被丢弃
lazy.orange.male.rabbit
是四个单词但匹配 Q2
当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和
出现,那么该队列绑定类型就是 direct 了

5.6.4.
实战
在这里插入图片描述

public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1-->绑定的是
* 中间带 orange 带 3 个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
* 第一个单词是 lazy 的多个单词(lazy.#)
*
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
bindingKeyMap.put("quick.orange.fox","被队列 Q1 接收到");
bindingKeyMap.put("lazy.brown.fox","被队列 Q2 接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey,
null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q1 队列与绑定关系
String queueName="Q1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明 Q2 队列与绑定关系
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/872481.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pyspark笔记 pyspark.sql.functions

col qqpyspark 笔记 pyspark.sql.function col VS select_UQI-LIUWJ的博客-CSDN博客 取某一列 lit 创建一个包含指定值的列 date_trunc 将日期截取成由第一个参数指定的字符串值 year, yyyy, yy——截取到年month,mon,mm——截取到月day,dd ——截取到天microsecondmillis…

DAY21

题目一 给定三个字符串str1、str2和aim&#xff0c; 如果aim包含且仅包含来自str1和str2的所有字符&#xff0c;而且在aim中属于str1的字符 之间保持原来在str1中的顺序&#xff0c;属于str2的字符之间保持原来在str2中的顺序&#xff0c;那么称aim是str1和str2的交错组成。实…

ESP32-连接参数/间隔更新

连接间隔的设置是一个协商的过程&#xff0c;ESP32提供了一些协商的接口&#xff0c;按顺序分析一下。 Step 1&#xff1a;首先是Client连接时对Server要求的连接间隔&#xff08;确定值&#xff09; 在连接的时候&#xff0c;Client会把当前的连接间隔发送给Server。在Serve…

Vue.js2+Cesium1.103.0 十一、Three.js 炸裂效果

Vue.js2Cesium1.103.0 十一、Three.js 炸裂效果 Demo ThreeModelBoom.vue <template><div:id"id"class"three_container"/> </template><script> /* eslint-disable eqeqeq */ /* eslint-disable no-unused-vars */ /* eslint-d…

物流行业数据分析

文章目录 物流行业数据分析一、数据预处理1、数据清洗① 重复值、缺失值、格式调整② 异常值处理 2、数据规整 二、 数据分析1、配送服务是否存在问题2、是否存在尚有潜力的销售区域3、商品是否存在质量问题 三、总结参考 物流行业数据分析 Excel适合处理低量级数据&#xff0…

【Hystrix技术指南】(7)故障切换的运作流程原理分析(含源码)

背景介绍 目前对于一些非核心操作&#xff0c;如增减库存后保存操作日志发送异步消息时&#xff08;具体业务流程&#xff09;&#xff0c;一旦出现MQ服务异常时&#xff0c;会导致接口响应超时&#xff0c;因此可以考虑对非核心操作引入服务降级、服务隔离。 Hystrix说明 官方…

解决Idea 多模块,maven项目是多层级文件夹的子项时无法加入git管理的问题

问题 多模块项目&#xff0c;引入模块无法做git管理&#xff0c;第一个项目没有git分支标志&#xff0c;也不能像其他项目一样右键出git选项。 解决方法 发现该模块是多层级的文件夹结构&#xff0c;也就是项目本身在一个文件夹下。应该是要管理该文件夹。 Settings-Versi…

python使用装饰器记录方法耗时

思路 python使用修饰器记录方法耗时&#xff0c;目的是每当方法执行完后&#xff0c;可以记录该方法耗时&#xff0c;而不需要在每个方法的执行前后&#xff0c;去创建一个临时变量&#xff0c;来记录耗时。 方式一&#xff08;不推荐&#xff09;&#xff1a; 在每个方法的…

【java面向对象中static关键字】

提纲 static修饰成员变量static修饰成员变量的应用场景static修饰成员方法static修饰成员方法的应用场景static的注意事项static的应用知识&#xff1a;代码块static的应用知识&#xff1a;单例设计模式 static静态的意思&#xff0c;可以修饰成员变量&#xff0c;成员方法&a…

AbstractQueuedSynchronizer

目录 AQS是什么AQS什么样内部类成员变量方法public如果不使用AQS会怎样 AQS的应用ReentrantLockSyncNonfairSyncFairSync 其他实现 AQS是什么 AbstractQueuedSynchronizer&#xff08;AQS&#xff09;是Java中的一个并发工具&#xff0c;位于java.util.concurrent.locks包中&a…

最新Ubuntu LVGL SDL模拟器安装

前言 本文主要说明Ubuntu 23.4安装LVGL 9.0以及基于SDL的模拟环境。 代码下载 访问lv_port_pc_eclipse可以看到相信信息&#xff0c;官方已经打包好了整个代码环境。 安装CMAKE。 sudo apt install cmake安装SDL。 sudo apt-get update && sudo apt-get install …

HTML(JavaEE初级系列12)

目录 前言&#xff1a; 1.HTML结构 1.1认识HTML标签 1.2HTML文件基本结构 1.3标签层次结构 1.4快速生成代码框架 2.HTML常见标签 2.1注释标签 2.2标题标签&#xff1a;h1-h6 2.3段落标签&#xff1a;p 2.4换行标签&#xff1a; br 2.5格式化标签 2.6图片标签&#…

新手家长必读:英国ISEB考试局CEO为低龄留学家庭深度解析ISEB

大家都知道&#xff0c;英国私校备考难就难在流程繁琐&#xff0c;菁英私校笔面试风格各异&#xff0c;考试时间各不相同。但在进入私校笔面试之前&#xff0c;还有非常重要的一步——通过ISEB pre-test考试。      作为英校申请敲门砖&#xff0c;学生在注册之后&#xff…

STM32 F103C8T6学习笔记7:双机无线串口通信

今日尝试配通俩个C8T6单片机之间的无线串口通信&#xff0c;文章提供原理&#xff0c;源码&#xff0c;测试效果图&#xff0c;测试工程下载&#xff1a; 目录 传输不规范问题&#xff1a; 串口通信资源&#xff1a; 单个串口资源理解&#xff1a; 单片机串口资源&#xf…

ModaHub魔搭社区:Milvus Cloud向量数据库不可小觑

向量数据库不可小觑 事实上,向量数据库并不是一个新的数据库技术,只是一直以来并没有什么亮眼的技术突破,因此显得有点“籍籍无名”。然而,当向量检索找到典型应用场景,成为普遍需求后,向量数据库的真正价值才日益凸显。 云和恩墨创始人,中国数据库联盟(ACDU) 主席盖…

[ Docker ] 部署 nps 和 npc 实现内网穿透

nps 原作者已停止维护&#xff0c;现在用 yisier1/nps 云主机上运行 nps 创建目录 mkdir -p /root/docker/nps mkdir /root/repo下载必要文件 Docker 镜像 docker pull yisier1/npsGit 仓库 git clone https://github.com/yisier/nps.git /root/repo cp -r /root/repo/nps…

【密码学】维京密码

维京密码 瑞典罗特布鲁纳巨石上的图案看起来毫无意义&#xff0c;但是它确实是一种维京密码。如果我们注意到每组图案中长笔画和短笔画的数量&#xff0c;将得到一组数字2、4、2、3、3、5、2、3、3、6、3、5。组合配对得到24、23、35、23、36、35。现在考虑如图1.4所示的内容&a…

yolov5部署 单线程与多线程对比

单线程 部署代码可参考&#xff1a; Yolov5 ONNX Runtime 的 C部署_爱钓鱼的歪猴的博客-CSDN博客 main.cpp #include "detector.h" #include <chrono> using namespace std;// 识别线程 void *detect_thread_entry(void *para){}int main(int argc, char *ar…

分布式应用:Zabbix监控MariaDB

目录 一、理论 1.Zabbix监控MariaDB 二、实验 1.Zabbix监控MariaDB 一、理论 1.Zabbix监控MariaDB &#xff08;1&#xff09;环境 zabbix服务端&#xff1a;192.168.204.214 zabbix客户端&#xff1a;192.168.204.215 &#xff08;2&#xff09;MareaDB安装 安装 za…