MQ的初步了解

news2024/11/23 6:48:34

目录

什么是MQ?

为什么要用MQ(MQ的优点)?

MQ的缺点

常用的MQ产品

MQ使用中的常见问题


什么是MQ?

  【1】MQ:MessageQueue,消息队列。 队列,是一种FIFO 先进先出的数据结构。消息由生产者发送到MQ进行排队,然后按原来的顺序交由消息的消费者进行处理。QQ和微信就是典型的MQ。

消息队列(MQ)是一种用于在不同组件或系统之间传递消息的通信方式。它是一种在分布式系统中广泛使用的技术,用于解耦生产者和消费者,以实现异步通信,提高系统的可伸缩性和可靠性。

MQ的主要特点和优势包括:

  1. 解耦和异步通信:生产者和消费者之间通过MQ进行通信,不需要直接互相调用,从而解耦了系统组件,使系统更加灵活和可维护。同时,MQ支持异步通信,消费者可以在需要的时候处理消息,而不需要等待生产者的响应。

  2. 消息持久化:MQ通常支持消息的持久化,即使在生产者发送消息后,即使消费者当前不可用,消息也不会丢失。

  3. 负载均衡:MQ可以轻松实现多个消费者之间的负载均衡。多个消费者可以订阅相同的消息队列,并同时处理消息,以提高处理能力。

  4. 消息广播:MQ通常支持一对多的消息传递,使得消息可以广播给多个消费者。

  5. 消息过滤:MQ允许消费者根据消息的内容或属性来进行过滤,只选择他们关心的消息。

  6. 确保消息可靠性:MQ通常具有消息确认机制,确保消息能够可靠地传递到消费者。

  7. 缓冲:MQ可以充当缓冲,帮助平衡生产者和消费者之间的速度差异,防止消息丢失。

常见的消息队列系统包括RabbitMQ、Apache Kafka、ActiveMQ、Amazon SQS等。它们在不同场景下具有不同的特点和适用性,可以根据具体需求选择合适的MQ系统来构建分布式应用。

为什么要用MQ(MQ的优点)?

  MQ的作用主要有以下三个方面:

  【1】异步

    例子:快递员发快递,直接到客户家效率会很低。引入菜鸟驿站后,快递员只需要把快递放到菜鸟驿站,就可以继续发其他快递去了。客户再按自己的时间安排去菜鸟驿站取快递。

    作用:异步能提高系统的响应速度、吞吐量。

  【2】解耦

    例子:《Thinking in JAVA》很经典,但是都是英文,我们看不懂,所以需要编辑社,将文章翻译成其他语言,这样就可以完成英语与其他语言的交流。

    作用:

      1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。

      2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

  【3】削峰

    例子:长江每年都会涨水,但是下游出水口的速度是基本稳定的,所以会涨水。引入三峡大坝后,可以把水储存起来,下游慢慢排水。

    作用:以稳定的系统资源应对突发的流量冲击。

MQ的缺点

  【1】系统可用性降低

    系统引入的外部依赖增多,系统的稳定性就会变差。一旦MQ宕机,对业务会产生影响。这就需要考虑如何保证MQ的高可用。

  【2】系统复杂度提高

    引入MQ后系统的复杂度会大大提高。以前服务之间可以进行同步的服务调用,引入MQ后,会变为异步调用,数据的链路就会变得更复杂。并且还会带来其他一些问题。比如:如何保证消费不会丢失?不会被重复调用?怎么保证消息的顺序性等问题。

  【3】消息一致性问题

    A系统处理完业务,通过MQ发送消息给B、C系统进行后续的业务处理。如果B系统处理成功,C系统处理失败怎么办?这就需要考虑如何保证消息数据处理的一致性。

尽管消息队列(MQ)是一种强大的工具,可以提供许多优势,但它也存在一些潜在的缺点和挑战,包括:

  1. 复杂性:配置和管理消息队列系统可能会相当复杂,尤其是在大规模和高可用性环境下。这可能需要专业知识来确保系统的正确配置和运行。

  2. 维护成本:维护和监控消息队列系统需要额外的资源和时间。需要考虑到消息队列服务器的升级、备份、监控和故障处理等问题。

  3. 一致性问题:在分布式系统中,确保消息的一致性可能会变得复杂。例如,在消息发布后,如果在消息处理之前发生故障,可能会导致消息的丢失或重复处理。

  4. 消息堆积:如果消息队列系统的消费者速度不足以处理消息的生产速度,消息可能会在队列中堆积,导致延迟和系统性能问题。

  5. 消息顺序问题:某些消息队列系统可能无法保证消息的严格顺序。虽然大多数MQ系统可以提供有序消息,但在某些情况下可能需要额外的努力来确保消息的顺序性。

  6. 部署和维护成本:引入MQ系统需要额外的硬件和网络资源,以及相应的运维工作。这可能会增加部署和维护的成本。

  7. 学习曲线:对于新手来说,学习如何使用特定的MQ系统可能需要一定的时间和培训,尤其是对于复杂的系统和配置。

  8. 安全性:消息队列中的消息可能包含敏感信息,因此需要特别关注安全性。必须采取措施来加密和保护消息,以防止未经授权的访问。

