腾讯mini项目-【指标监控服务重构】2023-08-18

news2025/1/12 16:11:30

今日已办

watermill

将 key 设置到 message 中

修改 watermill-kafka 源码 将 key 设置到 message.metadata中

image-20230818224812801

image-20230818224728773

接入 otel-sdk

  1. 添加 middleware resolveUpstreamCtx 解析上游上下文,开启根Span
  2. 添加 middleware middleware.InstantAck - 马上ACK,使得多条消息可以平行处理(走middleware 和 handler 的逻辑)
// Package pubsub
// @Author xzx 2023/8/12 10:01:00
package pubsub

import (
	"context"
	"encoding/json"
	"github.com/ThreeDotsLabs/watermill/message"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/propagation"
	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
	"go.opentelemetry.io/otel/trace"
	"go.uber.org/zap"
	"profile/internal/connector"
	"profile/internal/log"
	"profile/internal/otelclient"
	"profile/internal/schema"
	"profile/internal/schema/performance"
	"profile/internal/state"
	"profile/internal/watermill/watermillkafka"
)

// consumeCtxData
// @Description: Data collection of a message processing context
// @Author xzx 2023-08-17 13:36:12
type consumeCtxData struct {
	Status        int
	Event         schema.Event
	RootSpan      trace.Span
	RootSpanCtx   context.Context
	AppID         string // API 上报
	FetchScenario string // API 上报
}

// resolveUpstreamCtx
// @Description
// @Author xzx 2023-08-18 11:15:09
// @Param h
// @Return message.HandlerFunc
func resolveUpstreamCtx(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		var data consumeCtxData
		// get upstream producer's W3C trace context via propagation
		headerCarrier := make(propagation.HeaderCarrier)

		headerCarrier.Set("Traceparent", msg.Metadata.Get("Traceparent"))
		upstreamProducerCtx := otel.GetTextMapPropagator().Extract(msg.Context(), headerCarrier)
		// set traceID to consumer context
		consumerCtx := trace.ContextWithRemoteSpanContext(msg.Context(),
			trace.NewSpanContext(trace.SpanContextConfig{
				TraceID: trace.SpanContextFromContext(upstreamProducerCtx).TraceID(),
			}))
		//start tracing
		data.RootSpanCtx, data.RootSpan = otelclient.ConsumerTracer.Start(consumerCtx, "Profile-Consumer",
			trace.WithSpanKind(trace.SpanKindConsumer),
			trace.WithLinks(trace.LinkFromContext(upstreamProducerCtx, semconv.OpentracingRefTypeFollowsFrom)))

		msg.SetContext(context.WithValue(msg.Context(), "data", &data))
		return h(msg)
	}
}

// unpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param hmsg.SetContext(context.WithValue(msg.Context(), "data", data))
// @Return message.HandlerFunc
func unpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		data := msg.Context().Value("data").(*consumeCtxData)

		unpackKafkaMessageCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "unpackKafkaMessage",
			trace.WithSpanKind(trace.SpanKindClient))
		// 反序列化,存入通用结构体
		if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {
			data.Status = state.StatusUnmarshalError
			handlerErr(unpackKafkaMessageCtx, "unmarshal error", contextErr)
			span.End()
			return nil, contextErr
		}
		log.Logger.InfoContext(unpackKafkaMessageCtx, "[UnpackKafkaItem] unpack kafka item success",
			zap.Any("event", data.Event),
			zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))

		setSpanAttributes(span, data)

		msg.SetContext(context.WithValue(msg.Context(), "data", data))
		span.End()
		return h(msg)
	}
}

// initPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func initPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		data := msg.Context().Value("data").(*consumeCtxData)

		initPerformanceEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "initPerformanceEvent",
			trace.WithSpanKind(trace.SpanKindInternal))
		event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)
		if contextErr != nil {
			data.Status = state.StatusEventFactoryError
			handlerErr(initPerformanceEventCtx, "event factory error", contextErr)
			span.End()
			return nil, contextErr
		}
		log.Logger.InfoContext(initPerformanceEventCtx, "[initPerformanceEvent] init performance event success",
			zap.Any("event", event),
			zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
		data.Event.ProfileData = event

		setSpanAttributes(span, data)

		msg.SetContext(context.WithValue(msg.Context(), "data", data))
		span.End()
		return h(msg)
	}
}

// analyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func analyzeEvent(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		data := msg.Context().Value("data").(*consumeCtxData)

		analyzeEventCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "analyzeEvent",
			trace.WithSpanKind(trace.SpanKindInternal))

		contextErr := data.Event.ProfileData.Analyze()
		if contextErr != nil {
			data.Status = state.StatusAnalyzeError
			handlerErr(analyzeEventCtx, "analyze event error", contextErr)
			span.End()
			return nil, contextErr
		}
		log.Logger.InfoContext(analyzeEventCtx, "[analyzeEvent] analyze performance event success",
			zap.Any("event", data.Event),
			zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
		// clear dimensions and values
		data.Event.Dimensions = nil
		data.Event.Values = nil

		setSpanAttributes(span, data)

		msg.SetContext(context.WithValue(msg.Context(), "data", data))
		span.End()
		return h(msg)
	}
}

// crashHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func crashHandler(msg *message.Message) ([]*message.Message, error) {
	data := msg.Context().Value("data").(*consumeCtxData)

	writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",
		trace.WithSpanKind(trace.SpanKindProducer))
	toWriteBytes, contextErr := json.Marshal(data.Event)
	if contextErr != nil {
		data.Status = state.StatusUnmarshalError
		handlerErr(writeKafkaCtx, "marshal error", contextErr)
		span.End()
		return nil, contextErr
	}

	msg = message.NewMessage(data.Event.BackendID, toWriteBytes)
	msg.Metadata.Set(watermillkafka.HeaderKey, data.Event.ID)
	log.Logger.Info("[4-crashHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))

	setSpanAttributes(span, data)
	span.End()
	data.RootSpan.End()
	return message.Messages{msg}, nil
}

// lagHandler
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func lagHandler(msg *message.Message) ([]*message.Message, error) {
	data := msg.Context().Value("data").(*consumeCtxData)

	writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",
		trace.WithSpanKind(trace.SpanKindProducer))
	toWriteBytes, contextErr := json.Marshal(data.Event)
	if contextErr != nil {
		data.Status = state.StatusUnmarshalError
		handlerErr(writeKafkaCtx, "marshal error", contextErr)
		span.End()
		return nil, contextErr
	}

	msg = message.NewMessage(data.Event.BackendID, toWriteBytes)
	log.Logger.Info("[4-lagHandler] write kafka ...", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))
	setSpanAttributes(span, data)
	span.End()
	data.RootSpan.End()
	return message.Messages{msg}, nil
}

// setSpanAttributes
// @Description  setSpanAttributes
// @Author xzx 2023-08-03 23:19:17
// @Param span
// @Param profileCtx
func setSpanAttributes(span trace.Span, data *consumeCtxData) {
	if span.IsRecording() {
		span.SetAttributes(
			attribute.String("event.category", data.Event.Category),
			attribute.String("event.backend_id", data.Event.BackendID),
		)
	}
}

// handlerErr
// @Description
// @Author xzx 2023-07-20 15:36:46
// @Param span
// @Param ctx
// @Param msg
// @Param err
func handlerErr(ctx context.Context, msg string, err error) {
	log.Logger.ErrorContext(ctx, msg, zap.Error(err))
}

会议纪要

进度

  1. venus 的 metrics 独立分支开发
  2. venus 的 trace 修复了一些bug
    1. 返回 error 主动调用 span.end()
  3. profile 的 watemill pub/sub 和 trace 上报还原原本功能
  4. profile 的 hyperscan 的继续调研中

待办

  1. 调研如何关闭otel,设置开关配置
    1. 找到了 sdktrace.WithSampler(sdktrace.NeverSample()) - 可以不上报 trace
  2. 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
  3. 调研 watermill 的 otelEnabled 的功能,其他集成 otel 的第三方库等
  4. hyperscan 的 benchmark

明日待办

  1. 性能benchmark,单topic、多topic的场景下,对比原profile代码和现有watermill代码性能
  2. watermill-kafka 的源码基础上对 publisher 的 写回 kafka 添加 otel - trace & log 的逻辑

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1016991.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

怒刷LeetCode的第4天(Java版)

#【中秋征文】程序人生,中秋共享# 目录 第一题 题目来源 题目内容 解决方法 方法一:遍历字符串 方法二:有限状态机(Finite State Machine) 方法三:正则表达式 第二题 题目来源 题目内容 解决方…

机器学习——决策树/随机森林

0、前言: 决策树可以做分类也可以做回归,决策树容易过拟合决策树算法的基本原理是依据信息学熵的概念设计的(Logistic回归和贝叶斯是基于概率论),熵最早起源于物理学,在信息学当中表示不确定性的度量&…

带你了解前后端分离的秘密-Vue【vue入门】

🏅我是默,一个在CSDN分享笔记的博主。📚📚 🌟在这里,我要推荐给大家我的专栏《Vue》。🎯🎯 🚀无论你是编程小白,还是有一定基础的程序员,这个专栏…

js中事件委托和事件绑定之间的区别

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 事件绑定(Event Binding)⭐事件委托(Event Delegation)⭐ 选择事件绑定或事件委托⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本…

NVM安装及如何使用NVM

NVM是什么? nvm 全名 Node Version Manager,Node的版本管理工具 NVM能做什么? 安装 nvm 后,可以使用nvm的相关命令来管理和切换不同的 node 版本,方便开发 如何安装NVM 链接: NVM GitHub地址 如何使用 NVM 命令 …

R语言绘制PCA双标图

代码&#xff1a; setwd("D:/Desktop/0000/R") #更改路径#导入数据 df <- read.table("Input data.csv", header T, sep ",")# ----------------------------------- #所需的包: packages <- c("ggplot2", "tidyr"…

1.简单工厂模式

UML类图 代码 main.cpp #include <iostream> #include "OperationFactory.h" using namespace std;int main(void) {float num1;float num2;char operate;cin >> num1 >> num2 >> operate;Operation* oper OperationFactory::createOpera…

