MQ~消息队列能力、AMQP协议、现有选择(Kafka、RabbitMQ、RocketMQ 、Pulsar)

news2024/12/29 9:01:05

消息队列

消息队列看作是一个存放消息的容器,当我们需要使用消息的时候,直接从容器中取出消息供自己使用即可。由于队列 Queue 是一种先进先出的数据结构,所以消费消息时也是按照顺序来消费的。

常⽤的消息队列主要这 五 种,分别为 Kafka、RabbitMQ、RocketMQ 、 ActiveMQ、Pulsar。

消息队列模式

点对点通讯

  • 点对点模式:多个⽣产者可以向同⼀个消息队列发送消息,⼀个具体的消息只能由⼀个消费者消费。
    在这里插入图片描述
    使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

广播

  • 发布/订阅模式:单个消息可以被多个订阅者并发的获取和处理。
    在这里插入图片描述
    发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者。

消息队列能力

1. 异步处理

将写请求数据存储到消息队列之后就立即返回结果。随后,消息队列系统再对消息进行消费。因为写请求写入消息队列之后就立即返回了,大大降低写请求的延迟,下游的执行结果也不会影响核心流程的进行,做到降低延迟。

但如果请求数据在后续的业务校验、写数据库等操作中可能失败。因此,使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后,订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。

2. 降低系统耦合性,弱依赖化

生产者(客户端)发送消息到消息队列中去,接受者(服务端)处理消息,需要消费的系统直接去消息队列取消息进行消费即可而不需要和其他系统有耦合,这显然也提高了系统的扩展性。

消息队列使用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。
在这里插入图片描述
从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。

对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

3. 削峰/限流

先将短时间高并发产生的事务消息存储在消息队列中,然后后端服务再慢慢根据自己的能力去消费这些消息,这样就避免直接把后端服务打垮掉。

例如在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。

4. 分布式事务处理

RocketMQ、 Kafka、Pulsar都提供了事务相关的功能。事务允许事件流应用将消费,处理,生产消息整个过程定义为一个原子操作。

在分布式事务的实现中,有很多种方案,其中比较常用的就是基于MQ来实现,在MQ的具体实现中,又有很多种具体的方案,从大的方面来说可以分为两种:

  • 可靠消息最终一致性
  • 最大努力通知

可靠消息最终一致性

可靠消息最终一致性,顾名思义就是依赖可靠的消息,来实现一种最终一致性的模型。
他的大致流程就是:

  1. 事务的发起方执行本地事务
  2. 事务的发起方向事务的参与方发送MQ消息
  3. 事务的参与方接收到MQ消息后执行自己的本地事务

这里面事务的发起方和参与方都需要各自执行本地事务,他们之间,通过可靠消息来保障最终一致。
那么,怎么样的消息算可靠呢,直接依赖kafka、rocketMQ发送一个消息就可靠了么? 显然是不行的,因为我们知道,在出现网络分区、网络延迟等情况时,是没办法保证消息一定可以发发出去的,也没办法保证消息发出去就一定能被成功消费。

那么想要做到让这个消息可靠,一般由两种做法:

  1. 本地消息表
  2. 事务消息

通过这两种方案,都可以保证事务的发起方在执行完本地事务之后,消息一定可以发出去,并且一定能被消费成功。

本地消息表的方案是基于本地事务+重试,来保证MQ消息一定可以发出去。
事务消息的方案是基于MQ的事务消息机制,把一条消息拆成两个half消息,通过2阶段的方式+回调反查来保证消息一定能发出去。2者都是依赖MQ自身的重试机制+事务参与者反查+对账来保证正消息一定可以消费。

最大努力通知

除了可靠消息最终一致性这种以外,还有一种方式就是也使用消息,但是这个消息并不要求一定可靠。这就是最大努力通知。

这个方案一般就是只依赖重试机制,来做最大努力的通知事务参与者。但是需要注意的是,在最大努力通知的过程中,可能会出现消息重复发送的情况,也可能会出现消息丢失的情况。

5. 顺序保证

在很多应用场景中,处理数据的顺序至关重要。消息队列保证数据按照特定的顺序被处理,适用于那些对数据顺序有严格要求的场景。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持顺序消息。

6. 延时/定时处理

消息发送后不会立即被消费,而是指定一个时间,到时间后再消费。大部分消息队列,例如 RocketMQ、RabbitMQ、Pulsar、Kafka,都支持定时/延时消息。

