腾讯mini项目-【指标监控服务重构】2023-08-16

news2025/2/24 22:18:57

今日已办

v1

验证 StageHandler 在处理消息时是否为单例,【错误尝试】

type StageHandler struct {
}

func (s StageHandler) Middleware1(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		log.Logger.Info("StageHandler Middleware 1")
		fmt.Printf("%p\n", &s)
		return h(msg)
	}
}

func (s StageHandler) Middleware2(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		log.Logger.Info("StageHandler Middleware 2")
		fmt.Printf("%p\n", &s)
		return h(msg)
	}
}

func (s StageHandler) Middleware3(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		log.Logger.Info("StageHandler Middleware 3")
		fmt.Printf("%p\n", &s)
		return h(msg)
	}
}

func (s StageHandler) Handler1(msg *message.Message) error {
	log.Logger.Info("StageHandler Handler 1")
	fmt.Printf("%p\n", &s)
	return nil
}

image-20230816161140984

v2

  • 定义不同 Handler
type CrashHandler struct {
	Topic string
}

func (s CrashHandler) Handler1(msg *message.Message) error {
	log.Logger.Info(s.Topic + ": CrashHandler Handler 1 start")
	fmt.Printf("%p\n", &s)
	time.Sleep(1 * time.Second)
	log.Logger.Info(s.Topic + ": CrashHandler Handler 1 end")
	return nil
}

type LagHandler struct {
	Topic string
}

func (s LagHandler) Handler1(msg *message.Message) error {
	log.Logger.Info(s.Topic + ": LagHandler Handler 1 start")
	fmt.Printf("%p\n", &s)
	time.Sleep(1 * time.Second)
	log.Logger.Info(s.Topic + ": LagHandler Handler 1 end")
	return nil
}
  • 添加到router中
	for _, topic := range topics {
		var category string
		var handlerFunc message.NoPublishHandlerFunc
		if strings.Contains(topic, performance.CategoryCrash) {
			category = performance.CategoryCrash
			handlerFunc = CrashHandler{Topic: category}.Handler1
		} else if strings.Contains(topic, performance.CategoryLag) {
			category = performance.CategoryLag
			handlerFunc = LagHandler{Topic: category}.Handler1
		} else {
			continue
		}

		handler := router.AddNoPublisherHandler(topic+"test-handler", topic, subscriber, handlerFunc)
	}

image-20230816171659632

  • 结论
    • handler 实例会不断创建
    • 不同的 handler 可以并行处理不同主题的消息
    • 相同的 handler 在处理该主题的消息时是顺序的

官方文档: Message Router (watermill.io)

订阅者可以一次消费一条消息,也可以并行消费多条消息

  • Single stream of messages 是最简单的方法,这意味着在调用 msg.Ack() 之前,订阅者将不会收到任何新消息
  • Multiple message streams 仅部分订阅者支持。通过一次订阅多个主题分区,可以并行消费多条消息,甚至是之前未确认的消息(例如,Kafka 订阅者就是这样工作的) Router 通过运行并发 HandlerFuncs(每个分区一个)来处理此模型

v3

存在并发安全问题

  1. 公用一个上下文
  2. 频繁的修改上下文中的字段值
  3. 不同Handler和MiddleWare存在并发

解决思路

  • 将一次消息处理会使用到的数据集合定义为一个结构体
type ContextData struct {
	Status int
	Event  schema.Event

	AppID         string // API 上报
	FetchScenario string // API 上报
}
  • 使用message的Context来传递这个数据

image-20230816222038683

  • 移除掉 ProfileCtx 的相关设计
  • 使用watermillzap.Logger来替换本身的 LoggerAdapter,更加直观且与原项目适配image-20230816225840032

完整代码

profile/internal/watermill/consumer/consumer_context.go

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumer

import (
	"context"
	kc "github.com/Kevinello/kafka-client"
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/garsue/watermillzap"
	"github.com/qiulin/watermill-kafkago/pkg/kafkago"
	"go.uber.org/zap"
	"profile/internal/config"
	"profile/internal/connector"
	"profile/internal/log"
	"profile/internal/schema/performance"
	"strings"
	"time"
)