常用的MQ产品

  【1】Kafka、RabbitMQ和RocketMQ。我们对这三个产品做下简单的比较,重点需要理解他们的适用场景。

  【2】图示:

                  

   【3】分别分析三种消息中间件

    1.RabbitMQ:消息可靠性很高,功能非常全面,很多高级功能都是从这里衍生出来的,如死信队列,延迟队列。缺点在于吞吐量很低,消息积累会影响消费的性能,而且erlang的语言使用的比较少,定制比较难。适用于公司内部系统的请求扭转的流程。

    2.Kafka:行业的老大哥,基本上是大数据场景必用的组件之一,吞吐量不可挑战,集群性能很好。之前是依赖zookeeper搭建集群,但是新版本会逐渐抛弃zookeeper。但是会存在丢消失的可能,而且功能单一,很多高级功能都没有,如死信队列。最早就是用来做日志分析的。

    3.RocketMQ:最开始是借鉴Kafka,后面逐步优化。吞吐量基本和Kafka是一个量级的,功能也很全面,如RabbitMQ有的都有,还有其他没有的事务功能。缺点是开源版不如云上商业版。如延迟队列,开源会有固定的限制。

MQ使用中的常见问题

  【1】如何保证消息不丢失

    1)分析哪些环节会有丢消息的可能

      (1)图示

           

      (2)分析

        1.其中,1,2,4三个场景都是跨网络的,而跨网络就肯定会有丢消息的可能。

        2.而3这个环节,通常MQ存盘时都会先写入操作系统的缓存page cache中,然后再由操作系统异步的将消息写入硬盘。这个中间有个时间差,就可能会造成消息丢失。如果服务挂了,缓存中还没有来得及写入硬盘的消息就会丢失。这也是任何用户态的应用程序无法避免的。

    2)分析怎么处理的

      1.为保证消息不丢失,发送端的ACK应答必须是多个节点写入的应答 兼 采用多次重试的方式(预防网络抖动),其次消息中间件内部持久化,消费端是消费后手动应答。

      2.在发送端还应该:区分业务的关键性,如果消息不影响主体业务(如,消息通知要做的事情可以延迟很久,但因某些缘故,消息发不出去),这时候采用将消息落盘,然后调用定时任务的形式,延时检查发送。

      3.在消费端还应该:对消费失败的消息进行次数检测,如果多次失败(有可能参数异常,有可能流程出了问题),应该落盘(避免消息堆积),告知程序员处理。

  【2】如何保证消息幂等性

    1)分析哪些环节会造成消息重复消费

      1.MQ的自动重试功能:如网络抖动时,生产者发送得不到MQ的回应尝试多次发送;消费者做完任务,返回给MQ的应答丢失,导致MQ发给了另一个消费者去消费消息。

      2.代码BUG导致消息多次发送。

    2)分析怎么处理的

      1.首先在MQ上我们是不能保证消息的幂等性的,所以我们只能在业务中处理。

      2.处理幂等问题的关键是要给每个消息一个唯一的标识(但这个不能是MQ给我们的消息ID,因为它依旧解决不了生产者发送多次的问题)

      3.需要我们自行构建分布式唯一ID(如雪花算法),能够添加一个具有业务意义的数据作为唯一键会更好,这样能更好的防止重复消费问题对业务的影响。比如,针对订单消息,那就用订单ID来做唯一键。

      4.如订单ID来做唯一键,就算真的出现了很不幸的两个消费者同时消费两条重复的数据,那么在进行MYSQL写入的时候,事务处理与唯一键索引,将是兜底保证业务执行幂等性的关键。

      5.当然,采用redis的Setnx(要设置超时时间)作为CAS锁保证只有一个线程执行业务也是可以的,成功后还可以设置标记值来标记该业务已经做完,等下次重复的消息过来时候,进行redis检验的时候就会自动丢弃这些重复的消息。【这里面需要衡量的是业务的处理速度,与占用redis的内存空间,虽然有过期时间,但是在这段时间内这些数据依旧会占用空间,如果处理速度很快,则占用的空间越多

  【3】如何保证消息的顺序

    1)原因:某些场景下,需要保证消息的消费顺序,例如一个下单过程,需要先完成扣款,然后扣减库存,然后通知快递发货,这个顺序不能乱。如果每个步骤都通过消息进行异步通知的话,这一组消息就必须保证他们的消费顺序是一致的

    2)分析该怎么处理(基于MQ无法保证,那么更多是在业务层面实现

      方案一:为保证消息的有序性,采用用同步发送的模式去发消息,然后消息发往同一个队列里面,然后采用一个消费者去进行消费。

      方案二:为保证高性能,采用用异步发送的模式去发消息,然后消息发往同一个队列里面,然后采用一个消费者去进行消费。消费者端接收后,因为可能消息群是乱序的(异步发送模式),所以构建内存队列(优先级队列),将消息排序消费(每个内存队列只允许一个线程消费,可拓展为多个内存队列多个线程)

      针对这种,容易出现消息堆积的情况,可扩展为多个队列,每个队列都有唯一的一个消费者。在发送端建立消息组ID,根据组ID进行hash决定这一组消息分配至哪个队列里面。但是又容易出现数据倾斜的问题,则可以考虑构建hash环与增加虚拟节点的想法,将数据更加均匀的分布。

  【4】数据堆积如何处理

    1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致MQ积压大量未消费消息。此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他队列,然后再启动多个消费者同时消费。

    2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致MQ积压大量未消费消息。此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。

例子:

Java的RabbitMQ库为例来演示如何发布和消费消息

import com.rabbitmq.client.*;

public class MessageQueueExample {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // RabbitMQ服务器的主机名

        // 创建连接
        Connection connection = factory.newConnection();

        // 创建通道
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 发布消息
        String message = "Hello, RabbitMQ!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        // 创建消息消费者
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String receivedMessage = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + receivedMessage + "'");
        };

        // 消费消息
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

