腾讯mini项目-【指标监控服务重构】2023-08-23

news2025/1/13 15:43:10

今日已办

进度和问题汇总

  1. 请求合并
    1. feature/venus trace
    2. feature/venus metric
    3. feature/profile-otel-baserunner-style
    4. bugfix/profile-logger-Sync
    5. feature/profile_otelclient_enable_config
  2. 完成otel 开关
    1. trace-采样
    2. metrice-reader
  3. 已经都在各自服务器运行,并接入了云clickhouse集群,开始准备测试【详细需求】
    1. 测试的用例,并发的数目-【用例拓展-kafka的消息积压】
    2. clickhouse的哪些指标,cpu、内存,耗时等
    3. 以什么形式来输出这个性能对比?(表格or图形)
    4. 指标采集的性能消耗,复杂指标查询的消耗
    5. 对比对象-Jaeger
      1. 存储后端-elasticsearch 【手动部署或者购买】
      2. 收集存储,查询
      3. golang pprof 抓取文件 CPU 占用和耗时,内存-火焰图
      4. 不同方案做对比
    6. ck 的指标
      1. **数据库的延时,(五分钟)入库成功率 **【压测】
      2. 通过指标或者链路耗时,定位哪个环节卡住
      3. 压测 jaeger 数据收集出现问题-【qps】,降低配置,突出优势
      4. 内存和cpu占有,profile 手动收集指标
  4. profile服务器3301的端口
  5. watermill和baserunner的benmark,做得差不多了,修改了publisher用了kafka-client的异步生产者,耗时快了很多
  6. 需要启动其他监控工具(zipkin,jaeger【已经接入,正在尝试连入ck】,Prometheus等来进行对比吗)
  7. 一个优化代码中接入otel-sdk,如何减少显式声明,提高代码的可扩展性
    1. profile 已经将otel逻辑嵌入到baserunner的handler中
    2. venus 待办
    3. profile-watermill 待办

分工

  1. 测试用例 - 1
  2. jaeger - 2
  3. pprof - 1
  4. 测试对比两种方案的 clickhouse 指标
  5. docker-compose拉低配置

watermill-benchmark

代码实现

  1. 先初始化 producer
  2. watermill 初始化并启动 router / baserunner 初始化 consumer
  3. 在 for 循环中同步生产完固定数量的消息【开始计时】
  4. 阻塞等待固定数量的消息被消费,解析,处理,异步推回 kafka 完成【结束计时】
  5. 本机和服务器测试单个topic的100条消息的结果见下列表格
    1. watermill 的性能和资源利用率均好于 baserunner
    2. 核心数多的情况下,优势会更加明显
// Package consumer
// @Author xzx 2023/8/19 14:13:00
package internal

import (
	"context"
	"fmt"
	kc "github.com/Kevinello/kafka-client"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/bytedance/sonic"
	"github.com/garsue/watermillzap"
	"github.com/google/uuid"
	"github.com/segmentio/kafka-go"
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
	"profile/cmd"
	"profile/internal/config"
	baseconsumer "profile/internal/context/consumer"
	"profile/internal/log"
	"profile/internal/schema"
	"profile/internal/watermill/consumer"
	"profile/internal/watermill/watermillkafka"
	"testing"
	"time"
)

// BenchmarkWatermill-16           240           5631314 ns/op         3684370 B/op           37997 allocs/op
// BenchmarkWatermill-16           153           7084305 ns/op         3706966 B/op           38168 allocs/op
// BenchmarkWatermill-16           145           6917486 ns/op         3712511 B/op           38175 allocs/op
func BenchmarkWatermill(b *testing.B) {
	router := newRouter()
	go func() {
		if err := router.Run(context.Background()); err != nil {
			log.Logger.Error("router run error", zap.Error(err))
		}
	}()
	producer := newProducer()
	time.Sleep(10 * time.Millisecond)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		b.StopTimer()
		watermillkafka.MessageCount = 0
		err := publishMessage(producer, 100)
		if err != nil {
			break
		}
		b.StartTimer()
		// 阻塞等待消费完成指定数量
		for {
			if watermillkafka.MessageCount >= 100 && router.IsRunning() {
				b.StopTimer()
				log.Logger.Error("PubSub Count End", zap.Any("count", watermillkafka.MessageCount))
				break
			}
		}
	}
	b.StopTimer()
	router.Close()
}