算法综合篇专题四:前缀和

"回忆里的我&#xff0c;比国王富有。奢侈的快乐~" 1、前缀和【模板】 (1) 题目解析 (2) 算法原理 #include <iostream> using namespace std;const int N 100010; // 可能出现溢出 long long arr[N],dp[N]; int n,q;int main() {cin >> n …

长胜证券:突破五日线什么意思?

随着股市的快速开展&#xff0c;越来越多的人开端了解和参与股票投资&#xff0c;但或许会遇到一些术语和概念&#xff0c;例如“打破五日线”&#xff0c;这是新手们需求了解的。本文将介绍“打破五日线”的概念及其意义&#xff0c;同时从不同视点剖析其意义和影响因素。 什…

记录wisemodel上传失败

参考&#xff1a;https://wisemodel.cn/docs/%E6%A8%A1%E5%9E%8B%E4%B8%8A%E4%BC%A0 第一种方法&#xff1a; git lfs install git clone https://oauth2:your_git_tokenwww.wisemodel.cn/username/my_test_model.git也就是用oauth2&#xff0c;然后再按照一般的方法传文件&a…

(三十三)大数据实战——Canal安装部署及其应用案例实战

前言 Canal 是一个开源的MySQL数据库binlog监听和解析框架&#xff0c;用于实时捕获 MySQL数据库的binlog 变更事件&#xff0c;并将其解析成易于消费的数据格式。Canal 可以实时监听 MySQL 数据库的 binlog&#xff0c;并即时捕获数据库的数据变更事件。Canal可以将捕获到的b…

C++ function<>和bind()

一、可调用对象 介绍两个概念&#xff1a;调用运算符和可调用对象 调用运算符 调用运算符&#xff0c;即&#xff1a;() 。跟随在函数名之后的一对括号 “()”&#xff0c;起到调用函数的效果&#xff0c;传递给函数的参数放置在括号内。 可调用对象 对于一个对象或者一个表…

位图+布隆过滤器+海量数据问题(它们都是哈希的应用)

一)位图: 首先计算一下存储一下10亿个整形数据&#xff0c;需要多大内存呢&#xff0c;多少个G呢&#xff1f; 2^3010亿&#xff0c;10亿个字节 byte kb mb gb 100000000个字节/1024/1024/10241G 所以10亿个字节就是1G&#xff0c;所以40亿个字节就是4G&#xff0c;也就是10个整…

Swing基本组件的用法(一)

语雀笔记&#xff1a;https://www.yuque.com/huangzhanqi/rhwoir/paaoghdyv0tgksk1https://www.yuque.com/huangzhanqi/rhwoir/paaoghdyv0tgksk1Java图形化界面: Java图形化界面学习demo与资料 (gitee.com)https://gitee.com/zhanqi214/java-graphical-interface Swing组件层次…

机器学习笔记 - 视频分析和人类活动识别技术路线简述

一、理解人类活动识别 首先了解什么是人类活动识别,简而言之,是对某人正在执行的活动/动作进行分类或预测的任务称为活动识别。 我们可能会有一个问题:这与普通的分类任务有什么不同?这里的问题是,在人类活动识别中,您实际上需要一系列数据点来预测正确执行的动作。 看看…

Python 多进程异常

这里写目录标题 1、捕获异常2、退出程序3、进程共享变量4、multiprocessing的Pool所起的进程中再起进程 1、捕获异常 https://zhuanlan.zhihu.com/p/321408784 try:<语句> except Exception as e:print(异常说明,e)1 捕获所有异常 包括键盘中断和程序退出请求&#xff0…

一个Binder的前生今世 (一):Service的创建

一个Binder的前生今世 (一):Service的创建 一个Binder的前生今世Binder的历史 (字面意义的前生今世)Binder的生命周期(抽象意义的前生今世)Binder 应用及系统层关系图Binder应用层的架构设计Binder应用层实现Binder的创建服务端Binder的创建服务端Binder的传递Binder在客…

Trino HTTPS 与密码认证介绍与实战操作

文章目录 一、概述二、安装 Trino三、配置 HTTPS1&#xff09;生成证书2&#xff09;配置 Trino3&#xff09;修改 Trino docker-compose yaml 文件4&#xff09;开始部署 Trino5&#xff09;测试验证 四、密码认证1&#xff09;开启密码认证2&#xff09;创建密码认证配置文件…

AndroidStudio 安装与配置【安装教程】

1.下载软件 进入官网https://developer.android.google.cn/studio&#xff0c;直接点击下载 2.阅读并同意协议书 直接下滑至最底部 如果这里出现了无法访问 官方地址&#xff1a;https://redirector.gvt1.com/edgedl/android/studio/install/2022.3.1.19/android-studio-2022.…

java:杨辉三角形

public class YangHui {public static void main(String[] args){int yangHui[][] new int[10][];for (int i 0; i < yangHui.length;i){yangHui[i] new int[i 1];for (int j 0; j < yangHui[i].length; j){ // 最初和最后的数值都是1if (j 0 || j …