记一次pulsar数据丢失排查

news2025/1/13 15:33:20

记一次pulsar数据丢失排查

背景

生产者往pulsar写消息时会有递增的序列号字段,消费端在消费时,会出现序列号断层。当下无法确定是生产端、mq、消费端哪个地方丢失了数据,所以先从生产端进行排查。
生产端的消息发送是通过sendAsync的异步方法Template.sendAsync该方法会返回一个CompletableFuture对象目前是没有对该对象进行检查,所以补上对应的代码逻辑

template.sendAsync(xxx, protocol)
                    .whenCompleteAsync((res, exp) -> {
                        if (exp != null) {
                            log.error("", exp);
                        }
                    });
        }

在运行一段时间后,该异常被触发,可以确定问题在生产者这里了

异常排查

内存限制

运行一段时间后提示Client memory buffer is full,观察源码得知ProducerImpl在发送消息给mq时,会先进行this.canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)判断消息是否可以发送。
这里判断的是使用的本地内存大小

if (!this.client.getMemoryLimitController().tryReserveMemory((long)payloadSize)) {
                    this.semaphore.ifPresent(Semaphore::release);
                    callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId));
                    return false;
                }

ProducerImpl内部会有一个流量控制器,对当前消息的大小进行累加,判断是否超过限制,默认大小为64M。于是乎这边对限制的大小进行调大,在我们创建PulsarClinet时可以指定如下参数设置使用本地内存大小的最大值,这里我将大小调整
memoryLimit(512, SizeUnit.MEGA_BYTES)
重启后不在提示该异常

超时响应

虽然解决了内存问题,新的异常又开始出现
can not send message to the topic persistent://public within given timeout : createdAt 5.207 seconds ago, firstSentAt 5.206 seconds ago, lastSentAt 5.206 seconds ago, retryCount 1
在讲解这个异常怎么检测前,先说说ProducerImpl的发送机制,发送的数据会被加入到ProducerImplOpSendMsgQueue对象中
在这里插入图片描述
在pulsar中间件处理完数据后会告诉回发送端,这时ClientCnx netty对象就会回调handleSendReceipt方法将OpSendMsgQueue的数据进行移出,就是告诉发送端这条消息存储成功的一个响应-应答模式
在这里插入图片描述
那如果消息一直没有ack,这时发送端就会有个内部定时器进行定时检测,其在ProducerImpl.run方法中,默认执行时间为30s,会判断pendingMessage中的第一条消息是否超过了30s的时间,如果超过了,就会进行failPendingMessages方法调用

                        long diff = this.conf.getSendTimeoutMs() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstMsg.createdAt);
                        if (diff <= 0L) {
                            log.info("[{}] [{}] Message send timed out. Failing {} messages", new Object[]{this.topic, this.producerName, this.pendingMessages.messagesCount()});
                            PulsarClientException te = new PulsarClientException.TimeoutException(String.format("The producer %s can not send message to the topic %s within given timeout", this.producerName, this.topic), firstMsg.sequenceId);
                            this.failPendingMessages(this.cnx(), te);
                            timeToWaitMs = this.conf.getSendTimeoutMs();
                        } else {
                            timeToWaitMs = diff;
                        }

failPendingMessages方法中,会调用每个消息的sendComplete方法进行异常回调,也就是我们最初加在sendAsync上的回调信息了。
自此调用链路就比较清晰了

  1. 生产端发送消息给pulsar,pulsar接受成功后会告诉生产端
  2. 生产端每隔30s会对pendingMessages队列进行检查
  3. 如果第一条消息发送的时间超过30秒,就代表pulsar 30秒还未对该消息进行ack
  4. 生产端提示超时异常

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1470994.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于插件实现RabbitMQ“延时队列“

