rabbitMQ对消息不可达处理-备份交换机/备份队列

news2025/1/10 1:56:27

在这里插入图片描述

生产者发送消息,在消息不可达指定队列时,可以借助扇出类型交换机(之前写过消息回退的处理方案,扇出交换机处理的方案优先级高于消息回退)处理不可达消息,然后放置一个备份队列,供消费者处理不可达消息,同时也加一个报警队列,对于不能走正常流程的消息进行消费者告警。

先用方法配置类把各个组件声明:

在这里插入图片描述

package com.esint.configs;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class BackupConfig {

    /**
     * 定义组件常量名字
     */
    //交换机- 确认交换机额
    public static final String EXCHANGE_SURE = "sure.ex";
    //交换机- 备份交换机额
    public static final String EXCHANGE_BACK = "backup.ex";

    //队列- 正常确认队列
    public static final String QUEUE_SURE = "sure.queue";
    //队列-备份队列
    public static final String QUEUE_BACKUP = "backup.queue";
    //队列-警告队列
    public static final String QUEUE_WARN = "warn.queue";

    //routing-key
    public static final String ROUTING_KEY_SURE = "key1";

    /**
     * 声明组件
     */
    //确认交换机
    @Bean("sureExchange")
    public DirectExchange sureExchange(){
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("alternate-exchange",EXCHANGE_BACK);
        return ExchangeBuilder.directExchange(EXCHANGE_SURE).durable(true).withArguments(arguments).build();

    }

    //备份交换机
    @Bean("backExchange")
    public FanoutExchange backExchange(){
        return new FanoutExchange(EXCHANGE_BACK);
    }

    //确认队列
    @Bean("sureQueue")
    public Queue sureQueue(){
        return QueueBuilder.durable(QUEUE_SURE).build();
    }

    //备份队列
    @Bean("backupQueue")
    public Queue backupQueue(){
        return QueueBuilder.durable(QUEUE_BACKUP).build();
    }

    //警告队列
    @Bean("warnQueue")
    public Queue warnQueue(){
        return QueueBuilder.durable(QUEUE_WARN).build();
    }

    /**
     * 绑定组件  确认队列 绑定 确认交换机 with key1
     */
    @Bean
    public Binding sureQueueBindingSureExchange(@Qualifier("sureQueue") Queue sureQueue,
                                             @Qualifier("sureExchange")DirectExchange sureExchange){
        return BindingBuilder.bind(sureQueue).to(sureExchange).with(ROUTING_KEY_SURE);

    }

    /**
     * 绑定组件 备份队列 绑定 备份交换机
     */
    @Bean
    public Binding backupQueueBindingBackupExchange(@Qualifier("backupQueue") Queue backupQueue,
                                                    @Qualifier("backExchange")FanoutExchange backExchange){

        return  BindingBuilder.bind(backupQueue).to(backExchange);
    }

    /**
     * 绑定组件 警告队列 绑定 备份交换机
     */
    @Bean
    public Binding warnQueueBindingBackupExchange(@Qualifier("warnQueue") Queue warnQueue,
                                                  @Qualifier("backExchange")FanoutExchange backExchange){

        return  BindingBuilder.bind(warnQueue).to(backExchange);

    }
}

生产者: 我们做出两个方法,一个可正常进行流程,一个routingKey异常无法路由到指定队列
package com.esint.controller;

import com.esint.configs.BackupConfig;
import com.esint.constants.ResponseCode;
import com.esint.entity.ResponseEntity;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import org.apache.kafka.clients.producer.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.concurrent.ExecutionException;


@Api(value = "rabbitMQ-备份队列测试")
@RestController
@RequestMapping("/rabbit")
public class BackUpExchangeController {


    @Autowired
    private RabbitTemplate rabbitTemplate;

    @ApiOperation(value = "routingKey正常测试",httpMethod = "GET",tags = {"去正常流程"})
    @ApiImplicitParams({
            @ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")
    })
    @ResponseBody
    @RequestMapping(value = "/test1", method = RequestMethod.GET)
    public ResponseEntity test1(String str ) {

        rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE,str);
        return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyOk:"+str);

    }

    @ApiOperation(value = "routingKey非正常测试",httpMethod = "GET",tags = {"去备份-警告"})
    @ApiImplicitParams({
            @ApiImplicitParam(name="str",value="消息体",required = false,dataType = "String")
    })
    @ResponseBody
    @RequestMapping(value = "/test2", method = RequestMethod.GET)
    public ResponseEntity test2(String str ) {

        rabbitTemplate.convertAndSend(BackupConfig.EXCHANGE_SURE, BackupConfig.ROUTING_KEY_SURE+"wrong",str);

        return new ResponseEntity(ResponseCode.SUCCESS).addData("routingKeyWrong:"+str);

    }

}

三个消费者分别监听正常队列 备份队列 警告队列

确认队列消费者:

package com.esint.consumer;

