RabbitMQ工作模式(5) - 主题模式

news2025/1/15 16:47:19

 概念

主题模式(Topic Exchange)是 RabbitMQ 中一种灵活且强大的消息传递模式,它允许生产者根据消息的特定属性将消息发送到一个交换机,并且消费者可以根据自己的需求来接收感兴趣的消息。主题交换机根据消息的路由键和绑定队列的路由键进行模糊匹配,支持通配符 *#,从而实现了更灵活的消息路由和分发。

工作流程

  1. 生产者发送消息: 生产者将消息发送到一个主题交换机,并指定一个特定的路由键。

  2. 交换机根据路由键路由消息: 主题交换机根据消息的路由键和绑定队列的路由键进行模糊匹配。路由键可以包含多个单词,以 . 分隔,例如 stock.usd.nyseweather.usa.ca.sunny 等。

  3. 消息发送到匹配的队列: 如果消息的路由键与绑定队列的路由键完全匹配,则将消息发送到对应的队列中。如果路由键中包含通配符 *#,则可以匹配多个单词或多个级别的单词,从而实现更灵活的匹配规则。

  4. 消费者接收消息: 消费者可以根据自己的需求来选择监听匹配的队列,从而接收感兴趣的消息。消费者可以使用通配符 * 匹配一个单词,或使用 # 匹配零个或多个单词。

发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开

优点

  • 灵活性:生产者可以根据消息的特定属性来发送消息,消费者可以根据自己的需求来接收感兴趣的消息。
  • 精确匹配:支持精确的路由键匹配和模糊匹配,可以根据实际需求定义复杂的路由规则。
  • 扩展性:可以根据需要动态地添加和修改绑定规则,而不需要停止消息传递服务。

主题模式适用于需要根据消息的特定属性进行灵活路由和分发的场景,例如事件处理、消息过滤、数据分析等。

 Springboot集成

示例: 系统应用程序测试的时候,会有不同的BUG,测试人员会将不同的BUG按照规范打上标签(相当于routingKey),然后发送到mq中,然后通过主题模式分发;

标签内容:bug归属.模块.等级 例如: back.order.severity

分发规则如下:

第一个消费者是前端的开发人员:处理所有严重的前端BUG:front.#

第二个消费者是后端负责订单模块开发人员:处理所有的后端order模块BUG:back.order.*

另外还有多个消费者处理不同的BUG,这里只用两个做示例

1.创建队列和交换机并绑定

 在TopicConfig中配置

package com.model.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: Haiven
 * @Time: 2024/4/22 11:55
 * @Description: TODO
 */
@Configuration
public class TopicConfig {

    /**
     * 主题模式交换机
     * @return exchange
     */
    @Bean(name = "topicExchange")
    public Exchange getTopicExchange(){
        return ExchangeBuilder
                .topicExchange("exchange_topic")
                .build();
    }

    /**
     * 主题队列 01
     * @return queue
     */
    @Bean(name = "topicQueue01")
    public Queue getTopicQueue01(){
        return QueueBuilder
                .durable("queue_topic_01")
                .build();
    }

    /**
     * 主题队列 02
     * @return queue
     */
    @Bean(name = "topicQueue02")
    public Queue getTopicQueue02(){
        return QueueBuilder
                .durable("queue_topic_02")
                .build();
    }

    /**
     * 绑定队列 01
     * @return binding
     */
    @Bean
    public Binding getTopicBinding01(){
        return BindingBuilder
                .bind(getTopicQueue01())
                .to(getTopicExchange())
                //路由键 队列1接收debug级别的消息
                .with("front.#")
                .noargs();
    }

    /**
     * 绑定队列 02
     * @return binding
     */
    @Bean
    public Binding getTopicBinding02(){
        return BindingBuilder
                .bind(getTopicQueue02())
                .to(getTopicExchange())
                // 路由键 队列2接收info级别的消息
                .with("back.order.*")
                .noargs();
    }
}

 主题模式的交换机类型为TopicExchange

2.创建消费者

TopicConsumer

package com.model.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author: Haiven
 * @Time: 2024/4/22 10:08
 * @Description: TODO
 */
@Component
public class TopicConsumer {

    @RabbitListener(queues = {"queue_topic_01"})
    public void topicConsumer01(String msg){
        System.out.println("消费者 -01- 接收消息:" + msg);
    }

    @RabbitListener(queues = {"queue_topic_02"})
    public void topicConsumer02(String msg){
        System.out.println("消费者 -02- 接收消息:" + msg);
    }
}

3.创建生产者并发送消息

package com.model.controller;

import com.code.domain.Response;
import com.model.service.RabbitService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;


/**
 * @Author: Haiven
 * @Time: 2024/4/19 9:46
 * @Description: TODO
 */
