RabbitMq-接收消息+redis消费者重复接收

news2024/12/28 5:40:22

在接触RammitMQ时,好多文章都说在配置中设置属性 

# rabbitmq 配置
rabbitmq:
  host: xxx.xxx.xxx.xxx
  port: xxxx
  username: xxx
  password: xxxxxx
  ## 生产端配置
  # 开启发布确认,就是confirm模式. 消费端ack应答后,才将消息从队列中删除
  #确认消息已发送到队列(Queue)
  publisher-returns: true
  #确认消息已发送到交换机(Exchange)
  publisher-confirm-type: correlated
  listener: #消费者 端配置
    retry:
      enabled: true # 是否支持重试
      default-requeue-rejected: false
      max-attempts: 5 #最大重试次数
      initial-interval: 3000 # 重试时间间隔
    direct:
      acknowledge-mode: manual
    simple:
      acknowledge-mode: manual

消息接收消息失败时,可以重复调用5次;按照此操作,发现没有重复调用。

----------------------------------正确思路---------------------------------------------------------------------------------

设置完配置文件属性后,在代码中利用redis与channel.basicNack联合使用,将错误记录保存至数据库,方便查找原因;

---------------------------------------代码

package com.charg.listener;

import com.charg.common.constant.CacheConstants;
import com.charg.common.constant.Constants;
import com.charg.common.utils.JsonUtils;
import com.charg.common.utils.redis.RedisUtils;
import com.charg.constant.RabbitConstants;
import com.charg.product.domain.bo.ProductDeviceBo;
import com.charg.product.domain.bo.RabMsgLogBo;
import com.charg.product.service.IProductDeviceService;
import com.charg.product.service.IRabMsgLogService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.Duration;

/**
 * rabbitmq 监听


 */
@Slf4j
@Component
public class RabbitQueueListener {

    /**
     * 最大重试次数
     */
    private static int maxReconsumeCount = 3;
    @Autowired
    private StringRedisTemplate redisTemplate;
    /**
     * 监听  队列的处理器
     *
     * @param message
     */

