【RabbitMQ五】——RabbitMQ路由模式(Routing)

news2025/1/9 16:44:59

RabbitMQ路由模式

  • 前言
  • RabbitMQ模式的基本概念
    • 为什么要使用Rabbitmq 路由模式
    • RabbitMQ路由模式组成元素
  • 路由模式完整代码
    • Pom文件引入RabbtiMQ依赖
    • RabbitMQ工具类
    • 生产者
    • 消费者1
    • 消费者2
    • 运行结果截图

前言

通过本篇博客能够简单使用RabbitMQ的路由模式。
本篇博客主要是博主通过官网以及学习他人的博客总结出的RabbitMQ发布订阅模式。其中如果有误欢迎大家及时指正。

RabbitMQ模式的基本概念

路由模式是根据Routing Key有条件的将消息筛选后发送给消费者,消费者只接受筛选之后的消息
路由模式的核心是:
配置一个类型为direct的交换机,并且需要指定不同的路由键(routing key),把对应的消息从交换机路由到不同的消息队列进行存储,再由对应的消费者进行消费

为什么要使用Rabbitmq 路由模式

由于发布订阅模式是无条件将所有消息分发给所有消费者,路由模式可以根据条件(Routing Key)将消息筛选之后发送给消费者。

应用场景:
例如:有一个股票分析机构,每天都会有一些独家的股票分析报告。对于其他一些应用平台,想要每天都到这家股票分析机构提供的百度的独家股票分析报告,对于另外一些应用平台想要收到谷歌的独家股票分析报告,就可以使用路由模式。

RabbitMQ路由模式组成元素

在这里插入图片描述
P:生产者,向交换机发送消息的是否需要指定routing key
X:交换机,接收生产者发送的消息,需要指定交换机的类型为direct,并且将消息发送给与routing key匹配的队列
C1:消费者1,它所在队列指定了需要routing key为error的信息
C2:消费者2,其所在队列指定了需要routing key 为 info、error、warning 的消息

路由模式完整代码

**业务场景:**生产者为日志分发平台,分发info、warning、error级别的日志,消费者1只接受日志级别为error的日志,消费者2接收全部日志。

Pom文件引入RabbtiMQ依赖

<dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>

RabbitMQ工具类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : RabbitMQUtils
 * @description : [rabbitmq工具类]
 * @createTime : [2023/1/17 8:49]
 * @updateUser : [WangWei]
 * @updateTime : [2023/1/17 8:49]
 * @updateRemark : [描述说明本次修改内容]
 */
public class RabbitMQUtils {

    /*
     * @version V1.0
     * Title: getConnection
     * @author Wangwei
     * @description 创建rabbitmq连接
     * @createTime  2023/1/17 8:52
     * @param []
     * @return com.rabbitmq.client.Connection
     */
    public static Connection getConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("ip");
        factory.setPort(5672);
        factory.setVirtualHost("虚拟主机");
        factory.setUsername("用户名");
        factory.setPassword("密码");
        //创建连接
        Connection connection=factory.newConnection();
        return  connection;
    }

    /*
     * @version V1.0
     * Title: getChannel
     * @author Wangwei
     * @description 创建信道
     * @createTime  2023/1/17 8:55
     * @param []
     * @return com.rabbitmq.client.Channel
     */
    public static Channel getChannel() throws IOException, TimeoutException {
        Connection connection=getConnection();
        Channel channel=connection.createChannel();
        return channel;
    }
}


生产者

import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : Producer
 * @description : [生产者]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class Producer {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立连接
        RabbitMQUtils.getConnection();
        //声明通道
        Channel channel = RabbitMQUtils.getChannel();
        //创建fanout类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");

        //声明routingKey
        String severityInfo="info";
        String severityError="error";
        String severityWarning="warning";
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="路由模式info:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityInfo,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="路由模式error:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityError,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
        //循环发送2条消息
        for (int i = 0; i <2 ; i++) {
            String msg="路由模式warning:"+i;
            /*推送消息
             *交换机命名,不填写使用默认的交换机
             * routingKey -路由键-
             * props:消息的其他属性-路由头等正文
             * msg消息正文
             */
            channel.basicPublish(EXCHANGE_NAME,severityWarning,null,msg.getBytes(StandardCharsets.UTF_8));
            System.out.println(msg);
        }
    }
}

消费者1

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerOne
 * @description : [消费者1]
 * @createTime : [2023/2/1 9:39]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:39]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerOne {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (error)
        String severityError="error";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );

        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

