kafka消费堆积问题探索

news2025/1/13 14:20:13

背景

我们的商城项目用PHP写的,原本写日志方案用的是PHP的方案,但是,这个方案导致资源消耗一直降不下来,使用了20个CPU。后面考虑使用通过kafka的方案写日志,商城中把产生的日志丢到kafka中,在以go写的项目中消费kafka中的日志,并打印到控制台,最后,统一使用阿里sls抓取日志。我们kafka的分区有12个,go程序部署在k8s集群中,开启了弹性扩缩容,最多开启了8个pod进行消费,每秒产生的日志数量高峰在1500条左右,在这种情况下,依然产生了消息的堆积。

消费中执行的逻辑只有对象的映射和日志写控制台,所以,这种情况下产生了消息堆积,令我倍感困惑。

探索之路

第一步,确认一下每一步的执行时间。

func (s KafkaLogService) ReaderCreateLog(ctx context.Context, msg *customerkafka.CustomKafkaMsg) error {
	now := time.Now().UnixNano()
	var logEntry LogEntry
	data, ok := msg.Data.(string)

	if !ok {
		global.GIN_LOG.Error(ctx, "消息数据类型错误", "data", msg.Data)
		return fmt.Errorf("消息数据类型错误: %v", msg.Data)
	}

	// 解析 JSON 数据
	if err := json.Unmarshal([]byte(data), &logEntry); err != nil {
		global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", data)
		return fmt.Errorf("解析消息失败: %w", err)
	}
	now2 := time.Now().UnixNano()
	fmt.Printf("*******************分隔符*******************Unmarshal logEntry 耗时:%d 纳秒\n", now2-now)
	var logMessage LogMessage
	if err := json.Unmarshal([]byte(logEntry.Message), &logMessage); err != nil {
		global.GIN_LOG.Error(ctx, "解析 JSON 数据失败:", "error", err, "message", logEntry.Message)
		return fmt.Errorf("解析消息失败: %w", err)
	}
	now3 := time.Now().UnixNano()
	fmt.Printf("*******************分隔符*******************Unmarshal LogMessage 耗时:%d 纳秒\n", now3-now2)
	// 日志等级判断
	switch logEntry.Level {
	case "ERROR":
		global.GIN_LOG.Error(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))
	case "WARN":
		global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))
	case "INFO":
		global.GIN_LOG.Info(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))
	default:
		global.GIN_LOG.Warn(ctx, "", zap.Any("data", logMessage), zap.String("project", logMessage.Project))
	}
	now4 := time.Now().UnixNano()
	fmt.Printf("*******************分隔符*******************log 耗时:%d 纳秒\n", now4-now3)
	return nil
}

 

 json的Unmarshal的耗时,倒是符合我的认知,在预期之中。

但是,打印日志尽然需要耗时1.5毫秒,这个有点超出我的意料之外。这个时间似乎有点夸张啊。

但是,即便如此,一个消费者每秒也可以消费670条左右的消息,在起了8个实例的情况下,也不应该造成kafka消息的阻塞。

继续我们的探索之路

下面这一段是我对于kakfa消费者的封装。大概的逻辑就是每一个ActionType起一个协程进行消费。在这篇《基于kafka-go写的生产者和消费者》文章中写过这个封装背后的设计逻辑,有兴趣的可以移步过去一探究竟。

// Start 方法启动消费者并开始读取消息,根据actionType调用不同的处理函数
func (c *ConsumerClient) Start(ctx context.Context, handlers map[string]ActionHandler) error {
	for {
		select {
		case <-ctx.Done():
			return nil // 上下文取消,直接返回
		default:
			msg, err := c.reader.ReadMessage(ctx)
			if err != nil {
				c.logger.Error(ctx, "Failed to read message from Kafka", "error", err)
				continue
			}
			c.logger.Info(ctx, fmt.Sprintf("Message on topic: %s value: %s partion:%d offset:%d", msg.Topic, string(msg.Value), msg.Partition, msg.Offset))
			var kafkaMsg CustomKafkaMsg
			if err := json.Unmarshal(msg.Value, &kafkaMsg); err != nil {
				c.logger.Error(ctx, "Failed to unmarshal Kafka message", "error", err)
				continue
			}
			channel := make(chan *CustomKafkaMsg)
			// 使用 sync.Map 来管理 worker
			worker, loaded := c.workerMap.LoadOrStore(kafkaMsg.ActionType, channel)
			if !loaded {
				c.wg.Add(1) // 增加 WaitGroup 计数
				if ch, ok := worker.(chan *CustomKafkaMsg); ok {
					go c.startWorker(ctx, kafkaMsg.ActionType, handlers, ch)
				}
			}
			// 发送消息到对应的通道,避免阻塞其他消息消费
			// 只有在 handlers 中存在对应的 actionType 时才发送消息到对应的通道
			if _, ok := handlers[kafkaMsg.ActionType]; ok {
				if ch, ok := worker.(chan *CustomKafkaMsg); ok {
					ch <- &kafkaMsg
				}
			}
		}
	}
}

 由于我的这个写法,让我产生了一点担忧,虽然,我想的是每个ActionType只起一个协程进行消费,难道,实际情况并不是如我预期一样运行,而是,一条kafka消息就起了一个协程进行消费,如果是这种情况的话,那么,会导致大量的垃圾回收,程序的性能就会下降,那么,消息阻塞的问题也就可以解释了。