1.官网下载 在添加链接描述下载rabbitmq_delayed_message_exchange 插件,本文以v3.10.0为例 1.1.上传安装包 scp /Users/hong/资料/rabbitmq_delayed_message_exchange-3.10.0.ez root10.211.55.4:/usr/local/software1.2.将文件移入RabbitMQ的安装目录下的plugins目录 m…

EasyRecovery2024个人免费版本电脑手机数据恢复软件下载

EasyRecovery是一款功能强大的数据恢复软件&#xff0c;能够帮助用户恢复丢失、删除、格式化或损坏的数据。无论是由于误操作、病毒攻击、硬盘故障还是其他原因导致的数据丢失&#xff0c;EasyRecovery都能提供有效的解决方案。 该软件支持从各种存储介质恢复数据&#xff0c;…

linux-并发通信

一.linux-tcp通信框架 1.基础框架 1.1 tcp 服务器框架 1.套接字 #include <sys/socket.h> int socket(int domain, int type, int protocol);
 返回的文件描述符可以指向当前的socket&#xff0c;后续通过对文件描述符的访问就可以配置这个socket 成功时返回文件…

云原生应用测试:挑战与方法

&#x1f60f;作者简介&#xff1a;博主是一位测试管理者&#xff0c;同时也是一名对外企业兼职讲师。 &#x1f4e1;主页地址&#xff1a;【Austin_zhai】 &#x1f646;目的与景愿&#xff1a;旨在于能帮助更多的测试行业人员提升软硬技能&#xff0c;分享行业相关最新信息。…

Linux进程 ----- 信号处理

前言 从信号产生到信号保存&#xff0c;中间经历了很多&#xff0c;当操作系统准备对信号进行处理时&#xff0c;还需要判断时机是否 “合适”&#xff0c;在绝大多数情况下&#xff0c;只有在 “合适” 的时机才能处理信号&#xff0c;即调用信号的执行动作。 一、信号的处理…

万界星空科技MES系统,实现数字化智能工厂

万界星空科技帮助制造型企业解决生产过程中遇到的生产过程不透明&#xff0c;防错成本高&#xff0c;追溯困难&#xff0c;品质不可控&#xff0c;人工效率低下&#xff0c;库存积压&#xff0c;交期延误等问题&#xff0c;从而达到“降本增效”的目标。打通各个信息孤岛&#…

Python性能测试框架Locust实战教程

01、认识Locust Locust是一个比较容易上手的分布式用户负载测试工具。它旨在对网站&#xff08;或其他系统&#xff09;进行负载测试&#xff0c;并确定系统可以处理多少个并发用户&#xff0c;Locust 在英文中是 蝗虫 的意思&#xff1a;作者的想法是在测试期间&#xff0c;放…

推荐一个 Obsidian 的 ChatGPT 插件

源码地址&#xff1a;https://github.com/nhaouari/obsidian-textgenerator-plugin Text Generator 是目前我使用过的最好的 Obsidian 中的 ChatGPT 功能插件。它旨在智能生成内容&#xff0c;以便轻松记笔记。它不仅可以在 Obsidian 中直接使用 ChatGPT&#xff0c;还提供了优…

Vue+SpringBoot打造衣物搭配系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、研究内容2.1 衣物档案模块2.2 衣物搭配模块2.3 衣物收藏模块 三、系统设计3.1 用例设计3.2 E-R图设计3.3 数据库设计3.3.1 衣物档案表3.3.2 衣物搭配表3.3.3 衣物收藏表 四、系统实现4.1 登录页4.2 衣物档案模块4.3 衣物搭配模块4.4…

力扣用例题:2的幂

此题的解题方法在于根据用例调整代码 bool isPowerOfTwo(int n) {if(n1){return true;}if(n<0){return false;}while(n>2){if(n%21){return false;}nn/2; }if(n1){return false;}return true;}

RDMA内核态函数ib_post_send()源码分析

最近调用linux内核下RDMA的Verb API ib_post_send()出现了问题&#xff0c;因此从源码分析一下这个函数的调用过程。 我使用的内核版本为5.15.0-94 这是函数ib_post_send的头文件定义&#xff0c;这个函数的意义是向发送队列提交发送请求&#xff0c;他会调用qp对应设备的post_…

