从流处理来深入理解消息队列

news2025/1/15 16:57:17

大家好,我是 方圆。在《数据密集型应用系统设计》这本书中关于流处理的部分提到了消息队列相关的知识,我觉得它对理解和之后使用消息队列中间件有很大的帮助,遂将其中重要的部分总结出来,但也更推荐大家去看原书,原文收录在我的 Github: enthusiasm 中,欢迎Star和获取原文。

1. 流处理

“流” 是指随着时间的推移逐渐可用的数据,所以流处理认为数据是 无界限的,它们会随着时间的推移而逐渐达到。

流处理介于在线处理和批处理(离线处理)之间,所以又被称为 准实时准在线 处理。它和批处理相似:它们消费输入并产生输出(并不需要响应请求),不同的是流处理在 事件(event) 发生时会尽快处理,而批处理需要等待若干数据准备好之后,才进行处理,这种差异使流处理系统比起批处理系统具有 更低的延迟。在流处理系统中,一个事件由 生产者 生成一次,然后可能被多个 消费者 进行处理,相关的事件通常被聚合为一个主题(topic)或流(stream)。

2. 消息系统

消息系统是典型的流处理系统,它能在新事件出现时立即通知消费者,这样就能保证对新事件进行低延迟的连续处理,也因此避免了消费者通过轮询机制检查新事件产生开销。

像TCP信道这种直接通信的形式是比较简单的消息系统,它使用生产者和消费者直接进行网络通信,不过这种形式的消息系统容错程度极为有限:如果消费者宕机,即使生产者有超时重传的机制也会导致消息丢失;如果生产者宕机,那么需要它进行超时重传的消息和缓冲队列中的消息都会丢失。

为了解决容错程度较低的问题,可以采用 消息队列(message queue) 来对消息进行管理,它的本质上是一种 针对消息流而优化的数据库,生产者将消息写入消息队列,消费者从消息队列进行读取,通过将 数据持久化 转移到消息队列上,来提高生产者和消费者客户端对消息丢失的容忍程度。

3. 消息队列

消费者对消息队列中消息的消费通常是 异步 的,当生产者发送消息时,通常只会等待消息队列确认消息已经被缓存,而不会等待消费者来处理消息。消费者对消息的消费通常在几分之一秒内,如果发生消息积压的情况,会出现明显的延迟。

3.1 基于 JMS/AMQP 标准的消息队列

我们所熟悉的 RabbitMQ 就是对 JMS/AMQP 标准的实现。如果在消息处理代价比较高昂,并且希望 并行处理 以及 消息的顺序没那么重要 的情况下,这种消息队列是非常合适的选择。不过 JMS/AMQP 风格的消息队列在消费者收到消息后可能会将该消息在消息队列中移除,那么如果此时再加入新的消费者,只能接收到该消费者注册之后的消息了。为了能够消费先前的消息和获得对消息持久化的能力,便提出了基于 日志 的消息队列。

JMS 即Java消息服务(Java Message Service)应用程序接口,是一个 Java 平台中关于面向消息中间件(MOM)的 API,它是一种技术规范。用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的 API,绝大多数 MOM提供商都对 JMS提供支持。

AMQP 的全称是 Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

3.2 基于日志的消息队列

Apache Kafka 是基于日志的消息队列。日志仅是在磁盘上简单地追加记录消息序列,生产者通过将消息追加到日志末尾来对消息进行持久化,而消费者则通过依次读取日志来接收消息,当消费者读取到日志末尾时,则会等待新消息追加的通知。

为了提高单个磁盘的吞吐量,可以进行分区处理,如下图所示:

消息分区.png

这些分区对应的主题可以理解为携带了一组相同类型消息的分区,不同的分区可以托管在不同的服务器上,这样就使得每个分区都有一份能独立于其他分区进行读写的日志,从而提高吞吐量。

在每个分区中,消息队列为每条消息都会分配一个单调递增的 偏移量,以此来保证消息在分区内的有序,但是并不支持跨分区的顺序保证。消息队列为每个消费者维护一个偏移量即可记录消费者的消费进度,而无需跟踪每一条消息,如果消费者节点失效,则消费者的分区将指派给其他消费者节点,并从最后记录的偏移量开始消费;消费者也可以通过指定偏移量来对先前的消息进行消费,不过如果消费者已经处理了后续的消息,但还没记录它们的偏移量,那么消费者节点发生失效重启后,这些消息将被消费两次。

消息吞吐量很高消息能被迅速处理顺序很重要 的情况下,基于日志的消息队列是合适的选择。

3.3 消息的传递模式

当多个消费者从同一主题读取消息时,有两种主要的消息传递模式,如下图所示:

