如何保证消息不丢之MQ重试机制消息队列

news2024/12/26 0:45:15

1. 简介

死信队列,简称:DLXDead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另外一个交换机,这个交换机就是DLX

在这里插入图片描述

那么什么情况下会成为Dead message?

队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。

流程讲解,如图所示(以第三种情况为例):

Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。

那么什么情况下会成为Dead message?

队列的长度达到阈值。
消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false。
原队列存在消息过期设置,消息到达超时时间未被消费。

流程讲解,如图所示(以第三种情况为例):

Producer发送一条消息到Exchange并路由到设有过期时间(假设30分钟)的Queue中。
当消息的存活时间超过了30分钟后,Queue会将消息转发给DLX。
DLX接收到Dead message后,将Dead message路由到与其绑定的Queue中。
此时消费者监听此死信队列并消费此消息。

死信队列有什么用呢?

  1. 取消订单(比如下单30分钟后未付款,则取消订单,回滚库存),或者新用户注册,隔段时间进行短信问候等。
  2. 将消费者拒绝的消息发送到死信队列,然后将消息进行持久化,后续可以做业务分析或者处理。

2. TTL

因为要实现延迟消息,我们先得知道如何设置过期时间。这里指演示

TTL :Time To Live(存活时间/过期时间),当消息到达存活时间后,还没有被消费,会被自动清除。

RabbitMQ可以对消息设置过期时间,也可以对整个队列(Queue)设置过期时间。

设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
设置消息过期时间使用参数:expiration。单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。 例:现在有两条消息,第一条消息过期时间为30s,而第二条消息过期时间为15s,当过了15秒后,第二条消息不会立即过期,而是要等第一条消息被消费后,第二条消息被消费时,才会判断是否过期,所以当所有消息的过期时间一致时(比如30m后过期),最好给队列设置过期时间,而不是消息。但是有的情况确实每个消息的过期时间不一致,比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知,这就有问题了,不可能设置那么多的队列。
如果两者都进行了设置,以时间短的为准。

3. 利用死信队列的机制, 带有重试队列机制的消费队列的流程图:

在这里插入图片描述

动态绑定队列和重试队列:

/**
 * 遍历所有的 枚举队列 手动注册队列等相关bean到spring容器中
 */
@Component
public class QueueAutoRegisterAware implements BeanFactoryAware {

    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) beanFactory;
        for (QueueDeclareEnum value : QueueDeclareEnum.values()) {
            // 正常消息 队列 交换器 路由键绑定
            Queue queue = new Queue(value.getQueueName());
            DirectExchange directExchange = new DirectExchange(value.getExchangeName());
            Binding binding = BindingBuilder.bind(queue).to(directExchange).with(value.getRoutingKey());
            // 注册bean
            defaultListableBeanFactory.registerSingleton(value.getQueueName(), queue);
            defaultListableBeanFactory.registerSingleton(value.getExchangeName(), directExchange);
            defaultListableBeanFactory.registerSingleton(value.getRoutingKey(), binding);


            // TODO 重试队列 交换器 路由键(暂时没有处理, 后续需要用到mq机制则可以进行加入AOP切面重试机制)
            if (StringUtil.isNotBlank(value.getRetryQueueName()) && StringUtil.isNotBlank(value.getRetryRoutingKey())) {
                Map<String, Object> dlqParamMap = new HashMap<>(2);
                dlqParamMap.put("x-dead-letter-exchange", value.getExchangeName());
                dlqParamMap.put("x-dead-letter-routing-key", value.getRoutingKey());
                Queue retryQueue = new Queue(value.getRetryQueueName(), true, false, false, dlqParamMap);
                Binding retryBindIng = BindingBuilder.bind(retryQueue).to(directExchange).with(value.getRetryRoutingKey());

                defaultListableBeanFactory.registerSingleton(value.getRetryQueueName(), retryQueue);
                defaultListableBeanFactory.registerSingleton(value.getRetryRoutingKey(), retryBindIng);
            }
        }
    }
}

