RabbitMQ工作模式(4) - 路由模式

news2025/1/16 6:04:18

概念

路由模式(Routing)是 RabbitMQ 中的一种消息传递模式,也称为直连模式。它允许生产者将消息发送到一个交换机,并指定一个或多个路由键(routing key),交换机根据路由键将消息路由到与之匹配的队列中。这样消费者只需关注感兴趣的消息,而不需要接收所有的消息。

工作流程

  1. 生产者发送消息: 生产者将消息发送到一个交换机,并指定一个或多个路由键。

  2. 交换机根据路由键路由消息: 交换机根据消息的路由键将消息发送到与之匹配的队列中。匹配规则可以由交换机的类型和绑定规则决定。

  3. 消费者监听队列: 消费者可以选择监听特定的队列,或者多个队列,以接收他们感兴趣的消息。

  4. 消息处理: 消费者从队列中接收消息,并进行相应的处理。只有与队列绑定的交换机根据消息的路由键将消息发送到该队列中。

特点

  • 灵活路由:生产者可以根据需要指定不同的路由键来发送消息,交换机根据路由键将消息路由到不同的队列。
  • 定向传递:消息只会被发送到与之匹配的队列中,消费者只需关注他们感兴趣的消息,而不需要接收所有的消息。
  • 路由规则:可以根据实际需求定义不同的路由规则,例如根据消息的类型、内容、优先级等进行路由。

路由模式适用于需要根据不同的消息属性将消息路由到不同队列的场景,例如消息分类、事件处理、分布式系统等。

Springboot集成

1.创建队列和交换机并绑定

在RoutingConfig文件中配置

package com.model.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: Haiven
 * @Time: 2024/4/22 10:09
 * @Description: TODO
 */
@Configuration
public class RoutingConfig {

    /**
     * 路由模式交换机
     * @return exchange
     */
    @Bean(name = "routingExchange")
    public Exchange getRoutingExchange(){
        return ExchangeBuilder
                .directExchange("exchange_routing")
                .build();
    }

    /**
     * 路由队列 01
     * @return queue
     */
    @Bean(name = "routingQueue01")
    public Queue getRoutingQueue01(){
        return QueueBuilder
                .durable("queue_routing_01")
                .build();
    }

    /**
     * 路由队列 02
     * @return queue
     */
    @Bean(name = "routingQueue02")
    public Queue getRoutingQueue02(){
        return QueueBuilder
                .durable("queue_routing_02")
                .build();
    }

    /**
     * 绑定队列 01
     * @return binding
     */
    @Bean
    public Binding getRoutingBinding01(){
        return BindingBuilder
                .bind(getRoutingQueue01())
                .to(getRoutingExchange())
                //路由键 队列1接收debug级别的消息
                .with("debug")
                .noargs();
    }

    /**
     * 绑定队列 02
     * @return binding
     */
    @Bean
    public Binding getRoutingBinding02(){
        return BindingBuilder
                .bind(getRoutingQueue02())
                .to(getRoutingExchange())
                // 路由键 队列2接收info级别的消息
                .with("info")
                .noargs();
    }