// BenchmarkBaseRunner-16         12         100429542 ns/op         4959836 B/op      42119 allocs/op
// BenchmarkBaseRunner-16         10         100110220 ns/op         4946421 B/op      42132 allocs/op
// BenchmarkBaseRunner-16         10         106747810 ns/op         4942656 B/op      42107 allocs/op
func BenchmarkBaseRunner(b *testing.B) {
	producer := newProducer()

	myConsumer, err := kc.NewConsumer(
		context.Background(),
		kc.ConsumerConfig{
			Bootstrap: config.Profile.GetString("kafka.bootstrap"),
			GroupID:   config.Profile.GetString("kafka.group"),
			GetTopics: func(broker string) (topics []string, err error) {
				return []string{
					"to_analyzer__0.PERF_CRASH",
					"to_analyzer__0.PERF_LAG",
				}, nil
			},
			MessageHandler: cmd.ConsumerDispatchHandler,
			LogLevel:       int(zapcore.InfoLevel),
		},
	)

	if err != nil {
		log.Logger.Fatal("create consumer error", zap.Error(err))
		return
	}
	go func() {
		select {
		case <-myConsumer.Closed():
			log.Logger.Info("consumer Closed")
			return
		}
	}()
	time.Sleep(10 * time.Millisecond)
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		b.StopTimer()
		baseconsumer.ConsumeCount = 0
		err := publishMessage(producer, 100)
		if err != nil {
			break
		}
		b.StartTimer()
		// 阻塞等待消费完成指定数量
		for {
			if baseconsumer.ConsumeCount >= 100 {
				log.Logger.Error("PubSub Count End", zap.Any("count", baseconsumer.ConsumeCount))
				break
			}
		}
	}
	b.StopTimer()
	myConsumer.Closed()
}

func publishMessage(producer *kc.Producer, nums int) (err error) {
	var event = &schema.Event{
		Meta: schema.Meta{
			AppID:    "1024",
			Category: "PERF_CRASH",
			Model:    "xiaomi13",
			DeviceID: "1b201ff9-5002-4fae-8d22-507a1c1a10b6",
			Os:       "ios",
			OsVer:    "13.1",
			UserID:   "28865194-fd08-480f-957d-ee9f21b32c3c",
			Version:  "100.24.56.7.19",
			Arch:     "aarch64",
			SdkVer:   "5.12.6",
			Platform: "ios",
		},
		Data: schema.Data{
			Time:         1688491757512,
			IP:           "119.147.10.203",
			ID:           "a4b838db-4f34-4da8-a27b-e725477ed336",
			NetType:      "5G",
			NetOp:        "CT",
			BatteryLevel: 92,
			PageID:       "com.tencent.test.page1",
			Dimensions: map[string]string{
				"crashed_thread": "com.tencent.thread1",
				"crash_type":     "native",
				"lose_data":      "true",
				"repeat_occur":   "false",
			},
			Values: map[string]int64{
				"memory_free":  600,
				"memory_max":   1200,
				"memory_total": 1600,
				"remain_disk":  4000,
			},
		},
		VenusData: schema.VenusData{
			UploadTime: time.Now().UnixMilli(),
			BackendID:  uuid.NewString(),
			Country:    "China",
			Region:     "Guangdong",
			City:       "Shenzhen",
		},
	}
	topic := fmt.Sprintf("to_analyzer__0.%s", event.Category)
	messages := make([]kafka.Message, 0, nums)
	for i := 0; i < nums; i++ {
		event.UploadTime = time.Now().UnixMilli()
		event.BackendID = uuid.NewString()
		bytes, err := sonic.Marshal(event)
		if err != nil {
			fmt.Printf("failed to marshal event: %v\n", err)
		}
		messages = append(messages, kafka.Message{
			Topic: topic,
			Value: bytes,
		})
	}

	if err = producer.WriteMessages(context.Background(), messages...); err != nil {
		fmt.Printf("failed to write messages: %v\n", err)
	}
	return
}

