将请求参数数据推送至RabbitMQ队列中并且捕捉消息没有到达交换机的异常

news2024/11/22 22:08:15

1:自定义mq信息类(我的交换这些信息都从nacos上直接取的,怎么从nacos取配置信息看上篇文章):

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class MqInfoEntity implements Serializable {

    private static final long serialVersionUID = 1L;


    /**
     * 交换机类型(0:DIRECT直连交换机; 1:TOPIC主题交换机; 2:FANOUT扇形交换机; 3:HEADERS头交换机)
     */
    private Integer exchangeType;

    /**
     * 交换机名称
     */
    private String exchangeName;

    /**
     * 队列名称
     */
    private String queueName;

    /**
     * 绑定关系
     */
    private String routingKey;
}

2:自定义生产者类:

import com.alibaba.fastjson.JSONArray;
import com.fescotech.ordercommon.model.MqInfoEntity;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class SendRequestParamsProducer implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //这两个变量是用来将交换机发送消息失败异常的原因在service里抛异常用
    public  boolean ack1 = true;
    public String reason;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
        rabbitTemplate.setMandatory(true);
    }


    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String deliveryNumber = correlationData.getId();
        if (ack){
            ack1=true;
            //记录发送记录
            log.info("向交换机发送消息成功,派单编号:{}",deliveryNumber);
        } else {
            ack1=false;
            reason = cause;
            log.error("向交换机发送消息失败,派单编号:{},原因为:{},", deliveryNumber, cause);
        }
    }

    /**
     * 推送消息
     * @author fu
     * @time 2023/7/18 14:28
     * @param mqInfoEntity
     * @param data为要推送的数据
     * @param correlationDataId调用时赋值一个唯一值就行
     * @return void
     */
    public void sendRabbitMq(MqInfoEntity mqInfoEntity, JSONArray data ,String correlationDataId) {
       //  rabbitTemplate.convertAndSend(mqInfoEntity.getExchangeName(), mqInfoEntity.getRoutingKey(), data, new CorrelationData(correlationDataId));
        rabbitTemplate.convertSendAndReceive(mqInfoEntity.getExchangeName(), mqInfoEntity.getRoutingKey(), data, new CorrelationData(correlationDataId));
    }

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        String json = new String(message.getBody());
        log.info("消息本身:" + json);
        log.info("退回的replyCode是:" + replyCode);
        log.info("退回的replyText是:" + replyText);
        log.info("退回的exchange是:" + exchange);
        log.info("退回的routingKey是:" + routingKey);
        String errorDesc = "未匹配到队列,交换机:"+ exchange +",routingKey:"+ routingKey;
    }
}

3:逻辑推送数据至mq(前三步是我个人需求业务处理,你们可以从第四步看):

import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSONArray;
import com.fescotech.ft.common.model.vo.Result;
import com.fescotech.ft.common.util.FtResultUtil;
import com.fescotech.ordercommon.constants.ExchangeTypeEnum;
import com.fescotech.ordercommon.model.MqInfoEntity;
import com.fescotech.ordercommon.model.param.SendMsgParam;
import com.fescotech.orderservice.config.limit.CommonMqMsgConfig;
import com.fescotech.orderservice.mq.producer.SendRequestParamsProducer;
import com.fescotech.orderservice.service.CommonMqMsgService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;


@Service
@Slf4j
public class CommonMqMsgServiceImpl implements CommonMqMsgService {

    @Autowired
    private SendRequestParamsProducer sendRequestParamsProducer;
    @Autowired
    private CommonMqMsgConfig commonMqMsgConfig;