    @RabbitListener(queues = "队列名称")
    @RabbitHandler
    public void onMessage(Message message, Channel channel) {
        //唯一标识
        String messageId = message.getMessageProperties().getMessageId();
        try {
            //判断messageId在redis中是否存在
            if (verificationMessageId(messageId)) {
                log.error("消息已重复处理,拒绝再次接收...");
                // 拒绝消息
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
            } else {//不存在 则处理消息
                // 接收消息
                if (StringUtils.isNotBlank(new String(message.getBody()))) {
                    //修改业务逻辑
                    if (!false) {
                        log.error("消息即将再次返回队列处理...逻辑错误");
                        // 处理最大回调次数
                        getMaximumNumber(message, channel);
                    } else {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                        //加入缓存
                        addMessageId(message);
                    }
                } else {
                    log.info("消息为空拒绝接收...");
                    // 拒绝消息
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            try {
                if (message.getMessageProperties().getRedelivered()) {
                    log.error("消息已重复处理,拒绝再次接收----...");
                    // 拒绝消息
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
                } else {
                    log.error("消息即将再次返回队列处理...");
                    // 处理最大回调次数
                    getMaximumNumber(message, channel);
                }
            } catch (Exception exception) {
                exception.printStackTrace();
            }
        }
    }

   

    /**
     * 记录消息最大次数
     *
     * @param message
     * @param channel
     * @throws IOException
     */
    private void getMaximumNumber(Message message, Channel channel) {
        try {
            int recounsumeCounts = RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId()) == null ? 0 : RedisUtils.getCacheObject("messageMaxKey"+message.getMessageProperties().getMessageId());

            if (maxReconsumeCount > recounsumeCounts) {
                log.info("maxMessageId(message.getMessageProperties().getMessageId())=" + recounsumeCounts);
        
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                // 记录重试次数
                maxMessageId(message.getMessageProperties().getMessageId());
            } else {
                log.info("次数达到三次了呢---------" + RedisUtils.getCacheObject(CacheConstants.MESSAGE_MAX_KEY + message.getMessageProperties().getMessageId()));
                // 将消息重新放回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                // 清除缓存
                RedisUtils.deleteObject("messageMaxKey" + message.getMessageProperties().getMessageId());
                //重试三次后,还是失败 需记录到数据库
                addRabMsgLog(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 设置消息的最大重试次数
     */
    public void maxMessageId(String messageId) {
        String messageMax ="messageMaxKey"+ messageId;
        // 存入缓存,用来记录该消息重试了几次
        if (RedisUtils.hasKey(messageMax)) {
            RedisUtils.incrAtomicValue(messageMax);
        } else {
            //错误的消息-插入数据库
            RedisUtils.setCacheObject(messageMax, 1, Duration.ofHours(Constants.MESSAGE_TIME));

        }
    }

    /**
     * 校验消息是否消费过该消息
     *
     * @param messageId 消息id
     * @return
     */
    public boolean verificationMessageId(String messageId) {
        // 消息是否存在key
    
        String verifyIsExistKey ="messageExistKey" + messageId;
        if ((RedisUtils.hasKey(verifyIsExistKey))) {
            return true;
        }
        return false;
    }

    /**
     * 保存消费过消息
     *
     * @param message 消息
     * @return
     */
    public void addMessageId(Message message) {
        // 存入缓存
        RedisUtils.setCacheObject("messageExistKey" + message.getMessageProperties().getMessageId(), message.getMessageProperties().getMessageId(), 1);
    }

    /**
     * 消息队列 失败日志 操作
     * 自己存数据库逻辑
     */
    public void addRabMsgLog(Message message) {
        log.info("====操作日志===");
        //将内容记录到数据库
    }

}
--------------------------------数据库表

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/463962.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法记录lday3 LinkedList 链表移除 + 链表构建 + 链表反转reverse

今日任务 ● 链表理论基础 ● 203.移除链表元素 ● 707.设计链表 ● 206.反转链表 链表理论基础 建议:了解一下链接基础,以及链表和数组的区别 文章链接:https://programmercarl.com/%E9%93%BE%E8%A1%A8%E7%90%86%E8%AE%BA%E5%9F%BA%E7%A…

【SpringBoot源码剥析】| 依赖管理

目录 一. 🦁 依赖管理Ⅰ. 部分dependency导入时为啥不需要指定版本?1.1 父依赖启动器的工作1.2 问题答案 Ⅱ. 项目运行依赖的JAR包是从何而来的?2.1 分析源码2.2 问题答案 二. 🦁 总结 一. 🦁 依赖管理 Ⅰ. 部分dependency导入时…

Linux——中断和时间管理(中)

目录 驱动中的中断处理 中断下半部 软中断 tasklet 工作队列 驱动中的中断处理 通过上一节的分析不难发现,要在驱动中支持中断,则需要构造一个 struct irqaction的结构对象,并根据IRQ 号加入到对应的链表中(因为 irq_des 已经在内核初始…

golang微服务项目通用流水线

golang微服务项目通用流水线 工作中随着业务越来越大,微服务的项目也越来越多,最开始的时候是一个服务一个流水线,然后还分了三个环境,也就是一个服务三个流水线,后面就越来越不利于管理维护了,因此&#…

马云的创业故事及他人生中的摆渡人-创建阿里巴巴(五)

著名的“18罗汉大会” 以及“马云成功背后的男人” 1999年大年初五,杭州湖畔花园小区,18个人坐满了一屋子, 这是阿里巴巴的第一次全员大会,马云激情澎湃地讲了2个小时,并且专门请了摄影师全程录像。 这就是传说中的…

SD卡无法识别怎么办?

SD卡是一种可移动存储设备,广泛应用于各种电子设备,如Android智能手机、平板电脑或相机等,您可以将SD卡连接到计算机以传输一些文件。但有些时候,当您打开文件资源管理器后,可能会发现您的SD卡不显示,无法使…

即时通讯IM源码应该如何做好安全防护?

即时通讯(Instant Messaging,IM)在现代社会中已经成为了人们日常生活中必不可少的沟通工具。无论是在家庭、教育、商业或政府等各行各业中,IM都扮演着重要的角色。然而,随着IM使用率的增加,相应的安全威胁也…

程序员基础的硬件知识(cpu、主板、显卡、内存条等)

一、综合简介 cpu:负责运算数据,就等于你的大脑运算速度。 显卡:本来没有显卡,后来因为大家对图片要求越来越高,视频要求越来越高,啥都让cpu算太累了,于是分出来一个,专门用来计算…

华为云服务EulerOS release 2.0 版本安装大象数据库

1连接华为服务器 下载并按照命令yum install -y postgresql-server 2 初始化 postgresql-setup initdb 3启动 systemctl start postgresql.service 启动服务 4开放端口(如果防火墙已经关闭则可以省略) iptables -I INPUT -p tcp --dport 5432 -j ACCEPT 5验证安装结果&…

Cortex-A7中断详解(二)

CP15协处理器 CP15协处理器一般用于存储系统管理,但是在中断中也会使用到,CP15协处理器一共有16个32位寄存器。CP15协处理器的访问通过以下指令完成: MRC:将CP15协处理器中的寄存器数据读到ARM寄存器中。MCR:将ARM寄…

坚持刷题2个月,终于去了梦寐以求的大厂....

写在前面 最近一个读者和我反馈,他坚持刷题2个月,终于去了他梦寐以求的大厂,薪资涨幅非常可观,期间面字节跳动还遇到了原题…并表示目前国内的大厂和一些独角兽,已经越来越效仿硅谷公司的做法,通过面试给定…

软件测试的测试用例

1.白盒和黑盒测试: 黑盒测试:把代码看成一个黑盒子,只关心输入和输出结果之间的关系 产品功能是否符合要求; 白盒测试:能够看到代码本身,针对代码本身进行测试,测试代码本身的逻辑是否符合规范。 2.测试用…

常用图标(icon)css下载

1、演示图例(icon1.css)[24*18] 2、演示图例(icon2.css)[24*24] 3、演示图例(icon3.css)[24*24] 4、演示图例(icon4.css)[24*18] 5、演示图例(icon5.css)[26*…

C/C++每日一练(20230426)

目录 1. 不喜欢带钱的小C 🌟🌟 2. 数组排序 ※ 3. 超级素数 ※ 🌟 每日一练刷题专栏 🌟 Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 专栏 1. 不喜欢带钱的小C 小C不喜欢带钱&#xff0c…

linux 安装zsh shell工具

安装zsh sudo apt install zsh chsh -s /bin/zsh sh -c "$(curl -fsSL https://raw.github.com/robbyrussell/oh-my-zsh/master/tools/install.sh)" 这一步需要网络状态比较好 ~$ git clone --depth 1 https://github.com/junegunn/fzf.git ~/.fzf ~$ ~/.fzf/i…

USB2.0、USB3.0和typec引脚定义

USB2.0 USB 2 (Type A) pinout PinNameDirectionColorDescription1Vred5 V power2D-←→whiteData -3D←→greenData 4GNDblackGround USB 2 (Type B) pinout PinNameDirectionColorDescription1Vred5 V power2D-←→whiteData -3D←→greenData 4GNDblackGround USB Mini/M…

输入 jupyter notebook 报错 ModuleNotFoundError: No module named ‘pysqlite2‘ 解决方案

今天在cmd命令行中输入jupyter notebook想要打开jupyter时,出现了以下问题:即找不到模块‘pysqlite2’。 找到出问题的文件“sessionmanager.py”,发现出问题的地方在于:尝试导入sqlite3没有导致失败 因此,以下是解决…

HCIA-RS实验-ENSP设备的基础配置

本文主要简单地介绍ENSP设备的基础配置,帮助读者快速上手使用ENSP。可以掌握一些基础的配置方案,更改名称,系统时间,系统地区、密码登录等信息 以下是该文章的拓扑图;现将这2台设备启动;后续双击即可进入命…

mac软件卸载不干净怎么回事 mac如何卸载软件干净

很多苹果用户会发现,mac卸载软件不干净。明明是早都卸载的软件还能看到那些软件的图标和残留文件夹。mac软件卸载不干净怎么回事?mac如何卸载软件干净?今天小编就来教大家如何将软件彻底卸载,保证电脑磁盘的干净。 一、mac软件卸…

“分布式基础概念”全面解析,让你秒懂分布式系统!【一】

前言 在项目中学习这些技术、加深了对其的使用和深层次的理解。以下总结来自谷粒商城项目案例资料 1、什么是微服务? 微服务架构风格,就像是把一个单独的应用程序开发为一套小服务,每个小服务运行在自己的进程中,并使用轻量级机…