为了,验证我的这一想法,基于pprof工具看一下实际情况。

实际验证,排除我的担忧,符合我的预期,不是有一条kafka消息就开一个协程进行消费,而是,一个ActionType就只有一个协程进行消费。 

模拟生产环境测试

上述的探索,依然不能够完美解释文章开头提到的现象,起了8个消费者,依然导致消息堆积的现象。为了进一步探究其背后的原因,我模拟生产环境的状态,每秒钟往kafka中丢了1000条消息,再观察,我发现,在这种情况下,有时json.Unmarshal也有比较长的耗时,会出现1.5毫秒的耗时,另外,而写日志需要5毫秒左右,如此,每秒只能消费140条消息,消息堆积的现象也就能够解释了。

结论 

消息堆积的主要原因是日志打印操作耗时较长,最差时每秒只能消费140条消息。此外,有时JSON解析的时间也较长,这也是一个需要关注的问题。

接下来的目标是找出JSON解析耗时较长和日志打印慢的具体原因,并进行优化。通过解决这些问题,我们有望提高日志处理的效率,从而解决消息堆积的问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2275998.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用conda出现requests.exceptions.HTTPError 解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

智能化文档开发(DI)

这个文档涉及到多模态&#xff08;文本、发票、订单、语音&#xff09; 对于普通的文本&#xff0c;我们希望对某些实体的某些属性挖空生成文档模版&#xff0c;并根据预设字段填空最后生成正式文件对于发票、订单&#xff0c;我们想提取它的字段信息&#xff0c;写入DB对于一些…

RAID储存技术

RAID独立磁盘冗余技术是一种把2个或者多个HDD或SSD合并为一个协调的存储单元或列阵&#xff0c;从而预防数据丢失的技术&#xff0c;其最早由加州大学伯克利分校的计算机科学家David Patterson、Garth Gibson和Randy Katz在1987年提出。他们的研究论文“关于RAID的论证”提出了…

Java Web开发基础:HTML的深度解析与应用

文章目录 前言&#x1f30d;一.B/S 软件开发架构简述&#x1f30d;二.HTML 介绍❄️2.1 官方文档❄️2.2 网页的组成❄️2.3 HTML 是什么❄️2.4html基本结构 &#x1f30d;三.HTML标签1.html 的标签/元素-说明2. html 标签注意事项和细节3.font 字体标签4.标题标签5.超链接标签…

使用 WPF 和 C# 绘制图形

绘图困难 此示例展示了如何在 WPF 和 C# 中绘制图形。绘制图形总是很棘手&#xff0c;因为您通常需要在至少两个不同的坐标系中工作。首先&#xff0c;您要为图形使用世界坐标。例如&#xff0c;您可能希望 X 值的范围为 2000 年至 2020 年&#xff0c;Y 值的范围为 10,000 美元…

年度技术突破奖|中兴微电子引领汽车芯片新变革

随着以中央计算区域控制为代表的新一代整车电子架构逐步成为行业主流&#xff0c;车企在电动化与智能化之后&#xff0c;正迎来以架构创新为核心的新一轮技术竞争。中央计算SoC&#xff0c;作为支撑智驾和智舱高算力需求的核心组件&#xff0c;已成为汽车电子市场的重要新增量。…

【JVM-2.3】深入解析JVisualVM:Java性能监控与调优利器

在Java应用的开发和运维过程中&#xff0c;性能监控与调优是不可或缺的环节。无论是排查内存泄漏、分析CPU瓶颈&#xff0c;还是优化线程使用&#xff0c;开发者都需要借助一些强大的工具来辅助诊断。JVisualVM 正是这样一款由Oracle提供的免费工具&#xff0c;它集成了多种性能…

filestream安装使用全套+filebeat的模块用法

1 filestream介绍 官方宣布&#xff1a;输入类型为log在filebeat7.16版本已经弃用了 Filestream 是 Filebeat 中的一种 输入类型&#xff08;Input&#xff09;&#xff0c;用于处理日志文件的读取。它是为了取代 Filebeat 中传统的 log 输入&#xff08;Input&#xff09;设…

超燃预告!Origin百图绘制系列即将登场