7. 海量数据流处理(日志、监控)

针对分布式系统产生的海量数据流,如业务日志、监控数据、用户行为等,消息队列可以实时或批量收集这些数据,并将其导入到大数据处理引擎中,实现高效的数据流管理和处理。

消息队列一定好吗?

消息队列能力多,但也会给系统带来如下三点问题:

  • 系统可用性降低: 系统可用性在某种程度上降低,为什么这样说呢?在加入 MQ 之前,你不用考虑消息丢失或者说 MQ 挂掉等等的情况,但是,引入 MQ 之后你就需要去考虑了!
  • 系统复杂性提高: 加入 MQ 之后,你需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等等问题!
  • 一致性问题: 我上面讲了消息队列可以实现异步,消息队列带来的异步确实可以提高系统响应速度。但是,万一消息的真正消费者并没有正确消费消息怎么办?这样就会导致数据不一致的情况了!

消息协议

任何的数据传输都有协议,协议代表了其具有的最基础的能力,了解协议,有助于深入体会能力和后续拓展。

AMQP

AMQP,统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。

基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

  • 支持跨语言
  • 支持跨平台
  • 提供了五种消息模型:①direct exchange;②fanout exchange;③topic change;④headers exchange;⑤system exchange。

本质来讲,后四种和pub/sub 模型没有太大差别,仅是在路由机制上做了更详细的划分。

Exchange 提供的路由算法,AMQP 可以提供多样化的路由方式来传递消息到消息队列。

现有选型

1. Kafka

Kafka 早期被用来用于处理海量日志流式处理平台,后面才慢慢发展成了一款功能全面的高性能消息队列。
流式处理平台具有三个关键功能:

  1. 消息队列:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
  2. 容错的持久方式存储记录消息流:Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险。
  3. 流式处理平台: 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。

Kafka 是一个分布式系统,由通过高性能 TCP 网络协议进行通信的服务器和客户端组成,可以部署在在本地和云环境中的裸机硬件、虚拟机和容器上。

在 Kafka 2.8 之前,Kafka 最被大家诟病的就是其重度依赖于 Zookeeper 做元数据管理和集群的高可用。在 Kafka 2.8 之后,引入了基于 Raft 协议的 KRaft 模式,不再依赖 Zookeeper,大大简化了 Kafka 的架构,让你可以以一种轻量级的方式来使用 Kafka。

2. RocketMQ

RocketMQ 是阿里开源的一款云原生:消息、事件流的实时数据处理平台,借鉴了 Kafka,已经成为 Apache 顶级项目。

RocketMQ 的核心特性(摘自 RocketMQ 官网):

  1. 云原生:生与云,长与云,无限弹性扩缩,K8s 友好高吞吐:万亿级吞吐保证,同时满足微服务与大数据场景。
  2. 流处理:提供轻量、高扩展、高性能和丰富功能的流计算引擎。
  3. 金融级:金融级的稳定性,广泛用于交易核心链路。
  4. 架构极简:零外部依赖,Shared-nothing 架构。
  5. 生态友好:无缝对接微服务、实时计算、数据湖等周边生态。

根据官网介绍:Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。历经十余年的大规模场景打磨,RocketMQ 已经成为业内共识的金融级可靠业务消息首选方案,被广泛应用于互联网、大数据、移动互联网、物联网等领域的业务场景。

3. RabbitMQ

RabbitMQ 是采用 Erlang 语言实现 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

RabbitMQ 发展到今天,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是分不开的。

RabbitMQ 的具体特点可以概括为以下几点:

  1. 可靠性: RabbitMQ 使用一些机制来保证消息的可靠性,如持久化、传输确认及发布确认等。
  2. 灵活的路由: 在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能,RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起,也可以通过插件机制来实现自己的交换器。
  3. 扩展性: 多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
  4. 高可用性: 队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队列仍然可用。
  5. 支持多种协议: RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP、MQTT 等多种消息中间件协议。
  6. 插件机制: RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。

4. Pulsar

Pulsar 是新一代云原生分布式消息流平台,最初由 Yahoo 开发 ,已经成为 Apache 顶级项目。

Pulsar 集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具有强一致性、高吞吐、低延时及高可扩展性等流数据存储特性,被看作是云原生时代实时消息流传输、存储和计算最佳解决方案。