func newProducer() *kc.Producer {
	eventKafkaConfig := &kc.ProducerConfig{
		Bootstrap:              "127.0.0.1:9092",
		Async:                  false,
		AllowAutoTopicCreation: true,
		Logger:                 &log.LogrLogger,
	}

	producer, err := kc.NewProducer(context.Background(), *eventKafkaConfig)
	if err != nil {
		panic("cannot connect to kafka with address 127.0.0.1:9092")
	}
	return producer
}

func newRouter() *message.Router {
	logger := watermillzap.NewLogger(log.Logger)
	publisher, subscriber := consumer.NewPubSub(logger)
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		log.Logger.Fatal("create router error", zap.Error(err))
	}

	router.AddPlugin(plugin.SignalsHandler)
	router.AddMiddleware(
		middleware.InstantAck,
		middleware.Recoverer,
	)

	router.AddMiddleware(consumer.UnpackKafkaMessage, consumer.InitPerformanceEvent, consumer.AnalyzeEvent)
	router.AddHandler("crash", "to_analyzer__0.PERF_CRASH", subscriber, "solar-dev.PERF_CRASH", publisher, consumer.CrashHandler)
	router.AddHandler("lag", "to_analyzer__0.PERF_LAG", subscriber, "solar-dev.PERF_LAG", publisher, consumer.LagHandler)
	return router
}

本地测试

BenchmarkWatermill-16BenchmarkBaseRunner-16
240 563,1314 ns/op 368,4370 B/op 3,7997 allocs/op12 1,0042,9542 ns/op 495,9836 B/op 4,2119 allocs/op
153 708,4305 ns/op 370,6966 B/op 3,8168 allocs/op10 1,0011,0220 ns/op 494,6421 B/op 4,2132 allocs/op
145 691,7486 ns/op 371,2511 B/op 3,8175 allocs/op10 1,0674,7810 ns/op 1,0674,7810 B/op 4,2107 allocs/op

服务器上测试

image-20230823202739491

image-20230823204753051

单个topic的100条消息

BenchmarkWatermill-4BenchmarkBaseRunner-4
10 4339,8240 ns/op 363,0762 B/op 3,7820 allocs/op25 4616,7095 ns/op 315,8836 B/op 3,9902 allocs/op
78 4065,2822 ns/op 360,0755 B/op 3,7893 allocs/op26 4330,6776 ns/op 317,8770 B/op 3,9880 allocs/op
100 3549,3863 ns/op 360,5322 B/op 3,7899 allocs/op100 4489,2327 ns/op 316,3158 B/op 3,9775 allocs/op
386 1427,4034 ns/op 358,7454 B/op 3,7876 allocs/op10000 4949,4435 ns/op 319,7664 B/op 3,9874 allocs/op

本地测试单个topic的100条消息

testb.nns/opB/opallocs/op
BenchmarkWatermill-161537084305370696638168
BenchmarkWatermill-161456917486371251138175
BenchmarkBaseRunner-1610100110220494642142132
BenchmarkBaseRunner-161010674781010674781042107

服务器测试单个topic的100条消息

testb.nns/opB/opallocs/op
BenchmarkWatermill-47840652822360075537893
BenchmarkBaseRunner-42643306776317877039880

明日待办

  1. 协助部署 jaeger

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1016772.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Datagrip 下载、安装教程,详细图文,亲测有效

