RabbitMQ应用问题 - 消息顺序性保证、消息积压问题

news2025/1/16 14:50:50

文章目录

  • MQ 消息顺序性保证
    • 概述
    • 原因分析
    • 解决方案
    • 基于 spring-cloud-stream 实现`分区消费`
  • 消息挤压问题
    • 概述
    • 原因分析
    • 解决方案

MQ 消息顺序性保证


概述

a)消息顺序性:消费者消费的消息的顺序 和 生产者发送消息的顺序是一致的.

例如 生产者 发送消息顺序是 msg1、msg2、msg3,那么消费者也需要按照 msg1、msg2、msg3 的顺序进行消费.

b)顺序不一致可能会导致哪些问题?

例如用户系统中,用户需要对昵称进行了两次修改,此时生产者发送两条消息:

  1. 消息1:修改 用户318 的昵称为 “白天”.
  2. 消息2:修改 用户318 的昵称为 “黑夜”.
    那么,按正常的逻辑来讲,用户318 的名称最后因该为 “黑夜”,但如果 消息1 是最后一个被消费者消费的消息,那么 用户318 的名称就变成了 “白天”.

原因分析

Note:以下场景成立的前提是,只能有一个生产者!因为生产者发送消息给 mq,中间都需要经过网络传输,而网络的不确定性是非常大的,因此无法保证多个生产者的消息谁先到 mq.

a)多个消费者:
多个消息会被不同的消费者并行处理,也就意味着有的消费者消费的快,有的消费者消费的慢,从而导致消息处理的顺序性无法保证.

b)网络波动:
网络波动可能会导致消费者消费完消息返回的 ack 丢失,从而使得 mq 以为消息发给消费者的中途丢失了,进而使得消息重新入队,这就意味着 如果队列中此时还有其他消息,那么这个重新入队的消息就会排在队列尾部,而头部的消息会被优先消费,导致顺序性问题.

实际上,也就意味着,只要触发了消息重新入队的操作,就会导致顺序性问题.

c)消息路由问题:
在复杂的路由场景中(例如大量应用 Topic 交换机),消息可能会根据 routingKey 被分发到不同的队列,使得无法保证全局的顺序性.

d)死信队列:
消息因为一些原因(例如被消费者返回 nack + requeue=false),然后放入死信队列,那么死信队列无论是网络传输,还是处理死信队列的消费者和普通队列的消费者并行处理,都会导致顺序不一致的情况.

解决方案

顺序性的保证分为 局部顺序性保证全局顺序性保证.
例如如下,假设消息入队的顺序为 msg1、msg2、msg3、msg4、msg5…
在这里插入图片描述
在这里插入图片描述
消息顺序性保证的常见策略:

Note:以下顺序性保证策略往往不是单独使用进行保证的,而是多种组合使用.

a)单队列,单消费者(全局顺序性)
最简单的方式就是使用单个队列,并由单个消费者进行消息. 对于消息在队列先进先出,这是 RabbitMQ 给我们保证的.

b)业务逻辑控制(全局顺序性)
例如给每个消息引入一个序号(类似 TCP 确认应答),序号 3 消费之前,要保证序号 2 被成功消费…

c)手动消息确认机制(局部顺序性)
消费者在处理完消息后,显式的发送确认,这样 RabbitMQ 才会移除并继续处理下一个消息.

Ps:在 RabbitMQ 中,当消费者接收到一条消息时,这条消息并不会立即从队列中删除。相反,消息会保持在队列中,直到 RabbitMQ 收到消费者发回的确认.

d)分区消费(局部顺序性)
单个消费者的吞吐量太低了,当需要多个消费者来提高处理速度时,可以使用分区消费. 也就是把一个队列分割成多个分区(例如根据订单系统,将 订单id 进行 hash 或者其他算法 -> 保证同样的订单 id,经过这个算法后,得到的队列名称是一致的(如果 同样的订单 id 一会跑到队列1,一会跑到队列2,就会导致多个消费并行消费,最终消费顺序不一致)),最后每个分区由一个消费者处理,保证每个分区内消息的顺序性.

