Apache pulsar 技术系列-- 消息重推的几种方式

news2025/1/22 12:57:35

导语

Apache Pulsar 是一个多租户、高性能的服务间消息传输解决方案,支持多租户、低延时、读写分离、跨地域复制(GEO replication)、快速扩容、灵活容错等特性。在很多场景下,用户需要通过 MQ 实现消息的重新推送能力,比如超时重推、处理异常时重推等,本文介绍 Apache Pulsar 提供的几种消息重推方案。

在 MQ 实际的使用中,用户消费数据时,可能会遇到消息处理异常或者需要推迟处理的场景,这里就涉及到消息的重推逻辑,Pulsar 自己提供了消息重推的能力。本文主要介绍 Pulsar 的消息重推机制。

消息获取(拉取/推送)机制

Pulsar 的消费采用了推、拉结合的消息获取机制,Consumer 获取消息之前会首先通知 Broker(FLOW 请求),Broker 会根据配置的 ReceiveQueue 大小以及 Consumer 当前可以接收的消息数量来推送消息给 Consumer。

详细的交互流程如下图所示:

图片

  1. Consumer 在创建之后,会以 MaxReceiveQueue 的大小作为 Permit 值,这个值就是 Consumer 可以缓存的的最大消息条数。

  2. 然后,Consumer 向 Broker 发起 FLOW 请求,携带 Permit 信息(Consumer Permit 减少到 0),Broker 接收之后会记录这个 Permit 作为 Consumer 的 AvailablePermit,AvailablePermit 决定 Broker 可以向 Consumer 发送数据的数量(实际是在读取数据时判断)。

  3. 如果 AvailablePermit > 0, Broker 开始读取数据(假设有 N 条),然后推送给 Consumer,推送之后,AvailablePermit 自减 N。

  4. Consumer 接收到消息之后,并不会直接返回给用户,而是放在 ReceiveQueue 中,当用户调用 Receive() 方法来获取消息时,Consumer 将 Permit + 1。

  5. 当 Permit > MaxReceiveQueueSize / 2,Consumer 会再次发起 Flow 请求,并且携带当前的 Permit 值。

上述流程,就是 Consumer 和 Broker 的消息传递过程。

在默认的情况下,数据推送给 Consumer 之后,就完全交给用户处理,数据不会重复推送。这种方式满足不了需要重推的场景,下面介绍目前 Pulsar 的几种重推机制。

SDK 统一的重推

一个比较直观的做法是超过一定时间,如果消息没有 Ack 就重新推送。

目前 Pulsar 提供了通过超时时间来控制数据重推的能力,Consumer 可以配置 AckTimeout(默认关闭),在设置了 AckTimeout 之后,Client 会构建一个 UnAckedMessageTracker ,用户 Receive() 的所有的消息都会被 UnAckedMessageTracker 跟踪。用户 Ack 消息时,会从 UnAckedMessageTracker 删除,对于没有 Ack 的消息,UnAckedMessageTracker 会有定时任务来检查,如果已经超过了 AckTimeout 时间,则会触发重推。

重推是通过 RedeliverUnackMessage 来实现的,UnAckedMessageTracker 会主动发起 Redeliver 的请求,Broker 会根据请求的 MessageId 信息重新推送。

AckTimeout 在 Consumer 初始化时设置:

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)                  .ackTimeout(10, TimeUnit.SECOND)

用户决策的重推 – NegativeAck

通过 AckTimeout 实现的重推,是 SDK 内部统一实现的,用户不能控制重推的行为,如果用户希望根据自己的使用场景,决定哪些消息需要重推,Pulsar 提供了 NegativeAck 的能力。

NegativeAck 和 AckTimeout 方式类似,有一个 NegativeAcksTracker 来管理消息的重推,NegativeAcksTracker 只会跟踪用户主动调用 NegativeAcknowledge() 方法的 MessageID,重推的逻辑也是通过 RedeliverUnackMessage 实现。

NegativeAck 可以设置 Redelivery 的 Delay 时间。

 Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)                .negativeAckRedeliveryDelay(1001, TimeUnit.MILLISECONDS)

使用的时候,需要明确调用。

// call the API to send negative acknowledgmentconsumer.negativeAcknowledge(message);

用户决策的重推 – RLQ

除了 NegativeAck 的方式,用户还可以通过重试队列( RLQ )来实现主动的消息重推,RLQ 一般会使用在用户暂时不能处理某些消息,并且希望之后再处理的场景。

Pulsar 提供了 ReconsumeLater() 方法来实现重试队列,和 Negative 不同的是,RLQ 会创建一个新的 Topic,Topic 的格式是 TopicName-SubscriptionName_RLQ , 每次 ReconsumeLater() 时,都会产生一个新的消息写入到 RLQ Topic 中,并且会对之前的消息 Ack。

设置了 RLQ 的 Consumer,SDK 内部默认会启动 RLQ 的订阅,所以 RLQ 的消息也会被 Consumer 消费到。