文章目录 前言1. 下载2. 安装3 DataGrip 常用操作4 推荐阅读 前言 DataGrip 是 JetBrains 发布的多引擎数据库环境&#xff0c;支持 MySQL 和 PostgreSQL&#xff0c;Microsoft SQL Server 和 Oracle&#xff0c;Sybase&#xff0c;DB2&#xff0c;SQLite&#xff0c;还有 Hyp…

RocketMQ快速实战以及集群架构详解

文章目录 1、MQ简介1.1 、定义1.2 、作用 2、RocketMQ产品特点2.1、RocketMQ介绍2.2 、RocketMQ特点 1、MQ简介 1.1 、定义 ​ MQ&#xff1a;MessageQueue&#xff0c;消息队列。是在互联网中使用非常广泛的一系列服务中间件。 这个词可以分两个部分来看 一是Message&#…

SmartApi使用说明

缘起&#xff1a; 搞移动开发十多年了&#xff0c;接口、数据、数据模型、以及数据边界值的处理是需要团队协作解决&#xff0c;而这方面恰恰是总容易导致bug难修复的地方。而一款好用的api调试工具对于后端、前端、测试都是必须必的掌握熟练使用&#xff0c;Api就像人体的血管…

开发、测试、生产环境

开发环境&#xff08;Development Environment&#xff09;&#xff1a; 开发环境是用于开发新功能、修改和调试代码的环境。 在开发环境中&#xff0c;开发人员可以针对特定需求编写和测试代码。 通常&#xff0c;开发环境会模拟完整的系统环境&#xff0c;并提供开发人员所需…

SQL 性能优化总结

文章目录 一、性能优化策略二、索引创建规则三、查询优化总结 一、性能优化策略 1. SQL 语句中 IN 包含的值不应过多 MySQL 将 IN中的常量全部存储在一个排好序的数组里面&#xff0c;但是如果数值较多&#xff0c;产生的消耗也是比较大的。所以对于连续的数值&#xff0c;能用…

手撕排序之堆排序

一、概念&#xff1a; 什么是逻辑结构、物理结构&#xff1f; 逻辑结构&#xff1a;是我们自己想象出来的&#xff0c;就像内存中不存在一个真正的树 物理结构(存储结构)&#xff1a;实际上在内存中存储的形式。 堆的逻辑结构是一颗完全二叉树 堆的物理结构是一个数组 之…

常见请求方法

请求方法的本质 请求方法是请求行中的第一个单词&#xff0c;它向服务器描述了客户端发出请求的动作类型。在 HTTP 协议中&#xff0c;不同的请求方法只是包含了不同的语义&#xff0c;但服务器和浏览器的一些约定俗成的行为造成了它们具体的区别 fetch(https://www.baidu.com…

【算法|双指针|链表】反转链表

Leetcode206 反转链表 给你单链表的头节点 head &#xff0c;请你反转链表&#xff0c;并返回反转后的链表。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[5,4,3,2,1]示例 2&#xff1a; 输入&#xff1a;head [1,2] 输出&#xff1a;[2,1]示例…

opencv 处理扫描件移除灰色背景图

先看对比效果: 再上代码: import cv2 import numpy as npdef remove_gray_background(input_image_path, output_image_path, threshold180):# Load the input imageimage cv2.imread(input_image_path)# Convert the image to grayscalegray cv2.cvtColor(image, cv2.COLO…

【数据可视化】动态条形图Python代码实现

使用 Python 中的 bar_chart_race_cn 库创建动态条形图 前言 数据可视化在今天的数据分析和传达信息中起着至关重要的作用。动态条形图是一种强大的数据可视化工具&#xff0c;可以帮助我们展示随时间变化的数据趋势。本文将介绍如何使用 Python 编程语言中的 bar_chart_race…

阿里云yum源和tuna源

