【架构系列】RabbitMQ应用场景及在实际项目中如何搭建可靠的RabbitMQ架构体系

news2024/11/18 0:25:33

作者:后端小肥肠

创作不易,未经允许禁止转载。

1. 前言

RabbitMQ,作为一款高性能、可靠的消息队列软件,已经成为许多企业和开发团队的首选之一。它的灵活性和可扩展性使得它适用于各种应用场景,从简单的任务队列到复杂的分布式系统。本文将深入探讨RabbitMQ的应用场景以及如何在实际项目中构建可靠的RabbitMQ架构体系。

2. RabbitMQ应用场景

2.1 异步处理

在现代应用中,异步消息处理是提升用户体验和系统效率的关键。RabbitMQ可以有效地用于多种异步处理任务,例如:

  • 用户注册后的邮件发送:用户注册后,通过RabbitMQ发送一个消息到队列中,由后台服务监听并处理发送邮件的任务,从而不会延迟用户的注册过程。
  • 订单处理:在电商平台中,订单处理包括库存管理、支付确认等多个步骤,RabbitMQ可以用来在这些服务间异步传递订单信息,确保处理流程的连续性和效率。

2.2 应用解耦

RabbitMQ支持多种通信模式,如点对点、发布/订阅等,这些模式帮助系统各部分保持低耦合度,便于独立扩展和维护。例如:

  • 微服务架构中的服务通信:在微服务架构中,RabbitMQ允许各个微服务之间通过消息进行交互,而不是直接调用对方的API,这种方式减少了服务间的直接依赖。

2.3 流量削峰

在流量高峰期,如促销或大型活动期间,系统可能会遭遇巨大的访问压力。RabbitMQ可以用来缓冲入站消息,如订单或请求,从而保护后端服务不被过载:

  • 秒杀活动中的订单处理:在秒杀活动中,大量的购买请求可以先进入RabbitMQ队列,系统根据处理能力逐步从队列中取出并处理这些请求,有效避免了系统崩溃。

2.4 通信与集成

RabbitMQ提供了一个灵活的消息传递系统,可以集成复杂的企业系统。它支持多种协议和广泛的开发语言库,适用于:

  • 跨平台通信:在不同操作系统和不同编程语言编写的应用之间,RabbitMQ可以作为消息传递中间件,实现这些系统的有效通信。

2.5 日志处理和应用监控

RabbitMQ也常用于系统日志处理和监控。它可以聚合各服务产生的日志信息,并传输到日志分析系统:

  • 集中式日志管理:通过RabbitMQ,各个系统和应用的日志可以被统一收集至一个中央处理位置,便于进行日志分析、监控和报警。

2.6 数据同步

RabbitMQ 在数据同步中扮演着重要的角色,特别是在分布式系统中,它能够确保数据在多个系统或组件之间保持一致性和最新状态。这对于维护数据的完整性和及时性至关重要。例如:

  • 数据库同步:在多地数据中心运营的情况下,RabbitMQ 可以用来同步不同地点的数据库。通过消息队列,当一个数据中心的数据库更新时,相应的变更可以通过 RabbitMQ 发送到其他数据中心,从而保证所有地点的数据一致。

  • 实时数据复制:在金融服务或电子商务平台,实时数据复制是保证高可用性和灾难恢复的关键。使用 RabbitMQ,可以实现高效的数据复制策略,如将交易数据从主系统复制到备份系统或分析数据库。

  • 缓存刷新:在使用缓存提高应用性能的情况下,RabbitMQ 可以用来在数据更新时自动通知系统刷新缓存。这样,用户总是能够获取到最新的数据,而不是过时的缓存数据。

通过这些应用场景,可以看出RabbitMQ在现代软件架构中扮演的多样化角色,不仅增强了系统的可靠性和伸缩性,还提高了开发和运维的效率。

3. 在项目中如何搭建稳定RabbitMQ架构体系

3.1. RabbitMQ安装

网上RabbitMQ安装教程很多,本文只简述基于docker安装的核心步骤:

1. 环境准备,准备Cenos虚拟机,我的是7.x版本:

2. 拉取或解压RabbitMQ镜像:

3. 运行docker容器:

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 -v /home/docker/rabbitmq/rabbitmq:/var/lib/rabbitmq -v /home/docker/rabbitmq/rabbitmq_conf:/etc/rabbitmq   -e RABBITMQ_DEFAULT_VHOST=km_vhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin rabbitmq:latest