RLQ 是通过 DeadLetterPolicy 来配置的(DLQ 下文会解释)。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)     .topic("my-topic")     .subscriptionName("my-subscription")    .subscriptionType(SubscriptionType.Shared)     .enableRetry(true)    .deadLetterPolicy(DeadLetterPolicy.builder()     .maxRedeliverCount(maxRedeliveryCount)    .build())     .subscribe();

RLQ Topic 中的消息属性中会添加一下信息:

Special propertyDescription
REAL_TOPIC原始 Topic 名称
ORIGIN_MESSAGE_ID原始 MessageId
RECONSUMETIMES重复消费的次数
DELAY_TIME投递的延迟时间

RLQ 也需要主动调用: consumer.reconsumeLater(msg, 3, TimeUnit.SECONDS)。

为重推次数加上限制–DLQ

对于数据持续处理失败,一直重试并不是一个很好的策略,此时死信队列(DLQ)就是一个比较好的选择,DLQ 允许用户将持续处理失败的数据写入到一个独立的 Dead Letter Topic 中,DLQ 的数据需要单独的订阅来消费。

DLQ Topic 的格式为 TopicName-SubscriptionName_DLQ。DLQ 需要为重试设置一个上限,当重试次数超过上限之后,就会被写入到 DLQ Topic 中。

Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)       .topic("my-topic")      .subscriptionName("my-subscription")      .subscriptionType(SubscriptionType.Shared)         .deadLetterPolicy(DeadLetterPolicy.builder()             .maxRedeliverCount(maxRedeliveryCount)             .build())         .subscribe();

几种重推和 DLQ 的关系

如果配置了 DLQ,那么使用 AckTimeout、NegativeAck 或者 ReconsumeLater 引起的数据重推都会触发 DLQ,也就是说重试的次数达到上限之后,都会被写入到 DLQ topic 里。

重试次数的统计有所区别:

AckTimeout 和 NegativeAck 都是通过 Redelivery 机制来计数的,SDK 发起 Redelivery 请求之后,Broker 侧的 RedeliveryTracker 会记录重推的次数,并且在推送给 Consumer 的 Message 中会包含 RedeliveryCount 的字段。

对于 RLQ,则是从 RECONSUMETIMES 属性中获取重复消费的次数,这个属性在 Client 生成,并且也是在 Client 计数。

总的来说,Apache Pulsar 提供了多种消息重推的方式,用户可以结合自己的场景,灵活使用,满足自己的业务需求。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/792686.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

若依框架@DataScop不生效问题

主要原因没有在方法传参中method直接变成对象&#xff0c;而还是用String来进行接受&#xff0c;导致切面没有获取到参数 若依框架DataScop不生效问题

neo4j教程-Cypher操作

Cypher基础操作 Cypher是图形存储数据库Neo4j的查询语言&#xff0c;Cypher是通过模式匹配Neo4j数据库中的节点和关系&#xff0c;从而对数据库Neo4j中的节点和关系进行一系列的相关操作。 下面&#xff0c;通过一张表来介绍一下常用的Neo4j操作命令及相关说明&#xff0c;具…

产业大数据应用:洞察企业全维数据,提升企业监、管、服水平

​在数字经济时代&#xff0c;数据已经成为重要的生产要素&#xff0c;数字化改革风生水起&#xff0c;在新一代科技革命、产业革命的背景下&#xff0c;产业大数据服务应运而生&#xff0c;为区域产业发展主导部门提供了企业洞察、监测、评估工具。能够助力区域全面了解企业经…

打造交流利器:PHP留言板功能详解与实践

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌&#xff0c;2023年6月csdn上海赛道top4。多年电商行业从业经验&#xff0c;对系统架构&#xff0c;数据分析处理等大规模应用场景有丰富经验。 &#x1f3c6;本文已收录于PHP专栏&#xff1a;PHP…

聚类算法概述

聚类算法简介 1.1 聚类算法在现实中的应用 用户画像,广告推荐,Data Segmentation,搜索引擎的流量推荐,恶意流量识别 基于位置信息的商业推送,新闻聚类,筛选排序 图像分割,降维,识别;离群点检测;信用卡异常消费;发掘相同功能的基因片段 1.2 聚类算法的概念 聚类…

Spring 源码解读

1、Spring 的结构组成 1.1、核心类介绍 Spring 中有两个最核心的类 1 DefaultListableBeanFactory XmlBeanFactory 继承自 DefaultListableBeanFactory&#xff0c;而DefaultListableBeanFactory 是整个 bean加载的核心部分&#xff0c;是 Spring 注册及加载 bean 的默认实现…

DAY13_会话技术-CookieSession综合案例

目录 1 会话跟踪技术的概述2 Cookie2.1 Cookie的基本使用2.1.1 概念2.1.2 Cookie的工作流程2.1.3 Cookie的基本使用2.1.3.1 发送Cookie2.1.3.2 获取Cookie 2.2 Cookie的原理分析2.3 Cookie的使用细节2.3.1 Cookie的存活时间2.3.2 Cookie存储中文 3 Session3.1 Session的基本使用…

/var/lock/subsys目录的作用

