RabbitMQ消息模型之Routing-Direct

news2025/1/16 16:00:35

Routing Direct

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在向 Exchange发送消息时,也必须指定消息的RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息

image-20191126220145375

  • P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
  • X:Exchange(交换机),接收生产者的消息,然后把消息递交给与routing key完全匹配的队列。
  • C1:消费者,其所在队列指定了需要routing key 为 error 的消息。
  • C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息。

创建生产者

public class MyProducer {

    @Test
    public void test() throws Exception {
        // 交换机
        String exchange = "logs_direct";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 声明交换机
        channel.exchangeDeclare(exchange, "direct");
        for (int i = 0; i < 3; i++) {
            // 发布消息
            channel.basicPublish(exchange, "DEBUG", null, ("DEBUG LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "INFO", null, ("INFO LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "WARN", null, ("WARN LOG -> " + i).getBytes());
            channel.basicPublish(exchange, "ERROR", null, ("ERROR LOG -> " + i).getBytes());
        }
    }
}

创建消费者1

public class MyConsumer1 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_direct";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "direct");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "WARN");
        channel.queueBind(queue, exchange, "ERROR");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

创建消费者2

public class MyConsumer2 {

    public static void main(String[] args) throws Exception {
        // 指定交换机
        String exchange = "logs_direct";

        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("/");
        factory.setHost("xuewei.world");
        factory.setUsername("xuewei");
        factory.setPassword("123456");
        factory.setPort(5672);

        // 创建连接和通道
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 绑定交换机
        channel.exchangeDeclare(exchange, "direct");
        // 创建临时队列
        String queue = channel.queueDeclare().getQueue();
        // 将临时队列绑定exchange
        channel.queueBind(queue, exchange, "DEBUG");
        channel.queueBind(queue, exchange, "INFO");
        // 处理消息
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2: " + new String(body));
                // TODO 业务处理
            }
        });
    }
}

image-20220526182028082

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1269960.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

记录创建粒子的轻量级JavaScript库——particles.js(可用于登录等背景显示)

文章目录 前言一、下载particles.js二、引入particles.js并使用三、配置数据说明如有启发&#xff0c;可点赞收藏哟~ 前言 本文记录使用创建粒子的轻量级JavaScript库 particles.js 可用于登录等背景显示 一、下载particles.js 先下载particles.js库&#xff0c;放在项目libs…

504. 七进制数

这篇文章会收录到 : 算法通关第十三关-青铜挑战数学基础问题-CSDN博客 七进制数 描述 : 给定一个整数 num&#xff0c;将其转化为 7 进制&#xff0c;并以字符串形式输出。 题目 : LeetCode 504. 七进制数 : 504. 七进制数 分析 : 我们先通过二进制想一下7进制数的变化特…

剑指 Offer(第2版)面试题 9:用两个栈实现队列

剑指 Offer&#xff08;第2版&#xff09;面试题 9&#xff1a;用两个栈实现队列 剑指 Offer&#xff08;第2版&#xff09;面试题 9&#xff1a;用两个栈实现队列解法1&#xff1a;模拟拓展&#xff1a;用队列模拟栈 剑指 Offer&#xff08;第2版&#xff09;面试题 9&#xf…

Flutter 控件查阅清单

为了方便记录和使用Flutter中的各种控件&#xff0c;特写此博客以记之&#xff0c;好记性不如烂笔头嘛&#xff1a;&#xff09; 通过控件的首字母进行查找&#xff0c;本文会持续更新 控件目录 AAppBar BCContainerColumn &#xff08;列&#xff09; DDivider (分割线) EElev…

数据结构day4作业