4. 进入容器 :

 docker exec -it 容器id /bin/bash

5. 运行rabbitmq-plugins enable rabbitmq_management(解决无法访问网页端15672端口问题),即可完成RabbitMQ安装。

3.2. 总体技术流程

本文以异步处理应用场景为例,展示如何构建稳定可靠的RabbitMQ架构体系:

上述流程为异步消息通信的技术流程,在异步消息通信中当消息投递后就立刻返回了结果,我们无法获取消息消费的具体过程,这就导致了虽然我们可以即刻获取程序返回状态,但是程序执行细节或是否失败无法通过程序响应返回的方式获取。

基于以上RabbitMQ异步通信的优缺点,我们要搭建一个可靠的RabbitMQ架构需要从以下几个方面入手:

生产者稳定架构:

1. 消息投递回调监听。创建消息投递回调监听函数,监听生产者投递的消息是否投递成功。

2. 消息确认表创建。创建消息确认表message_confirmation,记录消息投递状态,其中字段status反应了是否投递成功(0为为投递成功,1为投递成功)。

CREATE TABLE "public"."message_confirmation" (
  "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "status" int4,
  "create_time" timestamp(6),
  "update_time" timestamp(6),
  "message" varchar(255) COLLATE "pg_catalog"."default",
  CONSTRAINT "message_confirmation_pkey" PRIMARY KEY ("id")
)
;

ALTER TABLE "public"."message_confirmation" 
  OWNER TO "postgres";

3. 创建定时任务监听消息投递确认表。每隔一段时间遍历消息确认表,筛选出status为0的消息数据,进行重复投递动作。

消费者稳定架构

1. 死信队列运用。由于网络或外部因素导致消息消费失败,可将消息投递至死信队列进行二次消费。

2. 日志表记录。如死信队列也消费失败,可将消息写入日志表(message_error)后进行手动消费,由技术人员获取日志表中消费失败记录,排查消费失败原因。

CREATE TABLE "public"."message_error" (
  "id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "message_id" varchar(100) COLLATE "pg_catalog"."default" NOT NULL,
  "error_log" text COLLATE "pg_catalog"."default",
  "create_time" timestamp(6),
  "update_time" timestamp(6),
  CONSTRAINT "message_error_pkey" PRIMARY KEY ("id")
)
;

ALTER TABLE "public"."message_error" 
  OWNER TO "postgres";

3.3. 实战讲解

3.3.1. 环境配置
3.3.1.1. 所需版本工具
3.3.1.2. pom依赖
<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-validation</artifactId>
        </dependency>
         <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
       <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>
</dependencies>
3.3.2. 生产者核心代码讲解

3.3.2.1. yml配置
server:
  port: 8873
spring:
  datasource:
    url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_producer
    username: postgres
    password: postgres
    driver-class-name: org.postgresql.Driver
  rabbitmq:
    port: 5672
    host: 192.168.10.11
    username: admin
    password: admin
    virtual-host: my_vhost
    publisher-confirm-type: correlated
    listener:
      simple:
        acknowledge-mode: manual
3.3.2.2. 编写回调函数
 @PostConstruct
    public void regCallback() {
        // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("cause:"+cause);
                // 如果ack为true代表消息已经收到
                String messageId = correlationData.getId();

                if (!ack) {
                    // 这里可能要进行其他的方式进行存储
                    log.error("MQ队列应答失败,messageId是:" + messageId);
                    return;
                }

                try {
                    MessageConfirmation messageConfirmation = messageConfirmationMapper.selectById(messageId);
                    messageConfirmation.setStatus(1);
                    int count=messageConfirmationMapper.updateById(messageConfirmation);
                    if (count == 1) {
                        log.info("本地消息状态修改成功,消息成功投递到消息队列中...");
                    }
                } catch (Exception ex) {
                    log.error("本地消息状态修改失败,出现异常:" + ex.getMessage());
                }
            }
        });
    }

上述回调函数主要用于监听生产者发送的消息是否发送成功,并将消息发送状态更新至消息确认表中。

