消息队列kafka使用技巧和常见问题

news2024/11/24 6:27:00

目录

【消息队列概述】

【kafka】

消息丢失问题

消息重复问题

消费顺序问题

消息积压问题

kafka集群部署


【消息队列概述】

消息队列主要解决应用耦合、异步消息、流量削锋等问题,是大型分布式系统不可缺少的中间件。消息生产者 只管把消息发布到 MQ 中而不用管谁来取,消息消费者 只管从 MQ 中取消息而不管是谁生产的,这样生产者和消费者都不用知道对方的存在。

对于一些流转步骤较多,或者耗费时间过长的场景,就可以使用消息队列。比如用户下订单成功后,可以通过消息队列异步发送信息,异步处理赠送积分等等。

常用的消息队列中间件有下面几种:

  • ActiveMQ:由 Apache 出品的开源消息中间件。
  • RabbitMQ:使用 Erlang 语言开发的开源消息队列系统,基于 AMQP协议来实现。
  • RocketMQ:阿里巴巴自主开发,是淘宝内部的交易系统使用了淘宝自主研发的 Notify 消息中间件,使用 MySQL 作为消息存储媒介,可完全水平扩容。
  • Kafka:Kafka是优秀的分布式消息队列,采用 Scala 和 Java 开发,特点是拥有巨大吞吐量(数百万 / 秒),可水平扩展,可容错的消息系统。

使用消息队列能解决哪些问题:

  • 异步处理:比如上面说的下订单后的赠送积分,或者异步收集日志;
  • 应用解耦:微服务时代,多个服务之间可以通过消息队列传输数据,不用担心阻塞问题;
  • 流量削峰:秒杀业务中可以将来不及处理的数据暂时积压到队列中,后台慢慢处理,缓解服务器压力;
  • 发布/订阅:一条消息可以广播给任意个收听方,Producer 只负责发送消息 Message,Consumer 可随意订阅 Message。

举例:电商系统中,订单服务和支付服务分开部署,订单系统、物流系统、财务系统 都订阅了支付系统的消息。当订单支付成功后,支付系统广播一条消息:用户已经确认付款;各个订阅方收到消息后,订单系统将订单状态改成已支付,物流系统开始发货,财务系统开始开发票.... 这个过程可以使用消息队列来完成。

引入了消息中间件会导致系统复杂度提高,会带来如下问题:

  • 消息丢失问题: 任何系统都不能保证万无一失,比如 Producer 发出了 10000 条消息,Consumer 只收到了 9999 条消息,那么就需要评估能否接受丢失的这条消息?如果是订单成功后的短信通知那可以接受丢消息;如果是用户支付成功了却没给人家发货,那用户估计要急眼了。
  • 消息重复问题:和上面类似,如果 Producer 发出了 10000 条消息,但是 Consumer 收到了 10001 条消息,有一条是重复的,业务能否接受这条重复的消息?如果是订单成功后的短信通知多发了一条那也还好;如果是用户支付成功下了一单却发了两个商品,那卖家就要吃亏了。
  • 消息的顺序问题:比如 Producer 发送顺序是 1->2->3,Consumer 收到的消息是 1->3->2,要考虑消费端是否对顺序敏感。
  • 一致性问题: 如果消息丢失并且无法找回,会造成两个系统的数据最终不一致;如果消息延迟,会造成短暂不一致;针对这两种情况都要提前想好应对策略。

【kafka】