1.单链表任意位置删除 datetype pos;printf("please input pos");scanf("%d",&pos);headdelete_all(head,pos);Output(head);Linklist delete_all(Linklist head,datetype pos) {if(pos<1||pos>length(head)||headNULL)return head;if(head->…

Android中实现RecyclerView,并对item及其多个子控件的点击事件监听

目录 背景 实现RecyclerView 第一步、 新建item的xml 第二步、在activity的布局中引入 RecyclerView 第三步、新建一个adapter 第四步、在activity中初始化绑定adapter即可 实现item及其多个子组件点击事件监听 第一步、 适配器中创建监听对象 第二步、适配器中绑定监听…

【Git】ssh: connect to host github.com port 22: Connection refused

错误展示&#xff1a; 错误原因&#xff1a;22端口被拒绝访问 解决办法 在~/.ssh/config文件&#xff08;有就直接编辑&#xff0c;没有就创建&#xff09;里添加以下内容&#xff0c;这样ssh连接GitHub的时候就会使用443端口。 Host github.comHostname ssh.github.comPort…

mac 聚焦搜索不显示

我是连搜索框都不显示&#xff0c;不是搜索结果显示异常 点右上角的搜索按钮都毫无反应 我检查过快捷键之类的设置&#xff0c;都正常&#xff0c;最后是通过删除文件解决的 cd ~/Library/Preferences/ rm com.apple.Spotlight.plist 重启 mac 参考 Spotlight Search Not W…

串口理解小结(UART)

串口作为单片机的必备外设&#xff0c;主要用于单片机与其它模块的信息通讯、程序烧录和升级作用。 UART全称为通用异步收发器。 可分为&#xff1a; 一、串行和并行 串行指数据位只能一位一位地发送 并行之多个数据位同时发送 二、同步和异步 同步和异步是相当于时钟而…

linux 服务 下 redis 安装和 启动

官网下载 https://redis.io/download/ 安装步骤&#xff1a; 1.安装redis 所需要的依赖 yum install -y gcc tcl2.上传安装包并解压&#xff0c;下载安装包&#xff0c;上传到/usr/local/src目录&#xff0c;解压 tar -zxvf redis-7.2.3.tat.gz进入安装目录&#xff0c;运行…

基于Java SSM框架实现美食推荐管理系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现美食推荐管理系统演示 摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;管理工作的重要性已逐渐被人们所认识&a…

zookeeper 客户端常用命令简单记录(实操课程系列--watcher功能测试)(发布订阅功能测试)

本系列是zookeeper相关的实操课程&#xff0c;课程测试环环相扣&#xff0c;请按照顺序阅读测试来学习zookeeper。阅读本文之前&#xff0c;请先阅读----zookeeper 单机伪集群搭建简单记录&#xff08;实操课程系列&#xff09; 1、命令行工具切换到zookeeper的bin目录下面&am…

【计算机网络笔记】交换机

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

Java零基础——docker篇

1.【熟悉】docker简介 1.1 什么是docker Docker是一个开源项目&#xff0c;诞生于2013年初&#xff0c;最初是dotCloud公司内部的一个业余项目。它基于Google公司推出的Go语言实现。项目后来加入了Linux基金会&#xff0c;遵从了Apache2.0协议&#xff0c;项目代码在GitHub上进…

【开源视频联动物联网平台】流媒体传输协议HLS,FLV的功能和特点

HLS&#xff08;HTTP Live Streaming&#xff09;和FLV&#xff08;Flash Video&#xff09;都是用于视频流传输的协议或容器格式&#xff0c;但它们在某些方面有着显著的区别和特点。 HLS是一种由苹果公司开发的用于流媒体传输的协议&#xff0c;而FLV则是Adobe公司开发的用于…

【排序】希尔排序(C语言实现)

文章目录 前言1. 希尔排序的思想2. 希尔排序的一些小优化 前言 本章将详细介绍希尔排序的思想及实现&#xff0c;由于希尔排序是在插入排序的思想上进行升华&#xff0c;所以如果不知道插入排序或者不熟悉的可以先看看这篇文章&#xff1a;《简单排序》中的直接插入排序。 1. 希…

AppDelete 4.3.3(软件清理卸载工具)

AppDelete for Mac是一款运行在Mac平台上的强大软件卸载工具&#xff0c;AppDelete Mac版不仅可以删除应用程序&#xff0c;还可以删除小部件&#xff0c;首选项窗格&#xff0c;插件和屏幕保护程序及其相关文件&#xff0c;卸载快速又干净&#xff0c;仅需要简单的拖拽即可。 …

国内首个农业开源鸿蒙操作系统联合华为正式发布

2023年11月29日&#xff0c;在中国国际供应链促进博览会上&#xff0c;中信农业科技股份有限公司&#xff08;简称“中信农业”&#xff09;与深圳开鸿数字产业发展有限公司&#xff08;简称“深开鸿”&#xff09;以及华为技术有限公司&#xff08;简称“华为”&#xff09;联…

智能手表上的音频(四):语音通话

上篇讲了智能手表上音频文件播放。本篇开始讲语音通话。同音频播放一样有两种case&#xff1a;内置codec和BT。先看这两种case下audio data path&#xff0c;分别如下图&#xff1a; 内置codec下的语音通话audio data path 蓝牙下的语音通话audio data path 从上面两张图可以看…

函数保留凸性的一些运算,限制为一条线

凸优化在学术研究中非常重要&#xff0c;经常遇到的问题是证明凸性。常规证明凸性的方式是二阶导数的黑塞矩阵为半正定&#xff0c;或者在一维函数时二阶导数大于等于零。但很多时候的数学模型并不那么常规、容易求导的&#xff0c;若能够知道一些保留凸性的运算&#xff0c;将…