消息传递模式.png

  • 负载均衡(load balancing):每个消息只被传递给 消费者之一,所以处理该主题下的消息能被消费者共享。代理可以为消费者任意分配消息,当在处理消息代价比较高昂时,希望能并行处理消息时,此模式非常有用

  • 扇出(fan-out):每条消息都被传递给 所有消费者

以上两种模式可以组合使用:两个独立的消费者组可以订阅同一主题,每一组都共同收到所有消息,而在每一组内,只由单个节点来处理消息。

如果采用的是消息队列向消费者 推送 消息的模式,为了确保消息被消费,消费者在消费完消息时需要向消息队列发送确认(ACK),供消息队列判断是否需要超时重传。

当发生消息积压时(生产者发送消息的速率大于消费者消费消息的速率),可以采用如下三种方式解决:

  • 丢弃消息

  • 为积压的消息创建缓冲区

  • 降低生产者发送消息的速率(流量控制)

3.4 消息队列与数据库的差异

我们在前文说过:消息队列是一种 针对消息流而优化的数据库,那么它与数据库又有什么区别呢?

  • 在数据的保存机制上:数据库通常保留数据直至显式删除;基于 JMS/AMQP 标准的消息队列在消息成功发送给消费者时会自动删除消息,基于日志的消息队列会在磁盘中以日志的形式对消息做持久化处理

  • 数据搜索方式上:数据库通常支持次级索引和各种搜索数据的方式;消息队列通常支持按照某种模式匹配主题,订阅其子集。虽然机制并不一样,但对于客户端来说都是选择想要了解数据的一部分

  • 查询结果的时效上:查询数据库时,结果通常基于某个时间点的数据快照,如果另一个客户端随后向数据库写入一些改变了查询结果的内容,则第一个客户端不会发现其先前结果现已过期(快照隔离);消息队列不支持任意查询,当数据发生变化时(即新消息可用时),它们会通知客户端


巨人的肩膀

  • 《数据密集型应用系统设计》:第十一章 流处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/711990.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一文了解Python中的运算符

目录 🥩1.1.算数运算符 🥩1.2.赋值运算符 🥩1.3.复合赋值运算符 🥩1.4.比较运算符 🥩1.5.逻辑运算符 🦐博客主页:大虾好吃吗的博客 🦐专栏地址:Python从入门到精通专栏 P…

Android通过连接USB读写SD卡(libaums方案)

Android通过连接USB读写SD卡 最近有一个需求是要求通过Usb扩展读取到SD卡的内容。可以从Usb存储设备拷贝文件到内置卡,也可以从内置卡文件拷贝到Usb存储。 1. 相关的引入包 implementation androidx.core:core-ktx:1.7.0implementation androidx.appcompat:appcompa…

基于PSO优化LSSVM的时序预测MATLAB实战

今天给大家分享PSO优化LSSVM的时序预测代码实战,主要从算法原理和代码实战展开。需要了解更多算法代码的,可以点击文章左下角的阅读全文,进行获取哦~需要了解智能算法、机器学习、深度学习和信号处理相关理论的可以后台私信哦,下一…

SQL-每日一题【196.删除重复的电子邮箱】

题目 表: Person 编写一个 SQL 删除语句来 删除 所有重复的电子邮件,只保留一个id最小的唯一电子邮件。 以 任意顺序 返回结果表。 (注意: 仅需要写删除语句,将自动对剩余结果进行查询) 查询结果格式如下所示。 示…

电脑密码忘了怎么解除?分享4个好方法!

我的电脑设置了一个密码,但刚刚想开电脑的时候怎么也想不到那个密码了!电脑密码忘了怎么解除?有什么好的方法吗? 为电脑设置一个密码可以更好地保护我们的电脑数据。但有时候我们在输入密码时可能会忘记密码,这将导致我…

考虑学PMP认证的项目经理,听我一句劝

早上好,我是老原。昨天3月份的考试成绩出了,我的朋友圈跟过年似的。 参加完3月的考试的同学体感普遍不是很好,本以为大家能过就很优秀了嘛,没想到是3A满天飞…… 我结合自己和一些同学的备考经历,分享一些备考思路。 …

【c++】并行编程:OpenMP入门

😏★,:.☆( ̄▽ ̄)/$:.★ 😏 这篇文章主要介绍OpenMP入门。 学其所用,用其所学。——梁启超 欢迎来到我的博客,一起学习,共同进步。 喜欢的朋友可以关注一下,下次更新不迷路&#x1f9…

“希尔排序:打破时间瓶颈的排序算法 “

文章目录 🔍什么是希尔排序🔑希尔排序分组思想📈希尔排序的优缺点👨‍💻希尔排序代码剖析 🔍什么是希尔排序 希尔排序(Shell Sort)是插入排序的一种高效率的改进版本,也…

浏览器缓存方式有哪些(cookie、localstorage、sessionstorage)

