RabbitMQ入门案例之Direct模式

news2025/1/22 12:14:19

前言

RabbitMQ的Direct模式是一种可以根据指定路由key,Exchang将消息发送到具有该路由key下的Queue下进行存储。也就类似于将数据写进指定数据库表中。这个路由Key可以类比为SQL语句中的:where routeKey = …

官方文档地址:https://www.rabbitmq.com/getstarted.html

什么是Direct模式

RabbitMQ中的Direct模式是一种消息传输模式,通常使用Direct Exchange(直连交换机)实现。

在Direct模式中,生产者将消息发送到交换机,并指定消息的Routing Key(路由键)。交换机会将Routing Key与队列绑定进行匹配,如果匹配成功,则将该消息路由到对应的队列中。如果没有匹配成功,该消息将被丢弃或返回给生产者。在Direct模式中,每个消息只能被一个消费者接收。

Direct模式常用于一对一的场景,例如订单管理系统中将订单分配给特定的处理队列。

通过使用Exchange和Routing Key来进行消息传输,Direct模式实现了消息的有选择性地路由,提高了消息传输的效率,减少了系统负载。
在这里插入图片描述

实操

实操准备工作

在开始使用代码进行操作前,我们先到管理界面构造一个Direct交换机,如下图:
在这里插入图片描述
为其绑定Queue,同时设置这个Queue的route key,如下图:
在这里插入图片描述
最终绑定结果:
在这里插入图片描述
既然交换机和队列已经准备好,接下来就是准备依赖与代码了

<!--RabbitMQ依赖-->
<dependency>
	<groupId>com.rabbitmq</groupId>
	<artifactId>amqp-client</artifactId>
	<version>5.10.0</version>
</dependency>

生产者代码