import com.esint.configs.BackupConfig;
import com.esint.configs.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class BackUpConsumer01 {

    @RabbitListener(queues = BackupConfig.QUEUE_SURE)
    public void reveiver(Message message){
        log.info("正常消费者C1:" +  new String(message.getBody()),"UTF-8");
    }

}

备份队列消费者:

package com.esint.consumer;

import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class BackUpConsumer02 {

    @RabbitListener(queues = BackupConfig.QUEUE_BACKUP)
    public void reveiver(Message message){
        log.info("备份消费者C2:" +  new String(message.getBody()),"UTF-8");
    }
}

警告队列消费者:

package com.esint.consumer;

import com.esint.configs.BackupConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class BackUpConsumer03 {

    @RabbitListener(queues = BackupConfig.QUEUE_WARN)
    public void reveiver(Message message){
        log.info("警告消费者C3:" +  new String(message.getBody()),"UTF-8");
    }

}

测试:

1.正常流程测试
在这里插入图片描述
在这里插入图片描述

 com.esint.consumer.BackUpConsumer01      : 正常消费者C1:你好啊 正常队列

2.路由不达消息测试

在这里插入图片描述
在这里插入图片描述

com.esint.consumer.BackUpConsumer03      : 警告消费者C3:这个消息不可达 routing-key不对 它去哪里了?
com.esint.consumer.BackUpConsumer02      : 备份消费者C2:这个消息不可达 routing-key不对 它去哪里了?

测试达到预期结果!

在队列消息不可达时,备份交换机处理优先级高于消息回退处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1258462.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pytorch中的gather的理解和用法

Pytorch中的gather的理解和用法 这个Gather的用法花费了点时间&#xff0c;我相信很多人一开始不太懂。 跟着我简单理解。 首先样例是&#xff1a; tensor([[ 3, 4, 5],[ 6, 7, 8],[ 9, 10, 11]])然后index: [[2, 1, 0]]然后执行的代码&#xff1a; tensor_0.gather(0…

使用 SIEM 管理安全事件

每家公司都必须处理检测、管理和解决安全事件&#xff0c;未能制定事件响应计划可能会对任何组织产生重大的影响&#xff0c;无论是在财务损失还是声誉损害方面。本文探讨了事件响应的重要性、检测和管理事件的关键要素&#xff0c;以及帮助组织处理安全事件的最佳实践。 安全…

FLASK博客系列6——数据库之谜

我们上一篇已经实现了简易博客界面&#xff0c;你还记得我们的博客数据是自己手动写的吗&#xff1f;但实际应用中&#xff0c;我们是不可能这样做的。大部分程序都需要保存数据&#xff0c;所以不可避免要使用数据库。我们这里为了简单方便快捷&#xff0c;使用了超级经典的SQ…

如何使用内网穿透将Tomcat网页发布到公共互联网上【内网穿透】

文章目录 前言1.本地Tomcat网页搭建1.1 Tomcat安装1.2 配置环境变量1.3 环境配置1.4 Tomcat运行测试1.5 Cpolar安装和注册 2.本地网页发布2.1.Cpolar云端设置2.2 Cpolar本地设置 3.公网访问测试4.结语 前言 Tomcat作为一个轻量级的服务器&#xff0c;不仅名字很有趣&#xff0…

Redis面试题:分布式锁相关问题

目录 面试官&#xff1a;Redis的内存用完了会发生什么&#xff1f; 面试官&#xff1a;Redis分布式锁如何实现 ? 面试官&#xff1a;好的&#xff0c;那你如何控制Redis实现分布式锁有效时长呢&#xff1f; 面试官&#xff1a;好的&#xff0c;redisson实现的分布式锁是可重…

现货黄金走势图下载与保存

MetaTrader 4 (MT4) 是一款在全球范围内广受欢迎的现货黄金交易软件&#xff0c;简单性和灵活性是其深受市场欢迎的原因。它的显示界面的主要部分由品种的走势图表组成&#xff0c;投资者可以在其中查看实时的行情走势。屏幕左上角是市场观察窗口&#xff0c;当中列出了平台所有…

群晖NAS:docker(Container Manager)、npm安装Verdaccio并常见命令集合

群晖NAS&#xff1a;docker&#xff08;Container Manager&#xff09;、npm安装Verdaccio并常见命令集合 自建 npm 资源库&#xff0c;使用Verdaccio。如果觉得麻烦&#xff0c;直接可以在外网注册 https://www.npmjs.com/ 网站。大同小异&#xff0c;自己搭建搭建方便局域网…

如何进行有效的移动应用测试?

1、识别关键功能: 对于移动应用测试&#xff0c;首先要了解应用的需求和功能规格&#xff0c;确定哪些功能是最关键的。 关键功能通常是用户最常用的功能&#xff0c;对应用的成功和用户体验至关重要。 2、设定测试目标和用例: 针对每个关键功能&#xff0c;设置具体的测试目…