Ps:RabbitMQ 本身没有实现分区消费

基于 spring-cloud-stream 实现分区消费

Note: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_partitions.html

RabbitMQ 并没有实现分区消费,因此这里可以引入一些其他的机制来实现.

a)引入依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.3.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2023.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

b)配置文件如下:

spring:
  rabbitmq:
    host: env-base
    port: 5672
    username: root
    password: 1111
  cloud:
    stream:
      bindings: # bindings 表示消息通道绑定配置
        generate-out-0: # generate-out-0 是一个输出通信的名称,表示这是生成消息的第一个通道(还可能由类似 generate-out-1 的其他通道)
          destination: partitioned.destination # 消息发送的名称为 "partitioned.destination" 的目的地(目的地在这里就是 mq 消息队列).
          producer: # 生产者配置
            # partitioned: true
            partition-key-expression: headers['partitionKey'] # 表示消息应该发送到哪个分区(这个跟代码里配置的 header 有关)
            partition-count: 2 # 表示有两个分区(两个队列). 生产者会根据 "partition-key-expression" 计算的结果,将消息分配到这两个分区之一
            required-groups: # 配置消费组
              - myGroup

c)代码如下:

import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

import java.util.Random;
import java.util.function.Supplier;

@SpringBootApplication
public class SpringCloudStreamMqApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "abc2", "abc3",
            "abc4",
    };

    public static void main(String[] args) {
        new SpringApplicationBuilder(SpringCloudStreamMqApplication.class)
                .web(WebApplicationType.NONE) //不运行其他 web 组件
                .run(args);
    }

    /**
     * 分区消息:
     * 方法返回一个函数,这个函数每次调用都会从 data 中随机选择一个字符串,
     * 生成一个带有分区键(partitionKey)的消息,并将这个消息返回.
     */
    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}

d)效果演示:
在 mq 管理平台可以看到多出来了一个交换机 和 两个队列(分区)
在这里插入图片描述
在这里插入图片描述
在 partitioned.destination.myGroup-0 中获取消息,可以看到都是 “abc2” 和 “abc4”
在这里插入图片描述
在 partitioned.destination.myGroup-1 中获取消息,可以看到都是 “abc1” 和 “abc3”
在这里插入图片描述

消息挤压问题


概述

消息挤压:在消息队列中,待处理的消息数量超过了消费者的处理能力,导致消息在队列中不断堆积的现象.

原因分析

a)消息生产过快
在流量较大的情况下,生产者发送消息速率大于消费者消费消息速率.

b)消费者处理能力不足

  • 消费端业务复杂,耗时长.
  • 系统资源限制,例如 CPU、内存、磁盘I/O 限制消费者处理速度.
  • 消费者在处理消息时出现异常,导致消息无法被正确处理和确认.
  • 服务器端配置过低

c)网络问题
由于网络抖动,消费者没有及时反馈 ack/nack,导致消息不断重发.

d)消费者代码逻辑异常,引发重试
消费者配置了手动 ack + requeue= true,导致一旦由于消费者代码逻辑引发异常,就会造成消息不断重新入队,不断重试,进而导致消息积压.

解决方案

Note:实际工作中,更多的是处理消费者的效率

a)提高消费者效率:

  • 提高消费者的数量,比如新增机器.
  • 如果消费端业务分散耗时,可以考虑使用 CompletableFuture 实现多线程异步编排.
  • 设置 prefetchCount,当一个消费者阻塞时,消息转发到其他没有阻塞的消费者.
  • 消息引发异常时,考虑配置重试机制,或者转入死信队列.

b)限制生产者速率:

  • 使用限流工具,限制消息发送速率的上限.
  • 设置消息过期时间. 如果消息过期没有消费,可以配置死信队列,不仅避免消息丢失,还减少了主队列的压力.

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2034628.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

centos7 xtrabackup mysql(8)压缩 增量备份(3)

centos7 xtrabackup mysql&#xff08;8&#xff09;压缩 增量备份&#xff08;3&#xff09; 添加数据1 添加数据测试一下 测试主从是否可以 主机端 mysql -u root -p 1234aA~1 show databases ; use company_pro; show tables ; insert into employee(name) value (‘2024…