    @Override
    public Result sendMsg(SendMsgParam sendMsgParam) {
        //1.拼接faceCode_sysChannel
        String faceCodeSysChannel = sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel();
        //2.获取nacos配置文件
        Map<String,CommonMqMsgConfig.SystemMap> map = commonMqMsgConfig.getSystemMap();
        //3.校验faceCode_sysChannel在nacos上是否有配置
        if(!map.keySet().contains(faceCodeSysChannel)){
            return FtResultUtil.result(Result.FAIL,null,"未找到"+sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel()+"配置",null);
        }

        //4.将要推送的数据转至json串(我要推送json串,你们推啥数据用啥数据就行)
        JSONArray jsonArray = JSONArray.parseArray(sendMsgParam.getRequestParams()) ;
        log.info("发送数据:{}",jsonArray);
        //5.取mq队列数据(我这是从nacos上取交换机名称、队列名称、绑定关系值,你们有现成的直接在第6步赋值即可)
        CommonMqMsgConfig.SystemMap systemMap = map.get(faceCodeSysChannel);
        //6.构建发送信息
        MqInfoEntity mqInfoEntity = MqInfoEntity.builder().exchangeName(systemMap.getExchange_name())
                .queueName(systemMap.getQueue_name()).routingKey(systemMap.getRouting_key())
                .exchangeType(ExchangeTypeEnum.TOPIC.getCode())
                .build();
        log.info("推送MQ,{},data:{}", JSONUtil.toJsonStr(mqInfoEntity),jsonArray);
        //7.推送mq
        sendRequestParamsProducer.sendRabbitMq(mqInfoEntity,jsonArray,sendMsgParam.getFaceCode()+"_"+sendMsgParam.getSysChannel());
        //8.捕捉交换机未收到消息异常原因
        boolean flag = sendRequestParamsProducer.ack1;
        if(flag){
            return FtResultUtil.result(Result.SUCCESS, "推送成功", null, null);
        }else {
            log.error("向交换机发送消息失败,编号:{},原因为:{},", faceCodeSysChannel, sendRequestParamsProducer.reason);
            return FtResultUtil.result(Result.FAIL,null,"推送失败,失败原因:"+sendRequestParamsProducer.reason,null);
        }

    }
}

推送成功:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/777728.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows10 + Mingw + Paho Mqtt C/C++编译使用

文章目录 1、前言2、按照Mingw环境3、编译paho c3.1、ssl验证大坑3.2、解决方法3.3、mingw32-make过程出现报错3.4、继续出错3.5、编译成功 4、编译paho c5、Qt使用paho mqtt库5.1、编码 1、前言 起初使用的是Cmake Visual Studio 2019进行编译&#xff0c;使用的时候出现ssl错…

《Java核心技术大会2023》——AIC松鼠活动第一期

共同深入探讨 Java 生态&#xff01;直播预约&#xff1a;视频号“IT阅读排行榜” 大会简介 人工智能在22年、23年的再次爆发让Python成为编程语言里最大的赢家&#xff1b;云原生的持续普及令Go、Rust等新生的语言有了进一步叫板传统技术体系的资本与底气。我们必须承认在近…

【搜索引擎Solr】配置 Solr 以获得最佳性能

Apache Solr 是广泛使用的搜索引擎。有几个著名的平台使用 Solr&#xff1b;Netflix 和 Instagram 是其中的一些名称。我们在 tajawal 的应用程序中一直使用 Solr 和 ElasticSearch。在这篇文章中&#xff0c;我将为您提供一些关于如何编写优化的 Schema 文件的技巧。我们不会讨…

基于linux下的高并发服务器开发(第二章)- 2.14 管道的读写特点和管道设置为非阻塞

管道的读写特点&#xff1a; 使用管道时&#xff0c;需要注意以下几种特殊的情况&#xff08;假设都是阻塞I/O操作&#xff09; 1.所有的指向管道写端的文件描述符都关闭了&#xff08;管道写端引用计数为0&#xff09;&#xff0c;有进程从管道的读端读数据&#xff0c;那么管…

SpringBoot系列--【K8s中的SpringBoot如何给应用配置健康检查?】

K8s中的SpringBoot如何给应用配置健康检查&#xff1f; 1.健康检查的必要性 作为业务监控的首要目标&#xff0c;服务的存活性&#xff0c;也就是它的健康状况&#xff0c;成为了重中之重&#xff0c;容器云平台可以根据健康检查策略来对服务实例进行自动重启或从负载均衡中摘除…

IP-GUARD如何在客户端上进行审批管理?

如何在客户端上进行审批管理&#xff1f; 4 实现步骤如下&#xff1a; 1、先在控制台-加密-加密授权设置-常规中&#xff0c;勾选允许登录审批管理平台开启客户端登录审批管理平台的功能。 2、然后客户端电脑&#xff0c;右键加密托盘登录审批账号后即可正常审批。 如何实现特…

MySQL 8.1.0正式发布!

早在五年前&#xff0c;MySQL 8.0 就发布了第一个 GA 版本&#xff0c;此后一直在这个版本进行更新&#xff0c;而没有升级大版本。最近 MySQL 官方终于发布了 MySQL 8.1.0 和 MySQL 8.0.34&#xff0c;分别代表了创新版和长期支持版。 新版本中与 SQL 相关的改进包括保存执行计…

用C++在Windows桌面上打个叉❌

我们的目标是&#xff0c;只写一二十行代码&#xff0c;用 Windows自带的原生接口&#xff0c;强行在桌面上打个大红❌&#xff0c;如图&#xff1a; 写了大半年C&#xff0c;天天和“黑乎乎” 的小窗口你侬我侬&#xff1f;赶紧来打开一扇“Windows”&#xff0c;从窗口跳进全…

