仿大众点评——秒杀系统部分03——RabbitMq措施

news2024/9/19 10:49:04

RabbitMq保证消息不丢失

RabbitMQ如何保证消息的可靠性:
1.从生产者到消息队列,congfirm模式(与事务相比confirm模式最大的优势是异步)通过消息确认机制来保证,通过给每个指派唯一标志,完成消费后返回ack确认, 2 消息队列 消息队列的持久化,包括交换机持久化,消息队列持久化以及消息持久化 3.消费者 消费完返回完后手动ack确认,开启RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。 若长时间没有ack确认会像下一个消费者发送任务,同时需要做等幂性处理。

RabbitMQ消息丢失的情况

RabbitMQ 消息丢失的源头主要有以下三个:

  • 生产者丢失消息
  • RabbitMQ 丢失消息
  • 消费者丢失消息

生产者丢失消息解决方案

事务消息机制:在生产者发送消息之前,通过 channel.txSelect 开启一个事务,接着发送消息,如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚 channel.txRollback 然后重新发送;假如 RabbitMQ 收到了这个消息,就可以提交事务channel.txCommit;但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。
// 开启事务
channel.txSelect
try {
      // 这里发送消息
} catch (Exception e) {
      channel.txRollback
// 这里再次重发这条消息
}
// 提交事务
channel.txCommit
confirm 机制:就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

一般采用异步confirm,所以发送消息之前,需要把消息存起来。所以我们需要为消息分配全局唯一的Id,与消息内容一一对应。

生产者也需要监听Broker发送的通知,根据ack / nack 进行确认。

而具体实现时,需要注意:

  • 限制重发次数:超限应标志为任务异常,通过人工处理,避免一直重发浪费资源
  • 定时扫描未确认消息,因为broker的响应可能会丢失
rabbitmq:
    # 开启发送确认
    publisher-confirm-type: correlated
@Service
@Slf4j
public class MqSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    //发送秒杀信息
    public void sendSeckillMessage(String msg){
        log.info("发送消息:" + msg);
        rabbitTemplate.convertAndSend("seckillExchange", "seckill.message", msg);
    }
}
//连接工厂
    @Resource
    private CachingConnectionFactory cachingConnectionFactory;