Kafka 中的一些关键词:

  • Producer:消息的生产者,谁来创建消息,谁就是生产者。
  • Consumer:消费的消费者,谁来接收消息,谁就是消费者。
  • Topic:每条发布到 MQ 集群的消息都有一个类别,这个类别被称为 topic,可以理解成一类消息的名字。所有的消息都以 topic 作为单位进行归类。
  • Partition:Kafka 物理上的分区概念,每个 Topic 会分散在一个或多个 Partition,每个 Partition 都是有序的。一个 Topic 的数据太大了,就分成小片,Kafka 为分区引入多副本模型,副本之间采用 “一个 leader 多 follower” 的设计,通过多副本实现故障自动转移,保证可用性。
  • Broker:可以理解成一个服务器的节点,集群包含一个或多个服务器,这种服务器被称为 broker。对应用来说,生产者把消费发出去了,就不管了;消费者不紧不慢地按照自己的速率来消费。这段时间可能有大量消息产生,消费者压力还是在一定范围内。做生产者和消费者之间解耦的就是一个缓存服务 broker。
  • Kafka Cluster:kafka的集群就是 Broker 的集合,多个 Broker 组成一个高可用集群。

相比同类中间件 RabbitMQ / ActiveMQ,Kafka 支持批量拉取消息,大大增加了消息吞吐量。kafka是分布式可扩展,Kafka 集群可以透明的扩展,增加新的服务器进集群。

kafka支持多种发送场景:1. 发送并忘记;2. 同步发送;3. 异步发送 + 回调函数。这3 种方式主要体现在时间上的差别,并不是说时间越快的越好,具体使用哪种方式要看具体的业务场景:

  • 比如业务要求消息必须是按顺序发送,可以使用第 2 种同步发送,并且只能在一个 partation 上;
  • 如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用第 1 种发送并忘记的方式;
  • 如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用第3种 异步 + 回调的方式来发送消息。

为什么 Kafka 的吞吐量远高于其他同类中间件Kafka 是一个高吞吐量分布式消息系统,并且提供了持久化;架构采用分布式并行处理,利用磁盘顺序 IO 批处理。

  • 利用了磁盘连续读写性能远远高于随机读写的特点,内部采用消息的批量处理,zero-copy 机制,数据的存储和获取是本地磁盘顺序批量操作,具有 O (1) 的复杂度,消息处理的效率很高
  • 并发机制,将一个 topic 拆分多个 partition, kafka 读写的单位是 partition,因此,将一个 topic 拆分为多个 partition 可以提高吞吐量。但是,不同 partition 需要位于不同的磁盘(可以在同一个机器),如果多个 partition 位于同一个磁盘,那么会有多个进程同时对一个磁盘的多个文件进行读写,破坏了磁盘读写的连续性。

消息丢失问题

消息从生产到消费会经历下面的过程:

  • 消息生产阶段:只要能正常收到Broker的ack确认响应,就表示发送成功,所以只要处理好返回值和异常,这个阶段是不会出现消息丢失的;
  • 消息存储阶段:这个阶段一般会直接交给MQ消息中间件来保证,比如Broker会做副本,保证一条消息至少同步两个节点再返回ack;
  • 消息消费阶段:消费端从Broker上拉取消息,只要消费端在收到消息后,不立即发送消费确认给Broker,而是等到执行完业务逻辑后,再发送消费确认,也能保证消息的不丢失。

哪些环节可能丢失消息?

  • 在消息生产过程中可能会丢失消息:采用的方案是消息重传。如果不是消息队列发生故障,或者是到消息队列的网络断开了,重试 2~3 次就可以了。不过,这种方案可能会造成消息的重复
  • 在消息队列中可能会丢失消息:消息在 Kafka 中是存储在本地磁盘上的,而为了减少消息存储时对磁盘的 随机 I/O,一般会将消息先写入到操作系统的 Page Cache 中,然后再找合适的时机刷 新到磁盘上。比如,Kafka 可以配置当达到某一时间间隔,或者累积一定的消息数量的时候再刷盘,也就是异步刷盘。如果你的系统对消息丢失的容忍度很低,那么可以考虑以集群方式部署 Kafka 服务,通过部署多个副本备份数据,保证消息尽量不丢失。
  • 在消费的过程中可能会丢失消息:一定要等到消息接收和处理完成后才能更新消费进度,但是这也会造成消息重复的问题,比方说某一条消息在处理之后,消费者恰好宕机了,那么因为没有更新消费进度,所以当这个消费者重启之后,还会重复地消费这条消息。

