消息队列中间件,RabbitMQ的使用,死信队列,延迟队列,利用枚举实现队列,交换机,RountKey的声明

news2025/3/14 13:56:14

目录

0.交换机种类和区别

1.声明队列和交换机以及RountKey

2.初始化循环绑定

3.声明交换机

4.监听队列

4.1 监听普通队列

4.2监听死信队列

 5.削峰填谷的实现


0.交换机种类和区别

  1. Direct Exchange(直连交换机)

    • 直连交换机将消息发送到与消息的路由键完全匹配的队列。它是最简单的交换机类型之一。
    • 当一个队列使用某个直连交换机绑定时,它需要指定一个绑定键(binding key),当消息的路由键与该绑定键完全匹配时,消息会被发送到该队列。
  2. Fanout Exchange(扇出交换机)

    • 扇出交换机会将消息发送到与其绑定的所有队列,忽略消息的路由键。
    • 当一个队列使用扇出交换机绑定时,它会接收到交换机发送的所有消息,无论消息的路由键是什么。
  3. Topic Exchange(主题交换机)

    • 主题交换机根据消息的路由键和绑定键之间的模式匹配来路由消息。
    • 绑定键可以使用通配符进行匹配,支持 '*' 匹配一个单词,'#' 匹配零个或多个单词,从而允许更灵活的路由规则。
  4. Headers Exchange(标头交换机)

    • 标头交换机根据消息的标头(headers)中的键值对来路由消息,而不是使用路由键。
    • 在将队列绑定到标头交换机时,可以指定一组标头键值对,只有当消息的标头中包含与绑定相匹配的所有键值对时,消息才会被路由到该队列。

如果满足key的前提下,绑定同一个交换机的队列都会分配到相同数量的信息

比如此时交换机有20条信息,a,b队列都会分配到20条信息

默认情况下,会轮询分配给消费者,也可以设置最多获取多少条未被消费的信息,根据消费者的消费能力来设置

1.声明队列和交换机以及RountKey

package com.example.config;


import lombok.Getter;

@Getter
public enum RabbitmqBind {


