腾讯mini项目-【指标监控服务重构】2023-08-26

news2024/12/26 10:57:41

今日已办

Venus 的 Trace 无感化

定义 handler 函数

  1. fiber.Handler 的主要处理逻辑
  2. 返回处理中出现的 error
  3. 返回处理中响应 json 的函数
// handler
// @Description:
// @Author xzx 2023-08-26 18:00:03
// @Param c
// @Return error
// @Return func() error : function for response json
type handler func(c *fiber.Ctx) (error, func() error)

定义TraceWrapper函数

  1. otel-trace 的逻辑嵌入 handler 中

  2. 启动 span

  3. 执行 handler,记录 span 的 attributes

  4. 根据返回的 err, jsonRespFunc 分情况讨论

  5. 条件处理逻辑
    err != nil && jsonRespFunc == nil存在错误,将错误记录到Span中,结束Span,执行c.Next()
    err != nil && jsonRespFunc != nil存在错误,将错误记录到Span中,结束Span,响应JSON
    err == nil && jsonRespFunc == nil结束Span,执行c.Next()
    err == nil && jsonRespFunc != nil结束Span,响应JSON
  6. 代码实现

// TraceWrapper return fiber.Handler integrate report otel-trace
// @Description integrate otel-trace logic in handler
// @Author xzx 2023-08-26 18:00:03
// @Param f
// @Param opts
// @Return fiber.Handler
func TraceWrapper(f handler, opts ...trace.SpanStartOption) fiber.Handler {
	return func(c *fiber.Ctx) error {
		_, span := otelclient.Tracer.Start(c.UserContext(), runtime.GetFunctionName(f), opts...)
		// execute handler logic
		err, jsonRespFunc := f(c)
		// todo: setSpanAttributes
		if err != nil {
			// record span error
			span.RecordError(err)
			span.SetStatus(codes.Error, err.Error())
			span.End()
			if jsonRespFunc != nil {
				// response error result
				return jsonRespFunc()
			}
			// ignore error, continue handlers
			return c.Next()
		}
		span.End()
		if jsonRespFunc != nil {
			// response success result
			return jsonRespFunc()
		}
		// err == nil, jsonRespFunc == nil
		return c.Next()
	}
}

具体的 handler 逻辑

// SplitAndValidate split multi-events and validate uploaded data
// @Description
// @Author xzx 2023-08-26 17:54:21
// @Param c
// @Return Err
// @Return f
func SplitAndValidate(c *fiber.Ctx) (Err error, f func() error) {
	log.Logger().Debug("split and validate", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))

	otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
		attribute.String("type", "all_upload"),
	)))

	RequestIdBaggage, err := baggage.NewMember("request_id", c.GetRespHeader(fiber.HeaderXRequestID))
	if err != nil {
		log.Logger().Error("Create Baggage Member Failed", zap.Error(err))
	}
	Baggage, err := baggage.New(RequestIdBaggage)
	if err != nil {
		log.Logger().Error("Create Baggage Failed", zap.Error(err))
	}
	c.SetUserContext(baggage.ContextWithBaggage(c.UserContext(), Baggage))

	c.Accepts(fiber.MIMEMultipartForm)
	uploadedData, err := parseJSON(c)
	if err != nil {
		log.Logger().Error("failed to parse json", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
		c.Status(fiber.StatusBadRequest)
		return err, func() error {
			return c.JSON(protocol.Response{
				Code: protocol.AllFail,
				Data: err.Error(),
			})
		}
	}
	if err = uploadedData.Meta.Validate(); err != nil {
		log.Logger().Error("failed to validate meta", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
		otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
			attribute.String("type", "wrong_meta"),
		)))
		c.Status(fiber.StatusBadRequest)
		return err, func() error {
			return c.JSON(protocol.Response{
				Code: protocol.AllFail,
				Data: err.Error(),
			})
		}
	}

	// must use pointer when using Locals of fiber if it's about to modify
	events := make([]*schema.Event, 0, len(uploadedData.Data))
	parts := make([]*protocol.Part, 0, len(uploadedData.Data))

	for idx, data := range uploadedData.Data {
		log.Logger().Debug("split event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))
		event := &schema.Event{}
		part := &protocol.Part{}
		if err = data.Validate(); err != nil {
			Err = err
			otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
				attribute.String("type", "wrong_data"),
			)))
			part.Code = protocol.ValidationError
			part.Data = err.Error()
		}
		part.ID = data.ID

		event.Meta = uploadedData.Meta
		event.Data = data

		events = append(events, event)
		parts = append(parts, part)
	}

	c.Locals("meta", uploadedData.Meta)
	c.Locals("events", events)
	c.Locals("parts", parts)

	return Err, nil
}