怎么知道消息丢失了?

  • 可以在消息生产端给每个发出的消息都指定一个全局唯一 ID, 或者附加一个连续递增的版本号,然后在消费端做对应的版本校验。关于这里提到的全局唯一ID可以使用uuid或者雪花算法,请参考我之前在MySQL分库分表的方案中的解决思路:MySQL分区分库分表和分布式集群

怎么防止消息丢失?

  • 可以利用拦截器机制,在生产端发送消息之前,通过拦截器将消息版本号注入消息中,然后在消费端收到消息后,再通过拦截器检测版本号的连续性或消费状态。这样实现的好处是消息检测的代码不会侵入到业务代码中,可以通过单独的任务来定位丢失的消息。
  • 也可以提前把消息存到数据库中,然后等待消费者明确告知已消费完成,再把数据库中的这条记录改为已完成,或者删除。如果消息丢失了,就从数据库中查找到这条消息然后重新消费。如果担心这一连串的操作因为非原子性引发问题,可以使用事务,但是成本也会更高。

消费失败的数据如何处理?

  • 消费端成功执行相关任务流程之后,再删除当前消息,如果当前消息执行失败则不删除。这样可能会有一个问题,就是如果是消息内容本身数据异常,会导致这条消息一直占据着队列头部。第二种方案:如果消费失败后也删除消息,但是记录一个失败次数,并把这条消息重新放回到队列尾部,如果失败次数达到一定数量则发送报警信息人工干预。

消息重复问题

怎么解决消息重复的问题 首先生产者的消息内容要满足“幂等”条件,就是无论重复消费多少次的结果都是一样的。可以根据某个唯一标识做强幂等,比如用户订单号或者流水号。

所谓幂等,就是无论Http或者RPC接口在入参不变的情况下,无论请求多少次,结果都是一样的,请求结果不会因为请求次数不同而改变。比如说设定要更新的金额为100元,那么无论更新多少次最终结果都是100元;但是如果传递的是个增量,比如增加20元,那么每多请求一次,就会多加20元。

幂等接口常见的设计方案:

  • 客户端按钮提交限制,每次提交一个请求时,按钮置为不可用。
  • 后台系统逻辑层处理,生成保存唯一ID,每次请求先校验这个ID是否已经存在,存在则表示重复操作,直接返回上一次操作结果。
  • token 校验机制,客户端请求前先申请 token,同一个 token 只处理一次,无 token 或者相同 token 不做处理。
  • 分布式锁,如引入 Redis 分布式锁(set+nx),防止其他请求重复操作。
  • 请求队列,引入 MQ 排队的方式让请求有序处理。

消费顺序问题

在多集群消息架构中,如果消费端要求接收到的消息是有序的,怎么解决消息顺序消费问题?比如一个消息 Producer 发送顺序是 1->2->3,那 Consumer 接收到的消息也应该是 1->2->3。思路:让同一个消息不分区,且单线程。

  • Producer:让生产端同步发送消息,消息1确定发送成功后再发送消息2,不能异步,保证消息顺序入队。
  • 服务端:Producer -> kafka服务器 -> Consumer 一对一关系,一对一服务,这肯定能保证消息是按照顺序消费的,那么问题来了:这里面任意一个环节出现问题,那肯定整个链路都阻塞了。另外这种单通道模型会出现性能瓶颈。
  • topic 不分区:意思就是让同一个 topic 主题都入一个队列,在分布式环境下如果同一个 topic 进入多个分区,那多个分区之间肯定无法保证消息顺序了。
  • Consumer:保证消费端是串行消费,禁止使用多线程。但是这样会牺牲掉系统的性能和稳定性。 
  • 从消息内容考虑:消息中存储一个id,消费者如果发现接收到的id不连续了,说明顺序出现了问题,但是消费者要存储和查询这个id,也是会耗费性能。