    /**
     * 绑定队列 01
     * @return binding
     */
    @Bean
    public Binding getRoutingBinding03(){
        return BindingBuilder
                .bind(getRoutingQueue01())
                .to(getRoutingExchange())
                //路由键 队列1接收debug级别的消息
                .with("err")
                .noargs();
    }
}

 !!!这里创建的交换机类型为:DirectExchange,如果交换机内容错误,会导致消息错误的分发

  1. Direct Exchange(直连交换机): 直连交换机将消息的路由键与绑定队列的路由键进行精确匹配,只有当消息的路由键与绑定队列的路由键完全相同时,才会将消息路由到对应的队列。

  2. Fanout Exchange(扇出交换机): 扇出交换机将消息广播到所有与之绑定的队列,无视消息的路由键。这种模式适用于需要将消息广播给多个消费者的场景。

  3. Topic Exchange(主题交换机): 主题交换机根据消息的路由键与绑定队列的路由键进行模糊匹配,支持通配符 *#* 表示匹配一个单词,# 表示匹配零个或多个单词。这种模式适用于需要根据消息的特定属性进行路由的场景。

  4. Headers Exchange(头交换机): 头交换机根据消息的头部属性进行匹配,而不是路由键。在绑定队列时,可以指定匹配的头部属性和值,只有当消息的头部属性和值与绑定规则完全匹配时,才会将消息发送到对应的队列。

  5. Default Exchange(默认交换机): 默认交换机是 RabbitMQ 的默认交换机,它将消息的路由键与队列的名称进行匹配,如果消息的路由键与队列的名称完全匹配,则将消息路由到该队列中。默认交换机通常以空字符串表示,不需要显示声明,可以直接使用。

2.创建消费者

RoutingConsumer

package com.model.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author: Haiven
 * @Time: 2024/4/22 10:08
 * @Description: TODO
 */
@Component
public class RoutingConsumer {

    @RabbitListener(queues = {"queue_routing_01"})
    public void routingConsumer01(String msg){
        System.out.println("消费者 -01- 接收消息:" + msg);
    }

    @RabbitListener(queues = {"queue_routing_02"})
    public void routingConsumer02(String msg){
        System.out.println("消费者 -02- 接收消息:" + msg);
    }
}

3.创建生产者并发送消息

package com.model.controller;

import com.code.domain.Response;
import com.model.service.RabbitService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;


/**
 * @Author: Haiven
 * @Time: 2024/4/19 9:46
 * @Description: TODO
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Resource
    private RabbitService rabbitService;

    @GetMapping("/simple")
    public Response<Void> simple(String msg){
        boolean res = rabbitService.simple(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/work")
    public Response<Void> work(String msg){
        boolean res = rabbitService.work(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/sub")
    public Response<Void> sub(String msg){
        boolean res = rabbitService.sub(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/routing")
    public Response<Void> routing(String msg, String type){
        boolean res = rabbitService.routing(msg, type);
        return res ? Response.success() : Response.fail();
    }
}
package com.model.service.impl;

import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @Author: Haiven
 * @Time: 2024/4/19 10:51
 * @Description: TODO
 */
