RabbitMQ---订阅模型-Direct

news2024/12/24 3:18:59

1、 订阅模型-Direct

• 有选择性的接收消息
• 在订阅模式中,生产者发布消息,所有消费者都可以获取所有消息。
• 在路由模式中,我们将添加一个功能 - 我们将只能订阅一部分消息。
例如,我们只能将重要的错误消息引导到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
• 但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
• 在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
• 消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。
在这里插入图片描述

• P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
• X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
• C1:消费者,其所在队列指定了需要routing key 为 error 的消息
• C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

1.1、生产者

此处我们模拟商品的增删改,发送消息的RoutingKey分别是:insert、update、delete

public class Send {
   private final static String EXCHANGE_NAME = "direct_exchange_test";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明exchange,指定类型为direct
       channel.exchangeDeclare(EXCHANGE_NAME, "direct");
       // 消息内容
       String message = "商品新增了, id = 1001";
       // 发送消息,并且指定routing key 为:insert ,代表新增商品
       channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());
       System.out.println(" [商品服务:] Sent '" + message + "'");
       channel.close();
       connection.close();
   }
}

1.2、消费者1

我们此处假设消费者1只接收两种类型的消息:更新商品和删除商品。

public class Recv {
   private final static String QUEUE_NAME = "direct_exchange_queue_1";
   private final static String EXCHANGE_NAME = "direct_exchange_test";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       // 绑定队列到交换机,同时指定需要订阅的routing key。假设此处需要update和delete消息
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
       // 定义队列的消费者
       DefaultConsumer consumer = new DefaultConsumer(channel) {
           // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                   byte[] body) throws IOException {
               // body 即消息体
               String msg = new String(body);
               System.out.println(" [消费者1] received : " + msg + "!");
           }
       };
       // 监听队列,自动ACK
       channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

1.3、 消费者2

我们此处假设消费者2接收所有类型的消息:新增商品,更新商品和删除商品。