消费者2

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : [WangWei]
 * @version : [v1.0]
 * @className : ConsumerTwo
 * @description : [消费者2]
 * @createTime : [2023/2/1 9:38]
 * @updateUser : [WangWei]
 * @updateTime : [2023/2/1 9:38]
 * @updateRemark : [描述说明本次修改内容]
 */
public class ConsumerTwo {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        RabbitMQUtils.getConnection();
        Channel channel = RabbitMQUtils.getChannel();
        //创建fanout类型交换机并命名为logs
        channel.exchangeDeclare(EXCHANGE_NAME,"direct");
        //创建了一个非持久的、排他的、自动删除的队列,并生成了一个名称
        String queueName = channel.queueDeclare().getQueue();
        //声明routingKey (info,error,warning)
        String severityInfo="info";
        String severityError="error";
        String severityWarning="warning";
        //交换机与队列进行绑定-如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失
        //queueName绑定了direct_logs交换机并且绑定了3个routingKey
        channel.queueBind(queueName, EXCHANGE_NAME,severityInfo );
        channel.queueBind(queueName, EXCHANGE_NAME,severityError );
        channel.queueBind(queueName, EXCHANGE_NAME,severityWarning );

        //因为Rabbitmq服务器将异步地向我们推送消息,所以我们以对象的形式提供了一个回调,该回调将缓冲消息,直到我们准备好使用它们。
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }

}

运行结果截图

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/337758.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ROS运行机C++程序,移动

流程&#xff1a; 1.创建工作空间 mkdir catkin_ws cd catkin_ws mkdir src cd src catkin_init_workspace 2编译工作空间 cd ~/catkin_ws/ catkin_make catkin_make install 首先对ROS进行创建一个元功能包 3.设置环境变量 source devel/setup.bash source devel/setup.b…

Java基础知识

1. Java 基本功 1.1. Java 入门&#xff08;基础概念与常识&#xff09; 1.1.1. Java 语言有哪些特点?1.1.2. 关于 JVM JDK 和 JRE 最详细通俗的解答 1.1.2.1. JVM1.1.2.2. JDK 和 JRE 1.1.3. Oracle JDK 和 OpenJDK 的对比1.1.4. Java 和 C的区别?1.1.5. import java 和 jav…

每日学术速递2.11

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.IR、cs.MM 1.A Comprehensive Survey on Multimodal Recommender Systems: Taxonomy, Evaluation, and Future Directions 标题&#xff1a;关于多模态推荐系统的综合调查&#xff1a;分…

电子学会2020年6月青少年软件编程(图形化)等级考试试卷(三级)答案解析

目录 一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 二、判断题&#xff08;共10题&#xff0c;每题2分&#xff0c;共20分&#xff09; 三、编程题&#xff08;共4题&#xff0c;共30分&#xff09; 青少年软件编程&#xff08;Scratch&…

leaflet 加载geojson文件并显示图形(示例代码051)