3.3.2.3. 编写定时任务监听消息确认表
@Configuration
@EnableScheduling
@Slf4j
public class confirmMessageTaskService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    MessageConfirmationMapper messageConfirmationMapper;

    @Scheduled(cron = "0 */1 * * * ?")
    public void sendMessage(){
        // 把消息为0的状态消息重新查询出来,投递到MQ中。
        LambdaQueryWrapper<MessageConfirmation> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(MessageConfirmation::getStatus, 0);
        List<MessageConfirmation> noConfirmMessages = messageConfirmationMapper.selectList(queryWrapper)
                .stream()
                .collect(Collectors.toList());
        noConfirmMessages.forEach((noConfirmMessage)->{
            rabbitTemplate.convertAndSend("xz_push_exchange","", JsonUtil.obj2String(noConfirmMessage),
                    new CorrelationData(noConfirmMessage.getId()));
        });
    }
}

 上述定时任务为每分钟遍历消息确认表,将status=0的消息筛选出来进行消息投递。

3.3.2.4. 消息投递
    public void sendMessage(MessageConfirmation messageConfirmation) {
        messageConfirmationMapper.insert(messageConfirmation);
        rabbitTemplate.convertAndSend("xfc_fanout_exchange","", JsonUtil.obj2String(messageConfirmation),
                new CorrelationData(messageConfirmation.getId()));
    }

3.4. 消费者核心代码讲解

3.4.1. yml配置
server:
  port: 8872
spring:
  datasource:
    url: jdbc:postgresql://127.0.0.1:5432/xfc_mq_consumer
    username: postgres
    password: postgres
    driver-class-name: org.postgresql.Driver
  rabbitmq:
    port: 5672
    host: 192.168.10.11
    username: admin
    password: admin
    virtual-host: my_vhost
    listener:
      simple:
        acknowledge-mode: manual
mybatis-plus:
  typeAliasesPackage: com.xfc.consumer.entities
  mapper-locations: classpath:mapper/*.xml
3.4.2. RabbitMQ配置类
@Configuration
public class RabbitMQConfig {
    /**
     * 死信队列
     * @return
     */
    @Bean
    public FanoutExchange deadExchange() {
        return new FanoutExchange("dead_xfc_fanout_exchange", true, false);
    }

    @Bean
    public Queue deadXfcQueue() {
        return new Queue("dead.xfc.queue", true);
    }
    @Bean
    public Binding bindDeadXfc() {
        return BindingBuilder.bind(deadXfcQueue()).to(deadExchange());
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("xfc_fanout_exchange", true, false);
    }

    @Bean
    public Queue xfcQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "dead_xfc_fanout_exchange");
        return new Queue("xfc.queue", true, false, false, args);
    }

    @Bean
    public Binding bindXfc() {
        return BindingBuilder.bind(xfcQueue()).to(fanoutExchange());
    }
}

上述代码为RabbitMQ配置类,用于在项目初始化时生成相应的交换机和队列。 

3.4.3. 队列消费
@Service
@Slf4j
public class XfcMqConsumer {
    @RabbitListener(queues = {"xfc.queue"})
    public void messageconsumer(String message, Channel channel,
                                CorrelationData correlationData,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        MessageConfirmation messageConfirmation=null;
        try {
            log.info("收到MQ的消息是: " + message );
            messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
            /**
             * 编写业务逻辑
             */
            
        } catch (Exception e) {
            e.printStackTrace();
            log.error("消息投放到死信队列"+e.getMessage(),e);
            channel.basicNack(tag,false,false);// 死信队列
        }
    }
}
3.4.4. 死信队列消费
@Service
@Slf4j
public class DeadMqConsumer {
    @Autowired
    MessageErrorMapper messageErrorMapper;
    @RabbitListener(queues = {"dead.xfc.queue"})
    public void messageconsumer(String message, Channel channel,
                                CorrelationData correlationData,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        MessageConfirmation messageConfirmation=null;
        try {
            log.info("收到MQ的消息是: " + message );
            messageConfirmation= JsonUtil.string2Obj(message, MessageConfirmation.class);
            /**
             * 编写业务逻辑
             */
        } catch (Exception e) {
            e.printStackTrace();
            /**
             * 写入message_error
             */
            messageErrorMapper.insert(new MessageError(messageConfirmation.getId(),e.getMessage(),new Date()));
            channel.basicNack(tag,false,false);// 死信队列
        }
    }
}

3.5 效果测试

以上代码编写完成后需要进行架构效果测试,其步骤如下:

1. 消息投递测试

上图调用了消息投递接口。

在消息确认表中,新增了一条消息且status=1,代表该条消息已投递成功。

