RabbitMQ交换机与队列

news2024/11/16 12:42:53

交换机

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。
相反, 生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单, 一方面它接收来自生产者的消息,另一方面将它们推入队列。 交换机必须确切知道如何处理收到的消息。 是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。 这就的由交换机的类型来决定。

Exchanges 的类型

  1. 直接(direct)
  2. 主题(topic)
  3. 标题(headers) ,
  4. 扇出(fanout)

声明交换机

//参数:(交换机名称,交换机类型)
channel.exchangeDeclare(EXCHANEG_NAME,type);

默认 exchange

默认交换机使用空串标识,消息发送到交换机,交换机根据路由发送到队列中。
默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

channel.basicPublic("",queueName,props,message);

临时队列

临时队列,当消费者断开连接后,临时队列将被删除
临时队列声明

String queueName=channel.queueDeclare().getQueue();

绑定(bindings)

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系。
将队列绑定到交换机。

//参数:(队列,交换机,路由)
channel.queueBind(queueName,EXCHANGE_NAME,RouterKey);

Fanout

扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。
因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:

  • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
  • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
  • 分发系统使用它来广播各种状态和配置更新
  • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP没有内置presence的概念,因此XMPP可能会是个更好的选择)

Fanout 这种类型它是将接收到的所有消息广播到它知道的所有队列中。
image.png

public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机 参数(交换机,交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明临时队列
        String queue = channel.queueDeclare().getQueue();
        //队列绑定交换机 参数(队列,交换机,routingKey)
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("Receive02等待消息....");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("Receive02接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(queue,true,askCallback,consumerTag->{});
    }
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机 参数(交换机,交换机类型)
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        //声明临时队列
        String queue = channel.queueDeclare().getQueue();
        //队列绑定交换机 参数(队列,交换机,routingKey)
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("Receive02等待消息....");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("Receive02接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(queue,true,askCallback,consumerTag->{});
    }
public static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发出消息:"+message);
        }

消费者01和消费者02都将接收到生产者发送的消息

Direct exchange

直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应队列的。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

  • 将一个队列绑定到某个交换机上,同时赋予该绑定一个路由键(routing key)
  • 当一个携带着路由键为R的消息被发送给直连交换机时,交换机会把它路由给绑定值同样为R的队列。

直连交换机经常用来循环分发任务给多个工作者(workers)。当这样做的时候,我们需要明白一点,在AMQP 0-9-1中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。
image.png
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列Q1。 绑定键为 blackgreen 和的消息会被发布到队列 Q2, 其他消息类型的消息将被丢弃

多重绑定

当然如果 exchange 的绑定类型是 direct, 但是它绑定的多个队列的 key 如果都相同,在这种情况下虽然绑定类型是 direct 但是它表现的就和 fanout 有点类似了,就跟广播差不多
image.png

Topics

主题交换机(topic exchanges)通过对消息的路由键和队列到交换机的绑定模式之间的匹配,将消息路由给一个或多个队列。主题交换机经常用来实现各种分发/订阅模式及其变种。主题交换机通常用来实现消息的多播路由(multicast routing)。
发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。
这些单词可以是任意单词,比如说: “stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”.这种类型的。
当然这个单词列表最多不能超过 255 个字节。
在这个规则列表中
(星号)可以代替一个单词
#(井号)可以替代零个或多个单词

image.png
当队列绑定关系是下列这种情况时需要引起注意
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
如果队列绑定键当中没有#和
出现,那么该队列绑定类型就是 direct 了

	private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName="Q1";
        channel.queueDeclare(queueName,false,false,false,null);
        //队列绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
        System.out.println("等待接受消息...");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("接收到的消息为:"+ new String(message.getBody(),"UTF-8"));
            System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName,true,askCallback,consumerTag->{});
    }
	private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        //声明队列
        String queueName="Q2";
        channel.queueDeclare(queueName,false,false,false,null);
        //队列绑定
        channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
        System.out.println("等待接受消息...");
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("接收到的消息为:"+ new String(message.getBody(),"UTF-8"));
            System.out.println(" 接 收 队 列 :"+queueName+" 绑 定 键:"+message.getEnvelope().getRoutingKey()+",消息:"+message);
        };
        channel.basicConsume(queueName,true,askCallback,consumerTag->{});
    }
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"topic");
        Map<String,String> binding=new HashMap<>();
        binding.put("quick.orange.rabbit","被队列 Q1Q2 接收到");
        binding.put("lazy.orange.elephant","被队列 Q1Q2 接收到");
        binding.put("quick.orange.fox","被队列 Q1 接收到");
        binding.put("lazy.brown.fox","被队列 Q2 接收到");
        binding.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列 Q2 接收一次");
        binding.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
        binding.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
        binding.put("lazy.orange.male.rabbit","是四个单词但匹配 Q2");
        for (Map.Entry<String, String> stringEntry : binding.entrySet()) {
            String routingKey = stringEntry.getKey();
            String message = stringEntry.getValue();
            channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送消息:"+message);
        }
    }