Pulsar 的关键特性如下(摘自官网):

  1. 是新一代云原生分布式消息流平台。
  2. Pulsar 的单个实例原生支持多个集群,可跨机房在集群间无缝地完成消息复制。
  3. 极低的发布延迟和端到端延迟。可无缝扩展到超过一百万个 topic。
  4. 简单的客户端 API,支持 Java、Go、Python 和 C++。
  5. 主题的多种订阅模式(独占、共享和故障转移)。
  6. 通过 Apache BookKeeper 提供的持久化消息存储机制保证消息传递 。
  7. 由轻量级的 serverless 计算框架 Pulsar Functions 实现流原生的数据处理。
  8. 基于 Pulsar Functions 的 serverless connector 框架 Pulsar IO 使得数据更易移入、移出 Apache Pulsar。
  9. 分层式存储可在数据陈旧时,将数据从热存储卸载到冷/长期存储。

总结

  • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka、RocketMQ 和 Pulsar,但是由于它基于 Erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 Erlang 开发,所以国内很少有公司有实力做 Erlang 源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),要求极高的低延迟,那这几种消息队列中,RabbitMQ 你可以选择使用。

  • RocketMQ 和 Pulsar 支持强一致性,对消息一致性要求、稳定性要求比较高的场景可以使用。RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的 MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。

  • Kafka 的特点其实很明显,追求⾼吞吐量,提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 Kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。如果是大数据领域的实时计算、日志采集、监控打点等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1850096.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python星载气溶胶数据处理与反演分析

在当前全球气候变化和环境污染问题日益突出的背景下,气溶胶研究显得尤为重要。气溶胶在大气中由直径范围在0.01微米至10微米固体和液体颗粒构成,直接或间接影响地球辐射平衡、气候变化和空气质量。尤其在“碳中和”目标的驱动下,研究气溶胶对…

掌握 Nuxt 3 中的状态管理:实践指南

title: 掌握 Nuxt 3 中的状态管理:实践指南 date: 2024/6/22 updated: 2024/6/22 author: cmdragon excerpt: 摘要:该文指南详述了Nuxt 3的概况与安装,聚焦于在Nuxt 3框架下运用Vuex进行高效的状态管理,涵盖基础配置、模块化实…

vue实现post请求接口流式输出数据sse

使用fetchEventSource 参考git源码:https://github.com/Azure/fetch-event-source/tree/main 本地联通 发现数据并没有流式输出:vue代理需要关闭compress 如下: devServer:{proxy:{},compress:false } 安装插件 npm install microsoft/f…

深入理解Python中的并发与异步的结合使用

​ 在上一篇文章中,我们讨论了异步编程中的性能优化技巧,并简单介绍了trio和curio库。今天,我们将深入探讨如何将并发编程与异步编程结合使用,并详细讲解如何利用trio和curio库优化异步编程中的性能。 文章目录 并发与异步编程的区…

鸿蒙HarmonyOS NEXT角落里的知识:ArkTS高性能编程实践

概述 本文主要提供应用性能敏感场景下的高性能编程的相关建议,助力开发者开发出高性能的应用。高性能编程实践,是在开发过程中逐步总结出来的一些高性能的写法和建议,在业务功能实现过程中,我们要同步思考并理解高性能写法的原理…

AMSR/ADEOS-II L1A Raw Observation Counts V003地球表面和大气微波辐射的详细观测数据

AMSR/ADEOS-II L1A Raw Observation Counts V003 简介 AMSR/ADEOS-II L1A Raw Observation Counts V003数据是由日本航空航天研究开发机构(JAXA)的AMSR (Advanced Microwave Scanning Radiometer)仪器收集的一组原始观测计数数据。这些数据是从ADEOS-I…

2024年高压电工证考试题库及高压电工试题解析

题库来源:安全生产模拟考试一点通公众号小程序 2024年高压电工证考试题库及高压电工试题解析是安全生产模拟考试一点通结合(安监局)特种作业人员操作证考试大纲和(质检局)特种设备作业人员上岗证考试大纲随机出的高压…

第2章 Android应用的界面编程

🌈个人主页:小新_- 🎈个人座右铭:“成功者不是从不失败的人,而是从不放弃的人!”🎈 🎁欢迎各位→点赞👍 收藏⭐️ 留言📝 🏆所属专栏&#xff1…

高通安卓12-Input子系统

1.Input输入子系统架构 Input Driver(Input设备驱动层)->Input core(输入子系统核心层)->Event handler(事件处理层)->User space(用户空间) 2.getevent获取Input事件的用法 getevent 指令用于获取android系统中 input 输入事件,比如获取按键上报信息、获…