    DATA_CLEAN_PROCESS_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD,
            false,
            false,
            null,
            null
    ),

    DATA_CLEAN_PROCESS(
            RabbitMqExchangeEnum.E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_PROCESS_DEAD),

    SMS_CLEAN_DEAD(
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD,
            true,
            false,
            null,
            null
    ),

    SMS_CLEAN(
            RabbitMqExchangeEnum.E_TOPIC_RCP,
            RabbitMqQueueConstants.Q_API_TO_DCN_SMS,
            RabbitmqRoutingKey.K_API_TO_DCN_SMS,
            true,
            true,
            RabbitMqExchangeEnum.DEAD_E_DIRECT_RCP,
            RabbitmqRoutingKey.K_DATA_CLEAN_FINISH_DEAD
    ),


    ;

    /**
     * 交换机
     */
    private RabbitMqExchangeEnum exchange;

    /**
     * 队列名称
     */
    private String queueName;

    /**
     * 路由Key
     */
    private RabbitmqRoutingKey routingKey;

    /**
     * 绑定标识
     * 是否启用
     */
    private Boolean isBind;

    /**
     * 是否绑定死信
     */
    private Boolean isDeathBelief;

    /**
     * 绑定的死信交换机
     */
    private RabbitMqExchangeEnum boundDeadExchange;

    /**
     * 死信key
     */
    private RabbitmqRoutingKey deadRoutingKey;


    RabbitmqBind(RabbitMqExchangeEnum exchange, String queueName, RabbitmqRoutingKey routingKey, Boolean isBind,
                 Boolean isDeathBelief, RabbitMqExchangeEnum boundDeadExchange, RabbitmqRoutingKey deadRoutingKey
    ) {
        this.exchange = exchange;
        this.queueName = queueName;
        this.routingKey = routingKey;
        this.isBind = isBind;
        this.isDeathBelief = isDeathBelief;
        this.boundDeadExchange = boundDeadExchange;
        this.deadRoutingKey = deadRoutingKey;
    }

    /**
     * 交换机
     */
    @Getter
    public enum RabbitMqExchangeEnum {

        /**
         * 交换机定义,类型 - 名称
         */
        E_DIRECT_RCP("direct", "E_DIRECT_RCP"),
        DEAD_E_DIRECT_RCP("direct", "DEAD_E_DIRECT_RCP"),

        E_TOPIC_RCP("topic", "E_TOPIC_RCP"),

        E_TOPIC_PAY("topic", "E_TOPIC_PAY");

        private String exchangeType;

        private String exchangeName;

        RabbitMqExchangeEnum(String exchangeType, String exchangeName) {
            this.exchangeType = exchangeType;
            this.exchangeName = exchangeName;
        }
    }

    /**
     * 队列名定义
     */
    public interface RabbitMqQueueConstants {

        /**
         * 接收清洗数据
         */
        String Q_DATA_CLEAN_PROCESS = "RMPS_TO_RCP_DATA_CLEAN_PROCESS";

        /**
         * 清洗结束通知
         */
        String Q_API_TO_DCN_SMS = "Q_API_TO_DCN_SMS";

        /**
         * 死信队列
         */
        String Q_DATA_CLEAN_PROCESS_DEAD = "Q_DATA_CLEAN_PROCESS_DEAD";

        /**
         * 清洗结束通知死信队列
         */
        String Q_API_TO_DCN_SMS_DEAD = "Q_API_TO_DCN_SMS_DEAD";
    }

    /**
     * routingKey
     */
    @Getter
    public enum RabbitmqRoutingKey {

        /**
         * 路由
         */
        K_DATA_CLEAN_PROCESS("K_DATA_CLEAN_PROCESS"),
        K_API_TO_DCN_SMS("K_API_TO_DCN_SMS"),

        // 路由绑定死信路由
        DEAD("DEAD"),

        //死信路由
        K_DATA_CLEAN_PROCESS_DEAD("K_DATA_CLEAN_PROCESS_DEAD"),
        K_DATA_CLEAN_FINISH_DEAD("K_DATA_CLEAN_FINISH_DEAD"),
        ;

        private String keyName;

        RabbitmqRoutingKey(String keyName) {
            this.keyName = keyName;
        }
    }

}

2.初始化循环绑定

package com.example.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Arrays;

@Configuration
@ConditionalOnClass(EnableRabbit.class)
public class MqConfig {
    @Resource
    protected RabbitTemplate rabbitTemplate;
    @Resource
    ConnectionFactory connectionFactory;
//
//    @Lazy
//    @Autowired
//    protected RabbitAdmin rabbitAdmin;
//
//
//    public static final int DEFAULT_CONCURRENT = 10;
//
//    @Bean("customContainerFactory")
//    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
//                                                                 ConnectionFactory connectionFactory) {
//        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//        factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
//        factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
//        configurer.configure(factory, connectionFactory);
//        return factory;
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
//        return new RabbitTransactionManager(connectionFactory);
//    }
//
//    @Bean
//    @ConditionalOnMissingBean
//    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
//        return new RabbitAdmin(connectionFactory);
//    }