阿里云开源镜像站地址&#xff1a;阿里巴巴开源镜像站-OPSX镜像站-阿里云开发者社区阿里巴巴开源镜像站&#xff0c;免费提供Linux镜像下载服务&#xff0c;拥有Ubuntu、CentOS、Deepin、MongoDB、Apache、Maven、Composer等多种开源软件镜像源&#xff0c;此外还提供域名解析D…

背包问题学习笔记-01背包

背景 背包问题是动态规划问题中的一个大类&#xff0c;学习背包问题对于掌握动态规划十分重要。背包问题也很容易成为程序员算法面试中的一个槛&#xff0c;但其实背包问题已经被研究&#xff0c;讲解的比较成熟了&#xff0c;在这些丰富的讲解资料的基础之上&#xff0c;大家…

图注意网络(GAT)的可视化实现详解

能够可视化的查看对于理解图神经网络(gnn)越来越重要&#xff0c;所以在这篇文章中&#xff0c;我将介绍传统GNN层的实现&#xff0c;然后展示ICLR论文“图注意力网络”中对传统GNN层的改进。 假设我们有一个表示为有向无环图(DAG)的文本文档图。文档0与文档1、2和3有一条边&am…

第72步 时间序列建模实战:单步滚动预测(以决策树回归为例)

基于WIN10的64位系统演示 一、写在前面 从这一期开始&#xff0c;我们开始基于python构建各种机器学习和深度学习的时间序列预测模型&#xff0c;本质上就是调用各种模型的回归分析的属性。所以很多模型其实之前都介绍过&#xff0c;比如说决策树、SVM等等。 同样&#xff0…

【踩坑篇】代码中使用 Long 作为 Map的Key存在的问题

本周的工作结束&#xff0c;详述一些在项目代码中实际遇到的一些坑。 代码中遇到这样一个场景&#xff1a; 有个业务接口&#xff0c;接口返回的值是一个JSON格式的字符串&#xff0c;通过JSON解析的方式&#xff0c;解析为格式为&#xff1a; Map<Long, Map<String, O…

数据结构——时间复杂度与空间复杂度

目录 一.什么是空间复杂度与时间复杂度 1.1算法效率 1.2时间复杂度的概念 1.3空间复杂度的概念 二.如何计算常见算法的时间复杂度 2.1大O的渐近表示法 使用规则 三.如何计算常见算法的空间复杂度 3.1 大O渐近表示法 3.2 面试题——消失的数字 3.3 面试题——旋转数组 一…

ChatGPT是留学生的论文神器还是学术不端的罪魁祸首?

当今时代&#xff0c;ChatGPT无疑是大数据和人工智能的完美结合&#xff0c;成为了搜索技术的革命性创新。几秒钟&#xff0c;一篇逻辑清晰、观点鲜明、有充分论据支持的文章即可生成。这种革命性的创新在学术界掀起了巨大的浪潮&#xff0c;甚至让全球的学校开始思考&#xff…

【GAN入门】生成 AI的概念

一、说明 GAN是生成对抗网络&#xff08;Generative Adversarial Network&#xff09;的缩写&#xff0c;是一种无监督学习算法&#xff0c;由Goodfellow等人于2014年提出。GAN由一个生成器网络和一个判别器网络组成&#xff0c;通过二者之间的对抗来训练生成器网络生成与真实样…

深入了解Python数据类型及应用

Python提供了一组丰富的内置数据类型&#xff0c;使您能够在程序中处理不同类型的数据。核心数值类型包括整数、浮点数和复数。整数表示整数&#xff0c;对于精确的计数和计算非常有用。 浮点数表示具有小数精度的实数&#xff0c;这对科学和统计计算非常重要。复数将数字扩展到…

C++系列赋值运算符重载

赋值运算符重载 类的默认函数拷贝构造函数和赋值运算符 重载赋值运算符相关注意事项 类的默认函数 一个类至少有4个默认函数&#xff1a; 默认构造函数拷贝构造函数析构函数赋值运算符重载函数 拷贝构造函数和赋值运算符 拷贝构造函数是在创建类的时候调用的&#xff0c;之…