04_FFmpeg常用API及内存模型

【说明】课程学习地址:https://ke.qq.com/course/468797 FFmpeg内存模型 FFmpeg内存模型 int avcodec_send_packet(AVCodecContext *avctx, const AVPacket *avpkt); int avcodec_receive_frame(AVCodecContext *avctx, AVFrame *frame);问题(数据的申请和释放): …

Python高效内存访问,memoryview这个神器你值得拥有!

目录 1、初识memoryview 🌀 1.1 memoryview基础介绍 1.2 为何使用memoryview优化内存访问 1.3 创建memoryview对象实战 示例1:基于bytes创建memoryview 示例2:修改memoryview中的数据 示例3:memoryview与切片 2、深入理解memoryview操作 🔄 2.1 访问与修改数据…

【Java】已解决java.nio.channels.FileLockInterruptionException异常

文章目录 一、分析问题背景二、可能出错的原因三、错误代码示例四、正确代码示例 已解决java.nio.channels.FileLockInterruptionException异常 在Java NIO(New I/O)中,java.nio.channels.FileLockInterruptionException是一个特殊的异常&am…

tedsign vue3 web-端框架中封装一个验证码组件 以及对应node 接口逻辑说明

一个这样的组件 我直接上代码了 <template><t-loading size"small" :loading"loading" show-overlay><div class"container" click"refresh"><div v-if"svg" class"svg" v-html"svg&…

VUE3脚手架工具cli配置搭建及创建VUE工程

1、VUE的脚手架工具(CLI&#xff09; 开发大型vue的时候&#xff0c;不能通过html编写一个大型的项目&#xff0c;这个时候需要用到vue的脚手架工具 通过vue的脚手架&#xff0c;可以快速的生成vue工程 1.1、安装nodejs和npm 【下载nodejs】 https://nodejs.org/en 【安装…

高通安卓12-安卓系统定制2

将开机动画打包到system.img里面 在目录device->qcom下面 有lito和qssi两个文件夹 现在通过QSSI的方式创建开机动画&#xff0c;LITO方式是一样的 首先加入自己的开机动画&#xff0c;制作过程看前面的部分 打开qssi.mk文件&#xff0c;在文件的最后加入内容 PRODUCT_CO…

python爬虫学习笔记一(基本概念urllib基础)

学习资料&#xff1a;尚硅谷_爬虫 学习环境: pycharm 一.爬虫基本概念 爬虫定义 > 解释1&#xff1a;通过程序&#xff0c;根据URL进行爬取网页&#xff0c;获取有用信息 > 解释2&#xff1a;使用程序模拟浏览器&#xff0c;向服务器发送请求&#xff0c;获取相应信息…

某程序员:30岁了,老婆管钱,背着我买了50万股票,亏了20w,强制她清仓后又买了36万

“辛辛苦苦攒了几年钱&#xff0c;本想买房买车&#xff0c;结果全被老婆炒股亏掉了&#xff01;” 近日&#xff0c;一位30岁的程序员大哥在网上吐苦水&#xff0c;引发了网友们的热议。 这位程序员大哥和妻子结婚后&#xff0c;一直秉持着“男主外&#xff0c;女主内”的传统…

C++——位图的介绍和使用

位图的介绍 位图的引入 给40亿个不重复的无符号整数&#xff0c;没排过序。给一个无符号整数&#xff0c;如何快速判断一个数是否在这40亿个数中&#xff1f; 要判断一个数是否在某一堆数中&#xff0c;我们可能会想到如下方法&#xff1a; 将这一堆数进行排序&#xff0c…

VBA学习(16):工作表事件示例:输入数据后锁定单元格

在工作表单元格中输入数据后&#xff0c;该单元格就被锁定&#xff0c;不能再编辑。 打开VBE&#xff0c;在工程资源管理器中双击该工作表名称打开其代码模块&#xff0c;在其中输入下面的代码&#xff1a; 假设整个工作表的LockedFalse Private Sub Worksheet_Change(ByVal …

【文献及模型、制图分享】1985-2015年美国坦帕湾流域土地开发利用强度时空变化分析

公众号新功能 目前公众号新增以下等功能 1、处理GIS出图、Python制图、区位图、土地利用现状图、土地利用动态度和重心迁移图等等 2、核密度分析、网络od分析、地形分析、空间分析等等 3、地理加权回归、地理探测器、生态环境质量指数、地理加权回归模型影响因素分析、计算…