2. 消费者正常消费测试

3. 消费异常测试

上图可看出消息消费异常投入到了死信队列。

在死信队列中依然消费失败。

消费失败后成功写入了日志表。

4. 结语

本文讲解了RabbitMQ应用场景以及在异步处理场景中如何搭建稳定的RabbitMQ架构体系,逐步详细的给出了生产者及消费者端代码并在文章最后对架构效果进行了测试,感兴趣的同学可根据代码进行实操,有疑问和其他见解也可在评论区留言,我看到都会回复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1642727.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

eNSP-动态路由(ospf协议)

一、拓扑结构搭建 二、主机配置 pc1 pc2 三、路由器配置 1.AR2配置 <Huawei>sys #进入系统视图 [Huawei]int g0/0/0 #进入接口 [Huawei-GigabitEthernet0/0/0]ip address 192.168.0.2 24 #设置ip地址 [Huawei-GigabitEthernet0/0/0]q #返回上一级 [Huawei]int g0/0/1 …

asp.net结课作业中遇到的问题解决2

目录 1、如何实现评论交流的界面 2、如果想要将文字添加到数据库中&#xff0c;而不是乱码&#xff0c;该怎么修改 3、如果想要添加的数据已经存在于数据库&#xff0c;就不允许添加了&#xff0c;该如何实现 4、想要实现某个模块下有好几个小的功能该如何实现 5、想要实现…

Unity 性能优化之数据面板(Statistics)(一)

提示&#xff1a;仅供参考&#xff0c;有误之处&#xff0c;麻烦大佬指出&#xff0c;不胜感激&#xff01; 文章目录 前言一、unity 统计数据面板&#xff08;Statistics&#xff09;1.Audio属性2.Graphics属性 二、什么是Draw Call&#xff1f;三、Unity3D stats也可以通过代…

大型语言模型的新挑战:AMR语义表示的神秘力量

DeepVisionary 每日深度学习前沿科技推送&顶会论文&数学建模与科技信息前沿资讯分享&#xff0c;与你一起了解前沿科技知识&#xff01; 引言&#xff1a;AMR在大型语言模型中的作用 在自然语言处理&#xff08;NLP&#xff09;的领域中&#xff0c;抽象意义表示&…

【Android学习】自定义文本框和输入监听

实现功能 以上代码可实现功能&#xff1a; 1 自定义文本框样式 2. 文本框触发形式转变 3. 文本框输入长度监听&#xff0c;达到最大长度关闭软键盘 4. password框触发检测phone框内容 1. drawable自定义形状 我创建了editor_focus.xml 和 editor_unfocus.xml&#xff0c;两者仅…

性能优化(一):ArrayList还是LinkedList?

引言 集合作为一种存储数据的容器&#xff0c;是我们日常开发中使用最频繁的对象类型之一。JDK为开发者提供了一系列的集合类型&#xff0c;这些集合类型使用不同的数据结构来实现。因此&#xff0c;不同的集合类型&#xff0c;使用场景也不同。 很多同学在面试的时候&#x…

3.2Java全栈开发前端+后端(全栈工程师进阶之路)-前端框架VUE3框架-企业级应用- Vuex

Vuex简介 Vuex概述 Vuex是一个专门为Vue.js应用程序开发的状态管理模式, 它采用集中式存储管理所有组件的公共状态, 并以相应的规 则保证状态以一种可预测的方式发生变化. 试想这样的场景, 比如一个Vue的根实例下面有一个根组件名为App.vue, 它下面有两个子组件A.vue和B.vu…

巧记英语单词