总的来说&#xff0c;系统关闭的过程&#xff08;发出关闭信号&#xff0c;调用服务自身的进程&#xff09;中会检查/var/lock/subsys下的文件&#xff0c;逐一关闭每个服务&#xff0c;如果某一运行的服务在/var/lock/subsys下没有相应的选项。在系统关闭的时候&#xff0c;会…

【SpringBoot】简介及传统的 Spring 框架:对比和分析

哈喽&#xff0c;哈喽&#xff0c;大家好~ 我是你们的老朋友&#xff1a;保护小周ღ 今天给大家带来的是 SpringBoot 的简介&#xff0c;SpringBoot 项目的创建&#xff0c;相较于 Spring 框架的优点&#xff1a; 1. 快速的集成框架 2.内置运行容器, 快速的部署项目 3. 摒弃…

数据接口有哪些?(数据接口有哪几种)

数据接口是指不同应用程序或系统之间交换数据的通信界面。在现代信息化社会中&#xff0c;数据接口扮演着极为重要的角色&#xff0c;它们使得不同平台之间能够相互连接和交流&#xff0c;从而实现数据共享和应用集成。 数据接口的种类繁多&#xff0c;常见的有以下几种&#…

OpenCvSharp (C# OpenCV) 二维码畸变矫正--基于透视变换(附源码)

导读 本文主要介绍如何使用OpenCvSharp中的透视变换来实现二维码的畸变矫正。 实现步骤 讲解实现步骤之前先看下效果(左边是原图,右边是矫正后的效果): 【1】需求分析 由于相机拍摄角度,导致二维码形状不是矩形,存在明显的畸变。我们希望将其矫正为正常的矩形或者正方形图…

mars3d绘制区域范围(面+边框)

1、图例&#xff08;绿色面区域白色边框&#xff09; 2、代码 1&#xff09;、绘制区域ts文件 import { mapLayerCollection } from /hooks/cesium-map-init /*** 安全防護目標* param map*/ export const addSafetyProtection async (map) > {const coverDatas await m…

游戏服务器的帧率控制

固定Tick时间策略 固定Tick时间&#xff1a;顾名思义就是指程序每次心跳的时间都是等长的、固定的。如图中的“图A”&#xff0c;Tick1和Tick2的时间是相等的&#xff0c;如果实际执行的比上次执行时间长&#xff08;Run2 > Run1&#xff09;&#xff0c;则Sleep2 < Slee…

EventLog Analyzer:保障网络安全的强大日志审计利器

日志审计是现代网络安全中不可或缺的一环。随着信息技术的迅速发展&#xff0c;企业和组织面临着越来越多的网络安全威胁&#xff0c;如数据泄露、网络攻击和内部滥用等。而为了确保网络安全&#xff0c;日志审计成为了必要的措施。在众多日志审计工具中&#xff0c;EventLog A…

neo4j教程-安装部署

neo4j教程-安装部署 Neo4j的关键概念和特点 •Neo4j是一个开源的NoSQL图形存储数据库&#xff0c;可为应用程序提供支持ACID的后端。Neo4j的开发始于2003年&#xff0c;自2007年转变为开源图形数据库模型。程序员使用的是路由器和关系的灵活网络结构&#xff0c;而不是静态表…

个人博客系统[SpringBoot+SpringMVC+MyBais]

文章目录 &#x1f387; 前言1.项目目录介绍2.项目前准备2.1 使用到的第三方库2.1 配置文件&#xff08;application.properties&#xff09;2.2 数据库介绍 3.common目录工具类介绍3.1 AjaxResult类3.2 AppVariable类3.3 CaptchaUtils类3.4 PasswordUtils类3.5 UserSessionUti…

VSCode打开终端的方法

VScode打开终端的方法 第一种&#xff1a;快捷键 Ctrl ~ 第二种&#xff1a;选中某个文件&#xff0c;右键&#xff0c;点击“在集成终端中打开” 第三种&#xff1a;在VSCode的页面上方的选项&#xff0c;点击“终端”&#xff0c;再点击“新建终端” 打开后&#xff0c;…

事务,不只ACID

大家好&#xff0c;我是 方圆。一提到事务&#xff0c;最先让我想到的就是ACID和倒背如流的隔离级别。它确实和这些相关&#xff0c;但是在我读了《数据密集型应用系统设计》之后&#xff0c;我想把事务这个主题讲的不那么“传统”。本文的部分内容可能读起来会有些老生常谈的感…

【Vue】在el-table的el-table-column中,如何控制单行、单列、以及根据内容单独设置样式。例如:修改文字颜色、背景颜色

用cell-style表属性来实现。在官网中是这样表述这个属性的。 在el-table中用v-bind绑定此属性。&#xff08;v-bind的简写是&#xff1a;&#xff09; <el-table:data"options":cell-style"cell"><el-table-column prop"id" label"…

医疗小程序:提升服务质量与效率的智能平台

在医疗行业&#xff0c;公司小程序成为提高服务质量、优化管理流程的重要工具。通过医疗小程序&#xff0c;可以方便医疗机构进行信息传播、企业展示等作用&#xff0c;医疗机构也可以医疗小程序提供更便捷的预约服务&#xff0c;优化患者体验。 医疗小程序的好处 提升服务质量…