// Consume
// @Description
// @Author xzx 2023-08-16 22:52:52
func Consume() {
	logger := watermillzap.NewLogger(log.Logger)

	publisher, subscriber := newPubSub(logger)
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		log.Logger.Fatal("creates a new Router with given configuration error", zap.Error(err))
	}

	router.AddPlugin(plugin.SignalsHandler)
	router.AddMiddleware(
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,
		middleware.Recoverer,
	)

	getTopics := kc.GetTopicReMatch(strings.Split(config.Profile.GetString("kafka.topicRE"), ","))
	topics, err := getTopics(config.Profile.GetString("kafka.bootstrap"))
	if err != nil {
		log.Logger.Fatal("get topics failed", zap.Error(err))
		return
	}

	for _, topic := range topics {
		var category string
		var handlerFunc message.HandlerFunc
		if strings.Contains(topic, performance.CategoryCrash) {
			category = performance.CategoryCrash
			handlerFunc = CrashWriteKafka
		} else if strings.Contains(topic, performance.CategoryLag) {
			category = performance.CategoryLag
			handlerFunc = LagWriteKafka
		} else {
			continue
		}
		router.AddHandler(category, topic, subscriber, connector.GetTopic(category), publisher, handlerFunc).
			AddMiddleware(
				UnpackKafkaMessage,
				InitPerformanceEvent,
				AnalyzeEvent)
	}

	if err = router.Run(context.Background()); err != nil {
		log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))
	}
}

// newPubSub
// @Description
// @Author xzx 2023-08-16 22:52:45
// @Param logger
// @Return message.Publisher
// @Return message.Subscriber
func newPubSub(logger watermill.LoggerAdapter) (message.Publisher, message.Subscriber) {
	marshaler := kafkago.DefaultMarshaler{}
	publisher := kafkago.NewPublisher(kafkago.PublisherConfig{
		Brokers:     []string{config.Profile.GetString("kafka.bootstrap")},
		Async:       false,
		Marshaler:   marshaler,
		OTELEnabled: false,
		Ipv4Only:    true,
		Timeout:     100 * time.Second,
	}, logger)

	subscriber, err := kafkago.NewSubscriber(kafkago.SubscriberConfig{
		Brokers:       []string{config.Profile.GetString("kafka.bootstrap")},
		Unmarshaler:   marshaler,
		ConsumerGroup: config.Profile.GetString("kafka.group"),
		OTELEnabled:   false,
	}, logger)
	if err != nil {
		log.Logger.Fatal("Unable to create subscriber", zap.Error(err))
	}
	return publisher, subscriber
}

profile/internal/watermill/consumer/consumer_stage.go

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumer

import (
	"context"
	"encoding/json"
	"github.com/ThreeDotsLabs/watermill/message"
	"go.uber.org/zap"
	"profile/internal/connector"
	"profile/internal/log"
	"profile/internal/schema"
	"profile/internal/schema/performance"
	"profile/internal/state"
)

type ContextData struct {
	Status int
	Event  schema.Event

	AppID         string // API 上报
	FetchScenario string // API 上报
}

// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		var data ContextData
		// 反序列化,存入通用结构体
		if contextErr := json.Unmarshal(msg.Payload, &data.Event); contextErr != nil {
			data.Status = state.StatusUnmarshalError
			return nil, contextErr
		}
		log.Logger.Info("[1-UnpackKafkaItem] unpack kafka item success", zap.Any("event", data.Event))

		msg.SetContext(context.WithValue(msg.Context(), "data", data))
		return h(msg)
	}
}

// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		data := msg.Context().Value("data").(ContextData)
		event, contextErr := performance.EventFactory(data.Event.Category, data.Event.Dimensions, data.Event.Values)
		if contextErr != nil {
			data.Status = state.StatusEventFactoryError
			return nil, contextErr
		}
		log.Logger.Info("[2-InitPerformanceEvent] Consume performance event success", zap.Any("event", data.Event))
		data.Event.ProfileData = event

		msg.SetContext(context.WithValue(msg.Context(), "data", data))
		return h(msg)
	}
}

// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {
	return func(msg *message.Message) ([]*message.Message, error) {
		data := msg.Context().Value("data").(ContextData)

		contextErr := data.Event.ProfileData.Analyze()
		if contextErr != nil {
			data.Status = state.StatusAnalyzeError
			return nil, contextErr
		}
		log.Logger.Info("[3-AnalyzeEvent] analyze event success", zap.Any("event", data.Event))
		// clear dimensions and values
		data.Event.Dimensions = nil
		data.Event.Values = nil

		msg.SetContext(context.WithValue(msg.Context(), "data", data))
		return h(msg)
	}
}

