21.发布确认模式-高级

news2025/2/22 4:17:53

问题

生产环境中由于一些不明原因,导致rabbitmq重启,在重启的期间生产者消息投递失败,导致消息丢失,需要手动处理恢复。那么如何才能进行rabbitmq的消息可靠性投递?特别是在极端的情况,rabbitmq集群不可用的时候,无法投递的消息该如何处理?

例如这样的异常信息:

方案

生产者将发送的消息发给rabbitmq的同时,将消息备份到缓存中。如果rabbitmq宕机了。会有一个定时任务会对未成功发送的消息进行重新投递。如果交换机成功收到消息会从缓存中清除已收到的消息。

分析

造成消息丢失会有两种情况,一种是交换机故障,另一个中是队列故障。

交换机确认消息是否收到的解决办法

 启用发布确认的配置

spring:
  rabbitmq:
    host: 192.168.171.128
    username: admin
    password: 123
    port: 5672
    publisher-confirm-type: correlated

 默认是none值,是不开启的,禁用发布确认模式。

correlated,发布消息到交换机后会触发回调方法。

simple, 单个确认消息。

代码

配置类

package com.xkj.org.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    //交换机
    public static final String EXCHANGE_NAME = "confirm.exchange";
    //队列
    public static final String QUEUE_NAME = "confirm.queue";
    //Routing Key
    public static final String ROUTING_KEY = "key1";

    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                          @Qualifier("confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);

    }

}

回调接口

package com.xkj.org.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * 回调接口
 */
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback {

    @Autowired // 2.第二步将rabbitTemplate实例依赖注入进来
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() { //3.第三步执行此方法
        //将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 交换机确认回调方法
     * @param correlationData
     * @param ack
     * @param cause
     * 1.发消息 交换机收到 调用
     *  1.1 correlationData 回调消息的id及相关信息
     *  1.2 交换机收到消息 ack = true
     *  1.3 cause null
     * 2.发消息 交换机接收失败 回调
     *  2.1 correlationData 回调消息的id及相关信息
     *  2.2 交换机收到消息 ack = false
     *  2.3 cause 失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId(): "";
        if(ack) {
            log.info("交换机已经接收到id为:{}的消息", id);
        }else {
            log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);
        }
    }
}

消费者

package com.xkj.org.listener;

import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class ConfirmQueueConsumer {

    @RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
    public void receiveMsg(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), "UTF-8");
        log.info("收到队列的消息:{}",  msg);
    }
}

生产者

@ApiOperation("测试发布确认发消息")
    @GetMapping("/sendMessage/{msg}")
    public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {
        CorrelationData correlation = new CorrelationData("1");

        rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlation);
        log.info("发送消息内容{}", message);
    }

结果

小技巧:如果要是测试交换机接收失败的回调,可以通过修改生产者发消息的交换机的名字为一个不存在的名字即可。

@ApiOperation("测试发布确认发消息")
    @GetMapping("/sendMessage/{msg}")
    public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {
        CorrelationData correlation = new CorrelationData("1");

        rabbitTemplate.convertAndSend("confirm.exchange"+"123", "key1", message, correlation);
        log.info("发送消息内容{}", message);
    }

 问题:上面只能保证交换机收到消息的确认回调,不能保证队列收到消息的确认回调?

队列确认消息是否收到的解决办法

比如routingKey错了,或者队列出了问题,队列也将无法收到消息。

在仅开启生产者确认机制情况下,接换机接收到消息后,会直接给消息生产者发送确认消息。如果发现该消息不可路由,那么消息会直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

解决办法

通过设置mandatory参数可以在当消息传递过程中不可达目的时将消息返回给生产者。

添加配置

spring:
  rabbitmq:
    host: 192.168.171.128
    username: admin
    password: 123
    port: 5672
    publisher-confirm-type: correlated
    publisher-returns: true

publiser-returns发布退回消息。

说明:这里为了测试故意把routingkey写错

代码

生产者

估计把routingKey改成错误的 key1123

@ApiOperation("测试发布确认发消息")
    @GetMapping("/sendMessage/{msg}")
    public void sendMessage(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message) {
        CorrelationData correlation = new CorrelationData("1");

        rabbitTemplate.convertAndSend("confirm.exchange", "key1"+"123", message, correlation);
        log.info("发送消息内容{}", message);
    }

消费者

package com.xkj.org.listener;