public class Recv2 {
   private final static String QUEUE_NAME = "direct_exchange_queue_2";
   private final static String EXCHANGE_NAME = "direct_exchange_test";
   public static void main(String[] argv) throws Exception {
       // 获取到连接
       Connection connection = ConnectionUtil.getConnection();
       // 获取通道
       Channel channel = connection.createChannel();
       // 声明队列
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       // 绑定队列到交换机,同时指定需要订阅的routing key。订阅 insert、update、delete
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
       channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
       // 定义队列的消费者
       DefaultConsumer consumer = new DefaultConsumer(channel) {
           // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                   byte[] body) throws IOException {
               // body 即消息体
               String msg = new String(body);
               System.out.println(" [消费者2] received : " + msg + "!");
           }
       };
       // 监听队列,自动ACK
       channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

1.4、测试

我们分别发送增、删、改的RoutingKey,发现结果:

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/932712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网站常见安全漏洞 | 青训营

Powered by:NEFU AB-IN 文章目录 网站常见安全漏洞 | 青训营 网站基本组成及漏洞定义服务端漏洞**SQL注入****命令执行****越权漏洞****SSRF****文件上传漏洞** 客户端漏洞**开放重定向****XSS****CSRF****点击劫持****CORS跨域配置错误****WebSocket** 网站常见安全漏洞 | 青训…

软件架构业务及技术复杂度分析总结

目录 一、综述分析 二、业务复杂性分析 (一)领域建模 (二)领域分层 (三)服务粒度 (四)流程编排 三、技术复杂性分析 (一)高可用 底层逻辑 CAP原则 …

Mac OS 13.4.1 搜狗输入法导致的卡顿问题

一、Mac OS 系统版本 搜狗输入法已经更新到最新 二、解决方案 解决方案一 在我的电脑上面需要关闭 VSCode 和 Chrmoe 以后,搜狗输入法回复正常。 解决方案二 强制重启一下搜狗输入法。 可以用 unix 定时任务去隔 2个小时自动 kill 掉一次进程 # kill 掉 mac …

EWM怎么取消pinking,SAP_EWM取消拣配报错处理方式

EWM是SAP的一个模块,代表扩展仓库管理(Extended Warehouse Management),是SAP企业资源计划(ERP)的一部分。它提供了一个完整的、高级的仓库管理解决方案,支持企业在全球范围内的仓库管理、订单管…

QGIS学习1-入门学习

QGIS作为一个广受欢迎的开源GIS,很多GIS的学生都了解过。但是因为学校老师都是教的Arcgis,因此很少去充分的学习。QGIS和arcgis一样,有完整的官方帮助文档,我也是要根据官方的帮助文档进行学习等。 https://www.qgis.org/zh-Hans/…

一文看懂Cat.1、Cat.4、NB-IOT、4G之间的区别

01 什么是Cat.1? Cat.1的全称是LTEUE-Category1,其中UE指的是用户设备,它是LTE网络下用户终端设备的无线性能的分类。根据3GPP的定义,UE类别以1-15分为15个等级。Cat.1,可以称为“低配版”的 4G 终端,上行…

微信小程序发布迭代版本后如何提示用户强制更新新版本

在点击小程序发布的时候选择,升级选项 之前用户使用过的再打开小程序页面就会弹出升级弹窗modal

企业博客搭建:经营好企业博客,能让你的业务蹭蹭上涨!

企业博客本身作为企业产品知识的沉淀,搭建并且经营好企业博客不仅有利于企业文化建设,更可以利用博客来推动业务增长。 何谓企业博客营销?简单地说,就是利用HelpLook这种工具创建并开展网络营销活动,称之为博客营销。 …

Linux学习之nginx虚拟域名主机,lsof和netstat命令查看端口是否被监听

需要先参考我的博客《Linux学习之Ubuntu 20.04在https://openresty.org下载源码安装Openresty 1.19.3.1,使用systemd管理OpenResty服务》安装好Openresty。 虚拟域名可以使用让不同的域名访问到同一台主机。 cd /usr/local/openresty切换当前访问目录到/usr/local/o…

stm32之USART(总结)

串行通信 UART串口内部结构示意图 普中科技的详细介绍 中断知识补充 代码 #ifndef __USART_H #define __USART_H #include "stdio.h" #include "stm32f10x_usart.h" #define USART1_REC_LEN 200 //定义最大接收字节数 200extern u8 USART1_RX_BUF[US…

LeetCode--HOT100题(42)

目录 题目描述:108. 将有序数组转换为二叉搜索树(简单)题目接口解题思路代码 PS: 题目描述:108. 将有序数组转换为二叉搜索树(简单) 给你一个整数数组 nums ,其中元素已经按 升序 排列&#xf…

Linux 应用 Segmentation fault 分析手段

前言 本文主要介绍,在Linux 下应用程序发生Segmentation fault 错误时,如何使用gdb 通过core dump文件查找错误具体发生的地方。 一、生成core dump文件 在板子上执行ulimit -c 或者 ulimit -a 命令查看core 文件大小的配置情况,如下图所示 此时 “ core file size ”大小…

芯讯通SIMCOM A7680C (4G Cat.1)AT指令测试 TCP通信过程

A7680C TCP通信 1、文档准备 去SIMCOM官网找到A7680C的AT指令集 AT指令官网 进入官网有这么多AT指令文件,只需要找到你需要用到的,这里我们用到了HTTP和TCP的,所以下载这两个即可。 2、串口助手 任意准备一个串口助手即可 这里我使用的是XC…

EWM是什么

EWM是SAP的一个模块,代表扩展仓库管理(Extended Warehouse Management),是SAP企业资源计划(ERP)的一部分。它提供了一个完整的、高级的仓库管理解决方案,支持企业在全球范围内的仓库管理、订单管…

python+TensorFlow实现人脸识别智能小程序的项目(包含TensorFlow版本与Pytorch版本)

pythonTensorFlow实现人脸识别智能小程序的项目(包含TensorFlow版本与Pytorch版本) 一:TensorFlow基础知识内容部分(简明扼要,快速适应)1、下载Cifar10数据集,并进行解压缩处理2、将Cifar10数据…

STL-常用容器-queue 容器(队列)

1 queue 基本概念 概念:Queue是一种先进先出(First In First Out,FIFO)的数据结构,它有两个出口。 队列容器允许从一端新增元素,从另一端移除元素 队列中只有队头和队尾才可以被外界使用,因此队列不允许有遍历行为 队列中进数据…

macbookpro如何清理系统数据 macbookpro怎么删除软件

Macbook Pro是苹果公司的一款高性能笔记本电脑,它拥有强大的硬件和流畅的操作系统。然而,随着时间的推移,你可能会发现你的Macbook Pro变得越来越慢,储存空间也越来越紧张。这时候,你需要对你的Macbook Pro进行一些清理…

CGY-OS 正式开源!【软件编写篇】

上一篇文章:CGY-OS 正式开源!_cgy091107的博客-CSDN博客 一、软件编写基础要求 在编写CGY-OS的应用程序之前,您需要: 1. 安装python3.10,配置好CGY-OS。 2.掌握python3的基本语法、lambda表达式、各种简单的数据结构。…

基于Python3 的 简单股票 可转债 提醒逻辑

概述 通过本地的定时轮训,结合本地建议数据库。检查股票可转债价格的同事,进行策略化提醒 详细 前言 为什么会有这么个东西出来呢,主要是因为炒股软件虽然有推送,但是设置了价格之后,看到推送也未必那么及时&#…

数据结构(Java实现)-ArrayList与顺序表

什么是List List是一个接口,继承自Collection。 List的使用 List是个接口,并不能直接用来实例化。 如果要使用,必须去实例化List的实现类。在集合框架中,ArrayList和LinkedList都实现了List接口。 线性表 线性表(lin…