C# EF Core迁移数据库

现象&#xff1a; 在CodeFirst时&#xff0c;先写字段与表&#xff0c;创建数据库后&#xff0c;再添加内容 但字段与表会变更&#xff0c;比如改名删除增加等 需求&#xff1a; 当表字段变更时&#xff0c;同时变更数据库&#xff0c;执行数据库迁移 核心命令 Add-Migrat…

一种基于道路分类特性的超快速车道检测算法

摘要&#xff1a; 本文介绍了一种新颖、简单但有效的车道检测公式。 车道检测是自动驾驶和高级驾驶员辅助系统 (ADAS) 的基本组成部分&#xff0c;在实际高阶驾驶辅助应用中&#xff0c;考虑车道保持、转向、限速等相关的控制问题&#xff0c;这种方式通常是通过受限的车辆计算…

java——多线程基础

目录 线程的概述多线程的创建方式一&#xff1a;继承Thread类方式二&#xff1a;实现Runnable接口方式三&#xff1a;利用Callable接口、FutureTask类来实现。Thread常用的方法 线程安全问题线程安全问题概述线程安全问题案例取钱案例描述模拟代码如下&#xff1a;执行结果 线程…

2024-02-25 Unity 编辑器开发之编辑器拓展7 —— Inspector 窗口拓展

文章目录 1 SerializedObject 和 SerializedProperty2 自定义显示步骤3 数组、List 自定义显示3.1 基础方式3.2 自定义方式 4 自定义属性自定义显示4.1 基础方式4.2 自定义方式 5 字典自定义显示5.1 SerizlizeField5.2 ISerializationCallbackReceiver5.3 代码示例 1 Serialize…

【Activiti7系列】Activi7简介和基于Spring Boot整合Activiti7(流程设计器)

本文将介绍Activiti7基础概念及基于Spring Boot整合Activiti7(流程设计器)的具体步骤。 作者&#xff1a;后端小肥肠 1. 前言 在企业级应用中&#xff0c;业务流程的管理和执行是至关重要的一环。Activiti7是一个强大的开源工作流引擎&#xff0c;它提供了灵活的流程定义、任务…

linux---安使用nginx

目录 一、编译安装Nginx 1、关闭防火墙&#xff0c;将安装nginx所需要软件包传到/opt目录下 ​编辑2、安装依赖包 3、创建运行用户、组 4、编译安装nginx 5、创建软链接后直接nginx启动 ​编辑 6、创建nginx自启动文件 ​编辑6.1 重新加载配置、设置开机自启并开启服务…

Kafka之Producer源码

Producer源码解读 在 Kafka 中, 我们把产生消息的一方称为 Producer 即 生产者, 它是 Kafka 的核心组件之一, 也是消息的来源所在。它的主要功能是将客户端的请求打包封装发送到 kafka 集群的某个 Topic 的某个分区上。那么这些生产者产生的消息是怎么传到 Kafka 服务端的呢&a…

unity发布webGL压缩方式的gzip,使用nginx作为web服务器时的配置文件

unity发布webGL压缩方式的gzip&#xff0c;使用nginx作为web服务器时的配置文件 Unity版本是&#xff1a;2021.3 nginx的版本是&#xff1a;nginx-1.25.4 Unity发布webgl时的测试 设置压缩方式是gzip nginx配置文件 worker_processes 1;events {worker_connections 102…

SpringBoot实现热插拔AOP

热插拔AOP执行核心逻辑 Advice&#xff1a;“通知”&#xff0c;表示 Aspect 在特定的 Join point 采取的操作。包括 “around”, “before” and “after 等 Advice&#xff0c;大体上分为了三类&#xff1a;BeforeAdvice、MethodInterceptor、AfterAdviceAdvisor&#xff1a…