基于springboot+maven的个人理财管理系统

基于springbootmaven的个人理财管理系统,演示地址:个人理财系统登录界面 用户名:admin,密码&#xff1a;123456 共分为用户信息管理(用户信息&#xff0c;银行卡&#xff0c;个人征信)&#xff0c;理财产品管理(零钱理财&#xff0c;工资理财&#xff0c;期限理财&#xff0c…

如何在本地安装部署WinSCP,并实现公网远程本地服务器

可视化文件编辑与SSH传输神器WinSCP如何公网远程本地服务器 文章目录 可视化文件编辑与SSH传输神器WinSCP如何公网远程本地服务器1. 简介2. 软件下载安装&#xff1a;3. SSH链接服务器4. WinSCP使用公网TCP地址链接本地服务器5. WinSCP使用固定公网TCP地址访问服务器 1. 简介 …

有效实施的五条教学策略

作为老师&#xff0c;是否曾为如何提高教学质量而苦恼&#xff1f;也为如何引导学生而思考&#xff1f;如果你正面临这些困扰&#xff0c;那么这篇文章将对你有帮助。为你介绍五条教学策略&#xff0c;帮你实施教学&#xff0c;提高效果。 明确教学目标 你是否知道你的教学目标…

3D数字孪生场景编辑器

在线工具推荐&#xff1a; 3D数字孪生场景编辑器 - GLTF/GLB材质纹理编辑器 - 3D模型在线转换 - Three.js AI自动纹理开发包 - YOLO 虚幻合成数据生成器 - 三维模型预览图生成器 数字孪生的强大功能来自于将真实世界的资产与真实世界的数据联系起来&#xff0c;因此您可以…

代码块01-Java

代码块01 一、介绍二、语法三、好处举例 四、使用细节五、练习题1题2 一、介绍 代码块又称为初始化块&#xff0c;属于类中的成员[即是类的一部分]&#xff0c;类似于方法&#xff0c;将逻辑语句封装在方法体中&#xff0c;通过包围起来。 但和方法不同&#xff0c;没有方法名…

ZKP15.1 Secure ZK Circuits via Formal Methods

ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 15: Secure ZK Circuits via Formal Methods (Guest Lecturer: Yu Feng (UCSB & Veridise)) Motivation Bugs in blockchain software are extremely dangers and costly.Smart Contract Bugs, Blockchain Protocol Bugs,…

OSError: symbolic link privilege not held报错解决

本人情况介绍 本人在复现某个代码的时候&#xff0c;需要安装开源代码已经封装好的setup.py代码。具体安装的库具体如下。 fairseqpython3.6.0pytorch1.6.0File2ROUGE 在安装fairseq的时候遇见了如下问题。 Installing build dependencies … done Getting requirements to …

使用 Redis Zset 有序集合实现排行榜功能(SpringBoot环境)

目录 一、前言二、Redis Zset 的基本操作三、通过Redis 命令模拟排行榜功能3.1、排行榜生成3.2、排行榜查询 四、SpringBoot 使用 Redis Zset 有序集合实现排行榜功能 一、前言 排行榜功能是非常常见的需求&#xff0c;例如商品售卖排行榜单、游戏中的积分排行榜、配送员完单排…

原创文章生成器-批量原文高质量伪原创

在信息爆炸的时代&#xff0c;创作者们面临的挑战愈发严峻。写一篇原创文章&#xff0c;不仅需要脑洞大开&#xff0c;还得担心自己的文字是否能够迎合读者口味。原创文章生成器只需输入标题或关键词&#xff0c;即可轻松生成原创文章。而与此同时&#xff0c;147SEO改写软件也…

FLASK博客系列8——我也有后台管理

上次我们学习了如何往数据库里插入数据&#xff0c;显示我们自己的文章。 有些朋友可能会问&#xff0c;django有后台管理&#xff0c;插入不用这么麻烦&#xff0c;那flask有类似的吗&#xff1f;当然有&#xff0c;而且还挺多的。今天我们就用一个最常用的包来完成 flask-adm…

数据库系统原理与实践 笔记 #10

文章目录 数据库系统原理与实践 笔记 #10存储管理与索引(续)数据字典存储系统元数据的关系表示 数据缓冲区存储访问缓冲区管理器缓冲区替换策略 顺序索引基本概念索引技术评价指标顺序索引稠密索引稀疏索引索引多级索引辅助索引主索引与辅助索引多码索引 B树索引B树索引文件B树…

机器学习的复习笔记2-回归

一、什么是回归 机器学习中的回归是一种预测性分析任务&#xff0c;旨在找出因变量&#xff08;目标变量&#xff09;和自变量&#xff08;预测变量&#xff09;之间的关系。与分类问题不同&#xff0c;回归问题关注的是预测连续型或数值型数据&#xff0c;如温度、年龄、薪水…