消息积压问题

有时候因为各种原因的bug导致的消息积压问题是很可怕的,如果积压的消息迟迟不能被释放,时间长了也会出现很大问题。造成消息积压的可能原因:

  • 服务监控不完善:可能是 Kafka 服务器没有添加消息积压告警,虽然正常网络抖动积压一部分消息很正常,但是应该根据服务器正常运行的情况添加一个合理的告警阈值,比如,消息积压超过 5000 时就发出一个告警信息,及时通知维护人员处理故障。
  • 业务低峰期问题没能暴露:在业务低峰的时候,消费端虽然有延迟但是能缓慢消化,影响可控;到了业务高峰期,流量快速上升,一时间消费端处理不过来,影响不可控,导致消息积压和延迟陡增。

出现消息积压后应该怎么处理?

  • 如果是线上突发问题,就要临时扩容,增加消费端的数量,与此同时,还可以降级一些非核心的业务,通过扩容和降级抗住流量。
  • 排查解决异常问题,比如通过查看监控,日志等手段分析是否消费端的业务逻辑代码出现了问题,进而优化消费端的业务处理逻辑。

如何避免或者消息积压?改进方案如下:

  • 制定B计划,增加队列补偿机制,当队列出现问题,可以有其他方式进行消息转发,如直连接口。
  • 降低发版频率,而且每次发版上线后及时观察线上数据。
  • 建立有效的报警机制,出现一定数量的消息积压,及时报警给相关负责人。
  • 最重要的,保证代码的健壮性,尤其是消费端。

kafka集群部署

Kafka 集群中有一个 Leader 负责消息的写入和消费,可以有多个 Follower 负责数据的备份。Follower 中有一个特殊的集合叫做 ISR(in-sync replicas),当 Leader 故障时,新选举出来的 Leader 会从 ISR 中选择,默认 Leader 的数据会异步地复制给Follower,这样在 Leader 发生掉电或者宕机时,Kafka 会从 Follower 中消费消息,减少消息丢失的可能。
由于默认消息是异步地从 Leader 复制到 Follower 的,所以一旦 Leader 宕机,那些还没有来得及复制到 Follower 的消息还是会丢失。为了解决这个问题,Kafka 为生产者提供一 个选项叫做“acks”,当这个选项被设置为“all”时,生产者发送的每一条消息除了发给 Leader 外还会发给所有的 ISR,并且必须得到 Leader 和所有 ISR 的确认后才被认为发送成功。这样,只有 Leader 和所有的 ISR 都挂了,消息才会丢失。

  •  如果需要确保消息一条都不能丢失,那么建议不要开启消息队列的同步刷盘,而是需要使用集群的方式来解决,可以配置当所有 ISR Follower 都接收到消息才返回成功。
  • 如果对消息的丢失有一定的容忍度,那么建议不部署集群,即使以集群方式部署,也建议配置只发送给一个 Follower 就可以返回成功了。
  • 业务系统一般对于消息的丢失有一定的容忍度,比如说因为用户下单后赠送积分的消息丢失了,可以后期针对个别丢失的消息给用户把积分补上就行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/615000.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【CMake 入门与进阶(4)】 CMakeLists.txt 语法规则基础及部分常用指令-续(附使用代码)

由于篇幅问题本篇接着上文继续介绍 CMakeLists.txt 语法规则基础及常用指令。 aux_source_directory aux_source_directory 命令会查找目录中的所有源文件&#xff0c;其命令定义如下&#xff1a; aux_source_directory(<dir> <variable>)从指定的目录中查找所有…

开发者工具调试