C++实现单例模式/工厂模式

单例模式 单例模式即一个类只创建一个实例&#xff0c;提供一个全局访问点。单例模式主要是为了控制资源访问&#xff0c;在一些功能如&#xff1a;数据库连接池&#xff0c;日志类实例&#xff0c;线程池等都可以采用单例模式。 // 实现一个单例 #include<iostream> #…

户外上网黑科技|续航能力大比拼,飞猫、闪鱼、格行、品胜,哪个好

在当今的移动互联网时代&#xff0c;随身WiFi已成为我们日常生活中不可或缺的一部分&#xff0c;特别是在租房、出差、旅行或户外活动时&#xff0c;其续航能力成为了用户选择的重要因素。本文将针对飞猫、闪鱼、格行、品胜这四款热门随身WiFi产品的续航能力进行详细比较&#…

C#高级:在SQLserver中使用视图、存储过程、索引和触发器

目录 一、视图 1.视图是什么&#xff0c;有什么作用&#xff1f; 2.视图和存储过程有什么区别&#xff1f; 3.建立一个视图&#xff0c;名为PersonBorrowView&#xff0c;SQL已给出&#xff1a; 4.如果往BorrowInfo加一条记录&#xff0c;我原本的SQL会增加一条记录&#…

JAVA毕业设计635—基于Java+ssm的仓库管理系统(源代码+数据库)

毕设所有选题&#xff1a; https://blog.csdn.net/2303_76227485/article/details/131104075 基于Javassm的仓库管理系统(源代码数据库)635 一、系统介绍 分为员工、管理员两种角色 1、员工&#xff1a; 登录、库存管理、出入库管理、密码修改 2、管理员&#xff1a; 库…

(自用)交互协议设计——protobuf序列化

protobuf是一种比json和xml等序列化工具更加轻量和高效的结构化数据存储格式&#xff0c;性能比json和xml真的强很多&#xff0c;毕竟google出品。 protobuf原理 protobuf如何使用 创建xxx.proto文件 开头写上 syntax"proto2"package tutorial; 表明使用的proto…

Linux:修改网卡名称(redhat-centos-redora)

解决问题: 我现在有块网卡名ens160&#xff0c;我想把他改为ens33&#xff08;仅是模拟&#xff0c;实际中你可以任意更改&#xff0c;不是局限在这两名称中&#xff0c;举一反三&#xff09; 我当前的操作系统为&#xff1a;centos9 解决办法&#xff1a; 1.修改grub配置 …

前端学习笔记-JS篇-02

运算符 赋值运算符 对变量进行赋值的运算符。 已经学过的赋值运算符:【将等号右边的值赋予给左边&#xff0c;要求左边必须是一个容器】 其他赋值运算符: - * / % 原始写法和简化写法【其实就是java基础】 一元运算符 众多的JavaScript 的运…

免费【2024】springboot 个人健康管理网站的设计与实现

博主介绍&#xff1a;✌CSDN新星计划导师、Java领域优质创作者、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和学生毕业项目实战,高校老师/讲师/同行前辈交流✌ 技术范围&#xff1a;SpringBoot、Vue、SSM、HTML、Jsp、PHP、Nodejs、Python、爬虫、数据可视化…

php 企业员工考勤系统—计算机毕业设计源码17108

摘要 由于数据库和数据仓库技术的快速发展&#xff0c;企业员工考勤系统建设越来越向模块化、智能化、自我服务和管理科学化的方向发展。员工管理系统对处理对象和服务对象&#xff0c;自身的系统结构&#xff0c;处理能力&#xff0c;都将适应技术发展的要求发生重大的变化。 …

Linux系统安全及应用(二):PAM安全认证

文章目录 4Linux中的PAM安全认证介绍su命令的安全隐患PAM认证原理和构成PAM安全认证流程PAM 配置文件结构说明PAM 控制标记的补充说明PAM 实例 4Linux中的PAM安全认证 介绍 PAM&#xff08;Pluggable Authentication Modules&#xff09;&#xff0c;可插拔式认证模块是一种高…