// CrashWriteKafka
// @Description
// @Author xzx 2023-08-12 15:09:15
// @Param msg
// @Return []*message.Message
// @Return error
func CrashWriteKafka(msg *message.Message) ([]*message.Message, error) {
	data := msg.Context().Value("data").(ContextData)

	toWriteBytes, contextErr := json.Marshal(data.Event)
	if contextErr != nil {
		data.Status = state.StatusUnmarshalError
		return nil, contextErr
	}

	msg = message.NewMessage(data.Event.ID, toWriteBytes)

	log.Logger.Info("[4-CrashWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))
	return message.Messages{msg}, nil
}

func LagWriteKafka(msg *message.Message) ([]*message.Message, error) {
	data := msg.Context().Value("data").(ContextData)

	toWriteBytes, contextErr := json.Marshal(data.Event)
	if contextErr != nil {
		data.Status = state.StatusUnmarshalError
		return nil, contextErr
	}

	msg = message.NewMessage(data.Event.ID, toWriteBytes)

	log.Logger.Info("[4-LagWriteKafka] write kafka success", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))
	return message.Messages{msg}, nil
}

测试

上报PERF_LAG Event可以并发处理 2 条消息,不必等待上一条消息处理完

image-20230816222712201

image-20230816222820505

多次测试发现是由于两条消息走了不同的 Handler

image-20230816230158777

暂未修复,明明是同一主题的两条消息却都走了两条不同的链路,而且 publisher 最后写回的主题也是写到了不同的主题上,并且上报另一个类型的事件,即另一个主题的消息却无法触发消费者消费!

暂定先写死两个主题名称测试是否正常

明日待办

  1. 开会讨论项目规划和任务分工
  2. 继续完成需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1016875.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

安全线程的集合

1. CopyOnWriteArrayList package com.kuang.unsafe;import java.util.*; import java.util.concurrent.CopyOnWriteArrayList;//java.util.ConcurrentModificationException 并发修改异常! 因为List集合线程不安全! public class ListTest {public st…

Linux —— 线程

一,线程概念 在一程序内,一个执行路线称为线程thread,即线程是一个进程内部的控制序列; 一切进程至少都有一个执行线程;线程在进程内部运行,本质是在进程地址空间内运行;在Linux系统中&#xf…

许可分析 license分析 第十七章

许可分析是指对软件许可证进行详细的分析和评估,以了解组织内部对软件许可的需求和使用情况。通过许可分析,可以帮助组织更好地管理和优化软件许可证的使用。以下是一些可能的许可分析方法和步骤: 软件许可证的云化管理:将许可证管…

如何删除清理Mac“其他”文件并删除它

当我们通过「关于本机」>「存储空间」查看硬盘的空间占用情况时。系统会将存储空间根据不同文件类别所占的空间大小显示在条状图上,大部分类型看文字都比较好理解,但对于“其他”这一类很多小伙伴都感觉很困惑,会产生一些问题如&#xff1…

Mac FoneLab for Mac:轻松恢复iOS数据,专业工具助力生活

如果你曾经不小心删除了重要的iOS数据,或者因为各种原因丢失了这些数据,那么你一定知道这种痛苦。现在,有一个名为Mac FoneLab的Mac应用程序,它专门设计用于恢复iOS数据,这可能是你的救星。 Mac FoneLab for Mac是一种…

MySQL数据库详解 二:数据库的高级语言和操作

文章目录 1. 克隆表 ---- 将数据表的数据记录生成到新的表中1.1 方式一:先创建新表,再导入数据1.2方式二:创建的时候同时导入 2. 清空表 ---- 删除表内的所有数据2.1 delete删除2.2 truncate删除(重新记录)2.3 创建临时…

基于Java+SpringBoot+Vue的大学生线上心理咨询系统(可随意更改项目主题如医院预约、店铺预约、专家挂号、在线咨询等)

大学生线上心理咨询室系统 一、前言二、我的优势2.1 自己的网站2.2 自己的小程序(小蔡coding)2.3 有保障的售后2.4 福利 三、开发环境与技术3.1 MySQL数据库3.2 Vue前端技术3.3 Spring Boot框架3.4 微信小程序 四、功能设计4.1 主要功能描述 五、系统实现…

确认过眼神,你就是我心中的【理想型】API!

API作为开发者友好的Friend凭借信息直达、灵活便捷、简单高效的特点,成为了商户绝佳的“资源连接利器”,也是跨境支付过程的“基石堡垒”,通过以上全面的释义,你清晰了解API的作用了吗? 但API与全球电子商户的相遇过程…