Console控制台 F12打开控制台 选择其他tab面板时&#xff0c;ESC打开Console面板enter直接执行Console的代码&#xff0c;shiftEnter输入多行代码 Source面板 左键单机行号设置断点&#xff0c;或在代码中添加debugger;右键单机行号设置条件断点&#xff08;条件表达式为tr…

PowerShell install 一键部署mariadb10.11

mariadb MariaDB数据库管理系统是MySQL的一个分支&#xff0c;主要由开源社区在维护&#xff0c;采用GPL授权许可 MariaDB的目的是完全兼容MySQL&#xff0c;包括API和命令行&#xff0c;使之能轻松成为MySQL的代替品。在存储引擎方面&#xff0c;使用XtraDB来代替MySQL的Inno…

ChatGPT 国内镜像网站大全(含GPT-4.0版本)之什么年代还在写传统文章。

前言&#xff1a; 临近期末&#xff0c;大量水课的节课作业都是论文&#xff0c;一篇就是几千字&#xff0c;这对于还要复习专业课的我们可以说是压力巨大&#xff1a;心理健康论文&#xff0c;安全教育论文&#xff0c;大学语文论文&#xff0c;书法赏析论文&#xff0c;劳动…

小议C++函数签名与模板返回类型

题记&#xff1a;什么事情都要追问一个为什么&#xff0c;真正理解了为什么&#xff0c;才能活学活用。 代码1 下面的代码能编译通过吗&#xff1f; #include <stdio.h> #include <stdlib.h>class X { public:int *get() { return new int(); }double *get() { r…

MATLAB矩阵的分解函数与案例举例

系列文章目录 MATLAB当中线性方程组、不定方程组、奇异方程组、超定方程组的介绍 MATLAB语句实现方阵性质的验证 MATLAB绘图函数的相关介绍——海底测量、二维与三维图形绘制​​​​​​ MATLAB求函数极限的简单介绍 文章目录 前言 1. 奇异值分解&#xff08;SVD&#x…

C++类和对象-4

在上篇C类和对象的博客中&#xff0c;我们讲述了析构函数、拷贝构造函数、浅拷贝和深拷贝的内容&#xff0c;我们紧接上文&#xff0c;开始讲述接下来的文章。 目录 1.this指针 1.1引入 1.2内容 1.3特征 1.4用法 2.静态成员 2.1内容 2.2静态数据成员 2.3静态成员函数…

Vue.js 中的国际化支持是什么?如何进行国际化支持?

Vue.js 中的国际化支持是什么&#xff1f;如何进行国际化支持&#xff1f; Vue.js 是一款流行的前端框架&#xff0c;它提供了许多方便的工具和 API&#xff0c;用于构建交互式的用户界面。其中&#xff0c;国际化支持是 Vue.js 中重要的一部分&#xff0c;它可以让我们轻松地…

如何强制删除文件夹?这样操作就能搞定!

案例&#xff1a;我想删掉一些没有用的文件夹&#xff0c;释放一些电脑内存&#xff0c;但是我发现&#xff0c;有些文件夹并不能直接被删除。怎样才能删除这些文件夹&#xff1f;有没有小伙伴有解决的办法。 在使用电脑过程中&#xff0c;我们可能会遇到一些无法正常删除文件夹…

空间计算时代来临:苹果Vision Pro震撼上市,探索真实与虚拟的新边界

目录 前言Vision Pro的外观设计Vision Pro的交互方式Vision Pro 硬件配置Vision Pro 上市时间及销售价格Vision Pro与传统XR设备不同点总结其它资料下载 前言 苹果公司在2023年6月6日的WWDC23主题演讲中正式发布了传闻已久的头显产品——Vision Pro。WWDC&#xff0c;全称为“…

LLM Accelerator:使用参考文本无损加速大语言模型推理

编者按&#xff1a;如今&#xff0c;基础大模型正在诸多应用中发挥着日益重要的作用。大多数大语言模型的训练都是采取自回归的方式进行生成&#xff0c;虽然自回归模型生成的文本质量有所保证&#xff0c;但却导致了高昂的推理成本和长时间的延迟。由于大模型的参数量巨大、推…