public class Producer {
    public static void main(String[] args) {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            String message = "宇宙无敌爱学习";
            String  exchangeName = "direct_exchange";
            String routingKey1 = "class";
            String routingKey2 = "student";
            // 7: 发送消息给中间件rabbitmq-server
            // @params1: 交换机exchange
            // @params2: 队列名称/routingkey
            // @params3: 属性配置
            // @params4: 发送消息的内容
            channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
            channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
            System.out.println("消息发送成功!");
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

消费者代码

public class Consumer {
    private static Runnable runnable = () -> {
        // 1: 创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2: 设置连接属性
        connectionFactory.setHost("ip地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        //获取队列的名称
        final String queueName = Thread.currentThread().getName();
        Connection connection = null;
        Channel channel = null;
        try {
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 5: 申明队列queue存储消息
            /*
             *  如果队列不存在,则会创建
             *  Rabbitmq不允许创建两个相同的队列名称,否则会报错。
             *
             *  @params1: queue 队列的名称
             *  @params2: durable 队列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
             *  @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
             *  @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
             * */
            // 这里如果queue已经被创建过一次了,可以不需要定义
            //channel.queueDeclare("queue1", false, false, false, null);
            // 6: 定义接受消息的回调
            Channel finalChannel = channel;
            finalChannel.basicConsume(queueName, true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println(queueName + ":收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {
                }
            });
            System.out.println(queueName + ":开始接受消息");
            System.in.read();
        } catch (Exception ex) {
            ex.printStackTrace();
            System.out.println("发送消息出现异常...");
        } finally {
            // 7: 释放连接关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    };
    public static void main(String[] args) {
        // 启动三个线程去执行
        new Thread(runnable, "queue1").start();
        new Thread(runnable, "queue2").start();
        new Thread(runnable, "queue3").start();
    }
}

在生产者代码中,我们定义了两个route key,如下图:
在这里插入图片描述
在这两个路由key的驱使下,生产者的消息便只会被放到我们刚刚在direct_exchange交换机中具有这两个路由key的Queue中,我们来执行代码验证一下。

生产者执行结果
在这里插入图片描述
管理界面效果
在这里插入图片描述
可以看出,消息就只放进了queue2和queue3中,这是符合我们预期的。
消费者执行结果,如下:在这里插入图片描述
管理界面效果:
在这里插入图片描述
可以看出,消息也被成功取出去。

以上便是Direct模式的全部内容,仅个人笔记使用
感谢阅读

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/631119.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DragGAN部署全流程

写在前面 看了DragGAN 官方&#xff0c;并没有找到软件&#xff0c;或者程序&#xff0c;github上也没有程序&#xff0c;如果大佬们能找到&#xff0c;可以评论通知下。不过也有技术大佬已经提前开发出来了&#xff0c;我们抢先体验下。 这里本地部署了 DragGAN。经历了报错&…

【LeetCode】HOT 100(5)

题单介绍&#xff1a; 精选 100 道力扣&#xff08;LeetCode&#xff09;上最热门的题目&#xff0c;适合初识算法与数据结构的新手和想要在短时间内高效提升的人&#xff0c;熟练掌握这 100 道题&#xff0c;你就已经具备了在代码世界通行的基本能力。 目录 题单介绍&#…

CTFShow-WEB入门篇--命令执行详细Wp

WEB入门篇--命令执行详细Wp 命令执行&#xff1a;Web29&#xff1a;Web30&#xff1a;Web31&#xff1a;web32&#xff1a;web33&#xff1a;web34&#xff1a;web35&#xff1a;web36&#xff1a;web37&#xff1a;web38&#xff1a; CTFShow 平台&#xff1a;https://ctf.sho…

【Kubernetes资源篇】Service四层代理入门实战详解

文章目录 一、Service四层代理概念、原理1、Service四层代理概念2、Service工作原理3、Service原理解读4、Service四种类型 二、Service四层代理三种类型案例1、创建ClusterIP类型Service2、创建NodePort类型Service3、创建ExternalName类型Service 三、拓展1、Service域名解析…

Nvidia AGX Orin MAX9296 GMSL 载板设计要点

因为项目的需求&#xff0c;我们设计了Nvidia AGX Orin MAX9296 GMSL 载板(绿板&#xff09;&#xff0c;项目完成&#xff0c;总结以下。需要参考原理图的&#xff0c;可以微我&#xff0c;索取。共同交流。 Jetson AGX Orin建立在NVIDIA Ampere架构之上&#xff0c;全新Jets…

AUTOSAR-BSW EEPROM模块解读

参考文件 AUTOSAR_SWS_EEPROMDriver&#xff08;4.3.0&#xff09; AUTOSAR_SWS_BSWGeneral&#xff08;4.3.0&#xff09; EEPROM Module 文件结构 如上图所示 EEPROM Module应该主要包含Eep.c,Eep.h,Eep_Cfg.c,Eep_MemMmap.h,Eep_Lcfg.c和Eep_PBcfg.c文件&#xff0c;如果使…

图像分类模型嵌入flask中开发PythonWeb项目

图像分类模型嵌入flask中开发PythonWeb项目 图像分类是一种常见的计算机视觉任务&#xff0c;它的目的是将输入的图像分配到预定义的类别中&#xff0c;如猫、狗、花等。图像分类模型是一种基于深度学习的模型&#xff0c;它可以利用大量的图像数据来学习图像的特征和类别之间…

华为OD机试之找终点

找终点 题目描述 给定一个正整数数组&#xff0c;设为nums&#xff0c;最大为100个成员&#xff0c;求从第一个成员开始&#xff0c;正好走到数组最后一个成员&#xff0c;所使用的最少步骤数。 要求&#xff1a; 第一步必须从第一元素开始&#xff0c;且1<第一步的步长<…

100天精通Golang(基础入门篇)——第3天:Go语言的执行原理及常用命令、编码规范和常用工具

&#x1f337; 博主 libin9iOak带您 Go to Golang Language.✨ &#x1f984; 个人主页——libin9iOak的博客&#x1f390; &#x1f433; 《面试题大全》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &#x1f30a; 《I…

java进阶—线程间通信(通俗易懂等待唤醒机制)

今天我们来看一看&#xff0c;线程之间的通信&#xff0c;也就是我们所说的等待唤醒机制 先来看三个关键方法: wait(); 当线程执行这个方法&#xff0c;它就会进入阻塞状态&#xff0c;并且释放同步监视器 notify(); 英文翻译 唤醒&#xff0c;就是说会唤醒wait的线程&…

【 图像分割 2022 ECCV】CP2

【 图像分割 2022 ECCV】CP2 论文题目&#xff1a;CP2: Copy-Paste Contrastive Pretraining for Semantic Segmentation 中文题目&#xff1a;CP2:语义分割的复制粘贴对比预训练 论文链接&#xff1a;https://arxiv.org/abs/2203.11709 论文代码&#xff1a;https://github.co…

Python3+Selenium框架搭建

Webdriver概述 Webdriver (Selenium2&#xff09;是一种用于Web应用程序的自动测试工具&#xff0c; Thoughtworks公司一个强大的基于浏览器的开源自动化测试工具&#xff0c;通常用来编写web应用的自动化测试。 Selenium 是一个用于Web应用程序测试的工具。 Selenium测试直…

异常数据检测 | Python实现支持向量机(SVM)的异常数据检测

文章目录 文章概述模型描述源码分享学习小结参考资料文章概述 SVM通常应用于监督式学习,但OneClassSVM算法可用于将异常检测这样的无监督式学习,它学习一个用于异常检测的决策函数其主要功能将新数据分类为与训练集相似的正常值或不相似的异常值。 模型描述 OneClassSVM的思…

Flink 1.17.0集群搭建

集群角色分配 HostnameIPRolehadoop01192.168.126.132 JobManager TaskManager hadoop02192.168.126.133TaskManagerhadoop03192.168.126.134TaskManager 下载flink安装包 https://archive.apache.org/dist/flink/flink-1.17.0/ 上传至hadoop01并解压&#xff1a; 修改conf/…

微软MFC技术简明介绍

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天来看一下微软MFC技术简明介绍 Visual C 与 MFC 微软公司于1992年上半年推出了C/C 7.0 产品时初次向世人介绍了MFC 1.0&#xff0c;这个产品包含了20,000行C原始代码&#xff0c;60个以上的Windows相关类…

【Leetcode】贪心问题合集 | 摆动序列、K次取反最大和、加油站、分发糖果、柠檬水找零、根据身高重建队列、单调递增的数字

贪心问题感觉还是挺不好想的&#xff0c;因为每一题有每一题的策略&#xff0c;感觉只能尽量做过的记住了。 376 摆动序列 注意&#xff1a;是序列&#xff0c;而不是数组。 求最大摆动序列的长度&#xff0c;即求谷 / 峰的个数。 若走势不为一条直线。 起始count 2&…

LED显示屏静电防护指南

LED显示屏是一种电子设备&#xff0c;对静电敏感。静电放电可能会对LED显示屏的电子元件造成损坏&#xff0c;因此需要采取静电防护措施。以下是LED显示屏静电防护的一些建议和指南&#xff1a; 环境控制&#xff1a;在LED显示屏周围创建适宜的环境条件。控制湿度和温度&#x…

Yolov5(tag v7.0)网络结构解读,以yolov5s为例

最近yolov5用的多&#xff0c;发现确实好用&#xff0c;于是较深入学了一下。下面按照训练的流程梳理一下网络的结构&#xff0c;同时也是自己记一下便于后面查阅。 同时&#xff0c;我也查了一些关于yolov5网络结构介绍的资料&#xff0c;发现大多是v5.0&#xff0c;少数v6.0的…

Linux驱动IO篇——阻塞/非阻塞IO

文章目录 非阻塞IO阻塞IO等待队列等待队列变体 非阻塞IO 在应用程序中&#xff0c;使用open函数打开一个/dev目录下的一个设备文件时&#xff0c;默认是以阻塞的方式打开。 所谓阻塞&#xff0c;就是当我们请求的资源不可用时&#xff08;资源被占用&#xff0c;没有数据到达等…

让车载系统与外部系统无缝对接——掌握SOA跨系统通信技术

车载SOA架构原理 车载 SOA&#xff08;Service-Oriented Architecture&#xff0c;面向服务的架构&#xff09;是一种基于服务的体系结构&#xff0c;旨在提高车载电子系统的可维护性、可扩展性和互操作性。它将车载电子系统划分为独立的、可复用的服务单元&#xff0c;这些服…