//向spring容器中注入RabbitTemplate对象,并配置开启确认消息回调和消息失败回调
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
        /**
         * 消息确认回调,确认消息是否到达broker
         * correlationData:消息唯一标识
         * ack:确认结果,消息是否发送成功
         * cause:失败原因
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            String msgId = correlationData.getId();
            if (ack) {
                LOGGER.info("消息发送成功============>" + msgId);
                //监听消息是否发送成功,如果成功将状态改为1
//                mailLogService.update(new UpdateWrapper<MailLog>().set("status", 1).eq("msgId", msgId));
            } else {
                LOGGER.error("消息发送到queue失败============>" + msgId);
            }
        });
        return rabbitTemplate;
    }

RabbitMQ 丢失消息解决方案

消息持久化

RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,需要给exchange、queue和message都进行持久化。

设置持久化,两个步骤:必须都设置。

  • 创建 exchange、queue 时,设置为持久化 durable = true:(其实默认是持久化):会持久化 queue 的元数据,但不会持久化 queue 里的消息
  • 因此需要单独设置消息的持久化:发送消息时,将消息Properties的 deliveryMode=PERSISTENT:

无法保障100%不丢失,因为可能持久化完成前就宕机。

(若设置持久化,会在持久化完成后再发出ack)

@Bean
    public Queue queue(){
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:该队列是否只供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能-一个消费者消费
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        return new Queue(QUEUE);
    }
//绑定correlationId、以及消息持久化
        rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message, Correlation correlation) {
                MessageProperties messageProperties = message.getMessageProperties();

                if (correlation instanceof CorrelationData) {
                    String correlationId = ((CorrelationData) correlation).getId();
                    messageProperties.setCorrelationId(correlationId);
                }
                // 持久化处理
                messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                MessageProperties messageProperties = message.getMessageProperties();
                messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                return message;
            }
消息入库

消息入库,顾名思义就是将要发送的消息保存到数据库中。

  • 发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示消息发送了但还没收到确认;收到确认后将status设为1,表示RabbitMQ已收到消息。
  • 生产端这边还需要开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认,这边定时器刚好检索到这条status=0的消息,所以要设置时间)
  • 还没收到确认的消息取出重发 (消息重发可能会造成幂等性问题,这里消费端要做幂等性处理),可能重发还会失败,所以还要添加一个最大重发次数字段retry_count,超过就做另外的处理。
    在这里插入图片描述
消息延迟投递,做二次确认,回调检查

在这里插入图片描述

消费者消息丢失

如何保证消费成功(或限流处理):对应消费端丢数据,即消费完成前,宕机

ACK确认机制

RabbitMQ默认是自动ack的,缺点:

  • 消息失败,则重新入队,但入队还是在队列首部,很容易造成死循环(可通过 default-requeue-rejected覆盖)
  • 只要有消息,就会源源不断地发送给客户端,而不管客户端能否消费的完(即无法限流)。注意:不要搞混了none和auto,none才是发送了就不管,auto只是自动回发ack。

因此使用手动ack确认

  • application.yml配置
spring:
	rabbitmq:
	    listener:
	      simple:
	        # 消费者最小数量
	        concurrency: 10
	        # 消费者最大数量
	        max-concurrency: 10
	        # 限制消费者每次处理消息的数量(预抓取:等价于缓冲区大小,限流关键)
	        prefetch: 1
	        # manual:手动确认
	        acknowledge-mode: manual
@RabbitListener(queues = "seckillQueue")
    @RabbitHandler
    public void receiveDeadMsg(String msg, Channel channel, Message messages) throws Exception {
        try {
            log.info("接收消息:" + msg);
            VoucherOrder voucherOrder = JsonUtil.jsonStr2Object(msg, VoucherOrder.class);
            voucherOrderService.createVoucherOrder(voucherOrder);
            // 回发ack,第二个参数multiple,表示是否批量确认
            channel.basicAck(messages.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 回发Nack,且重回队列为false,防死循环
            channel.basicNack(messages.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/22318.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LeetCode每日一题】——136.只出现一次的数字

文章目录一【题目类别】二【题目难度】三【题目编号】四【题目描述】五【题目示例】六【解题思路】七【题目提示】八【时间频度】九【代码实现】十【提交结果】一【题目类别】 数组 二【题目难度】 简单 三【题目编号】 136.只出现一次的数字 四【题目描述】 给你一个 非…

[附源码]SSM计算机毕业设计风景区管理系统JAVA

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

再学DataX

一、DataX简介 DataX官网文档&#xff1a;https://github.com/alibaba/DataX/blob/master/introduction.md DataX 是一个异构数据源离线同步工具&#xff0c;致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同…

MyBatis基于XML的使用——动态sql

1、动态sql 动态 SQL 是 MyBatis 的强大特性之一。如果你使用过 JDBC 或其它 类似的框架&#xff0c;你应该能理解根据不同条件拼接 SQL 语句有多痛苦&#xff0c;例如拼接时要确保不能忘记添加必要的空格&#xff0c;还要注意去掉列表最后一个列名的逗号。 利用动态 SQL&#…

14服务-ClearDiagnosticlnformation

诊断协议那些事儿 诊断协议那些事儿专栏系列文章&#xff0c;本文介绍存储数据传输服务下的14服务ClearDiagnosticlnformation&#xff0c;客户端使用ClearDiagnosticlnformation服务清除一个或多个服务器存储器中的诊断信息。 关联文章&#xff1a;$19服务:DTCStatusMask和s…

CockroachDB-读和写

本文知识点来源于官网地址https://www.cockroachlabs.com/docs/stable/architecture/reads-and-writes-overview.html 查询执行 当CRDB执行查询时&#xff0c;集群将请求路由到包含相关数据的范围的Leaseholder。如果查询涉及多个范围&#xff0c;则请求将发送给多个Leasehol…

求实数的整数次幂(循环版)(高效)(位运算解题)

求实数的整数次幂(循环版)(高效) (10 分) 原理图&#xff1a; 请编写函数&#xff0c;用循环语句以最快的方法求任意实数的任意整数次幂。 函数原型 double Power(double x, int n); 说明&#xff1a;参数 x 为底数&#xff0c;n 为指数。若参数正确&#xff0c;则函数值为…

智能驾驶开启产业新赛道:资本扎堆布局车规级高精定位

2022年被称为高阶智能驾驶元年的背后&#xff0c;新的产业链正在悄然发展。 车规级高精定位便是其中之一。2022年10月&#xff0c;主业聚焦于动力总成测试的上海华依科技集团股份有限公司&#xff08;以下简称“华依科技”&#xff0c;688071.SH&#xff09;&#xff0c;发布公…

漫画风格迁移神器 AnimeGANv2:快速生成你的漫画形象

生成你的漫画形象&#xff01; 漫画风格迁移神器 AnimeGANv2 文章目录生成你的漫画形象&#xff01; 漫画风格迁移神器 AnimeGANv2快速在线生成你的漫画形象AnimeGAN 简要介绍与其他动漫风格迁移模型的效果对比AnimeGANv2 的优点AnimeGANv2 风格多样化AnimeGANv2 网络结构快速生…

基于stm32单片机的水位检测自动抽水系统

资料编号&#xff1a;106 下面是相关功能视频演示&#xff1a; 106-基于stm32单片机的水位检测自动抽水系统Proteus仿真&#xff08;源码仿真全套资料&#xff09;功能介绍&#xff1a; 使用滑动变阻器模拟水位监测器&#xff0c;通过改变电压值表示水位的变化。stm32通过ADC…

【前端】从 0 到 1 实现一个网站框架(一、注册 [1] )

Hi~你好呀&#xff0c;等你很久啦~ 我是 LStar&#xff0c;一枚来自北京的初二女生&#xff0c;2020 年年初加入 CSDN。 话不多说&#xff0c;直入主题~&#xff08;我现在看两年多前我 11 岁那会发的文章&#xff0c;越看越想笑。为了不让四年后 18 岁的我看着这篇文章露出 …

超详细的mysql多表操作教程

目录 外键约束 概念 特点 操作 多表联合查询 概念 操作 多表操作总结 外键约束 概念 特点 定义一个外键时&#xff0c;需要遵守下列规则&#xff1a; 主表必须已经存在于数据库中&#xff0c;或者是当前正在创建的表。 必须为主表定义主键。 主键不能包含空值&#xf…

967亿销售额!博世解码智能汽车新蓝图

随着新一轮科技革命和产业变革的深化&#xff0c;在低碳化、电动化和智能化的推动下&#xff0c;处于变革关键时期的新能源汽车产业&#xff0c;正逐步由“政策驱动”转向“市场驱动”&#xff0c;智能化、网联化成为新趋势。 据中国汽车工业协会统计&#xff0c;今年我国新能…

通过 Traefik Hub 暴露家里的网络服务

Traefik Hub 简介 &#x1f4da;️Reference: 你的云原生网络平台 -- 发布和加固你的容器从未如此简单。 Traefik Hub 为您在 Kubernetes 或其他容器平台上运行的服务提供一个网关。 Traefik Hub 定位&#xff1a; 云原生网络平台 它有 2 大核心功能&#xff0c;我这次体验感…

pytorch深度学习实战lesson23

第二十三课 AlexNet AlexNet是在2012年被发表的一个金典之作&#xff0c;并在当年取得了ImageNet最好成绩&#xff0c;也是在那年之后&#xff0c;更多的更深的神经网路被提出&#xff0c;比如优秀的vgg,GoogleLeNet. 其官方提供的数据模型&#xff0c;准确率达到57.1%,top 1-5…

认识计算机中的简单指令集

我们现在有了一个新的寄存器&#xff0c;叫做指令寄存器。它包含一个字节&#xff0c;不同的内容表示控制部分的不同操作模式。也被称为指令代码。指令寄存器是一个字节&#xff0c;因此可能有多达256条不同的指令。所有指令都涉及在总线上移动字节。指令将导致字节进出RAM&…

【JavaEE】PCB和进程调度的基本过程

文章目录什么是进程PCB的组成PID内存指针文件描述符表并行和并发进程调度相关属性进程的状态优先级上下文进程的记账信息什么是进程 进程是正在运行的程序的实例&#xff08;an instance of a computer program that is being executed&#xff09; 进程&#xff08;process&am…

《爱的四十条法则》

《爱的四十条法则》 [土]艾丽芙沙法克 作者用别样的手法间接向我们阐述了爱的四十条法则&#xff0c;每一条都会触及不同阶段的灵魂&#xff0c;我仅将文中感触较深的摘录如下&#xff1a; 1.尽管有人这样说&#xff0c;但是爱绝对不是来的快&#xff0c;去的也快的甜蜜感觉而…

长尾分布系列论文解析(二)Delving into Deep Imbalanced Regression

大纲引言回归问题中的长尾分布LDSFDS实验和结果总结引言 本文是长尾分布系列论文解析的第二篇&#xff0c;前情提要详见长尾分布系列论文解析&#xff08;一&#xff09;Decoupling Representation and Classifier for Long-Tailed Recognition&#xff0c;本篇要介绍的是回归任…

弹性力学之边界条件

作者&#xff1a;张伟伟&#xff0c;来源&#xff1a;力学酒吧 弹性力学基本方程包括平衡方程、几何方程和广义胡克定律&#xff0c;其中平衡方程和几何方程都属于微分方程。我们知道&#xff0c;在求解微分方程时&#xff0c;会出现积分常数&#xff0c;只有确定了积分常数&a…