Aop异常切面,拦截异常把设置消息的重试次数和ttl过期时间,发送重试队列中;

/**
 * RabbitMQ监听器 切面
 * 异常消息发送到重试队列
 */
@Aspect
@Component
@Slf4j
@AllArgsConstructor
public class RabbitListenerAspect {


	private final RabbitTemplate rabbitTemplate;

	private final IFailureService failureService;

	private final TmsProperties tmsProperties;

	private final IOrderRequestService orderRequestService;


	@Pointcut("@annotation(org.springframework.amqp.rabbit.annotation.RabbitListener)")
	public void pointCut() {
	}


	@Around("pointCut()")
	public Object around(ProceedingJoinPoint joinPoint) {
		log.debug("MQ切面接收到消息");
		AtomicReference<Message> messageReference = new AtomicReference<>();
		Arrays.stream(joinPoint.getArgs()).forEach(arg -> {
			if (arg instanceof Message) {
				messageReference.set((Message) arg);
			}
		});
		Message message = messageReference.get();
		if (message == null) {
			log.debug("mq消息内容为空,不执行相关操作");
			return null;
		}
		Object proceed;
		// 消费出现异常时 会根据配置发送到重试队列
		try {
			//绑定租户
			String tenantId = message.getMessageProperties().getHeader(ForecastConstant.TENANT_ID);
			if (tmsProperties.isTenantApp()) {
				TmsTenantUtil.bindId(tenantId);
			}
			proceed = joinPoint.proceed();
			if (tmsProperties.isTenantApp()) {
				TmsTenantUtil.unbind();
			}
		} catch (ForecastException e) {
			log.info("预报失败,直接回传失败信息", e);
			this.forecastMqExceptionDeal(message, e);
			return null;
		} catch (Throwable throwable) {
			log.error("消费失败,异常信息:{}", throwable);
			this.sendToRetryQueue(message, throwable);
			return null;
		}
		return proceed;
	}

	/**
	 * 发送到重试队列
	 *
	 * @param message 消息内容
	 */
	private void sendToRetryQueue(Message message, Throwable throwable) {
		sendToRetryQueue(message, throwable, true);
	}