页面 在输入框中填写英语单词的谐音 这样的话就进行了一次英语单词的记忆练习。 页面代码 <% layout(/layouts/default.html, {title: 英语单词管理, libs: [dataGrid]}){ %> <div class"main-content"><div class"box box-main">&l…

如何为 Nestjs 编写单元测试和 E2E 测试

前言 最近在给一个 nestjs 项目写单元测试&#xff08;Unit Testing&#xff09;和 e2e 测试&#xff08;End-to-End Testing&#xff0c;端到端测试&#xff0c;简称 e2e 测试&#xff09;&#xff0c;这是我第一次给后端项目写测试&#xff0c;发现和之前给前端项目写测试还…

练习题(2024/5/4)

1 二叉树的所有路径 给你一个二叉树的根节点 root &#xff0c;按 任意顺序 &#xff0c;返回所有从根节点到叶子节点的路径。 叶子节点 是指没有子节点的节点。 示例 1&#xff1a; 输入&#xff1a;root [1,2,3,null,5] 输出&#xff1a;["1->2->5","…

学习Rust的第26天:Rust中的cp

在本文中复刻了 cp 实用程序的功能&#xff0c;我想默认使其递归&#xff0c;因为每次我想复制时都输入 -R 文件夹都会觉得有点重复&#xff0c;本文代码将与前文代码保持相似&#xff0c;我们只会更改程序的核心功能和一些变量名称以匹配用例 Pseudo Code 伪代码 function cop…

STM32G474 CMAKE VSCODE 开发环境搭建

本篇博文尝试搭建 stm32g474 的开发环境 一. 工具安装 1. 关于 MinGW、OpenOCD、Zadig 这些工具的下载和安装见 JlinkOpenOCDSTM32 Vscode 下载和调试环境搭建_vscode openocd stm32 jlink-CSDN博客 2. 导出一个 STM32 的 CMAKE 工程&#xff0c;这里略过。 3. 安装 ninja …

C++:继承-继承权限

在C中&#xff0c;类的权限分为公有、私有和保护三种。这些权限控制了类的成员&#xff08;数据成员和成员函数&#xff09;对外部代码的可见性和访问性。 公有&#xff08;public&#xff09;权限&#xff1a; 在公有权限下声明的成员可以被类的外部代码直接访问&#xff1b;公…

小程序引入 Vant Weapp 极简教程

一切以 Vant Weapp 官方文档 为准 Vant Weapp 官方文档 - 快速入手 1. 安装nodejs 前往官网下载安装即可 nodejs官网 安装好后 在命令行&#xff08;winr&#xff0c;输入cmd&#xff09;输入 node -v若显示版本信息&#xff0c;即为安装成功 2. 在 小程序根目录 命令行/终端…

langchain+qwen1.5-7b-chat搭建本地RAG系统

已开源&#xff1a;https://github.com/stay-leave/enhance_llm 概念 检索增强生成&#xff08;Retrieval Augmented Generation, RAG&#xff09;是一种结合语言模型和信息检索的技术&#xff0c;用于生成更准确且与上下文相关的输出。 通用模型遇到的问题&#xff0c;也是…

头歌实践教学平台:三维图形观察OpenGL1.0

一.任务描述 根据提示&#xff0c;在右侧修改代码&#xff0c;并自己绘制出图形。平台会对你编写的代码进行测试。 1.本关任务 学习了解三维图形几何变换原理。 理解掌握OpenGL三维图形几何变换的方法。 理解掌握OpenGL程序的模型视图变换。 掌握OpenGL三维图形显示与观察的…

怎么用CAPL与Python交互

怎么用CAPL与其他应用程序交互 怎么用CAPL与Python交互 怎么用CAPL与Python交互 怎么用CAPL与其他应用程序交互前言1、CAPL怎么调Python&#xff1f;1.1CAPL调Python的命令1.2CAPL调用Python实例 2、怎么把python运行的结果返回给CAPL2.1通过环境变量 3、CAPL调Python的输入参…

OCC笔记:选择TopoDS_Shape顶点、边、面等等

1、通过AIS_InteractiveContext的函数访问当前选择的图形 hAISContext->InitSelected(); hAISContext->MoreSelected(); hAISContext->NextSelected()&#xff1b; hAISContext->SelectedShape()&#xff1b; 其中hAISContext->SelectedShape()通过StdSelect_…

C语言——rand函数

一、rand函数 这是一个在 C 标准库 <stdlib.h> 中定义的函数&#xff0c;用于生成伪随机数&#xff0c;默认情况下&#xff0c;它生成从 0 到 RAND_MAX 的伪随机数&#xff0c;其中 RAND_MAX 是一个常数&#xff0c;通常是 32767。 1、函数原型&#xff1a; 2、函数返回…

MongoDB的分片集群

MongoDB分片技术 介绍 ​ 分片&#xff08;sharding&#xff09;是MongoDB用来将大型集合分割到不同服务器上采用的方法。分片这种说法起源于关系型数据库。但是实际上非关系型数据库在分片方面相比于传统的关系型数据库更有优势。 ​ 与MySQL分库方案对比&#xff0c;MongoDB…