RabbitMQ相关问题

news2025/1/12 6:46:54

文章目录

  • 避免重复消费(保证消息幂等性)
  • 消息积压
    • 上线更多的消费者,进行正常消费
    • 惰性队列
    • 消息缓存
    • 延时队列
  • RabbitMQ如何保证消息的有序性?
  • RabbitMQ消息的可靠性、延时队列
  • 如何实现数据库与缓存数据一致?
  • 开启消费者多线程消费

避免重复消费(保证消息幂等性)

  • 方式1: 消息全局ID或者写个唯一标识(如时间戳、UUID等) :每次消费消息之前根据消息id去判断该消息是否已消费过,如果已经消费过,则不处理这条消息,否则正常消费消息,并且进行入库操作。(消息全局ID作为数据库表的主键,防止重复)

  • 方式2: 利用Redis的setnx 命令:给消息分配一个全局ID,只要消费过该消息,将 < id,message>以K-V键值对形式写入redis,消费者开始消费前,先去redis中查询有没消费记录即可

  • 方式3: rabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的

在这里插入图片描述

发送消息

	@Autowired
	private RabbitTemplate rabbitTemplate;
	
	/**
	 * 发送消息
	 */
	public void sendMessage() {
	    // 创建消费对象,并指定 全局唯一ID(这里使用UUID,也可以根据业务规则生成,只要保证全局唯一即可)
	    MessageProperties messageProperties = new MessageProperties ();
	    messageProperties.setMessageId (UUID.randomUUID ().toString ());
	    messageProperties.setContentType ("text/plain");
	    messageProperties.setContentEncoding ("utf-8");
	    Message message = new Message ("hello,message idempotent!".getBytes (), messageProperties);
	    System.out.println ("生产消息:" + message.toString ());
	    rabbitTemplate.convertAndSend (EXCHANGE_NAME, ROUTE_KEY, message);
	}

消费消息

 /**
     * 消费消息
     *
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitHandler
    //org.springframework.amqp.AmqpException: No method found for class [B 这个异常,并且还无限循环抛出这个异常。
    //注意@RabbitListener位置,笔者踩坑,无限报上面的错,还有另外一种解决方案: 配置转换器
    @RabbitListener(queues = "message_idempotent_queue")
    @Transactional
    public void handler(Message message, Channel channel) throws IOException {
        /**
         * 发送消息之前,根据消息全局ID去数据库中查询该条消息是否已经消费过,如果已经消费过,则不再进行消费。
         */

        // 获取消息Id
        String messageId = message.getMessageProperties ().getMessageId ();
        if (StringUtils.isBlank (messageId)) {
            logger.info ("获取消费ID为空!");
            return;
        }
        MessageIdempotent messageIdempotent = null;
        Optional<MessageIdempotent> list = messageIdempotentRepository.findById (messageId);
        if (list.isPresent ()) {
            messageIdempotent = list.get ();
        }
        // 如果找不到,则进行消费此消息
        if (null == messageIdempotent) {
            //获取消费内容
            String msg = new String (message.getBody (), StandardCharsets.UTF_8);
            logger.info ("-----获取生产者消息-------------------->" + "messageId:" + messageId + ",消息内容:" + msg);
            //手动ACK
            channel.basicAck (message.getMessageProperties ().getDeliveryTag (), false);

            //存入到表中,标识该消息已消费
            MessageIdempotent idempotent = new MessageIdempotent ();
            idempotent.setMessageId (messageId);
            idempotent.setMessageContent (msg);
            messageIdempotentRepository.save (idempotent);
        } else {
            //如果根据消息ID(作为主键)查询出有已经消费过的消息,那么则不进行消费;
            logger.error ("该消息已消费,无须重复消费!");
        }
    }

消息积压

在这里插入图片描述

上线更多的消费者,进行正常消费

线上突发问题,要临时扩容,增加消费端的数量

考虑到消费者的处理能力,增加配置!!!

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

simple代表简单队列模型