高翔【自动驾驶与机器人中的SLAM技术】学习笔记(七)卡尔曼滤波器三:卡尔曼滤波器公式推导【转载】

卡尔曼滤波器三&#xff1a;卡尔曼公式推导 转载来源&#xff1a;卡尔曼滤波&#xff1a;从入门到精通 简述 考虑一个SLAM 问题&#xff0c;它由一个运动方程&#xff1a; x t f ( x t − 1 , u t ) ω t (1) \mathbf{x}_{t}f(\mathbf{x}_{t-1},\mathbf{u}_{t}) \omega_…

尚品汇-ES(三十一)

目录&#xff1a; &#xff08;1&#xff09;封装搜索相关实体对象 &#xff08;2&#xff09;搜索接口封装 &#xff08;3&#xff09;在service-list-client模块添加远程接口 &#xff08;1&#xff09;封装搜索相关实体对象 搜索参数实体&#xff1a;SearchParam 搜索参…

haproxy高级功能配置

介绍HAProxy高级配置及实用案例 一.基于cookie会话保持 cookie value:为当前server指定cookie值&#xff0c;实现基于cookie的会话黏性&#xff0c;相对于基于 source 地址hash 调度算法对客户端的粒度更精准&#xff0c;但同时也加大了haproxy负载&#xff0c;目前此模式使用…

Service服务在Android中的使用

目录 一&#xff0c;Service简介 二&#xff0c;Service的两种启动方式 1&#xff0c;非绑定式启动Service 2&#xff0c;绑定式启动Service 三&#xff0c;Service的生命周期 1&#xff0c;非绑定式Service的生命周期 2&#xff0c;绑定式Service的生命周期 四&#xf…

BCArchive加密工具实测分享:为何我觉得它很实用?

前言 你是不是经常有这样的烦恼&#xff1a;重要的文件、私密的照片、敏感的资料&#xff0c;总是担心会不小心泄露出去&#xff1f;哎呀&#xff0c;别担心&#xff0c;别担心&#xff0c;我今天要介绍的这款软件&#xff0c;简直就是守护你数据安全的超级英雄&#xff01; 在…

CVE-2012-2122 mysql/mariaDB身份认证漏洞

简介&#xff1a; 当连接MariaDB/MySQL时&#xff0c;输入的密码会与期望的正确密码比较&#xff0c;不断的尝试登录连接&#xff0c;回导致MySQL认为两个密码是相同的。也就是说只要知道用户名&#xff0c;不断尝试就能够直接登入SQL数据库。 影响范围#所有的Mariadb和mysql版…

【吊打面试官系列-Elasticsearch面试题】Elasticsearch 在部署时,对 Linux 的设置有哪些优化方法?

大家好&#xff0c;我是锋哥。今天分享关于 【Elasticsearch 在部署时&#xff0c;对 Linux 的设置有哪些优化方法&#xff1f;】面试题&#xff0c;希望对大家有帮助&#xff1b; Elasticsearch 在部署时&#xff0c;对 Linux 的设置有哪些优化方法&#xff1f; 1、64 GB 内存…

【STM32】CubeMX + CLion + FreeRTOS移植过程问题记录

文章目录 一、portable 文件选择二、自定义文件添加三、ST-Link v2 烧录问题四、STM32F407工程中程序无法启动调度器 前言   本文依照稚晖君分享的配置CLion用于STM32开发【优雅の嵌入式开发】&#xff0c;尝试配置STM32CubeMX CLion开发环境&#xff0c;并在此基础上移植Fre…

利用Emgucv绘制条形码边框16(C#)

EmguCV环境配置&#xff1a; ​​​​​​Emgu CV4图像处理之环境搭建1(C#)_emgucv 4.7-CSDN博客 本文测试环境&#xff1a; win10 64位 vistual studio 2019 Emgu CV 4.6.0 环境配置准备&#xff1a; 1 新增控制台项目&#xff0c;.net framework为4.7.2 2 把win-x…