《ADS2011射频电路设计与仿真实例》第一章—第六章用ads2017跟做的不同操作

我用的是ads2017,可能是因为版本原因,有些操作和书上的不一样 1.P69 Smith chart utility中,若要调节各曲线圆系的线条颜色,书上写的“执行菜单命令【circles】→【colors】”应该是【view】→【colors】 2.P83 要用微带线&…

Pycharm 2023 年下载、安装教程,好用的插件,附详细图文

文章目录 一、pycharm安装教程二、常用插件推荐安装方法插件介绍1、Material Theme UI Lite2、Chinese (Simplified) Language Pack / 中文语言包3、Statistic4、Json Parser5、Tabnine(强烈推荐)6、Rainbow Brackets(推荐)7、Ind…

友善Nona Pi开发板ubuntu22.04系统用Python3.8.17的pip安装PyQt5.15.2时报错“Q_PID”这个宏未定义的一种解决办法

安装命令: pip install PyQt55.15.2 --config-settings --confirm-license --verbose -i https://mirrors.aliyun.com/pypi/simple/ 遇到出错: 如图: 分析具体错误内容: These bindings will be built: Qt, QtCore, QtNetwo…

Draw.io for Mac:强大流程图绘制工具,让你的想法迅速可视化

对于需要经常处理复杂概念和流程的专业人士和爱好者来说,一个优秀的图形设计工具是必不可少的。今天,我们将为您介绍一款流程图绘制神器——Draw.io for Mac。这款应用具备易于使用的界面和强大的功能,可以帮助您快速创建各种精美的流程图。 …

2023CSP游寄

初赛 DAY -2 才刚考开学测就来初赛。 复赛之后就是月测,这就是初三吗。 初中最后一次 CSP,如果 S 没一等就得摆烂了。希望别因为各种原因爆炸。 中午下午借着刷初赛题的名义摆烂,半道题都没写。 CSP2023RP 初赛 DAY -1 看我发现了什么。…

项目实战-day1.0

软件开发整体介绍 软件开发流程 需求分析--需求规格说明书、产品原型 设计--UI设计、数据库设计、接口设计 编码--项目代码、单元测试 测试--测试用例、测试报告 上线运维--软件环境安装、配置 角色分工 软件环境 开发环境:开发人员在开发阶段使用的环境&am…

FactoryTalk View Studio

由于项目需要,学习了FactoryTalk View Studio的一些操作,这里记录一下,方便以后查阅,并且随着项目的学习,随时更新。 FactoryTalk View Studio FactoryTalk View Studio 安装新建一个View Site Edition工程在工程中新建…

Bash脚本自学 - 输入输出重定向

1. 输入输出重定向 首先,我们有一个文件 hello.txt, Hello World! Good day to you 在指令行中输入: wc -w hello.txt输出为: 6 hello.txt wc -w 是用于统计命令行参数中指定文件的字数(单词数)。 如果…

2023年奢侈品行业研究报告

第一章 行业概况 1.1 定义和分类 奢侈品行业是一个专门生产和销售高价值、高品质、具有独特性和稀缺性商品的行业。这些商品往往超出了人们的基本生活需求,更多地与特定的社会地位、身份认同和审美价值有关。奢侈品不仅仅是物质的,它们往往承载着品牌的…

KubeSphere:登录错误,token failed, reason: getaddrinfo EAI_AGAIN ks-apiserver

1.问题现象: 2.问题解决: [rootk8s-node1 ~]# kubectl get pods --all-namespaces [rootk8s-node1 ~]# kubectl get pods --all-namespaces NAMESPACE NAME READY STATUS …

代码随想录 --- day21 --- 530.二叉搜索树的最小绝对差、501.二叉搜索树中的众数 、236. 二叉树的最近公共祖先

530.二叉搜索树的最小绝对差 题目中要求在二叉搜索树上任意两节点的差的绝对值的最小值。 注意是二叉搜索树,二叉搜索树可是有序的。 遇到在二叉搜索树上求什么最值啊,差值之类的,就把它想成在一个有序数组上求最值,求差值&…

高速DSP系统设计参考指南(一)高速DSP设计面临的挑战

(一)高速DSP设计面临的挑战 1. 概述2. 一般挑战3. DSP音频系统的挑战4. 视频系统的挑战5. DSP通信系统面临的挑战 资料参考来自TI官网和网络。 1. 概述 DSP芯片,也称数字信号处理器,是一种具有特殊结构的微处理器。DSP芯片的内部…