惰性队列

	//基于@Bean声明lazy-queue
	@Bean
    public Queue lazyQueue() {
        return QueueBuilder
                .durable("lazy.queue")
                .lazy() //开启x-queue-mode为lazy
                .build();
    }
    //基于@RabbitListener声明LazyQueue
	@RabbitListener(queuesToDeclare = {
            @Queue(
                    name = "lazy.queue",
                    durable = "true",
                    arguments = @Argument(name = "x-queue-mode", value = "lazy")

            )
    })
    public void listenLazyQueue(String msg) {
        System.out.println("接收到lazy.queue的消息:【" + msg + "】");
    }

惰性队列的优点有哪些?

  • 基于磁盘存储,消息上限高
  • 没有间歇性的page-out性能比较稳定

消息缓存

使用Redis的List或ZSET做接收消息缓存,写一个程序 按照消费者处理时间定时从Redis取消息发送到MQ
在这里插入图片描述

延时队列

设置消息过期时间,过期后转入死信队列,写一个程序 处理死信消息(重新如队列或者 即使处理或记录到数据库延后处理)

在这里插入图片描述

RabbitMQ如何保证消息的有序性?

RabbitMQ是队列存储天然具备先进先出的特点,只要消息的发送是有序的,那么理论上接收也是有序的。不过当一个队列绑定了多个消费者时,可能出现消息轮询投递给消费者的情况,而消费者的处理顺序就无法保证

因此,要保证消息的有序性,需要做的下面几点:

  • 保证消息发送的有序性
  • 保证一组有序的消息都发送到同一个队列
  • 保证一个队列只包含一个消费者
    在这里插入图片描述

这样也会造成吞吐量下降,可以在消费者内部采用多线程的方式消费

RabbitMQ消息的可靠性、延时队列

RabbitMQ消息可靠性、延时队列

如何实现数据库与缓存数据一致?

实现方案有下面几种:

  • 本地缓存同步:当前微服务的数据库数据与缓存数据同步,可以直接在数据库修改时加入对Redis的修改逻辑,保证一致。
  • 跨服务缓存同步:服务A调用了服务B,并对查询结果缓存。服务B数据库修改,可以通过MQ通知服务A服务A修改Redis缓存数据
  • 通用方案:使用Canal框架,伪装成MySQL的salve节点,监听MySQL的binLog变化,然后修改Redis缓存数据