	/**
	 * 发送到重试队列
	 *
	 * @param message 消息内容
	 */
	private void sendToRetryQueue(Message message, Throwable throwable, Boolean addRetryCount) {
		String consumerQueue = message.getMessageProperties().getConsumerQueue();
		// 重试次数 默认0
		Optional<QueueDeclareEnum> first = Arrays.stream(QueueDeclareEnum.values()).filter(value ->
			value.getQueueName().equalsIgnoreCase(consumerQueue)).findFirst();
		if (!first.isPresent()) {
			return;
		}
		QueueDeclareEnum queueDeclareEnum = first.get();
		String retryQueueName = queueDeclareEnum.getRetryQueueName();
		String retryRoutingKey = queueDeclareEnum.getRetryRoutingKey();
		if (StringUtil.isEmpty(retryQueueName) || StringUtil.isEmpty(retryRoutingKey)) {
			log.info("当前队列没有配置重试队列 不进行重试,队列名:{}", queueDeclareEnum.getQueueName());
			return;
		}
		Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties().
			getHeader(ForecastConstant.RETRY_COUNT)).orElse(0);
		//有效的操作 重试次数才+1
		if (addRetryCount) {
			retryCount++;
		}
		if (retryCount == 1) {
			try {
				failureService.deal(message, throwable);
			} catch (Exception e) {
				log.error("失败最大次数处理异常", e);
			}
		}
		if (retryCount > queueDeclareEnum.getMaxRetryCount()) {
			log.info("当前消息超过队列配置最大重试次数,不进行重试,队列名:{}", queueDeclareEnum.getQueueName());
			return;
		}
		//如果是手动重试的 则不进入重试队列
		if (message.getMessageProperties().getHeader(ForecastConstant.IS_HAND)) {
			return;
		}
		Message retryMessage = MessageBuilder.fromMessage(message).setHeader(ForecastConstant.RETRY_COUNT, retryCount).build();
		this.convertAndRetry(retryMessage, queueDeclareEnum);
	}

	/**
	 * 预报失败处理
	 */
	private void forecastMqExceptionDeal(Message message, ForecastException mqException) {
		switch (mqException.getErrorCode()) {
			case ERROR_CODE_1:
				OrderEntityBO orderEntityBO = (OrderEntityBO) mqException.getData();
				//1.回传失败
				failureService.deal(message, mqException);
				//2.塞到重试任务中去 第二天0时执行
				orderRequestService.saveEntity(new OrderTaskEntity(orderEntityBO.getOrder().getCode(),
					OrderRequestTypeEnum.RETRY_FORECAST, getOperateTime()));
				//3.记录到redis中
				redisDeal(orderEntityBO);
				break;
			case ERROR_CODE_2:
				//不增加重试次数
				sendToRetryQueue(message, mqException, false);
				break;
			case ERROR_CODE_3:
				Integer retryCount = (Integer) Optional.ofNullable(message.getMessageProperties().
					getHeader(ForecastConstant.RETRY_COUNT)).orElse(0);
				if (retryCount == 3) {
					WXCallUtil.call(mqException.getCallMessage());
				}
				this.sendToRetryQueue(message, mqException);
			default:
		}
	}

	private void redisDeal(OrderEntityBO orderEntityBO) {
		String channelCode = orderEntityBO.getOrder().getChannelCode();
		String country = orderEntityBO.getReceiver().getCountryCode();
		String ruleName = country + "_" + channelCode;
		List<String> list = TmsRedisUtil.get(ForecastRedisConstant.CHANNEL_LIMIT_DATA);

		if (ObjectUtil.isEmpty(list)) {
			list = new ArrayList<>();
		}
		if (list.contains(ruleName)) {
			return;
		}
		list.add(ruleName);
		TmsRedisUtil.set(ForecastRedisConstant.CHANNEL_LIMIT_DATA, list);
		TmsRedisUtil.expireAt(ForecastRedisConstant.CHANNEL_LIMIT_DATA, getSecondDayZero());
	}

	/**
	 * 获取第二天0时 date
	 *
	 * @return
	 */
	private Date getSecondDayZero() {
		Instant instant = new Date().toInstant();
		return DateUtil.beginOfDay(Date.from(instant.plus(Duration.ofDays(1))));
	}

	/**
	 * 避免服务器时间不同步 向后兼容5分钟
	 *
	 * @return
	 */
	private Date getOperateTime() {
		//如果当前时间小于0时5分 则下次重试时间设为当天零时
		Date date = DateUtil.beginOfDay(new Date());
		if (System.currentTimeMillis() - date.getTime() < 5 * 60 * 1000) {
			return date;
		}
		return getSecondDayZero();
	}


	/**
	 * 发送消息到重试队列
	 *
	 * @param message          重试消息
	 * @param queueDeclareEnum 队列枚举
	 */
	public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum) {
		String ttlTime = String.valueOf(queueDeclareEnum.getTtlTime() * (Integer) message.getMessageProperties().getHeader(ForecastConstant.RETRY_COUNT));
		message.getMessageProperties().setExpiration(ttlTime);
		rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message);
	}

	/**
	 * 发送消息到重试队列
	 *
	 * @param message          重试消息
	 * @param queueDeclareEnum 队列枚举
	 */
	public void convertAndRetry(Message message, QueueDeclareEnum queueDeclareEnum, String ttlTime) {
		message.getMessageProperties().setExpiration(ttlTime);
		rabbitTemplate.convertAndSend(queueDeclareEnum.getExchangeName(), queueDeclareEnum.getRetryRoutingKey(), message);
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1502006.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity中PICO实现移动交互

文章目录 前言一、在允许行走的地面加上对应的组件1、Teleportation Anchor 移动锚点2、Teleportation Area 移动区域 二、在 玩家&#xff08;需要移动的对象&#xff09;上挂载对应组件1、Teleportation Provider 被移动对象2、在 Teleportation Anchor 或 Teleportation Are…

<商务世界>《第9课 产品地图》

1 产品地图 产品地图的核心是产品或用户的业务流程或地图导航&#xff0c;从用户和产品两条路线出发&#xff0c;搭建业务架构&#xff0c;并划分明确的功能模块&#xff0c;用图形化方式记录、整理、表现出产品的清晰特点。其中&#xff0c;包括用户在使用过程中做了什么、感…

景联文科技:专业提供高质量大语言模型训练数据

2024年&#xff0c;数字经济被再次写入政府工作报告中&#xff0c;报告指出要深化大数据、人工智能等研发应用&#xff0c;打造具有国际竞争力的数字产业集群。 大模型作为生成式人工智能的基础&#xff0c;日益成为国际科技竞争的焦点。人大代表杨剑宇指出&#xff0c;尽管我国…

SHARE 100M PRO:航测领域的多面手

在无人机航测领域&#xff0c;SHARE 100M PRO单镜头航测相机以其1.02亿像素的中画幅传感器和创新技术&#xff0c;正在重塑倾斜摄影的精度和效率。这款相机不仅在城市规划和土地管理中发挥着重要作用&#xff0c;还在环境监测、基础设施建设、农业管理等多个航测领域展现出其卓…

2024 RubyMine 激活,分享几个RubyMine 激活的方案

文章目录 RubyMine 公司简介我这边使用RubyMine 的理由RubyMine 2023.3 最新变化AI Assistant 正式版对 AI 生成名称建议的支持改进了 Ruby 上下文单元测试生成 RailsRails 应用程序和引擎的自定义路径Rails 路径的自动导入对存储在默认位置之外的模型、控制器和邮件器的代码洞…

Vue.js 进阶技巧:keep-alive 缓存组件解析

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

视频点播系统|基于SSM 框架+ Mysql+Java+B/S架构技术的视频点播系统设计与实现(可运行源码+数据库+设计文档+部署说明+视频演示)

目录 文末获取源码 系统功能实现 学生前台功能 学生登录、学生注册 个人中心 视频信息 我的收藏 系统公告 教师功能实现 管理员登录 管理员功能实现 视频分类管理 轮播图管理 数据库设计 系统的功能结构图 lumwen参考 概述 源码获取 文末获取源码 系统功能实…

C语言分析基础排序算法——选择排序

目录 选择排序 选择排序 堆排序 选择排序 选择排序 选择排序的基本思路是&#xff0c;定义两个区间指针begin和end&#xff0c;遍历数组中的每一个数据找出最大的数据的下标和最小的数据的下标&#xff0c;之后与begin和end指针分别交换小数据与begin的位置以及大数据和e…

华为北向网管NCE开发教程(1)闭坑选接口协议

华为北向网管NCE开发教程&#xff08;1&#xff09;闭坑选接口协议 华为北向网管NCE开发教程&#xff08;2&#xff09;REST接口开发 华为北向网管NCE开发教程&#xff08;3&#xff09;CORBA协议开发 本文一是记录自己开发华为北向网管遇到的坑&#xff0c;二是给需要的人&…

自研cloud框架专题–web模块(三)

项目特点一:框架集成 1.引入核心依赖2.配置相关功能 二:功能介绍 1.swagger支持并提供swagger快速配置2.knife增强swagger支持3.全局请求参数校验(Validation)支持4.字段脱敏支持5.默认jackson序列化6.xss,cors支持7.访问日志支持8.全局异常处理,统一返回结果9.系统关键及常用信…

AI新工具 百分50%算力确达到了GPT-4水平;将音乐轨道中的人声、鼓声、贝斯等音源分离出来等

1: Pi 百分50%算力确达到了GPT-4水平 Pi 刚刚得到了巨大的升级&#xff01;它现在由最新的 LLMInflection-2.5 提供支持&#xff0c;它在所有基准测试中都与 GPT-4 并驾齐驱&#xff0c;并且使用不到一半的计算来训练。 地址&#xff1a;https://pi.ai/ 2: Moseca 能将音乐…

Java项目:基于SSM框架实现的二手车交易平台【源码+开题报告+任务书+毕业论文+答辩ppt】

一、项目简介 本项目是一套基于SSM框架实现的二手车交易平台 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目附带全部源码可作为毕设使用。 项目都经过严格调试&#xff0c;eclipse或者idea 确保可以运行&#xff01; 该系统功能完善、界面美观、操作简单、功能齐…

【漏洞复现】锐捷 EWEB auth 远程命令执行漏洞

免责声明&#xff1a;文章来源互联网收集整理&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作者无关。该…

WPF开源的一款免费、开箱即用的翻译、OCR工具

前言 今天大姚给大家分享一款由WPF开源的、免费的&#xff08;MIT License&#xff09;、即开即用、即用即走的翻译、OCR工具&#xff1a;STranslate。 WPF介绍 WPF 是一个强大的桌面应用程序框架&#xff0c;用于构建具有丰富用户界面的 Windows 应用。它提供了灵活的布局、…

HTML5+CSS3+JS小实例:暗紫色Tabbar

实例:暗紫色Tabbar 技术栈:HTML+CSS+JS 效果: 源码: 【HTML】 <!DOCTYPE html> <html lang="zh-CN"><head><meta charset="UTF-8" /><meta name="viewport" content="width=device-width, initial-scal…

Java开发与配置用到的各类中间件官网

开发配置时用到了一些官网地址&#xff0c;记录一下。 activemq 官网&#xff1a;ActiveMQ elk 官网&#xff1a;Elasticsearch 平台 — 大规模查找实时答案 | Elastic nginx 官网&#xff1a;nginx maven 官网&#xff1a;Maven – Welcome to Apache Maven nexus 官网&a…

Linux环境下使用线程方式操作UART读写功能

目录 概述 1 Linux环境下UART设备 2 轮询方式操作UART功能实现 2.1 打开串口函数&#xff1a;usr_serial_open 2.2 关闭串口函数&#xff1a; usr_serial_close 2.3 发送数据函数&#xff1a; usr_serial_sendbytes 2.4 接收数据函数&#xff1a; thread_uart_readbytes …

Humanoid-Gym 开源人形机器人端到端强化学习训练框架!星动纪元联合清华大学、上海期智研究院发布!

系列文章目录 前言 Humanoid-Gym: Reinforcement Learning for Humanoid Robot with Zero-Shot Sim2Real Transfer GitHub Repository: GitHub - roboterax/humanoid-gym: Humanoid-Gym: Reinforcement Learning for Humanoid Robot with Zero-Shot Sim2Real Transfer 一、介…

挑战杯 基于深度学习的视频多目标跟踪实现

文章目录 1 前言2 先上成果3 多目标跟踪的两种方法3.1 方法13.2 方法2 4 Tracking By Detecting的跟踪过程4.1 存在的问题4.2 基于轨迹预测的跟踪方式 5 训练代码6 最后 1 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于深度学习的视频多目标跟踪实现 …

unity学习(51)——服务器三次注册限制以及数据库化角色信息6--完结

同一账号只写第一次&#xff0c;不同账号第一次爆炸 &#xff0c;就因为下面部分得到逻辑有问题 修改后的代码如下&#xff1a;1.成功完成角色注册信息的数据库化记录。2.每个账号上限3个角色。3.角色是可以重名的&#xff0c;但是角色的id不会重名。 internal class UserCach…