    @PostConstruct
    protected void init() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);


        rabbitTemplate.setChannelTransacted(true);
        //创建exchange
        Arrays.stream(RabbitmqBind.RabbitMqExchangeEnum.values())
                .forEach(rabbitMqExchangeEnum -> {
                            Exchange exchange = RabbitmqExchange
                                    .getInstanceByType(rabbitMqExchangeEnum.getExchangeType())
                                    .createExchange(rabbitMqExchangeEnum.getExchangeName());
                            rabbitAdmin.declareExchange(exchange);
                        }
                );

        //创建队列并绑定exchange
        Arrays.stream(RabbitmqBind.values()).forEach(RabbitmqBind -> {
            if (RabbitmqBind.getIsBind()) {
                if (RabbitmqBind.getIsDeathBelief()) {
                    //需要绑定死信交换机的队列
                    rabbitAdmin.declareQueue(QueueBuilder.durable(RabbitmqBind.getQueueName())
                            .ttl(60000).deadLetterExchange(RabbitmqBind.getBoundDeadExchange().getExchangeName())
                            .deadLetterRoutingKey(RabbitmqBind.getDeadRoutingKey().getKeyName()).build());
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                } else {
                    //不需要绑定死信交换机的队列
                    rabbitAdmin.declareQueue(new Queue(RabbitmqBind.getQueueName(),
                            true, false, false, null));
                    rabbitAdmin.declareBinding(new Binding(RabbitmqBind.getQueueName(),
                            Binding.DestinationType.QUEUE,
                            RabbitmqBind.getExchange().getExchangeName(),
                            RabbitmqBind.getRoutingKey().getKeyName(), null));
                }
            }
        });
    }

}

 绑定的形式由枚举类中定义

3.声明交换机

package com.example.config;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.TopicExchange;

import java.util.Arrays;


@Getter
@Slf4j
public enum RabbitmqExchange {

    DIRECT("direct"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new DirectExchange(exchangeName, true, false);
        }
    },

    TOPIC("topic"){
        @Override
        public Exchange createExchange(String exchangeName) {
            return new TopicExchange(exchangeName, true, false);
        }
    };

    public static RabbitmqExchange getInstanceByType(String type){

        return Arrays.stream(RabbitmqExchange.values()).filter(e -> e.getType().equals(type))
                .findAny()
                .orElseThrow(() ->
//                        new ProcessException("无效的exchange type")

                        new RuntimeException("无效的exchange type")
                );
    }

    private String type;


    RabbitmqExchange(String type) {
        this.type = type;
    }

    public abstract Exchange createExchange(String exchangeName);

}

4.监听队列

4.1 监听普通队列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_DATA_CLEAN_PROCESS }, concurrency = "1-5")
//, containerFactory = "customContainerFactory"
public class MqListener {



    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }

    /**
     * 处理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("process message is blank , message:{}" , message);
            return;
        }
    }

}

 监听并处理任务

4.2监听死信队列

package com.example.listener;

import com.example.config.RabbitmqBind;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RabbitListener(queues = {
        RabbitmqBind.RabbitMqQueueConstants.Q_API_TO_DCN_SMS_DEAD }, concurrency = "1-5")
public class DeadListener {

    @RabbitHandler
    public void processMessage(String message) {
        log.info("DataClean recive message :{} ", message);
        process(message);
    }

    @RabbitHandler
    public void processMessage(byte[] message) {
        String msg = new String(message);
        log.info("DataClean recive message :{} ", msg);
        process(msg);
    }


    /**
     * 处理推送消息
     * @param message
     */
    private void process(String message) {
        log.info("Dead process message :{}" , message);
        if(StringUtils.isBlank(message)) {
            log.error("Dead process message is blank , message:{}" , message);
            return;
        }
    }

}

 5.削峰填谷的实现

把高峰期的消息填进低峰期

可以用拉取的方式来实现

或者用消费者的最大数量和最小数量来实现

channel.basicQos();//设置最大获取未确认消息的数量,实现权重

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1598624.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

青铜器RDM研发管理平台 upload 任意文件上传漏洞复现

0x01 产品简介 青铜器RDM研发管理平台是集成产品管理、研发部门管理、研发项目管理、研发多项目管理、研发资源管理、研发绩效管理、研发工程管理的集中平台。 0x02 漏洞概述 青铜器RDM研发管理平台 upload 接口存在任意文件上传漏洞,未经身份验证的远程攻击者可通过该漏洞…

FreeSWITCH 1.10.10 简单图形化界面17 - ubuntu22.04或者debian12 安装FreeSWITCH(IamFree)