开启消费者多线程消费

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class SpringRabbitListener {
    /**
     * @RabbitListener:加了该注解的方法表示该方法是一个消费者 concurrency:并发数量。
     * 其他属性和注解想了解的话,自己按Ctrl点进去看
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "Queue1"),
                    exchange = @Exchange(value = "Exchange1"),
                    key = "key1"
            ),
            concurrency = "10"
    )
    public void process1(Message message) throws Exception {
        System.out.println("Queue1:" + new String(message.getBody()));

    }


}
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
public class RabbitmqConfig {
    @Bean("batchQueueRabbitListenerContainerFactory")
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory ();
        factory.setConnectionFactory (connectionFactory);
        factory.setMessageConverter (new Jackson2JsonMessageConverter ());
        //确认方式,manual为手动ack.
        factory.setAcknowledgeMode (AcknowledgeMode.MANUAL);
        //每次处理数据数量,提高并发量
        //factory.setPrefetchCount(250);
        //设置线程数
        //factory.setConcurrentConsumers(30);
        //最大线程数
        //factory.setMaxConcurrentConsumers(50);
        /* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
        factory.setConnectionFactory (connectionFactory);
        factory.setConcurrentConsumers (1);
        factory.setPrefetchCount (1);
        //factory.setDefaultRequeueRejected(true);
        //使用自定义线程池来启动消费者。
        factory.setTaskExecutor (taskExecutor ());
        return factory;
    }

    @Bean("correctTaskExecutor")
    @Primary
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor ();
        // 设置核心线程数
        executor.setCorePoolSize (100);
        // 设置最大线程数
        executor.setMaxPoolSize (100);
        // 设置队列容量
        executor.setQueueCapacity (0);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds (300);
        // 设置默认线程名称
        executor.setThreadNamePrefix ("thread-file-queue");
        // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown (true);
        return executor;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/355042.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

字符集、ASCII、GBK、UTF-8、Unicode、乱码、字符编码、解码问题等

编码解码一、背景二、字符的相关概念三、字符集3.1 ASCII[ˈski]3.1.1 ASCII的编码方式3.1.2 EASCII3.2 GBK3.2.1 GB 2312-803.2.2 GBK的制订3.2.3 GBK的实现方式3.3 Unicode&#xff08;统一码、万国码&#xff09;3.3.1 Unicode的出现背景3.3.2 Unicode的编写方式3.3.3 Unico…

Verilog 学习第四节(从计数器到可控制线性序列机——LED实验进化六部曲)

从计数器到可控制线性序列机——LED实验进化六部曲一&#xff1a;让LED灯按照亮0.25s&#xff0c;灭0.75s的状态循环亮灭二&#xff1a;让LED灯按照亮0.25s&#xff0c;灭0.5s&#xff0c;亮0.75s&#xff0c;灭1s的状态循环亮灭三&#xff1a;让LED灯按照指定的亮灭模式亮灭&a…

Java程序员线上排查问题神器-Arthas

文章目录前言一、Arthas是什么&#xff1f;二、快速入门1.下载2.如何运行三、常用命令1.dashboard2.trace总结前言 最近公司项目版本迭代升级&#xff0c;在开发新需求导致没什么时间写博客。 在开发需求的过程中&#xff0c;我写了一个接口&#xff0c;去批量调内部已经写好…

浏览器工作原理详解

浏览器工作原理 以打开百度官网为例 在浏览器地址栏输入网址www.baidu.com &#xff0c;回车 这一过程发生了什么&#xff1f; 首先我们要知道www.baidu.com 这是个域名&#xff0c;需要通过DNS去解析为IP地址&#xff08;也就是服务器地址&#xff09;&#xff0c;然后返回…

yum/vim工具的使用

yum 我们生活在互联网发达的时代&#xff0c;手机电脑也成为了我们生活的必须品&#xff0c;在你的脑海中是否有着这样的记忆碎片&#xff0c;在一个明媚的早上你下定决心准备发奋学习&#xff0c;“卸载”了你手机上的所有娱乐软件&#xff0c;一心向学&#xff01;可是到了下…

前端面试题整理之HMTL篇(一)

HTML面试题&#xff08;一&#xff09; 前言&#xff1a; 面试题及答案解析&#xff0c;大部分来自网络整理&#xff0c;我自己做了一些简化&#xff0c;如果想了解的更多&#xff0c;可以搜索一下&#xff0c;前端面试题宝典微信公众号或者查百度&#xff0c;另外如果出现错误…

Windows环境下安装和配置Gradle

1. 概述 Gradle是Google公司基于JVM开发的一款项目构建工具&#xff0c;支持Maven&#xff0c;JCenter多种第三方仓库&#xff0c;支持传递性依赖管理&#xff0c;使用更加简洁和支持多种语言的build脚步文件&#xff0c;更多详情可以参阅Gradle官网 2. 下载 由于Gradle与S…

【opencv 系列】第2章 图片视频的读取和保存

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录前言1. 图片2.视频(读取摄像头、视频文件)2.1 从摄像头读视频2.2 从视频读取文件2.3 保存摄像头读取的视频2.4 图片的打开,读取和保存代码2.5 摄像头的打开,读取和保…

[LeetCode]1237. 找出给定方程的正整数解

题目链接&#xff1a;https://leetcode.cn/problems/find-positive-integer-solution-for-a-given-equation/description/ 题目描述&#xff1a; 样例1&#xff1a; 输入&#xff1a;function_id 1, z 5 输出&#xff1a;[[1,4],[2,3],[3,2],[4,1]] 解释&#xff1a;functi…

Linux 根目录与路径

目录 Linux的根目录 /usr目录&#xff08;存放用户数据&#xff09; /bin目录&#xff08;存放所有用户都可使用的应用程序&#xff09; /sbin目录&#xff08;存放管理员才可以使用的应用程序&#xff09; /opt目录&#xff08;存放第三方软件&#xff09; /boot目录&am…

[Android Studio] Android Studio Virtual Device(AVD)虚拟机的功能试用

&#x1f7e7;&#x1f7e8;&#x1f7e9;&#x1f7e6;&#x1f7ea; Android Debug&#x1f7e7;&#x1f7e8;&#x1f7e9;&#x1f7e6;&#x1f7ea; Topic 发布安卓学习过程中遇到问题解决过程&#xff0c;希望我的解决方案可以对小伙伴们有帮助。 &#x1f680;write…

Mindspore安装

本文用于记录搭建昇思MindSpore开发及使用环境的过程&#xff0c;并通过MindSpore的API快速实现了一个简单的深度学习模型。 什么是MindSpore? 昇思MindSpore是一个全场景深度学习框架&#xff0c;旨在实现易开发、高效执行、全场景覆盖三大目标。 安装步骤 鉴于笔者手头硬…

QM9数据集示例项目学习图神经网络

目录QM9数据集&#xff1a;QM9数据提取的特征&#xff1a;网络结构的设计官网示例代码注释&#xff1a;QM9数据集&#xff1a; QM9为小有机分子的相关的、一致的和详尽的化学空间提供量子化学特征&#xff0c;该数据库可用于现有方法的基准测试&#xff0c;新方法的开发&#…

【算法】两道算法题根据提供字母解决解码方法和城市的天际线天际线问题

算法目录解码方法Java解答参考&#xff1a;天际线问题Java解答参考&#xff1a;大家好&#xff0c;我是小冷。 上一篇了解了项目相关的知识点 接下来看下两道算法题吧&#xff0c;用Java解答&#xff0c;可能更能激发一下大脑思考。 解码方法 题目要求&#xff1a; 一条包含…

将 Supabase 作为下一个后端服务

对于想快速实现一个产品而言&#xff0c;如果使用传统开发&#xff0c;又要兼顾前端开发&#xff0c;同时又要花费时间构建后端服务。然而有这么一个平台&#xff08;Baas Backend as a service&#xff09;后端即服务&#xff0c;能够让开发人员可以专注于前端开发&#xff0c…

Java反序列化漏洞——CommonsCollections4.0版本—CC2、CC4

一、概述4.0版本的CommonsCollections对之前的版本做了一定的更改&#xff0c;那么之前的CC链反序列化再4版本中是否可用呢。实际上是可用的&#xff0c;比如CC6的链&#xff0c;引入的时候因为⽼的Gadget中依赖的包名都是org.apache.commons.collections &#xff0c;⽽新的包…

【构建工具】Gradle中文教程

文章目录Gradle 简介Gradle 概述基于声明的构建和基于约定的构建为以依赖为基础的编程方式提供语言支持构建结构化深度 APIGradle 的扩展多项目构建多种方式管理依赖Gradle 是第一个构建集成工具易于移植GroovyThe Gradle wrapper自由和开源为什么使用 Groovy?Gradle 安装先决…

Pycharm搭建一个Django项目

File->new project 点击create&#xff0c; 等待一下即可 查看安装 Django 版本&#xff1a; 在 Pycharm 底部选择 Terminal 然后在里面输入&#xff1a;python -m django --version 启动项目&#xff1a; 在 Terminal 里面输入: python manage.py runserver 查看文件目…

一文精通MVCC机制

MVCC(Multi-Version Concurrency Control)多版本并发控制机制使用串行化隔离级别时&#xff0c;mysql会将所有的操作加锁互斥&#xff0c;来保证并发安全。这种方式必然降低并发性能。mysql在读已提交和可重复读隔离级别下&#xff0c;对一行数据的读和写两个操作默认是不会通过…

【Unity3d】Unity与iOS之间通信

在unity开发或者sdk开发经常遇到unity与移动端原生层之间进行通信&#xff0c;这里把它们之间通信做一个整理。 关于Unity与Android之间通信&#xff0c;参考【Unity3d】Unity与Android之间通信 Unity调用Objective-C 主要分三个步骤&#xff1a; (一)、在xcode中定义要被u…