SpringBoot —— 整合RabbitMQ常见问题及解决方案

news2025/1/11 5:24:21

前言

企业中最常用的消息中间件既不是RocketMQ,也不是Kafka,而是RabbitMQ。

RocketMQ很强大,但主要是阿里推广自己的云产品而开源出来的一款消息队列,其实中小企业用RocketMQ的没有想象中那么多。

至于Kafka,主要还是用在大数据和日志采集方面,除了一些公司有特定的需求会使用外,对消息收发准确率要求较高的公司依然是以RabbitMQ作为企业级消息队列的首选


一、使用步骤

1.引入依赖

<!--AMQP依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
</dependency>

2.环境配置

这里需要创建2个springboot项目,一个 provider (生产者),一个consumer(消费者)

生产者application.yml
生产者配置文件
消费者application.yml
消费者配置文件

3.生产者处理消息队列

创建消息队列

package com.local.springboot.springbootcommon.config.amqp;

import com.local.springboot.springbootcommon.constant.RabbitMQConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;


@Configuration
public class RabbitMQConfig {

    /**
     * 创建队列说明
     * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
     * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
     * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
     * 一般设置一下队列的持久化就好,其余两个就是默认false
     * @return
     */
    @Bean
    public Queue goodsEventQueue() {
        return new Queue(RabbitMQConstant.QUEUE_GOODS_EVENT, true, false, false, null);
    }

    /**
     * 创建交换机
     */
    @Bean
    public DirectExchange goodsEventExchange() {
        return new DirectExchange(RabbitMQConstant.EXCHANGE_GOODS_EXCHANGE, true, false);
    }

    /**
     * 将队列绑定到交换机上
     */
    @Bean
    public Binding goodsQueueToGoodsExchange() {
        return BindingBuilder
                .bind(goodsEventQueue())
                .to(goodsEventExchange())
                .with(RabbitMQConstant.ROUTING_KEY_GOODS_EVENT);
    }
}

启动生产者服务,浏览器打开http://127.0.0.1:15672/,可以看见消息队列创建
在这里插入图片描述
发送消息
在业务需要的地方,发生消息至消息队列

@Resource
private RabbitTemplate rabbitTemplate;

@Override
public ApiResponse saveItem(ItemEntity itemEntity) {
    if (itemEntity != null) {
        String id = itemEntity.getSkuId();
        if (StringUtils.isNotBlank(id)) {
            ItemEntity entity = getById(id);
            if (entity != null) {
                BeanUtils.copyProperties(itemEntity, entity);
                updateById(entity);
            }
        } else {
            EntityUtil.initEntity(itemEntity);
            itemEntity.setSkuId(IdWorker.get32UUID());
            save(itemEntity);
        }
    }
    // 同步商品信息
    rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_GOODS_EXCHANGE, RabbitMQConstant.ROUTING_KEY_GOODS_EVENT, itemEntity);
    return ApiResponse.ok();
}

4.消费者监听队列

package com.local.springboot.springbootservice.listener;

import com.local.springboot.springbootdao.entity.ItemEntity;
import com.local.springboot.springbootservice.service.search.ElasticSearchService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;

@Component
@Slf4j
public class GoodsEventQueueListener {

    @Resource
    private ElasticSearchService elasticSearchService;

    @RabbitListener(queues = "goodsEventQueue")
    public void onGoodsEvent(ItemEntity itemEntity, Channel channel
            , @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        // 同步商品至es
        try {
            log.info("同步商品事件队列接收参数:{}", itemEntity);
            // 业务处理
            elasticSearchService.addGoods(itemEntity);
        } catch (Exception e) {
            log.error("同步商品事件异常:{}", e.getMessage());
            e.printStackTrace();
        } finally {
            // 手动签收消息
            channel.basicAck(tag, false);
        }
    }
}

5.运行结果

上述业务是在添加商品时,向消息队列发送消息,消费者接收消息之后对商品进行相应的处理,实现业务上的解耦。

同时运行两个服务,生产者调用添加商品接口

查看日志,消费者接收到消息之后做相应处理
在这里插入图片描述
至此,SpringBoot 简单整合RabbitMQ成功结束。

二、问题及解决

1.消息丢失

消息丢失可能的原因

①消息发出后,中途网络故障,服务器没收到
②消息发出后,服务器收到了,还没持久化,服务器宕机
③消息发出后,服务器收到了,服务挂了,消息自动签收,消费方还未处理业务逻辑。