FreeSWITCH 1.10.10 简单图形化界面17 - ubuntu22.04或者debian12 安装FreeSWITCH 界面预览00、先看使用手册0、安装操作系统1、下载脚本2、开始安装3、登录网页 FreeSWITCH界面安装参考:https://blog.csdn.net/jia198810/article/details/132479324 界面预览 htt…

【Web】VS Code 插件

专栏文章索引:Web 有问题可私聊:QQ:3375119339 目录 一、安装步骤 二、插件 1.Chinese (Simplified) (简体中文) 2.open in browser 3.vscode-icons 4.Live Server 5.Live Server Preview 6.翻译(英汉词典) 一、安装步骤 点击 “扩…

Servlet的文件上传下载

Servlet的文件上传|下载 二、文件上传实现 2.1实现思路 需要使用到Commons-FileUpload组件需要将jsp页面form表单的enctype属性值设置为“multipart/form-data”&#xff0c;Servlet中使用IO流实现文件的上传 2.2、实现过程 2.2.1新建web项目导入jar包 <dependency>…

NLP的奥秘:用 Python 揭秘人类语言与人工智能的桥梁【6000 字长文含代码示例】

目录 NLP 的核心任务 NLP 的发展历史 NLP 的技术与方法 传统的 NLP 技术与方法 规则基础方法&#xff1a;语言学的智慧 统计学习方法&#xff1a;数据的力量 深度学习方法&#xff1a;人工智能的新浪潮 NLP 的应用领域 Python在 NLP 中的应用 1、NLTK (Natural Langu…

数字孪生与企业

数字孪生技术&#xff0c;简而言之&#xff0c;就是创造一个物理实体的数字双胞胎&#xff0c;在虚拟世界中精确模拟现实世界的行为、过程和系统。这种技术的核心在于&#xff0c;它允许我们在数字环境中实时地监控、分析和优化其物理对应物的性能和效率。数字孪生的应用场景极…

node.js服务器静态资源处理

前言&#xff1a;node.js服务器动态资源处理见 http://t.csdnimg.cn/9D8WN 一、什么是node.js服务器静态资源&#xff1f; 静态资源服务器指的是不会被服务器的动态运行所改变或者生成的文件. 它最初在服务器运行之前是什么样子, 到服务器结束运行时, 它还是那个样子. 比如平…

基于springboot+vue+Mysql的校园新闻网站

开发语言&#xff1a;Java框架&#xff1a;springbootJDK版本&#xff1a;JDK1.8服务器&#xff1a;tomcat7数据库&#xff1a;mysql 5.7&#xff08;一定要5.7版本&#xff09;数据库工具&#xff1a;Navicat11开发软件&#xff1a;eclipse/myeclipse/ideaMaven包&#xff1a;…

HarmonyOS实战开发-如何实现电话服务中发送短信的功能。

介绍 本示例使用ohos.telephony.sms 接口展示了电话服务中发送短信的功能。 效果预览 使用说明&#xff1a; 首页点击创建联系人&#xff0c;弹框输入联系人姓名和电话&#xff0c;点击确定按钮后&#xff0c;联系人列表中添加该联系人;点击管理&#xff0c;该按钮变成取消&…

[数据结构]——二叉树——堆的实现