队列

声明队列

channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,arguments);

参数介绍:
1、QUEUE_NAME: 队列的名称;
2、durable: 是否持久化;
3、exclusive: 是否独享、排外的;
4、autoDelete: 是否自动删除;
5、arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:
(1)x-message-ttl:消息的过期时间,单位:毫秒;
(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

死信队列

死信,顾名思义就是无法被消费的消息 一般来说, producer 将消息投递到 broker 或者直接到 queue 里了, consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源 :

  1. 消息 TTL 过期
  2. 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  3. 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
public class Consumer01 {
    public static final String NORMAL_EXCHANGE="normal_exchange";
    public static final String DEAD_EXCHANGE="dead_exchange";
    public static final String NORMAL_QUEUE="normal_queue";
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        //正常队列绑定死信队列信息
        Map<String, Object> arguments=new HashMap<>();
        //正常队列设置死信交换机 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置队列最大长度
        //arguments.put("x-max-length",6);
        //声明普通队列
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
        //绑定普通队列和死信队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        //成功时的回调
        DeliverCallback askCallback=(consumerTag,message)->{
            //拒绝消息 需要手动应答
            //channel.basicReject();
            System.out.println("Consumer01接收到的消息为:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(NORMAL_QUEUE,true,askCallback,consumerTag->{});
    }
}
public class Consumer02 {
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtil.getChannel();
        DeliverCallback askCallback=(consumerTag,message)->{
            System.out.println("Consumer02接收到的消息:"+new String(message.getBody(),"UTF-8"));
        };
        channel.basicConsume(DEAD_QUEUE,true,askCallback,consumerTag->{});
    }
}

延迟队列

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

RabbitMQ 中的 TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间
单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

Map<String,Object> arguments=new HashMap<>();
arguments.put("x-message-ttl",5000)
channel.queueDeclare(queueName,durable,exclusive,autoDelete,arguments);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1407960.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3.chrony服务器

目录 1. 简介 1.1. 重要性 1.2. Linux的两个时钟 1.3. 设置日期时间 1.3.1. timedatectl命令设置 1.3.2. date命令设置 1.4. NTP 1.5. Chrony介绍 2. 安装与配置 2.1. 安装&#xff1a; 2.2. Chrony配置文件分析 2.3. 同步时间服务器 2.3.1. 授时中心 2.3.2. 实验…

基于机会网络编码(COPE)的卫星网络路由算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.本算法原理 4.1机会网络编码&#xff08;COPE&#xff09;概述 4.2COPE算法原理 4.2.1 编码机会预测 4.2.2 编码决策 4.2.3 数据包编码 4.2.4 数据包传输 4.2.5 数据包解码 5.完整程序 1.程序功能…

notepad++ v8.5.3 安装插件,安装失败怎么处理?下载进度为0怎么处理?

notepad v8.5.3 安装插件&#xff0c;安装失败&#xff1f;下载进度为0&#xff0c;怎么处理&#xff1f; 安装 进度 进度条没有进度 &#xff0c;然后就退出了&#xff0c;自动打开程序&#xff0c;不知道什么问题&#xff0c;插件管理下面也没有插件显示 找到问题了&#x…

【python文件】生成的csv文件没两行数据之间有一个空行

问题描述 用python代码将数据写入csv文件&#xff0c;但生成的csv文件没两行数据之间有一个空行&#xff0c;如下图所示&#xff1a; 解决办法 在open函数中添加newline&#xff0c;如以下代码所示&#xff0c;即可解决这一问题。 with open(r"C:\Users\xxx\Desktop\DR…

PPP协议原理介绍+报文分析+配置指导-RFC1661

个人认为&#xff0c;理解报文就理解了协议。通过报文中的字段可以理解协议在交互过程中相关传递的信息&#xff0c;更加便于理解协议。 因此本文将在PPP协议报文的基础上进行介绍。 关于PPP协议基本原理&#xff0c;可参考RFC1661-The Point-to-Point Protocol (PPP)。 关于P…

建议CSDN不要这样吃人xue馒头

程序员裁员潮&#xff1a;技术变革下的职业危机 2023年以来&#xff0c;谷歌、阿里巴巴各个科技公司都在裁员&#xff0c;程序员的日子也不好过。 讨论在技术变革下&#xff0c;裁员对于程序员的影响到底有多大&#xff0c;是非常有意义的话题&#xff0c;但是为什么要用“一…

QT下载、安装详细教程[Qt5.15及Qt6在线安装,附带下载链接]

QT5.15及QT6的下载和安装 1.下载1.1官网下载1.2国内镜像网站下载 2.安装3.软件启动及测试程序运行3.1Qt Creator&#xff08;Community&#xff09; 1.下载 QT自Qt5.15版本后不在支持离线安装包下载(非商业版本&#xff0c;开源)&#xff0c;故Qt5.15及Qt6需要使用在线安装程序…

AppDesigner语音滤波器设计——IIR、IIR、维纳滤波、卡尔曼滤波、自适应滤波

1.AppDesigner简介 App Designer是一个可视化的集成开发环境&#xff0c;提供了仪表、旋钮等组件&#xff0c;采用面向对象的设计方法。利用App Designer可以快速开发出应用程序。App Designer提供了各种UI组件&#xff0c;如按钮、文本框、图表等&#xff0c;以及用于布局和设…

数据链路层——笔记·续

使用集线器的星形拓扑 传统以太网传输媒体&#xff1a;粗同轴电缆 -> 细同轴电缆 -> 双绞线。 采用双绞线的以太网采用星形拓扑。 在星形的中心则增加了一种可靠性非常高的设备&#xff0c;叫做集线器 (hub)。 传统以太网使用同轴电缆&#xff0c;采用总线形拓扑结构&am…

C++ Qt day1

提示并输入一个字符串&#xff0c;统计该字符中大写、小写字母个数、数字个数、空格个数以及其他字符个数(要求使用C风格字符串完成) #include <iostream> #include <string.h> #include <array> using namespace std;int main() {string str;cout <<…

100T数据存进服务器分几步?

大家好&#xff0c;我是豆小匠。 这期来聊聊数据存储相关的问题&#xff0c;包括&#xff1a; 容量评估。技术选型。容灾处理。 另外&#xff0c;文末赠送免费定制红包封面哦&#xff01; 1. 容量评估 通过对容量&性能的评估&#xff0c;可以把业务需求转化成技术语言描…

上位机图像处理和嵌入式模块部署(qt插件的使用)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 一个软件一般有很多的功能&#xff0c;但是主流程只有一个。但在软件开发的过程当中&#xff0c;一般来说功能是需要不断添加的&#xff0c;但是主…

C++ -- 入门(引用)

1.引用 1.1引用的概念 引用不是新定义一个变量&#xff0c;而是给已存在变量取了一个别名&#xff0c;编译器不会为引用变量开辟内存空间&#xff0c;它和它引用的变量共用同一块内存空间。 比如&#xff1a;李逵&#xff0c;在家称为"铁牛"&#xff0c;江湖上人称&q…

sky_take_out

day01&#xff1a; 前端网址通过nginx访问后端网址&#xff08;前后网址不一致&#xff09;&#xff0c;有三个好处&#xff1a; 一是提高访问速度&#xff0c;二是进行负载均衡&#xff0c;三是保障后端安全性 用md5加密了密码 后端使用knife4j调试,用Swagger生成接口文档&am…

MySQL怎么根据当前时间获取连续十二个月统计数据

需求 在某些业务场景中&#xff0c;需要后台获取连续十二个月的统计数据&#xff0c;如下图&#xff1a; 解决方式 1、创建一张临时表&#xff0c;在表中插入序号数据 该表的最大数量决定统计返回的最大条数 CREATE TABLE sys_redundancy (id bigint(22) NOT NULL AUTO_I…

搭建nodejs服务器

简单搭建nodejs服务器&#xff0c;用于爬虫js逆向. 1、安装镜像源 下载nrm npm install -g nrm 设置下载源&#xff1a;&#xff08;最好使用npm源或者淘宝源&#xff09; 例子&#xff1a;npm config set registry http://registry.npmjs.org 查看是否设置成功&#xff1a…

伊恩·斯图尔特《改变世界的17个方程》麦克斯韦方程方程笔记

它告诉我们什么&#xff1f; 电和磁并不会随便乱跑。旋转的电场区域会产生垂直于旋转方向的磁场。旋转的磁场区域也会产生垂直于旋转方向的电场&#xff0c;但方向相反。 为什么重要&#xff1f; 这是物理力的第一次重大统一&#xff0c;表明电和磁是密切相关的。 它带来了什么…

软考复习之多媒体篇

常用的计算公式 数据传输率&#xff08;单位:b/s&#xff09; 未压缩的数据传输率 采样频率&#xff08;Hz&#xff09;* 量化位数&#xff08;位&#xff09;* 声道数 波形声音经过数字化后的信息数据量&#xff08;单位:字节&#xff09; 声音信号数据量 数据传输率 * …

Excel导出警告:文件格式和拓展名不匹配

原因描述&#xff1a; Content-Type 原因&#xff1a;Content-Type&#xff0c;即内容类型&#xff0c;一般是指网页中存在的Content-Type&#xff0c;用于定义网络文件的类型和网页的编码&#xff0c;决定文件接收方将以什么形式、什么编码读取这个文件&#xff0c;这就是经常…

HTML 入门手册(一)

目录 HTML介绍 1-基础语法 单标签 双标签 整体结构 2-标题和水平线 标题 水平线 3-段落和换行 段落 换行 4-列表 无序列表 有序列表 嵌套列表 5-div和span div span 6-格式化标签 粗体 斜体 下划线中划线 上标和下标 7-超链接(a标签) 链接到URL 链接…