数据库系统课程笔记

初步认识数据库系统 schema 英 /ˈskiːmə/ 美 /ˈskiːmə/ n.(计划或理论的)提要&#xff0c;纲要 关系模型之基本概念 关系和表的差别 关系的特性 什么是sql 创建数据库&#xff08;编写脚本&#xff09; 创建表格语法&#xff08;编写脚本&#xff09; 修改表的结构语法 …

微服务Day4——Docker

一、什么是Docker 微服务虽然具备各种各样的优势&#xff0c;但服务的拆分通用给部署带来了很大的麻烦。 分布式系统中&#xff0c;依赖的组件非常多&#xff0c;不同组件之间部署时往往会产生一些冲突。在数百上千台服务中重复部署&#xff0c;环境不一定一致&#xff0c;会…

如何生成一个随机数?

文章目录 虚假的随机数真正的随机数生成规定位数的随机数 虚假的随机数 说到如何生成一个随机数&#xff0c;可能当你百度后会看到这样一段代码。 srand((unsigned int)time(NULL)); int ret rand();那么一个随机数到底是如何生成的呢&#xff1f;我相信善于探索的你一定想知…

(黑客)自学笔记

特别声明&#xff1a; 此教程为纯技术分享&#xff01;本教程的目的决不是为那些怀有不良动机的人提供及技术支持&#xff01;也不承担因为技术被滥用所产生的连带责任&#xff01;本教程的目的在于最大限度地唤醒大家对网络安全的重视&#xff0c;并采取相应的安全措施&#x…

Docker镜像分层

文章目录 docker镜像分层镜像层构成镜像FS 构成基础镜像层扩展镜像层容器层 镜像摘要分发散列值 多架构镜像工作原理 docker镜像分层 Docker 镜像由一些松耦合&#xff08;关系不怎么紧密&#xff09;的只读镜像层组成&#xff0c;Docker Daemon 负责堆叠这些镜像层&#xff0c…

vim安装及使用

天行健&#xff0c;君子以自强不息&#xff1b;地势坤&#xff0c;君子以厚德载物。 每个人都有惰性&#xff0c;但不断学习是好好生活的根本&#xff0c;共勉&#xff01; 文章均为学习整理笔记&#xff0c;分享记录为主&#xff0c;如有错误请指正&#xff0c;共同学习进步。…

得物 Android 包体积资源优化实践

包体积优化中&#xff0c;资源优化一般都是首要且容易有成效的优化方向。资源优化是通过优化APK中的资源项来优化包体积&#xff0c;本文我们会介绍得物App在资源优化上做的一些实践。 1. 插件优化 插件优化资源在得物App最新版本上收益12MB。插件优化的日志在包体积平台有具…

Kotlin基础(六):枚举类和扩展

前言 本文主要讲解kotlin枚举类和扩展 Kotlin文章列表 Kotlin文章列表: 点击此处跳转查看 目录 1.1 枚举类 1.1.1 枚举类的基本用法 Kotlin中的枚举类&#xff08;enum class&#xff09;用于定义一组具有预定义值的常量。它们在许多情况下都很有用&#xff0c;例如表示一组…

【外设篇】I2C工作原理

目录 一、I2C 简介 二、I2C 主设备与从设备的关系 三、I2C 数据传输过程 3.1 总线空闲状态 3.2 开始位和停止位的产生 3.3 主设备处于等待状态 3.4 ACK 应答位的产生 3.5 有效的数据传输 3.6 数据的传输 总结 一、I2C 简介 I2C&#xff08;内置集成电路&#…

浏览器 html通知权限已经开了,但是还不提醒

如果您已经在Chrome浏览器中开启了HTML5通知&#xff0c;但是仍然不收到提醒&#xff0c;可能有几种可能的原因。下面是一些建议的解决方法&#xff1a; 检查浏览器设置: 确保HTML5通知在Chrome浏览器中正确启用。您可以按照以下步骤检查设置&#xff1a; 在Chrome中输入 chrom…

【Nacos源码系列】Nacos服务发现的原理

文章目录 服务发现是什么客户端服务发现服务端发现总结 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。点击跳转到网站。 上篇文章介绍了 Nacos服务注册的原理 &#xff0c;本篇文章将从客户端和服务端的…

微服务保护——Sentinel【实战篇二】

一、线程隔离 &#x1f349; 线程隔离有两种方式实现&#xff1a; 线程池隔离信号量隔离&#xff08;Sentinel默认采用&#xff09; 线程隔离&#xff08;舱壁模式&#xff09;&#x1f95d; 在添加限流规则时&#xff0c;可以选择两种阈值类型&#xff1a; QPS&#xff1a;…