1. 堆的概念及结构 如果有一个关键码的集合K { &#xff0c; &#xff0c; &#xff0c;…&#xff0c; }&#xff0c;把它的所有元素按完全二叉树的顺序存储方式存储 在一个一维数组中&#xff0c;并满足&#xff1a; < 且 < ( > 且 > ) i 0&#xff0c;1&…

如何使用OSI七层模型的思路进行Linux网络问题排障?

在运维工作中&#xff0c;我们可能经常遇到诸如服务器无法远程连接、网站无法访问等各种网络问题。此时你是否想过&#xff0c;我们常背的OSI七层模型&#xff0c;能在处理这样的实际问题中发挥什么样的作用呢&#xff1f; 基于OSI架构的方法论&#xff0c;我们可以使用自下而…

OpenHarmony轻量系统开发【5】驱动之GPIO点灯

5.1点灯例程源码 先看最简单得LED灯闪烁操作 源码结构如下&#xff1a; 第一个BUILD.gn文件内容&#xff1a; static_library("led_demo") {sources ["led_demo.c"]include_dirs ["//utils/native/lite/include","//kernel/liteos_m/c…

FL Studio v21.2.3.4004 中文永久版网盘下载(含Key.reg注册表补丁)

软件介绍 FL Studio21水果编曲软件汉化版是一款专业的音乐制作软件&#xff0c;被广泛地应用于电子音乐、hip-hop、流行乐等多种音乐类型的制作。该软件提供了丰富的音频编曲工具和音乐效果器&#xff0c;让用户可以轻松地创作出高品质的音乐作品。同时&#xff0c;这也是一款…

配置优先级标记和队列调度示例

配置优先级标记和队列调度示例 组网图形 图1 优先级标记和队列调度示例组网图 优先级标记和队列调度简介配置注意事项组网需求配置思路操作步骤配置文件 优先级标记和队列调度简介 报文进入设备之后&#xff0c;设备会根据相应的规则分配或修改报文各种优先级的值&#xff…

Spring 事务失效总结

前言 在使用spring过程中事务是被经常用的&#xff0c;如果不小心或者认识不做&#xff0c;事务可能会失效。下面列举几条 业务代码没有被Spring 容器管理 看下面图片类没有Componet 或者Service 注解。 方法不是public的 Transactional 注解只能用户public上&#xff0c…

51单片机入门_江协科技_29~30_OB记录的自学笔记_DS18B20温度传感器

29. DS18B20温度传感器 29.1. DS18B20介绍 •DS18B20是一种常见的数字温度传感器&#xff0c;其控制命令和数据都是以数字信号的方式输入输出&#xff0c;相比较于模拟温度传感器&#xff0c;具有功能强大、硬件简单、易扩展、抗干扰性强等特点 •测温范围&#xff1a;-55C 到 …

k8s的service为什么不能ping通?——所有的service都不能ping通吗

点击阅读原文 前提&#xff1a;kube-proxy使用iptables模式 Q service能不能ping通&#xff1f; A: 不能&#xff0c;因为k8s的service禁止了icmp协议 B: 不能&#xff0c;因为clusterIP是一个虚拟IP&#xff0c;只是用于配置netfilter规则&#xff0c;不会实际绑定设备&…

腾讯EdgeOne产品测评体验—Web服务全能一体化服务,主打一步到位

前言 现在网络Web攻击真的防不胜防啊&#xff0c;相信有很多独狼开发者自己建站&#xff0c;租个云服务器&#xff0c;一部署自己的服务&#xff0c;每隔一段时间内测和网站总有一个要崩。自己感觉难受不说&#xff0c;网站稍微有点要出头的时候&#xff0c;数不清的访问攻击就…

汽车车灯用肖特基二极管,选什么型号好?

肖特基二极管种类繁多&#xff0c;有低压降肖特基二极管、通用型肖特基二极管、快速恢复型肖特基二极管、高功率肖特基二极管、汽车级肖特基二极管等等&#xff0c;其中低压降肖特基二极管和汽车级肖特基二极管是二极管厂家东沃电子的核心优势产品。关于东沃电子推出的低压降肖…

FFmpeg: 自实现ijkplayer播放器--05ijkplayer–连接UI界面和ffplay.c

文章目录 ijkplayer时序图消息循环--回调函数实现播放器播放时状态转换播放停止ijkmediaPlay成员变量成员函数ijkplayer时序图 stream_open: frame_queue_init packet_queue_init init_clock 创建read_thread线程 创建video_refresh_thread线程 消息循环–回调函数实现 ui 和…