import com.rabbitmq.client.Channel;
import com.xkj.org.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class ConfirmQueueConsumer {

    @RabbitListener(queues = ConfirmConfig.QUEUE_NAME)
    public void receiveMsg(Message message, Channel channel) throws Exception {
        String msg = new String(message.getBody(), "UTF-8");
        log.info("收到队列的消息:{}",  msg);
    }
}

配置

package com.xkj.org.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConfirmConfig {

    //交换机
    public static final String EXCHANGE_NAME = "confirm.exchange";
    //队列
    public static final String QUEUE_NAME = "confirm.queue";
    //Routing Key
    public static final String ROUTING_KEY = "key1";

    @Bean
    public DirectExchange confirmExchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
    }

    @Bean
    public Binding bindingQueueToExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange,
                                          @Qualifier("confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(ROUTING_KEY);

    }

}

回调接口

package com.xkj.org.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.io.UnsupportedEncodingException;

/**
 * 回调接口
 */
@Slf4j
@Component // 1.第一步实例化MyCallback这个bean
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

    @Autowired // 2.第二步将rabbitTemplate实例依赖注入进来
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() { //3.第三步执行此方法
        //将本类对象(ConfirmCallback的实现类对象)注入到RabbitTemplate中
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    /**
     * 交换机确认回调方法
     * @param correlationData
     * @param ack
     * @param cause
     * 1.发消息 交换机收到 调用
     *  1.1 correlationData 回调消息的id及相关信息
     *  1.2 交换机收到消息 ack = true
     *  1.3 cause null
     * 2.发消息 交换机接收失败 回调
     *  2.1 correlationData 回调消息的id及相关信息
     *  2.2 交换机收到消息 ack = false
     *  2.3 cause 失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId(): "";
        if(ack) {
            log.info("交换机已经接收到id为:{}的消息", id);
        }else {
            log.info("交换机还未收到id为:{}的消息,原因:{}", id, cause);
        }
    }

    /**
     * 可以在当消息传递过程中不可达目的时将消息返回给生产者
     * 注意此方法是消息传递失败才会调用,成功就不会执行
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        try {
            log.error("消息:{},被交换机:{},给退回了,原因:{},RoutingKey={}",
                    new String(message.getBody(), "UTF-8"),
                    exchange,
                    replyText,
                    routingKey
                    );
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

测试结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1954740.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从json到protobuf,接口效率的提升

在express开发的前后端调用中,express作为服务端是不二之选,它有一些很好用的body解析器来解析传入数据;而作为请求发起方,axios是非常方便的,这是一个很好的选择,它可以传输多种类型的数据给接收方。 通常…

ios生成打包证书和描述文件(保姆级)

苹果开发者地址:Apple Developer (简体中文) 1.申请苹果App ID(App的唯一标识) 选择App IDs 选择App 输入APP ID的描述和Bundle ID Explicit:唯一的ID,用于唯一标识一个应用程序,一般选Explicit WildCard:通配符ID&am…

【初阶数据结构篇】顺序表和链表算法题

文章目录 顺序表算法题移除元素删除有序数组中的重复项合并两个有序数组 链表算法题移除链表元素反转链表链表的中间结点合并两个有序链表链表分割链表的回文结构 顺序表算法题 不熟悉顺序表的可以先了解一下 顺序表实现方法 移除元素 给你一个数组 nums 和一个值 val&#x…

谷歌DeepMind的AlphaProof和AlphaGeometry 2:AI系统在国际数学奥林匹克竞赛中取得突破

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗?订阅我们的简报,深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同,从行业内部的深度分析和实用指南中受益。不要错过这个机会,成为AI领…

管不住人,你就当不好官:一流高手的3大管人秘籍,价值千金

管不住人,你就当不好官:一流高手的3大管人秘籍,价值千金 秘籍一:睁眼法 古语有云:“水至清则无鱼,人至察则无友。” 驾驭下属,学会睁一只眼闭一只眼,不要一竿子打死,…

如何在GPU服务器上安装Stable Diffusion webUI

一、前提条件 1、硬件条件 GPU:12G,建议16G以上,还是尽量勾搭,好像现在最大32G,目前个人性价比24G有时长出售。 内存:16G以上,建议32G,也是越大越好。 硬盘:最好使用…

谈谈面向对象

引言 无论你是刚入门的程序小白,还是混迹社会多年的程序大佬,谈起面向对象,想必多多少少都能侃上两句。面向对象作为程序界“家喻户晓”的一种编程思想,亦或是一种程序设计方法,重要性已是不言而喻。毫不夸张的说&…

打卡第22天------回溯算法

开始学习了,希望我可以尽快成功上岸! 一、回溯理论基础 什么是回溯法?回溯法也可以叫做回溯搜索法,它是一种搜索的方式。 回溯是递归的副产品,只要有递归就会有回溯。 回溯法的效率回溯法的本质是穷举,穷举所有可能,然后找出我们想要的答案。如果想让回溯法高效一些,可…

160. 相交链表(返回相交起点)

思路: 前提: PA headA,PB headB (B链表头节点) 过程: 1.PA与PB同时向后遍历 2.若PA遍历完,PA headB PB遍历完,PB headA 3.直到PA与PB指向相同节点(实际遍历过两次中的较短的链表即可&am…

LLM与搜索推荐

重磅推荐专栏: 《大模型AIGC》 《课程大纲》 《知识星球》 本专栏致力于探索和讨论当今最前沿的技术趋势和应用领域,包括但不限于ChatGPT和Stable Diffusion等。我们将深入研究大型模型的开发和应用,以及与之相关的人工智能生成内容(AIGC)技术。通过深入的技术解析和实践经…

24款奔驰E260后排电动座椅升级,舒适度全面提升

以下是关于 24 款奔驰 E 升级原厂后排电动座椅功能的案例讲解: 升级原厂后排电动座椅通常需要直接替换整个后排座椅,包括扶手等部件。 后排电动座椅的好处是可以通过电机调节靠背角度和座椅前后移动。例如,乘客可以通过车门上的座椅调节按钮…

聚焦民生服务 助力企业发展 区块链应用加速落地

聚焦民生服务,助力企业发展,区块链应用正在加速落地。这一趋势体现了区块链技术在多个领域的广泛应用和深远影响。以下是对这一主题的详细分析: 一、区块链在民生服务中的应用 政务服务 数据共享与打通:区块链技术利用其分布式账…

征服 Docker 镜像访问限制:KubeSphere v3.4.1 成功部署全攻略

近期,KubeSphere 社区的讨论中频繁出现关于 Docker 官方镜像仓库访问受限的问题。 本文旨在为您提供一个详细的指南, 展示在 Docker 官方镜像访问受限的情况下,如何通过 KubeKey v3.1.2 一次性成功部署 KubeSphere v3.4.1 以及 Kubernetes …

Java二叉树三序遍历的非递归实现

目录 零、本文中模拟实现的二叉树源码 一、前序遍历的非递归实现 1.代码示例: 2.与递归算法的比对演示: 二、中序遍历的非递归实现 1.代码示例: 2.与递归算法的比对演示: 三、后序遍历的非递归实现 1.代码示例: 2.与递…

VScode连接算力云服务器

打开VScode,找到插件市场,搜索Remote - SSH 下载插件Remote - SSH之后会出现下面这个,直接点击。 将下面这个恒源云租服务器的登陆指令 复制到下面之中,enter确认。 然后点第一个 然后点这个 复制粘贴这个云服务器的密码,(它不会显示,但你已经粘贴了)

Paddlenlp测试

1、环境安装 使用华为云euleros操作系统,python版本3.9.5,CPU无GPU服务器: (1)pip3 install setuptools_scm -i http://mirrors.aliyun.com/pypi/simple/ --trusted-host mirrors.aliyun.com (2&#xf…

js 替换json中的转义字符 \

例如有以下字符串 "\"{\\\"account\\\":\\\"66\\\",\\\"name\\\":\\\"66\\\"}\"" 想得到如下字符串 {"account":"66","name":"66"} 执行替换字符串 "\"{…

【科研绘图】记录一次论文结果复现

复现原论文中的图片是科研的基本功之一,它不仅验证了研究结果的可靠性,确保了科学工作的准确性和可重复性,还深刻地评估了方法的有效性,体现了对原始研究的尊重和对科学过程的严谨态度。这个过程不仅提高了研究的透明度&#xff0…

科普文:docker基础概念、软件安装和常用命令

docker基本概念 一 容器的概念 1. 什么是容器:容器是在隔离的环境里面运行的一个进程,这个隔离的环境有自己的系统目录文件,有自己的ip地址,主机名等。也可以说:容器是一种轻量级虚拟化的技术。 2. 容器相对于kvm虚…

计算机组成原理--慕课网学习笔记

本文记录了学习慕课网课程【新版】计算机基础,计算机组成原理操作系统网络时的计算机组成原理篇的笔记,方便查阅复习使用 一、概述篇 1.1 计算机的发展历史 1)计算机发展的四个阶段 ①第一个阶段:电子管计算机 背景&#xff1a…