被App包围 苹果Vision Pro将你推入空间“大屏”

2小时&#xff0c;这是2023年苹果开发者大会&#xff08;WWDC&#xff09;首日发布会的直播总时长&#xff0c;仅YouTube上&#xff0c;就有483.9万次观看。发布会开启时&#xff0c;北京时间是6月6日凌晨1点&#xff0c;众多科技博主串流直播了这场发布会。 苹果CEO蒂姆库克引…

3.2 继续完善的Vue.js响应式系统

前文提要&#xff1a; 3.0 响应式系统的设计与实现 3.1 一个稍微完善的Vue.js响应式系统 1、解决副作用函数的死循环问题 在解决了分支的切换的问题&#xff0c;此时还有一个代码死循环的问题&#xff0c;其这个死循环很容易触发&#xff0c;如下代码&#xff1a; const dat…

Netty Incubator Codec QUIC 0.0.41.Final 发布

导读Netty Incubator Codec QUIC 是一款基于 QUIC 协议的编解码器&#xff0c;为 Netty 提供了 QUIC 协议的支持。 近日&#xff0c;该团队发布了 0.0.41.Final 版本&#xff0c;这是一个错误修复版本&#xff0c;主要包括以下变化: 允许在派发前通过添加到读完队列来合并刷新…

嵌入式软件测试笔记3 | 嵌入式软件测试开发的多V模型

3 | 嵌入式软件测试开发的多V模型 1 简单的多V模型2 迭代与并行开发2.1 开发模型2.2 嵌入式开发过程的复杂性 3 多V模型中的测试活动3.1 测试活动和因素3.2 模型开发周期中与测试相关的元素分配3.3 原型开发周期中与测试相关的元素分配3.4 最终产品开发周期中与测试相关的元素分…

NineData x 华为云正式上线

6月5日&#xff0c;NineData 企业级 SQL 开发平台正式成为华为云“联营联运”商品。通过联营联运模式&#xff0c;双方将在产品、解决方案和生态等多个方面开展深度合作&#xff0c;共同提供高效、智能、安全的数据管理服务&#xff0c;帮助客户轻松构建一站式云端数据库管理平…

【随想录】一篇水文

前排许愿池: 我是一个没有梦想的咸鱼捏 自从知道成电优营了也不给offer之后 遂开始摆烂了(哈哈) 以及看了一下数据 好像前期存的资本够多的话 后面还是能混混的 however,已经快过去2/3了 前排致谢: 感谢好人一姐的助力 果然人是靠别人活着的 或者说伟人是站在巨人…

基于显扬科技3D视觉相机的芯片外观检测系统

Part.1 行业背景 电子元器件制造业是我国的支柱产业之一&#xff0c;具有产量大、技术投入高的特点&#xff0c;因此产品质量把控与生产成本优化是电子行业关注的发展重点。 芯片作为电子元器件中的核心组成部分&#xff0c;在现代社会被广泛应用&#xff0c;在芯片生产制造过…

Redis经典五大数据类型源码及底层实现

Redis经典五大数据类型源码及底层实现 一 面试题引入二 Redis数据类型的底层数据结构三 redis是字典数据库&#xff0c;KV键值对到底是什么&#xff1f;3.1 怎样实现键值对&#xff08;key-value&#xff09;数据库的&#xff1f;3.2 redisObject结构的作用3.3 RedisObject各字…

微信支付商户接入指引(企业)

目录 一、官方指引二、申请规则三、申请流程&#xff08;一&#xff09;提交资料&#xff08;二&#xff09;签署协议&#xff08;三&#xff09;绑定场景 一、官方指引 https://kf.qq.com/faq/210423UrIRB7210423by6fQn.html 二、申请规则 1、微信支付商家仅面向企业、个体…