第051个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+leaflet中加载geojson文件,将图形显示在地图上。 直接复制下面的 vue+openlayers源代码,操作2分钟即可运行实现效果; 注意如果OpenStreetMap无法加载,请加载其他来练习 文章目录 示例效果配置方式示例源代码(…

【MyBatis】自定义映射resultMap

8.1、resultMap处理字段和属性的映射关系 若字段名和实体类中的属性名不一致&#xff0c;则可以通过resultMap设置自定义映射 <!--resultMap&#xff1a;设置自定义映射属性&#xff1a;id&#xff1a;表示自定义映射的唯一标识type&#xff1a;查询的数据要映射的实体类的…

基于PLUS+InVEST模型 生态系统服务多情景模拟预测

目录 第一章 理论基础与软件介绍 第二章 数据获取与制备 第三章 土地利用格局模拟 第四章 生态系统服务评估 第五章 时空变化及驱动机制分析 第六章 论文撰写技巧及案例分析 工业革命以来&#xff0c;社会生产力迅速提高&#xff0c;人类活动频繁&#xff0c;此外人口与日…

Rabbitmq业务难点

Rabbitmq业务难点1.消息生产者发送的消息无法路由到任何一个队列怎么处理?2.聊聊Rabbitmq的七种工作模式3.Rabbitmq的消息确认机制4.Rabbitmq的消息持久化5.发布确认模式如何确保生产者能够成功将消息投递到消息队列6. Rabbitmq基于队列设置消息过期时间和单独针对消息设置过期…

ByteHouse:基于ClickHouse的实时数仓能力升级解读

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 ByteHouse是火山引擎上的一款云原生数据仓库&#xff0c;为用户带来极速分析体验&#xff0c;能够支撑实时数据分析和海量数据离线分析。便捷的弹性扩缩容能力&…

数据结构 第八章 查找(静态查找表)

集合 1、集合中的数据元素除了属于同一集合外,没有任何的逻辑关系 2、在集合中,每个数据元素都有一个区别于其他元素的唯一标识(键值或者关键字值) 3、集合的运算&#xff1a; 1 查找某一元素是否存在(内部查找、外部查找) 2 将集合中的元素按照它的唯一标识进行排序4、集合的…

shell编程之awk

文章目录九、shell编程之awk9.1 什么是awk9.2 awk的工作流程9.3 awk程序执行方式9.4 awk基本语法9.4.1 awk的输出9.4.2 awk的变量9.4.3 awk操作符9.4.4 awk的模式9.4.5 awk控制语句9.4.6 awk使用数组9.4.7 awk内置函数9.5 awk 案例9.5.2 网站日志分析九、shell编程之awk 9.1 什…

Linux:软链接和硬链接的理解

Linux通过命令行创建快捷方式使用的命令是ln&#xff0c;这里就涉及到了软链接和硬链接&#xff0c;确实有些不好理解&#xff0c;如果你也一样&#xff0c;那么可以继续看下去了 目录ln命令语法实操创建软链接&#xff1a;ln -s [源文件或目录][目标文件或目录]创建硬链接&…

使用Consul建立docker集群

概述什么是consulConsul是HashiCorp公司推出的开源工具&#xff0c;Consul由Go语言开发&#xff0c;部署起来非常容易&#xff0c;只需要极少的可执行程序和配置文件&#xff0c;具有绿色、轻量级的特点。Consul是分布式的、高可用的、可横向扩展的用于实现分布式系统的服务发现…

深度学习|论文中常用的注意力模块合集(下)

注意力机制可以增加少量参数的情况下来提升计算精度和模型性能&#xff0c;在论文中常用的注意力模块合集(上)中介绍了三种注意力机制&#xff0c;它们分别是CA、CBAM和SE&#xff0c;均在目标检测和语义分割领域内能够提升模型的性能&#xff0c;废话不多说&#xff0c;直接开…

java分治算法

分治算法介绍 分治法是一种很重要的算法。字面上的解释是“分而治之”&#xff0c;就是把一个复杂的问题分成两个或更多的相同或 相似的子问题&#xff0c;再把子问题分成更小的子问题……直到最后子问题可以简单的直接求解&#xff0c;原问题的解即子问题 的解的合并。这个技…

【机器学习】Linear and Nonlinear Regression 线性/非线性回归讲解

文章目录一、回归问题概述二、误差项定义三、独立同分布的假设四、似然函数的作用五、参数求解六、梯度下降算法七、参数更新方法八、优化参数设置一、回归问题概述 回归&#xff1a;根据工资和年龄&#xff0c;预测额度为多少 其中&#xff0c;工资和年龄被称为特征&#xff0…

flea-msg使用之JMS初识

JMS初识 1. JMS 基本概念 1.1 什么是 JMS &#xff1f; Java 消息服务【Java Message Service】&#xff0c;又简称 JMS&#xff0c;它是 Java 平台上有关面向消息中间件(MOM)的技术规范。 1.2 JMS 规范 JMS 中定义了 Java 中访问消息中间件的接口&#xff0c;并没有给予实…

分类预测 | MATLAB实现SSA-CNN麻雀算法优化卷积神经网络多特征分类预测

分类预测 | MATLAB实现SSA-CNN麻雀算法优化卷积神经网络多特征分类预测 目录分类预测 | MATLAB实现SSA-CNN麻雀算法优化卷积神经网络多特征分类预测分类效果基本介绍模型描述程序设计参考文献分类效果 基本介绍 1.Matlab实现SSA-CNN麻雀算法优化卷积神经网络多特征分类预测&…

Python操作的5个坏习惯,你中了几个呢?

很多文章都有介绍怎么写好 Python&#xff0c;我今天呢相反&#xff0c;说说写代码时的几个坏习惯。有的习惯会让 Bug 变得隐蔽难以追踪&#xff0c;当然&#xff0c;也有的并没有错误&#xff0c;只是个人觉得不够完美。 注意&#xff1a;示例代码在 Python 3.6 环境下编写 …

数据与C(布尔类型和虚数和实数)

一._Bool类型&#xff08;%d占位符&#xff09; C99标准添加了_Bool类型&#xff0c;用于表示布尔值&#xff0c;既逻辑值true&#xff08;1&#xff09;和false&#xff08;0&#xff09;。原则上_Bool在原则上仅占用1位存储空间&#xff0c;因为对0和1而言&#xff0c;1位的…