Hello&#xff0c;大家好 这里是练习时长两年半的菜狗~ 持续更新各种竞赛&#xff0c;科研&#xff0c;保研&#xff0c;学习干货ing 回想刚开始打比赛那会&#xff0c;啥都不懂&#xff0c;就从用 Excel 画图起步&#xff0c;绘制的图形实在太难看。后来运用 Matlab&#xf…

八、系统托盘与配置面板

没有人会把你变得越来越好&#xff0c;时间和经历只是陪衬。 支撑你变得越来越好的&#xff0c;是你自己坚强的意志、修养、品行、以及不断的反思和经验。 人生最好的贵人&#xff0c;就是努力向上的自己。 一、系统托盘 1、资源文件夹 新建资源文件夹&#xff0c;我们需要把…

uniapp 之 uni-forms校验提示【提交的字段[‘xxx‘]在数据库中并不存在】解决方案

目录 场景问题代码结果问题剖析解决方案 场景 uni-forms官方组件地址 使用uniapp官方提供的组件&#xff0c;某个表单需求&#xff0c;单位性质字段如果是高校&#xff0c;那么工作单位则是高校的下拉选择格式&#xff0c;单位性质如果是其他的类型&#xff0c;工作单位则是手动…

Java面试核心知识4

公平锁与非公平锁 公平锁&#xff08;Fair&#xff09; 加锁前检查是否有排队等待的线程&#xff0c;优先排队等待的线程&#xff0c;先来先得 非公平锁&#xff08;Nonfair&#xff09; 加锁时不考虑排队等待问题&#xff0c;直接尝试获取锁&#xff0c;获取不到自动到队尾…

基于 SSH 的任务调度系统

文末附有完整项目代码 在当今科技飞速发展的时代&#xff0c;任务调度系统的重要性日益凸显。本文将详细介绍一个基于 SSH&#xff08;SpringStruts2Hibernate&#xff09;的任务调度系统的设计与实现。 一、系统概述 本系统旨在改变传统人工任务调度方式&#xff0c;通过计算…

我的128天创作之路:回顾与展望

大家好呀&#xff01;今天来和你们分享一下我的创作历程&#x1f601;。 一、机缘 最开始创作呢&#xff0c;是因为在学习 C 的 STL 时&#xff0c;像 string、list、vector 这些模板可把我折腾得够呛&#xff0c;但也让我学到了超多东西&#xff01;我就想&#xff0c;要是把我…

性能测试工具Jmeter中的FTP脚本开发

FTP文件传输协议是TCP/IP协议组织中的常用协议之一&#xff0c;主要用在internet上双向传输文件。FTP协议具有客户端和服务器端两个部分组成部分&#xff0c;具有上传与下载两种功能。Jmeter也提供了FTP请求的测试支持&#xff0c;实现了上传和下载功能测试。 对于上图的FTP请求…

【C++】string的关系运算与比较分析

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;基础知识&#xff1a;C 中的 string 关系运算器1. 关系运算器概述2. 字符串比较的本质 &#x1f4af;代码解析与扩展代码例一&#xff1a;相等比较代码解析输出 代码例二&a…

mysql本地安装和pycharm链接数据库操作

MySQL本地安装和相关操作 Python相关&#xff1a;基础、函数、数据类型、面向、模块。 前端开发&#xff1a;HTML、CSS、JavaScript、jQuery。【静态页面】 Java前端&#xff1b; Python前端&#xff1b; Go前端 -> 【动态页面】直观&#xff1a; 静态&#xff0c;写死了…

深度学习|表示学习|一个神经元可以干什么|02

如是我闻&#xff1a; 如果我们只有一个神经元&#xff08;即一个单一的线性或非线性函数&#xff09;&#xff0c;仍然可以完成一些简单的任务。以下是一个神经元可以实现的功能和应用&#xff1a; 1. 实现简单的线性分类 输入&#xff1a;一组特征向量 x x x 输出&#xff…

HTB:Paper[WriteUP]

目录 连接至HTB服务器并启动靶机 信息收集 使用rustscan对靶机TCP端口进行开放扫描 将靶机TCP开放端口号提取并保存 使用nmap对靶机TCP开放端口进行脚本、服务扫描 使用nmap对靶机TCP开放端口进行漏洞、系统扫描 使用nmap对靶机常用UDP端口进行开放扫描 对靶机进行子域…

做一个 简单的Django 《股票自选助手》显示 用akshare 库(A股数据获取)

图&#xff1a; 股票自选助手 这是一个基于 Django 开发的 A 股自选股票信息查看系统。系统使用 akshare 库获取实时股票数据&#xff0c;支持添加、删除和更新股票信息。 功能特点 支持添加自选股票实时显示股票价格和涨跌幅一键更新所有股票数据支持删除不需要的股票使用中…