Spring Boot和Apache Kafka来发布和消费消息

生产者示例

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {

    private static final String TOPIC = "my-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String message) {
        kafkaTemplate.send(TOPIC, message);
    }
}

在这个示例中,我们创建了一个名为KafkaProducer的Spring组件,它使用KafkaTemplate来发送消息到名为"my-topic"的Kafka主题。

消费者示例

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consumeMessage(String message) {
        System.out.println("Received message: " + message);
    }
}

在这个示例中,我们创建了一个名为KafkaConsumer的Spring组件,它使用@KafkaListener注解来监听名为"my-topic"的Kafka主题,并在消息到达时执行consumeMessage方法。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1015781.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

28335 GPIO作为输入的配置记录

28335 GPIO配置为输入&#xff0c;可以启动输入滤波功能&#xff0c;看了网上很多的讲解&#xff0c;把滤波配置记录一下&#xff1a; 主要是配置两个参数&#xff1a; GpioCtrlRegs.GPXCTRL.bit.QUALPRDX &#xff1a;用于配置采样的周期&#xff0c;由配置值和SYSCLKOUT共同…

Java面试题之——异常和错误

提示&#xff1a;解释Java中的异常和错误是什么&#xff0c;以及它们之间的区别是什么&#xff1f; 文章目录 前言从定义上来说&#xff1a;从处理方式来看&#xff1a;总结⭐️ 好书推荐 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 在Java编程语言…

PostgreSQL 如果想知道表中某个条件查询条件在索引中效率 ?

开头还是介绍一下群&#xff0c;如果感兴趣PolarDB ,MongoDB ,MySQL ,PostgreSQL ,SQL Server,Redis &#xff0c;Oracle ,Oceanbase 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请加微信号 liuaustin3 &…

NSS [HNCTF 2022 Week1]Challenge__rce

NSS [HNCTF 2022 Week1]Challenge__rce hint:灵感来源于ctfshow吃瓜杯Y4大佬的题 开题&#xff0c;界面没东西&#xff0c;源码里面有注释&#xff0c;GET传参?hint 传参后返回了源码 <?php error_reporting(0); if (isset($_GET[hint])) {highlight_file(__FILE__); }…

如何判断linux 文件(或lib)是由uclibc还是glibc编译出来的?

工作中使用的编译环境有2套编译器&#xff0c;一个是glibc&#xff0c;一个是uclibc。 有些项目使用的glibc编译的lib&#xff0c;和使用uclibc编译的工程&#xff0c;在一起就会出现reference的编译错误如下&#xff1a; 那和如何来判断一个文件是由哪个编译器编译的呢&#…

苹果cms大橙子vfed 5.0去授权完美破解主题模板

大橙模版算是在苹果 cms 众多主题里&#xff0c;较为亮眼的一款了&#xff0c;主题简洁&#xff0c;功能众多&#xff0c;非常的齐全。 今天分享的就是大橙 5.0 版本模板&#xff0c;自定义菜单输入下列代码使用主题设置和资源采集。 vfed 主题设置&#xff0c;/index.php/la…