在说解决方案之前,我们需要明白两个概念:消息确认机制消息签收机制

1.消息确认机制

主要是生产者使用的机制,用来确认消息是否被成功消费。

添加配置如下:

publisher-returns: true #确认消息已发送到队列(Queue)
publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
@Component
@Slf4j
public class RabbitMQProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    

    /**
     * 发送消息
     *
     * @param exchange
     * @param routingKey
     * @param source
     */
    public void sendMessage(String exchange, String routingKey, ItemEntity source) {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.convertAndSend(exchange, routingKey, source);
    }

    /**
     * 成功接收后的回调
     *
     * @param correlationData
     * @param b
     * @param s
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        // 成功后的处理
    }

    /**
     * 失败后的回调
     *
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        // 失败后的处理
    }
}

实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback这两个接口的方法后,可以对失败或者成功之后进行相应处理,之后进一步做消息补偿。

但是这种方法并不推荐,因为这种机制十分降低MQ的性能,一般采用后台管理实现人工补偿,两种方法只是性能与运维成本之间的一种抉择

2.消息签收机制

一般RabbitMQ的消息是自动签收的,你可以理解为快递签收了,那么这个快递的状态就从发送变为已签收,唯一的区别是快递公司会对物流轨迹有记录,而MQ签收后就从队列中删除了。

在开发中,我们一般都采用手动签收的方式,这样可以有效避免消息的丢失。

3.解决方案

上述两个概念搞清楚之后,再回过头来看消息丢失的原因

①和②是由于生产方未开启消息确认机制导致
③是由于消费方未开启手动签收机制导致。

解决方案

①生产方发送消息时,要try…catch,在catch中捕获异常,并将MQ发送的关键内容记录到日志表中,日志表中要有消息发送状态,若发送失败,由定时任务定期扫描重发并更新状态;
②生产方publisher必须要加入确认回调机制,确认成功发送并签收的消息,如果进入失败回调方法,就修改数据库消息的状态,等待定时任务重发;
③消费方要开启手动签收ACK机制,消费成功才将消息移除,失败或因异常情况而尚未处理,就重新入队。

2.消息积压

1.出现原因

消息积压出现的场景一般有两种:

①消费方的服务挂掉,导致一直无法消费消息;
②消费方的服务节点太少,导致消费能力不足,从而出现积压,这种情况极可能就是生产方的流量过大导致。

2.解决方案

①既然消费能力不足,那就扩展更多消费节点,提升消费能力;
②建立专门的队列消费服务,将消息批量取出并持久化,之后再慢慢消费。

①就是最直接的方式,也是消息积压最常用的解决方案,但有些企业考虑到服务器成本压力,会选择第②种方案进行迂回,先通过一个独立服务把要消费的消息存起来,比如存到数据库,之后再慢慢处理这些消息即可。

2.消息重复

1.出现原因

消息重复大体上有两种情况会出现:

①消息消费成功,事务已提交,签收时结果服务器宕机或网络原因导致签收失败,消息状态会由unack转变为ready,重新发送给其他消费方;
②消息消费失败,由于retry重试机制,重新入队又将消息发送出去。

2.解决方案

网上大体上能搜罗到的方法有三种:

①消费方业务接口做好幂等;
②消息日志表保存MQ发送时的唯一消息ID,消费方可以根据这个唯一ID进行判断避免消息重复;
③消费方的Message对象有个getRedelivered()方法返回Boolean,为TRUE就表示重复发送过来的。

这里只推荐第一种,业务方法幂等这是最直接有效的方式,②还要和数据库产生交互,③有可能导致第一次消费失败但第二次消费成功的情况被砍掉。
ps:
幂等性:就是一条命令执行任意多次所产生的影响和执行一次的影响相同

(这里分布式锁应该能解决这个问题)

最简单的方案就是,在数据库中建一个消息日志表,这个表记录消息ID和消息执行状态。这个我们消费消息的逻辑变为:在消息日志中增加一个消息记录,再根据消息记录,执行业务。我们每次都会在插入之前检查该消息是否已存在。这样就不会出现一条消息被多次执行的情况。这里的数据库也可以使用redis/memcache来实现唯一约束方案。

2.小结

消息丢失、消息重复、消息积压三个问题中,实际上主要解决的还是消息丢失,而消息丢失的最常见企业级方案之一就是定时任务补偿。

其实MQ只是一个做为辅助的中间件,使用MQ的目的就是解耦和转发,不用做多余的事情,保证MQ本身是流畅的、职责单一的即可

总结

本文主要简单讲述了SpringBoot整合RabbitMQ的过程,以及RabbitMQ的三种常见问题及解决方案

其实RabbitMQ本身的性能还是很强大的,总结以下三点:

①消息100%投递会增加运维成本,中小企业视情况使用,非必要不使用;
②消息确认机制影响性能,非必要不使用;
③消费者先保证消息能签收,业务处理失败可以人工补偿。

此外消息中间件的问题其实还有很多,比如

  • 序列化、传输协议,以及内存管理等问题?
  • 为什么消息队列能实现高吞吐?
  • 消息中间件中的队列模型与发布订阅模型的区别?
  • 如何选型消息中间件?

参考文章 https://baijiahao.baidu.com/s?id=1737713844357727373&wfr=spider&for=pc

« 上一章:SpringBoot —— 简单多模块构建

创作不易,关注💖、点赞👍、收藏🎉就是对作者最大的鼓励👏,欢迎在下方评论留言🧐

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/66303.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

常见的推荐算法原理介绍

常见的推荐算法原理介绍&#xff0c;随着互联网的发展短视频运营越来越精准化&#xff0c;我们身边常见的抖音、火山小视频等软件让你刷的停不下来&#xff0c;这些软件会根据你的浏览行为推荐你感兴趣的相关内容&#xff0c;这就用到了很多推荐算法在里面。 在淘宝购物&#…

Linux 负载均衡介绍之LVS工作模式-DR直接路由模式

Linux 负载均衡介绍之LVS工作模式-DR直接路由模式 图示&#xff1a; 工作原理&#xff1a; ①.客户端将请求发往前端的负载均衡器&#xff0c;请求报文源地址是CIP&#xff0c;目标地址为VIP。 ②.负载均衡器收到报文后&#xff0c;发现请求的是在规则里面存在的地址&#x…

[Java反序列化]—Shiro反序列化(二)

0x01 这篇利用CC链来进行RCE 利用分析 在shiro-web 中加上CC依赖 <dependency><groupId>commons-collections</groupId><artifactId>commons-collections</artifactId><version>3.2.1</version><scope>compile</scope>…

ZKP方案衍变及对比

1. 引言 2019年是ZKP方案创新井喷的一年。 2019年10月&#xff0c;Chiesa在#zk0x04上的分享 State of the SNARG-scape - Alessandro Chiesa (UC Berkeley, StarkWare, Zcash)&#xff0c;有&#xff1a; 根据reference string的类型&#xff0c;可将zk-SNARKs分类为&#…

1.集群环境搭建

1.集群信息概览 2.集群环境搭建 2.1第一台服务器 修改静态ipvim /etc/sysconfig/network-scripts/ifcfg-ens33修改主机名echo first-node /etc/hostname修改主机名映射echo 192.168.226.140 first-node >> /etc/hosts echo 192.168.226.141 second-node >> /…

Redis缓存 缓存穿透+缓存雪崩+缓存击穿的原因及其解决方案

Redis缓存 缓存穿透缓存雪崩缓存击穿的原因及其解决方案 文章目录Redis缓存 缓存穿透缓存雪崩缓存击穿的原因及其解决方案一、缓存穿透是什么&#xff1f;解决方案&#xff1a;二、缓存雪崩是什么&#xff1f;解决方案三、缓存击穿是什么&#xff1f;解决方案一、缓存穿透是什么…

【保姆级·创建对象】如何通过factory-method创建对象

这个步骤在createBeanInstance()方法中有使用&#xff0c;我们先来看下这个方法中都干了些啥(&#xff61;&#xff65;ω&#xff65;&#xff61;)&#xff89; 首先&#xff0c;方法开头确认了beanClass是否被加载&#xff08;因为只有被加载叻的对象才是可以实例化的&#…

深入浅出MySQL事务和锁定语句

https://dev.mysql.com/doc/refman/8.0/en/sql-transactional-statements.html 13.3事务和锁定语句 13.3.1启动事务、提交和回滚语句 开启事务 begin START TRANSACTION提交事务 COMMIT回滚事务 ROLLBACK查询自动提交 show SESSION VARIABLES where variable_name "…

深入浅出InnoDB Locking

https://dev.mysql.com/doc/refman/8.0/en/innodb-locking.html 本节讨论的所有锁都是在 InnoDB 引擎下 MySQL 实现行锁定主要使用共享锁和排他锁。也就是常说的读写锁。 A shared (S) lock permits the transaction that holds the lock to read a row. An exclusive (X) l…

若依多租户集成浅析(基于数据源隔离)

背景 这边有个做 saas 化应用的需求&#xff0c;要求做到数据源级别隔离&#xff0c;选了 RuoyiCRM: 基于若依Vue平台搭建的多租户独立数据库CRM系统&#xff0c; 项目不断迭代中。欢迎提BUG交流~ (gitee.com) 这个项目做分析 先放一下码云上作者画的图&#xff0c;后面我把整…

股票量化怎样分析股票数据精准选股?

在日常的股票量化交易过程中&#xff0c;通常有不少的交易者会借助股票数据接口来分析股票数据&#xff0c;并且经过一番股票量化分析之后&#xff0c;做到精准选股也是很有可能的事情。那么&#xff0c;普通投资者进行股票量化怎样分析股票数据选好股呢&#xff1f; 首先来了…

springboot:集成Kaptcha实现图片验证码

文章目录springboot&#xff1a;集成Kaptcha实现图片验证码一、导入依赖系统配置文件二、生成验证码1、Kaptcha的配置2、自定义验证码文本生成器3、具体实现三、校验验证码1、controller接口2、自定义前端过滤器3、自定义验证码处理过滤器4、自定义BodyReaderFilter解决读取bod…

Redis——Jedis的使用

前言 接上文&#xff0c;上一篇文章分享了在Linux下安装redis&#xff0c;以及redis的一些命令的使用。本文要分享的内容是java使用代码连接操作redis。 一、连接redis 这里我们要用到Jedis&#xff0c;那么什么是Jedis 简单来说&#xff0c;Jedis就是Redis官方推荐的Java连接…

【元胞自动机】模拟电波在整个心脏中的传导和传播的时空动力学研究(Matlab代码实现)

&#x1f468;‍&#x1f393;个人主页&#xff1a;研学社的博客 &#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜…

(八)SpringCloud+Security+Oauth2--token增强个性化和格式化输出

一 token的个性化输出 我们知道token默认的输出格式是: {"access_token": "21bd6b0b-0c24-40d1-8928-93274aa1180f","token_type": "bearer","refresh_token": "2c38965b-d4ce-4151-b88d-e39f278ce1bb","e…

[思考进阶]02 如何进行认知升级?

除了要提升自己的技术能力&#xff0c;思维的学习和成长也非常非常重要&#xff0c;特推出此[思考进阶]系列&#xff0c;进行刻意练习&#xff0c;从而提升自己的认知。 最近在看东野的《无名之町》&#xff0c;这本书写于2021年&#xff0c;日本正值疫情&#xff0c;书中也有大…

这个项目获2022世界物联网博览会三新成果奖!

近日&#xff0c;2022世界物联网无锡峰会在无锡太湖国际博览中心召开。天翼物联科技有限公司副总经理赵建军代表中国电信出席会议。 大会颁发了“物联网新技术新产品新应用金奖成果奖”&#xff08;简称“三新成果奖”&#xff09;&#xff0c;中国电信天翼物联“基于5G物联孪…

gRPC:以 C++为例

文章目录1、gRPC 环境搭建1.1、安装 cmake1.2、安装 gcc/gdb1.3、安装 gRPC1.4、protobuf 安装1.5、测试环境2.1、grpc 同步2.1、定义服务2.2、gRPC 服务端2.3、gRPC 客户端2.4、消息流3、gRPC stream3.1、服务端&#xff1a;RPC 实现3.2、客户端&#xff1a;RPC 调用3.3、流的…

刷爆力扣之子数组最大平均数 I

刷爆力扣之子数组最大平均数 I HELLO&#xff0c;各位看官大大好&#xff0c;我是阿呆 &#x1f648;&#x1f648;&#x1f648; 今天阿呆继续记录下力扣刷题过程&#xff0c;收录在专栏算法中 &#x1f61c;&#x1f61c;&#x1f61c; 该专栏按照不同类别标签进行刷题&…

Centos 8.2 本地部署 Jenkins

文章目录1. 简介2. 准备条件3. 安装依赖工具4. 配置 jenkins 源5. 安装 java 176. 安装 Jenkins7. 登陆8. 安装插件8.1 kubernets 插件8.2 git 插件8.3 docker 插件9. 创建 pipeline job9.1 加载本地 Jenkinsfile 构建9.2 git 构建10. 问题1. 简介 Jenkins 是一个 CI/CD 工具。…