@Service
@Slf4j
public class RabbitServiceImpl implements RabbitService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.simple.queue}")
    private String simpleQueue;

    @Value("${rabbitmq.work.queue}")
    private String workQueue;

    @Override
    public boolean simple(String msg) {
        try {
            rabbitTemplate.convertAndSend(simpleQueue, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean work(String msg) {
        try {
            rabbitTemplate.convertAndSend(workQueue, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean sub(String msg) {
        try {
            //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
            rabbitTemplate.convertAndSend("exchange_sub","", msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean routing(String msg, String type) {
        System.out.println("理由模式发送消息:msg="+msg+",type="+type+"");
        try {
            //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
            rabbitTemplate.convertAndSend("exchange_routing",type, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }
}

4.发送消息

接口调用发送消息, type字段为消息的级别

 后台接收

 debug级别消息只被消费者1消费

 info级别的消息只被消费者2消费

5.额外说明

上述消费者1只消费了debug级别,如果还有err级别的消息,只需在将队列1绑定err级别的消息

  /**
     * 绑定队列 01
     * @return binding
     */
    @Bean
    public Binding getRoutingBinding03(){
        return BindingBuilder
                .bind(getRoutingQueue01())
                .to(getRoutingExchange())
                //路由键 队列1接收debug级别的消息
                .with("err")
                .noargs();
    }

发送消息并测试

 如果某种消息级别(warn)没有被绑定,这该级别的消息会被丢弃

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1628562.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

大学生考勤系统C语言--升级版

要求&#xff1a; 人狠话不多&#xff0c;直接上代码&#xff08;以下代码只展示部分&#xff0c;如需完整版代码&#xff0c;请私信联系我&#xff09;&#xff1a; #include <stdio.h> #include <stdlib.h> #include <string.h> #include <stdbool.h&g…

Linux--自主编写shell

目录 准备知识 shell原理 shell与用户互动的过程 实现shell 0.用到的头文件和宏定义 1.首先我们需要自己输出一个命令行 2.获取用户命令行字符 3.命令行字符串分割 4.执行命令 5.设置循环 6.检测内建命令 7.完善细节--获取工作目录而非路径 准备知识 Linux--环境…

掌静脉识别关键技术研究综述

掌静脉识别作为一种新兴的红外生物识别技术&#xff0c;因其高安全性、活体检测性等优势已成为当前生物特征识别领域中的研究热点之一。近年来&#xff0c;该领域的大量研究通过引入深度学习方法推动了掌静脉识别技术的发展。为了掌握掌静脉识别领域最新研究现状及发展方向&…

css中新型的边框设置属性border-block

border-block 是 CSS 中的一个属性&#xff0c;主要用于在样式表中一次性设置元素的逻辑块向边框的属性值。这个属性是简写属性&#xff0c;可以同时设置 border-block-width、border-block-style 和 border-block-color。其中&#xff0c;border-block-start 用于设置元素的开…

QT入门:计算圆面积的QT开始以及日历相关

QT入门&#xff1a;计算圆面积的QT开始以及日历相关 使用的工具为Qt creator 如图所示的为Qt的一个基本目录&#xff0c;首先打开mainwindow.ui进行设计&#xff0c;首先是讲解日历的&#xff0c;可以完全不用写代码&#xff0c;只在mainwindow.ui即可实现。 这是最后的一个成…

Ubuntu2004 CMake 使用基础

一、环境安装 win10安装wsl ubuntu2004 #windows c盘工程目录建立软链 ln -s /mnt/c /home/vrviu/ 安装cmake、c编译工具 apt install -y cmake g 二、CMakeLists.txt讲解 准备工作 首先&#xff0c;在/home/vrviu 目录建立一个 cmake 目录 以后我们所有的 cmake 练习都会放…

网络程序 -- TCP版服务器

一 多进程版TCP服务器 1.1 核心功能 对于之前编写的 字符串回响程序 来说&#xff0c;如果只有一个客户端进行连接并通信&#xff0c;是没有问题的&#xff0c;但如果有多个客户端发起连接请求&#xff0c;并尝试进行通信&#xff0c;服务器是无法应对的 原因在于 服务器是一个…

在MyBatis-Plus中实现多数据源切换

前言 在复杂的业务场景中&#xff0c;我们可能需要从不同的数据库获取数据。MyBatis-Plus提供了一种便捷的方式来实现这一需求。本文将介绍如何在MyBatis-Plus中配置和使用多数据源。 引入必要的依赖 为了支持多数据源&#xff0c;我们首先需要引入MyBatis-Plus及相关依赖。…

C语言项目实战——扫雷

目录 1.前言 2.完整流程 2.1规划书 2.2代码部分 2.2.1文件的结构设计 2.2.2变量的创建 2.2.3菜单的基本实现 2.2.4初始化期棋盘 2.2.5输出完整棋盘 2.2.6埋雷的实现 2.2.7查询周围雷的数量 2.2.8扫雷的实现 2.2.9完整代码 3.总结 1.前言 哈喽大家好吖&#xff0c;今…

Linux计划任务书以及定时任务的编写

一、程序可以通过两种方式执行&#xff1a; 手动执行利用调度任务&#xff0c;依据一定的条件自动执行 自动执行可通过一下两个命令来实现: &#xff08;1&#xff09;At &#xff08;单一工作调度&#xff09; &#xff08;2&#xff09;Cron &#xff08;循环工作调度&a…

Android进阶:2021大厂Android面试经验,含BATJM大厂_2021大厂android进阶面试指南目录

在面试的过程中我深深的感受到&#xff0c;对于一个优秀的安卓开发来说&#xff0c;首先摆在第一位的还是他/她作为一个软件工程师的基本素养。无论你是做前端还是后端&#xff0c;最后定义你的优秀程度的还是作为软件工程师的基本素养&#xff0c;学习能力和编程能力&#xff…

VC2022 + google test

还要从项目开始说。 刚来项目组&#xff0c;主体是医疗仪器中位机部分&#xff0c;基本的部署结构&#xff1a; 上位机UI(Ubuntu 18 java) 中位机(ARMUbuntu 18, C) 下位机&#xff08;N个下位机子系统&#xff0c;控制步进电机&#xff0c;各种传感器&#xff0c;反射计…

免费ChatGPT合集——亲测免费

1、YesChat 无需登录 网址&#xff1a;YesChat-ChatGPT4V Dalle3 Claude 3 All in One Freehttps://www.yeschat.ai/ 2. 讯飞星火 要登录 讯飞星火大模型-AI大语言模型-星火大模型-科大讯飞 3.通义千问 要登录 通义我是通义&#xff0c;一个专门响应人类指令的…

ios不兼容Svg Wave的动画的解决方法

近日也是用上了SvgWave&#xff0c;十分的好看 Svg Wave - A free & beautiful gradient SVG wave Generator. 大家感兴趣的也可以了解一下 【场景】 使用SvgWave的Animate&#xff0c;并生成svg代码使用&#xff0c;windows web端、朋友的安卓移动端都能够正常执行动画…

37.WEB渗透测试-信息收集-企业信息收集(4)

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a; 易锦网校会员专享课 上一个内容&#xff1a;36.WEB渗透测试-信息收集-企业信息收集&#xff08;3&#xff09;-CSDN博客 关于主域名收…

lt Redis变慢的原因及排查解决方法

前言 Redis 作为优秀的内存数据库&#xff0c;其拥有非常高的性能&#xff0c;单个实例的 OPS 能够达到 10W 左右(5-10W)。但也正因此如此&#xff0c;当我们在使用 Redis 时&#xff0c;如果发现操作延迟变大的情况&#xff0c;就会与我们的预期不符。 你也许或多或少地&…

python基础知识—while和for循环(三)

&#x1f3ac; 秋野酱&#xff1a;《个人主页》 &#x1f525; 个人专栏:《Java专栏》《Python专栏》 ⛺️心若有所向往,何惧道阻且长 文章目录 一&#xff1a;while循环1.1程序的三种执行流程1.2while循环1.3循环变量和死循环 二&#xff1a;for循环2.1for循环2.2range 一&…

【库函数】Linux下动态库.so和静态库.a的生成和使用

目录 &#x1f31e;1. Linux下静态库和动态库的基本概念 &#x1f31e;2. 动态库 &#x1f30a;2.1 动态库如何生成 &#x1f30d;2.1.1 文件详情 &#x1f30d;2.1.2 编译生成动态库 &#x1f30a;2.2 动态库如何使用 &#x1f30d;2.2.1 案例 &#x1f30d;2.2.2 动态…

【jQuery】看一眼就会用的jquery库之续章!

jQuery&#xff08;js框架&#xff09; 17、操作节点 创建节点&#xff1a; 创建节点只需要将元素放在jQuery的工厂函数中//创建一个button按钮let $btn$("<input typebutton>");//创建一个列表项let $li$("<li>选项</li>");添加节点…

【PostgreSQL】Postgres数据库安装、配置、使用DBLink详解

目录 一、技术背景1.1 背景1.2 什么是 DBLink 二、安装配置 DBLink2.1 安装 DBLink2.2 配置 DBLink1. 修改 postgresql.conf2. 修改 pg_hba.conf 三、DBLink 使用3.1 数据准备3.2 DBLink 使用1. 创建 DBLink 连接2. 使用 DBLink 进行查询3. 使用 DBLink 进行增删改4. 使用 DBLi…