MiniApp Dev 6

商城、会员、积分

通过 API 使用 React

&#x1f3ac; 岸边的风&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! 若使用相同的 Hello World! 应用 &#xff08;通过 React 生成并通过 Visual Studio Code 更新的应用&#xff09;&#x…

微信小程序开发--4.3订阅消息

首先在微信公众平台登录相应的微信小程序&#xff0c;左侧导航栏找到功能&#xff0c;点进去订阅消息&#xff0c;点击开通&#xff0c;点击选用&#xff0c;在公共模板库中选用订阅消息的模板。 js wx.requestSubscribeMessage({tmplIds:[aDRNef2_ty37dXyqVXyUADSyO8BXOZRWYi…

手撕双链表

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大一&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 望小伙伴们点赞&#x1f44d;收藏✨加关注哟&#x1f495;&#x1…

3D视觉到三维视觉之结构光

3D视觉是计算机视觉的终极体现形式 2D视觉技术主要在二维空间下完成工作&#xff0c;三维信息基本上没有得到任何利用&#xff0c;而三维信息才真正能够反映物体和环境的状态&#xff0c;也更接近人类的感知模式。近年来&#xff0c;学术界和工业界推出了一系列优秀的算法和产…

【【萌新的RISC-V学习之再看计算机组成与设计大收获总六】】

萌新的RISC-V学习之再看计算机组成与设计大收获总六 我们在进行设计的时候首先要明白一点 就是 确定我们的设计所需要的 指令的大小和 地址的大小 指令集是32位的 而 地址则一般更多的是64位 数据也是64位 PC与指令寄存器之间的关系 PC是用来保存当前指令的地址。假设地址是0…

Huggingface遇到 Couldn‘t reach xxx on the Hub (ConnectionError)解决方法

文章目录 遇到的问题解决方法参考 遇到的问题 使用服务器下载Huggingface的数据集&#xff0c;显示ConnectionError: Couldn’t reach ‘Salesforce/dialogstudio’ on the Hub (ConnectionError) 具体代码如下&#xff1a; dataset load_dataset("Salesforce/dialogs…

Scanner类用法(学习笔记)

Scanner类用法&#xff08;学习笔记&#xff0c;后续会补充&#xff09; 1.next&#xff08;&#xff09;用法 package com.yushifu.scanner; import java.util.Scanner;//util java工具包 //Scanner类&#xff08;获取用户的输入&#xff09; Scanner s new Scanner&#…

Otter改造 增加springboot模块和HTTP调用功能

环境搭建 & 打包 环境搭建&#xff1a; 进入 $otter_home/lib 目录执行&#xff1a;bash install.sh 打包&#xff1a; 进入$otter_home目录执行&#xff1a;mvn clean install -Dmaven.test.skip -Denvrelease发布包位置&#xff1a;$otter_home/target 项目背景 阿里…

【量化交易】151个量化交易策略解析

我又来推书了~ 今天分享的这本书&#xff0c;量化交易领域的同学们肯定喜欢&#xff08;doge&#xff09;&#xff0c;它就是Zura Kakushadze大佬&#xff08;“Alpha101”作者&#xff09;撰写的《151 Trading Strategies》。&#xff08;文末领&#xff09; 《151 Trading …

创建对象内存分析

package com.mypackage.oop.demo03;public class Pet {String name;int age;//无参构造public void shout(){System.out.println("叫了一声");} }package com.mypackage.oop.demo03;public class Application03 {public static void main(String[] args) {Pet dog n…

vue中el-dialog 中的内容没有预先加载,因此无法获得内部元素的ref 的解决方案 使用强制提前加载dialog方法

问题描述 在没有进行任何操作的时候&#xff0c;使用 this.$refs.xxxx 无法获取el-dialog中的内部元素&#xff0c;这个问题会导致很多bug&#xff0c;其中目前网络上也有许多关于这个问题的解决方案&#xff0c;但是大多数是使用el-dialog中的open在dialog打开的时候使用thi…

typescript类型详解

typescript类型概述 typescript是JavaScript的超集,ts提供了js所有的功能.并且额外增加了:类型系统 所有的js代码都是ts代码js有类型(比如,number/string等),但是js不会检查变量的类型是否发生变化,而ts会检查.ts类型系统的主要优势为:可以显示标记出代码中的意外行为,从而降…

spi协议精讲

spi 总线是一种 高速的、全双工&#xff0c;同步串行总线&#xff0c;有四根线MISO MOSI SCLK CS 2.通信过程 3.极性和相位 因为没有像iic一样规定上升沿还是下降沿发送数据&#xff0c;spi的通信取决于极性和相位&#xff0c;因此有四种工作模式 CPHA0 表示SCK 第一个边沿时&…