浏览器缓存方式 概要 http缓存 基于HTTP协议的浏览器文件级缓存机制 websql 只有较新的chrome浏览器支持,并以一个独立规范形式出现 indexDB 一个为了能够在客户端存储可观数量的结构化数据,并且在这些数据上使用索引进行高性能检索的 API Cooki…

v-model双向绑定指令

文章目录 前言v-model.lazy 延迟同步v-model.trim 去掉空格 前言 v-model指令是Vue.js中实现双向数据绑定的一种重要机制。它可以将表单控件的值与Vue.js实例中的数据进行双向绑定,即当表单控件的值发生变化时,Vue.js实例中的数据也会随之更新&#xff…

qt实现漂亮主页面

模仿自feiyangqingyun的博客_CSDN博客-Qt/C控件SDK使用示例,Qt/C音视频开发,Qt/C自定义控件领域博主 1.无边框窗口可移动 #ifndef MOVABLE_WIDGET_H #define MOVABLE_WIDGET_H#include <QWidget>class movable_widget:public QWidget { public:movable_widget(QWidget *…

华为荣耀6X(BLN-AL20)解锁全过程

这台旧手机一直闲置&#xff0c;想用它做测试机&#xff0c;所以必须先解锁。在此之前我已将手机改成了直供电&#xff0c;所以图片里没有电池&#xff0c;但是目前直供电方案并不完美&#xff0c;除了直供电线要插&#xff0c;尾插也要插上&#xff0c;淘宝卖家给出的理由是普…

部署 CNI网络组件

部署 flannel K8S 中 Pod 网络通信&#xff1a; ●Pod 内容器与容器之间的通信 在同一个 Pod 内的容器&#xff08;Pod 内的容器是不会跨宿主机的&#xff09;共享同一个网络命令空间&#xff0c; 相当于它们在同一台机器上一样&#xff0c;可以用 localhost 地址访问彼此的端…

如何看待程序员的高薪现象?

点击上方关注 “终端研发部” 设为“星标”&#xff0c;和你一起掌握更多数据库知识 最近在知乎上看到这样个话题&#xff1a; 难道不应该吗&#xff1f; 本人月薪八千&#xff0c;在北京一线&#xff0c;拿着最基础的工资&#xff0c;上的加班最频繁的班&#xff0c;干最累的活…

STM32模拟SPI协议控制数字电位器MCP41010电阻值

STM32模拟SPI协议控制数字电位器MCP41010电阻值 MCP41010是单路8位分辨率数字电位器&#xff0c;通过SPI接口可控制电位器阻值分配&#xff0c;相当于PW0端在PA0和PB0之间滑动。如下图所示&#xff1a; MCP41010是10K欧姆规格的数字电位器&#xff0c;即PA0和PB0之间的阻值恒…

Spring Boot 中的 @HystrixCommand 注解

Spring Boot 中的 HystrixCommand 注解 简介 在分布式系统中&#xff0c;服务之间的调用是不可避免的。但随着服务数量的增加&#xff0c;服务之间的依赖关系也会变得越来越复杂&#xff0c;服务的故障也会变得越来越常见。一旦某个服务出现故障&#xff0c;它所依赖的服务也…

Helm之深入浅出Kubernetes包管理工具基础

Helm 基础 作者&#xff1a;行癫&#xff08;盗版必究&#xff09; 一&#xff1a;Helm 简介 1.简介 ​ Helm 是 Kubernetes 的包管理器&#xff1b;它提供了提供、共享和使用为 Kubernetes 构建的软件的能力&#xff1b;是CNCF的毕业项目&#xff0c;自 Helm 加入 CNCF 以来…

【Canal】从原理、配置出发,从0到1完成Canal搭建

文章目录 简介工作原理MySQL主备复制原理canal 工作原理 Canal架构Canal-HA机制应用场景同步缓存 Redis /全文搜索 ES下发任务数据异构 MySQL 配置开启 binlog扩展statementrowmixed 配置权限 Canal 配置配置启动报错解决 实战引入依赖代码样例测试 前几天在网上冲浪的时候发现…

MYSQL03高级_新增用户、授予权限、授权底层表结构、角色理解

文章目录 ①. 登录服务器操作②. 用户的增删改③. 修改用户密码④. MySQL8密码管理⑤. 权限列表及原则⑥. 授予查看回收权限⑦. 底层权限表操作⑧. 角色的理解 ①. 登录服务器操作 ①. 启动MySQL服务后,可以通过mysql命令来登录MySQL服务器,命令如下: mysql –h hostname|hos…

chatgpt赋能python:搜索Python答案的软件

搜索Python答案的软件 介绍&#xff1a;什么是搜索Python答案的软件&#xff1f; 搜索Python答案的软件是一种工具&#xff0c;可以帮助编程人员快速地找到他们在编写Python代码时遇到的问题的答案。这种软件可以搜索各种不同的网站&#xff0c;以帮助用户找到最适合他们问题…