@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Resource
    private RabbitService rabbitService;

    @GetMapping("/simple")
    public Response<Void> simple(String msg){
        boolean res = rabbitService.simple(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/work")
    public Response<Void> work(String msg){
        boolean res = rabbitService.work(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/sub")
    public Response<Void> sub(String msg){
        boolean res = rabbitService.sub(msg);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/routing")
    public Response<Void> routing(String msg, String type){
        boolean res = rabbitService.routing(msg, type);
        return res ? Response.success() : Response.fail();
    }

    @GetMapping("/topic")
    public Response<Void> topic(String msg, String type){
        boolean res = rabbitService.topic(msg, type);
        return res ? Response.success() : Response.fail();
    }
}
package com.model.service.impl;

import com.model.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @Author: Haiven
 * @Time: 2024/4/19 10:51
 * @Description: TODO
 */
@Service
@Slf4j
public class RabbitServiceImpl implements RabbitService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Value("${rabbitmq.simple.queue}")
    private String simpleQueue;

    @Value("${rabbitmq.work.queue}")
    private String workQueue;

    @Override
    public boolean simple(String msg) {
        try {
            rabbitTemplate.convertAndSend(simpleQueue, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean work(String msg) {
        try {
            rabbitTemplate.convertAndSend(workQueue, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean sub(String msg) {
        try {
            //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
            rabbitTemplate.convertAndSend("exchange_sub","", msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean routing(String msg, String type) {
        System.out.println("理由模式发送消息:msg="+msg+",type="+type+"");
        try {
            //路由模式就不能直接发送消息到队列了, 而是发送到交换机,由交换机进行广播, routingKey为路由Key 订阅模式给""
            rabbitTemplate.convertAndSend("exchange_routing",type, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }

    @Override
    public boolean topic(String msg, String type) {
        System.out.println("主题模式发送消息:msg="+msg+",type="+type+"");
        try {
            //主题模式会根据 type的通配符进行分发
            rabbitTemplate.convertAndSend("exchange_topic",type, msg);
            return true;
        }catch (Exception e){
            e.printStackTrace();
            return false;
        }
    }
}

4.发送消息

接口调用发送消息, type字段为消息的级别

 后台接收

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1628712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

梦境绘师:揭秘生成对抗网络(GAN)的魔法

梦境绘师&#xff1a;揭秘生成对抗网络&#xff08;GAN&#xff09;的魔法 1 引言 在今日的深度学习领域&#xff0c;生成对抗网络&#xff08;GAN&#xff09;已成为一项无人能外的技术&#xff0c;以其独特的数据生成能力俘获了无数研究者和工程师的心。这项技术不仅在理论上…

Golang基础2-Array、Slice、Map

Array 数组 var a [5]int b:[5]int{} c:[...]int{}这样格式定义var a[5]int 和var a[10]int是不同类型从0开始下标&#xff0c;到len(a)-1遍历方式&#xff1a; for i : 0; i < len(a); i { }for index, v : range a { } 注意越界问题&#xff0c;panic值类型&#xff0c;…

知识图谱嵌入领域的重要研究:编辑基于语言模型的知识图谱嵌入

今天&#xff0c;向大家介绍一篇在知识图谱嵌入领域具有重要意义的研究论文——Editing Language Model-based Knowledge Graph Embeddings。这项工作由浙江大学和腾讯公司的研究人员联合完成&#xff0c;为我们在动态更新知识图谱嵌入方面提供了新的视角和方法。 研究背景 在…

go设计模式之工厂方法模式

工厂方法模式 什么是工厂方法模式 工厂方法模式是一种创建型设计模式&#xff0c;它定义了一个用于创建对象的接口&#xff0c;让子类决定实例化哪一个类。工厂方法使一个类的实例化推迟到其子类。 这个接口就是工厂接口&#xff0c;子类就是具体工厂类&#xff0c;而需要创…

盛水最多的容器 ---- 双指针

题目链接 题目: 分析: 最大容积 即使就是最大面积, 长为下标之差, 宽为两下标对应值的最小值解法一: 暴力枚举: 将每两个数之间的面积都求出来, 找最大值, 时间复杂度较高解法二: 假设我们的数组是[6, 2, 5, 4], 我们先假设最左边和最右边, 即6 和 4 之间是最大面积长a*宽b此…

便携式应急指挥箱规格参数

概况: 微缩型的无线视频音频传输的机动挥所。体积小、重量轻、公配电方便、携带便携、功能齐全。可进行单兵作战&#xff0c;通过此无线音频视频传输的指挥箱能完成现场图像、语音、数据的采集等功能&#xff0c;可以通过5G/4G/WIFI等多种无线网络完成传输的需求&#xff0c;或…

和鲸科技出席第五届空间数据智能学术会议,执行总裁殷自强受邀发表主题报告

4月26日&#xff0c;由 ACM SIGSPATIAL 中国分会、ACM SIGMOD 中国分会主办的第五届空间数据智能学术会议&#xff08;SpatialDI 2024&#xff0c;下简称“会议”&#xff09;在南京盛大开幕。本次会议特邀李清泉院士、周成虎院士、丛高教授、谢炯博士、张雪英教授等国内外知名…

流量网关与服务网关的区别:(面试题,掌握)

流量网关&#xff1a;&#xff08;如Nignx&#xff0c;OpenResty&#xff0c;Kong&#xff09;是指提供全局性的、与后端业务应用无关的策略&#xff0c;例如 HTTPS证书认证、Web防火墙、全局流量监控&#xff0c;黑白名单等。 服务网关&#xff1a;&#xff08;如Spring Clou…

通义灵码-IDEA的使用教程

通义灵码-IDEA的使用教程 1、通义灵码是什么&#xff1f; 通义灵码&#xff0c;是阿里云出品的一款基于通义大模型的智能编码辅助工具&#xff0c;提供行级/函数级实时续写、自然语言生成代码、单元测试生成、代码注释生成、代码解释、研发智能问答、异常报错排查等能力&#…

受尽折磨的ai剪辑视频心酸之路

因为公司需要剪辑好多视频~我每天不断手动剪啊剪啊手都剪麻 有天老板跟我说了句人家好多ai剪辑你能不能搞到一个&#xff0c;多少钱你在说。 我心想这不是我的强项么&#xff1f;白嫖界的天花板&#xff0c;我就拦下了这个活~于是我上班不是在找软件就是在逛论坛路上&#xff0…

【漏洞复现】润乾报表InputServlet13文件读取漏洞

漏洞描述&#xff1a; 润乾报表是一款功能全面且性能卓越的报表产品。它专注于企业级BI产品的研发和推广&#xff0c;通过提供丰富的报表功能和高效的开发工具&#xff0c;帮助用户提升图表的开发效率&#xff0c;节省成本。 润乾报表InputServlet13接口存在文件读取漏洞&…

数据结构 - 链表详解一 - 链表的介绍

一. 为什么要学习链表 我们已经学习了顺序表了&#xff0c;在学习的时候发现顺序表的功能很多&#xff0c;所以我们为什么还要学习链表呢&#xff0c;学习链表有什么用吗&#xff1f; 下面我将通过几个方面去研究一下 1. 动态数据操作 顺序表&#xff08;如数组&#xff09;通…

回溯-单词搜索

给定一个 m x n 二维字符网格 board 和一个字符串单词 word 。如果 word 存在于网格中&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 单词必须按照字母顺序&#xff0c;通过相邻的单元格内的字母构成&#xff0c;其中“相邻”单元格是那些水平相邻或垂直相…

力扣刷题总结——栈和队列

刷完栈和队列&#xff0c;对STL的内容有了更加深刻的认识。 STL中栈往往不被归类为容器&#xff0c;而被归类为container adapter&#xff08;容器适配器&#xff09; 栈的内部结构&#xff0c;栈的底层实现可以是 vector&#xff0c;deque&#xff0c;list 都是可以的&#xf…

ANSYS WB DesignModeler 概述

Workbench 在进行有限元分析之前&#xff0c;一般需要创建或导入模型。创建模型时一般会用到 DesignModeler 组件&#xff0c;在该组件中可以进行2D和3D模型的创建。 本章主要讲述 DesignModeler 的基础操作,包括启动、图形界面、图形选择和右键快捷菜单。 1.启动 DesignModel…

【数字电路与系统】【北京航空航天大学】实验:时序逻辑设计——三色灯开关(二)、需求分析和系统设计

本次实验&#xff08;一&#xff09;见博客&#xff1a;【数字电路与系统】【北京航空航天大学】实验&#xff1a;时序逻辑设计——三色灯开关&#xff08;一&#xff09;、实验指导书 说明&#xff1a;本次实验的代码使用verilog编写&#xff0c;文章中为阅读方便&#xff0c…

OGG extract进程占据大量虚拟内存导致服务器内存异常增长分析

现象 oracle服务器一节点内存&#xff0c;一个月来持续升高&#xff0c;近一月上涨10%左右。 问题分析 OS内存使用情况 使用内存最大的10个进程如下&#xff0c;PID为279417占用最大的内存。 查询279417&#xff0c;发现是ogg相关进程。 发现ogg的extract进程占用了大量的虚拟内…

Lagent AgentLego 智能体应用搭建-笔记六

本次课程由Lagent&AgentLego 核心贡献者樊奇老师讲解【Lagent & AgentLego 智能体应用搭建】课程 课程视频&#xff1a;https://www.bilibili.com/video/BV1Xt4217728/ 课程文档&#xff1a;https://github.com/InternLM/Tutorial/tree/camp2/agent 大语言模型的局限…

E4980A是德科技E4980A精密LCR表

181/2461/8938产品概述&#xff1a; Keysight E4980A 精密 LCR 表为各种元件测量提供了精度、速度和多功能性的最佳组合。E4980A 在低阻抗和高阻抗范围内提供快速测量速度和出色的性能&#xff0c;是元件和材料的一般研发和制造测试的终极工具。LAN、USB 和 GPIB PC 连接可提高…

Openharmony - 设备异常关机Power Down问题分析

By: fulinux E-mail: fulinux@sina.com Blog: https://blog.csdn.net/fulinus 喜欢的盆友欢迎点赞和订阅! 你的喜欢就是我写作的动力! 目录 1.问题描述1.1出现power down的原因1.1.1硬件故障或信号1.1.2软件错误或系统崩溃2.抓日志信息2.1.抓日志方法2.2.问题初步分析3.问题排…