// HandleEvent handle event
// @Description
// @Author xzx 2023-08-26 17:54:23
// @Param c
// @Return Err
// @Return f
func HandleEvent(c *fiber.Ctx) (Err error, f func() error) {
	log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))

	events := c.Locals("events").([]*schema.Event)
	parts := c.Locals("parts").([]*protocol.Part)

	for idx, event := range events {
		log.Logger().Debug("handle event", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Int("idx", idx))
		event.BackendID = uuid.NewString()
		event.UploadTime = time.Now().UnixMilli()

		part := parts[idx]
		part.BackendID = event.BackendID
		country, region, city, err := ip.ParseIP(event.IP)
		if err != nil {
			Err = err
			log.Logger().Warn("failed to parse ip", zap.Error(err), zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())))
			part.Code = protocol.IPParsingError
			part.Data = err.Error()
		}
		log.Logger().Debug("parsed ip", zap.String("client", c.IP()), zap.String("ip", event.IP), zap.String("agent", string(c.Context().UserAgent())), zap.String("country", country), zap.String("region", region), zap.String("city", city))
		event.Country = country
		event.Region = region
		event.City = city
	}

	return Err, nil
}

// WriteKafka write to kafka
// @Description
// @Author xzx 2023-08-26 17:54:26
// @Param c
// @Return Err
// @Return f
func WriteKafka(c *fiber.Ctx) (Err error, f func() error) {
	log.Logger().Debug("write kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))

	meta := c.Locals("meta").(schema.Meta)
	events := c.Locals("events").([]*schema.Event)
	parts := c.Locals("parts").([]*protocol.Part)

	// mark if all parts are succeed, which is response for code
	isAllSuccess := true
	// topic is like to_analyzer__0.PERF_CRASH
	topic := fmt.Sprintf("to_analyzer__0.%s", meta.Category)

	traceparent := c.Get(traceparentHeaderKey)
	if len(traceparent) == 55 {
		spanId := trace.SpanFromContext(c.UserContext()).SpanContext().SpanID().String()
		traceparent = traceparent[:36] + spanId + traceparent[52:]
	}

	messages := make([]kafka.Message, 0, len(events))
	for idx, event := range events {
		// skip if event was failed
		if parts[idx].Code != 0 {
			isAllSuccess = false
			continue
		}
		bytes, err := sonic.Marshal(event)
		if err != nil {
			Err = err
			log.Logger().Error("failed to marshal event", zap.Error(err), zap.Any("event", event), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
			parts[idx].Code = protocol.SerializationError
			parts[idx].Data = err.Error()
		}
		messages = append(messages, kafka.Message{
			Topic: topic,
			Value: bytes,
			Headers: []kafka.Header{
				{Key: traceparentHeaderKey, Value: []byte(traceparent)},
			},
		})
	}

	if len(messages) == 0 { // would not write to kafka since every part were failed for some reason
		log.Logger().Warn("every data were failed to handle, would not write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
		c.Status(fiber.StatusBadRequest)
		return errors.New("every data were failed to handle, check their code and data"), func() error {
			return c.JSON(protocol.Response{
				Code:  protocol.AllFail,
				Data:  "every data were failed to handle, check their code and data",
				Parts: parts,
			})
		}
	}

	log.Logger().Info("would write to kafka", zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())))
	kafkaProducer := connector.GetEventKafka()
	if err := kafkaProducer.WriteMessages(context.Background(), messages...); err != nil {
		log.Logger().Error("failed to write to kafka", zap.Error(err), zap.String("client", c.IP()), zap.String("agent", string(c.Context().UserAgent())), zap.Any("messages", messages))
		c.Status(fiber.StatusInternalServerError)
		return err, func() error {
			return c.JSON(protocol.Response{
				Code:  protocol.AllFail,
				Data:  fmt.Sprintf("failed to write to kafka: %s", err.Error()),
				Parts: parts,
			})
		}
	}
	if isAllSuccess {
		c.Status(fiber.StatusOK)
		otelclient.ReportCounter.Add(c.UserContext(), 1, metric.WithAttributeSet(attribute.NewSet(
			attribute.String("type", "success_upload"))))
		return nil, func() error {
			return c.JSON(protocol.Response{
				Code:  protocol.AllSuccess,
				Parts: parts,
			})
		}
	} else {
		c.Status(fiber.StatusPartialContent)
		return Err, func() error {
			return c.JSON(protocol.Response{
				Code:  protocol.PartialSuccess,
				Data:  "some data were failed to handle, check their code and data",
				Parts: parts,
			})
		}
	}
}

同步会议

进度

  1. 完成了 venus、profile 逻辑的 otel-trace 接入 handler 无感化,提高代码可扩展性
  2. otel上报的开关,已经 review 完合并了
  3. 部署了jaeger的整套方案
  4. 关于 watermill 和 baserunner 的 banchmark

Review和测试方案

  1. 代码 review
    1. fiber.Handler 接入 otel 无感化,代码可扩展性
    2. 修复部分命名规范、注释规范和代码质量检查中指出的问题
    3. 移除 profile-compose 的 grafana 容器
    4. log 的初始化
    5. 移除 venus-compose 的无用配置
  2. 测试方案
    1. 接入 ck 集群,benchmark 进行 otel-sdk 上报,压测 otel-collector 和 ck 集群
      1. 【卡点】对 collector 压测暂时对 ck 没有造成很大压力
    2. 部署了 jaeger 的整套方案,使用 es 存储 trace、Prometheus 存储 metrics
      1. 【差异】jaeger 的 trace 可视化出来,使用 span_references - followsform 会展示为父子关系的 span
      2. es 已购买集群,等待接入
      3. 【方案】对 jaeger 进行压测,找到 jaeger 出现问题的 qps,要该 qps 来测试 a、b两组的方案
      4. 【方法】在压测的过程用 pprof 来抓取 venus 和 profile 的 cpu、内存的使用情况
    3. 对于 ck 集群的指标
      1. 【插入】ck 的 写入耗时,1/5分钟的写入成功率
      2. 【查询】ck 的 查询耗时,查询成功率
      3. 【资源利用率】ck 的 cpu、内存利用率
    4. 【问题】如何证明我们在监控服务差异、优劣方面的断言,具体的测试流程、测试对象、测试指标对比等

总结

  1. 对比上报相同的 metrics 测试,收集器(jaeger、otel)collector(容器)的差异,展示otel的优势,cpu、内存的,监控 collector 的差异,cadvisor
  2. 相同上报比如 trace,对比 es、ck 的存储大小对比,kibana
  3. 简单拓展说明:otel-log,故障的trace的地方,具体问题的原因
  4. 查询性能方面:性能测试-响应耗时,用 signoz,jaeger 的 web-url 来压测-主要使用Web,控制变量(同一个trace和span,清空缓存,主要 tcp 压测),
  5. 扩展:watermill/baserunner 的对比,高并发场景下比较优秀
  6. 扩展:benchmark 的 hyperscan 和官方正则处理的对比

数据记录、控制变量

cadvisor

image-20230826231631007

services:
  cadvisor:
    image: gcr.io/cadvisor/cadvisor:latest
    container_name: cadvisor
    networks:
      - backend
    command: --url_base_prefix=/cadvisor
    volumes:
      - /:/rootfs:ro
      - /var/run:/var/run:rw
      - /sys:/sys:ro
      - /var/lib/docker/:/var/lib/docker:ro
      - /dev/disk/:/dev/disk:ro

jaeger-all-in-one 分开部署

为了收集 jaeger-collector 的指标,把 jaeger-all-in-one 分开部署

services:
  jaeger-collector:
    image: jaegertracing/jaeger-collector
    container_name: jaeger-collector
    networks:
      - backend
    command: [
      "--es.server-urls=http://elasticsearch:9200",
      "--es.num-shards=1",
      "--es.num-replicas=0",
      "--log-level=info"
    ]
    environment:
      - SPAN_STORAGE_TYPE=elasticsearch
      - METRICS_STORAGE_TYPE=prometheus
      - PROMETHEUS_SERVER_URL=http://prometheus:9090
    depends_on:
      - elasticsearch
  jaeger-query:
    image: jaegertracing/jaeger-query
    container_name: jaeger-query
    networks:
      - backend
    command: [
      "--es.server-urls=http://elasticsearch:9200",
      "--span-storage.type=elasticsearch",
      "--log-level=info"
    ]
    environment:
      - SPAN_STORAGE_TYPE=elasticsearch
      - METRICS_STORAGE_TYPE=prometheus
      - PROMETHEUS_SERVER_URL=http://prometheus:9090
      - no_proxy=localhost
      - QUERY_BASE_PATH=/jaeger
    depends_on:
      - jaeger-agent
  jaeger-agent:
    image: jaegertracing/jaeger-agent
    container_name: jaeger-agent
    networks:
      - backend
    command: [ "--reporter.grpc.host-port=jaeger-collector:14250" ]
    environment:
      - SPAN_STORAGE_TYPE=elasticsearch
      - METRICS_STORAGE_TYPE=prometheus
      - PROMETHEUS_SERVER_URL=http://prometheus:9090
    depends_on:
      - jaeger-collector

明日待办

  1. 压测

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1041004.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Redis】记录一次K8S存储故障导致Redis集群拓扑异常的修复过程

文章目录 背景处理新节点遗忘旧节点 背景 集群部署在K8S环境内,存储使用的localpv,有一台K8S主机节点磁盘故障,导致在该节点上的redis节点均出现故障,主要表现为持久化失败、集群拓扑异常,持久化失败可以临时关闭RDB和…

Python:pyts库中的GramianAngularField

您想要使用pyts库中的GramianAngularField类,这是一个用于时间序列数据图像转换的工具。要使用这个类,首先确保您已经安装了pyts库。如果尚未安装,您可以使用以下命令来安装它: pip install pyts一旦安装完成,您可以通…

sql分词查询,实现类似ES的效果

需求:希望通过缩写查询到全称,列如输入常州一院,要得到常州市第一人民医院。 1、创建全文索引 # 创建全文索引 create FULLTEXT INDEX ft_hospitalname ON hospital_information(hospitalname) with parser ngram;2、编写查询sql # 自然语…

计算机专业毕业设计项目推荐09-个人医疗系统(Spring+Js+Mysql)

个人医疗系统(SpringJsMysql) **介绍****系统总体开发情况-功能模块****各部分模块实现** 介绍 本系列(后期可能博主会统一为专栏)博文献给即将毕业的计算机专业同学们,因为博主自身本科和硕士也是科班出生,所以也比较了解计算机专业的毕业设计流程以及…

【python】pycharm导入anaconda环境

参考 Pycharm导入anaconda环境的教程图解 - 知乎 (zhihu.com)

el-table实现穿梭功能

第一种 <template><el-row :gutter"20"><el-col :span"10"><!-- 搜索 --><div class"search-bg"><YcSearchInput title"手机号" v-model"search.phone" /><div class"search-s…

【算法训练-动态规划】一 连续子数组的最大和

废话不多说&#xff0c;喊一句号子鼓励自己&#xff1a;程序员永不失业&#xff0c;程序员走向架构&#xff01;本篇Blog的主题是【动态规划】&#xff0c;使用【数组】这个基本的数据结构来实现&#xff0c;这个高频题的站点是&#xff1a;CodeTop&#xff0c;筛选条件为&…

代数——第2章——群

第 2 章 群(Groups) II est peu de notions en mathematiques qui soient plus primitives que celle de loi de composition. (数学中很少有比合成律更原始的概念了。) --------------------------------------------------------Nicolas Bourbaki 2.1 合成律(LAWS OF CO…

Python中的封装

迷途小书童 读完需要 3分钟 速读仅需 1 分钟 当我们谈到 Python 中的封装时&#xff0c;可以将其类比为一个礼物盒子。封装是面向对象编程的一个重要概念&#xff0c;它允许我们将数据和相关的方法包装在一个单独的单元中&#xff0c;就像将礼物放在一个盒子里一样。 在 Python…

nginx_0.7.65_00截断_nginx解析漏洞

nginx_0.7.65_00截断_nginx解析漏洞 文章目录 nginx_0.7.65_00截断_nginx解析漏洞1 环境搭建1 解压nginx_0.7.652 双击启动&#xff0c;如有闪退&#xff0c;端口占用的情况&#xff0c;在conf文件nginx.conf修改一下端口号3 查看一下进程有nginx4 启动成功访问127.0.0.1:18080…

SpringBoot全局异常处理源码

SpringBoot全局异常处理源码 一、SpringMVC执行流程二、SpringBoot源码跟踪三、自定义优雅的全局异常处理脚手架starter自定义异常国际化引入封装基础异常封装基础异常扫描器&#xff0c;并注册到ExceptionHandler中项目分享以及改进点 一、SpringMVC执行流程 今天这里叙述的全…

一、imx6ull 最新交叉编译工具下载地址,及安装方法

IMX6ULL为Cortex-A7单核处理器&#xff0c;架构为32位&#xff0c;支持硬件浮点功能。所以下载如下图所示交叉编译工具 linaro GNU-A 针对Cortex-A系列版本 ARM官方稳定版本&#xff0c; ARM官网下载地址:https://developer.arm.com/downloads/-/gnu-a 百度网盘地址&#xff…

消息队列(RabbitMQ+RocketMQ+Kafka)

消息队列是一种应用程序之间通过异步通信进行数据交换的通信模式 消息队列的类型&#xff1a; 点对点&#xff0c;一对一的消息传递模型&#xff0c;其中每个消息只能被一个接收者消费。发送者将消息发送到队列中&#xff0c;而接收者从队列中获取消息并进行处理&#xff0c;…

ElasticSearch - DSL查询文档语法,以及深度分页问题、解决方案

目录 一、DSL 查询文档语法 前言 1.1、DSL Query 基本语法 1.2、全文检索查询 1.2.1、match 查询 1.2.2、multi_match 1.3、精确查询 1.3.1、term 查询 1.3.2、range 查询 1.4、地理查询 1.4.1、geo_bounding_box 1.4.2、geo_distance 1.5、复合查询 1.5.1、相关…

mac 解决 vscode 权限不足问题,Insufficient permissions

commod 空格&#xff0c;输入终端并打开写入指令 sudo chown -R xxxxxx1 xxxxx2&#xff08;例如我的sudo chown -R admin Desktop&#xff0c;具体参数查看下方&#xff09; x1: 用户名&#xff0c;可通过左上角查看 x2: 目标文件夹。可以另起一个终端&#xff0c;用cd 和 l…

第1关:Hive 的 Alter Table 操作

相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a; 1.Alter Table 命令 Alter Table 命令 Alter Table 命令 可以在 Hive 中修改表名&#xff0c;列名&#xff0c;列注释&#xff0c;表注释&#xff0c;增加列&#xff0c;调整列顺序&#xff0c;属性名等操作。…

光谱-空间特征分割提取:多光谱图像压缩

Spectral–Spatial Feature Partitioned Extraction Based on CNN for Multispectral Image Compression &#xff08;基于CNN的光谱-空间特征分割提取多光谱图像压缩&#xff09; 近年来&#xff0c;多光谱成像技术的迅速发展引起了各领域的高度重视&#xff0c;这就不可避免…

[vulntarget靶场] vulntarget-c

靶场地址&#xff1a; https://github.com/crow821/vulntarget 拓扑结构 信息收集 主机发现 netdiscover -r 192.168.111.0/24 -i eth0端口扫描 nmap -A -sC -v -sV -T5 -p- --scripthttp-enum 192.168.111.131访问80端口&#xff0c;发现为Laravel v8.78.1框架 vulmap探测…

Windows--Python永久换下载源

1.新建pip文件夹&#xff0c;注意路径 2.在上述文件中&#xff0c;新建文件pip.ini 3.pip.ini记事本打开&#xff0c;输入内容&#xff0c;保存完事。 [global] index-url https://pypi.douban.com/simple

和 Node.js 说拜拜,Deno零配置解决方案

不知道大家注意没有&#xff0c;在我们启动各种类型的 Node repo 时&#xff0c;root 目录很快就会被配置文件塞满。例如&#xff0c;在最新版本的 Next.js 中&#xff0c;我们就有 next.config.js、eslintrc.json、tsconfig.json 和 package.json。而在样式那边&#xff0c;还…