腾讯mini项目-【指标监控服务重构】2023-08-12

news2025/1/18 3:31:57

今日已办

Watermill

Handler

将 4 个阶段的逻辑处理定义为 Handler

image-20230812100758947

image-20230812100744775

测试发现,添加的 handler 会被覆盖掉,故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。

参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)

Middleware

ProfileCtx实现 context.Context 接口

// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumer

import (
	"context"
	"github.com/Shopify/sarama"
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"go.uber.org/zap"
	"profile/internal/config"
	"profile/internal/log"
	"profile/internal/schema"
	"time"
)

// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {
	// Properties that can be called by inherited subclasses
	Status int
	Ctx    context.Context

	Router *message.Router
	Event  schema.Event

	AppID         string // API 上报
	FetchScenario string // API 上报
}

// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {
	profileCtx := &ProfileContext{
		Ctx: context.Background(),
	}
	profileCtx.init()
	return profileCtx
}

// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {
	logger := watermill.NewStdLogger(false, false)
	saramaSubscriberConfig := kafka.DefaultSaramaSubscriberConfig()
	saramaSubscriberConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
	subscriber, err := kafka.NewSubscriber(
		kafka.SubscriberConfig{
			Brokers:               []string{config.Profile.GetString("kafka.bootstrap")},
			Unmarshaler:           kafka.DefaultMarshaler{},
			OverwriteSaramaConfig: saramaSubscriberConfig,
			ConsumerGroup:         config.Profile.GetString("kafka.group"),
		},
		logger,
	)
	if err != nil {
		log.Logger.Error("creates a new Kafka Subscriber error", zap.Error(err))
	}

	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		log.Logger.Error("creates a new Router with given configuration error", zap.Error(err))
	}

	router.AddPlugin(plugin.SignalsHandler)
	router.AddMiddleware(
		middleware.CorrelationID,
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,
		middleware.Recoverer,
	)

	topic := "to_analyzer__0.PERF_CRASH"
	router.AddNoPublisherHandler("WriteKafka", topic, subscriber, profileCtx.WriteKafka).
		AddMiddleware(
			profileCtx.UnpackKafkaMessage,
			profileCtx.InitPerformanceEvent,
			profileCtx.AnalyzeEvent,
		)

	profileCtx.Router = router
}

// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {
	// router.Run contains defer cancel()
	if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {
		log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))
	}
}

func (profileCtx *ProfileContext) Done() <-chan struct{} {
	return profileCtx.Ctx.Done()
}

func (profileCtx *ProfileContext) Err() error {
	return profileCtx.Ctx.Err()
}

func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {
	return profileCtx.Ctx.Deadline()
}

func (profileCtx *ProfileContext) Value(key any) any {
	return profileCtx.Ctx.Value(key)
}

【测试】前 3 个阶段处理为 Middleware,最后一个阶段为 Handler

// Package consumer
// @Author xzx 2023/8/12 10:01:00
package consumer

import (
	"encoding/json"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/segmentio/kafka-go"
	"go.uber.org/zap"
	"profile/internal/connector"
	"profile/internal/log"
	"profile/internal/schema/performance"
	"profile/internal/state"
)

// UnpackKafkaMessage
// @Description
// @Author xzx 2023-08-12 12:27:30
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		// 反序列化,存入通用结构体
		if contextErr := json.Unmarshal(message.Payload, &profileCtx.Event); contextErr != nil {
			profileCtx.Status = state.StatusUnmarshalError
			return h(message)
		}
		log.Logger.Info("[UnpackKafkaItem] unpack kafka item success", zap.Any("event", profileCtx.Event))

		message.SetContext(profileCtx)
		return h(message)
	}
}

// InitPerformanceEvent
// @Description
// @Author xzx 2023-08-12 12:27:35
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		profileCtx = message.Context().(*ProfileContext)

		event, contextErr := performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)
		if contextErr != nil {
			profileCtx.Status = state.StatusEventFactoryError
			return h(message)
		}
		log.Logger.Info("[InitPerformanceEvent] init performance event success", zap.Any("event", profileCtx.Event))
		profileCtx.Event.ProfileData = event

		message.SetContext(profileCtx)
		return h(message)
	}
}

// AnalyzeEvent
// @Description
// @Author xzx 2023-08-12 12:27:38
// @Param h
// @Return message.HandlerFunc
func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {
	return func(message *message.Message) ([]*message.Message, error) {
		profileCtx = message.Context().(*ProfileContext)

		contextErr := profileCtx.Event.ProfileData.Analyze()
		if contextErr != nil {
			profileCtx.Status = state.StatusAnalyzeError
			return h(message)
		}
		log.Logger.Info("[AnalyzeEvent] analyze event success", zap.Any("event", profileCtx.Event))
		// clear dimensions and values
		profileCtx.Event.Dimensions = nil
		profileCtx.Event.Values = nil

		message.SetContext(profileCtx)
		return h(message)
	}
}

// WriteKafka
// @Description
// @Author xzx 2023-08-11 22:30:47
// @Param msg
// @Return contextErr
func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {
	profileCtx = msg.Context().(*ProfileContext)

	toWriteBytes, contextErr := json.Marshal(profileCtx.Event)
	if contextErr != nil {
		profileCtx.Status = state.StatusUnmarshalError
		return
	}
	topic := connector.GetTopic(profileCtx.Event.Category)
	contextErr = connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{
		Topic: topic,
		Key:   []byte(profileCtx.Event.ID),
		Value: toWriteBytes,
	})
	if contextErr != nil {
		profileCtx.Status = state.StatusWriteKafkaError
		return
	}
	log.Logger.Info("[WriteKafka] write kafka success", zap.String("topic", topic), zap.String("id", profileCtx.Event.ID), zap.String("msg", string(toWriteBytes)))
	return
}

可以实现正常的效果

image-20230812130912792

Router

  • 目前的 topic 是固定写死的,要考虑正则表达式,将不同 topic 的共有逻辑抽出为 Middleware,特定逻辑抽出为 Handler
  • 消息处理似乎不是并发的

pub/sub kafka-go

  • custom pub/sub

  • Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama

  • qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)

明日待办

  • 组内开会
  • 继续开发需求

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1016070.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DS相关题目

DS相关题目 题目一&#xff1a;消失的数字 拿到这道题目之后&#xff0c;首先可以想到的一个解题方法就是&#xff0c;我们可以先排序&#xff0c;排完序之后&#xff0c;这个数组其实就是一个有序的数组了&#xff0c;那只用比较数组中的每一个元素和他对应的下标是不是相等的…

Day46-50:统计图表项目总结

建项 项目需求写法——可视化报表 可视化报表项目效果 开源表格样式库阿帕奇 绘制echarte图标的流程 在视图中放置一个容器&#xff0c;这个容器需要有一个固定的宽高获取容器&#xff0c;调用init方法&#xff0c;初始化echarts实例 let container document.querySelec…

stm32----ADC模数转换

一、ADC介绍 ADC&#xff0c;即模数转换器&#xff0c;它可以将模拟信号转化为数字信号。在stm32种一般有3个ADC&#xff0c;每个ADC有18个通道。 12位ADC是一种逐次逼近型模拟数字转换器&#xff0c;它有多达18个通道&#xff0c;可测量16个外部和两个内部信号源。各个通道的A…

jquery设置图片可手动拖拽

JQuery是一款流行的JavaScript框架&#xff0c;可以轻松实现网页交互效果。而其中一种常见效果是图片手动拖拽。以下是设置图片手动拖拽的JQuery代码。 $(document).ready(function() { var isDragging false; var mousePos { x: 0, y: 0 }; var elemPos { x: 0, y: 0 }; v…

Apache Spark 在爱奇艺的应用实践

01 Apache Spark 在爱奇艺的现状 Apache Spark 是爱奇艺大数据平台主要使用的离线计算框架&#xff0c;并支持部分流计算任务&#xff0c;用于数据处理、数据同步、数据查询分析等场景&#xff1a; 数据处理&#xff1a;在数据开发平台中支持开发者提交 Spark Jar 包任务或Spar…

elasticsearch2-es和kibana的安装

个人名片&#xff1a; 博主&#xff1a;酒徒ᝰ. 个人简介&#xff1a;沉醉在酒中&#xff0c;借着一股酒劲&#xff0c;去拼搏一个未来。 本篇励志&#xff1a;三人行&#xff0c;必有我师焉。 本项目基于B站黑马程序员Java《SpringCloud微服务技术栈》&#xff0c;SpringCloud…

第 113 场 LeetCode 双周赛题解

A 使数组成为递增数组的最少右移次数 数据范围小直接模拟… class Solution { public:int minimumRightShifts(vector<int> &nums) {for (int op 0; op < nums.size(); op) {if (is_sorted(nums.begin(), nums.end()))//nums是否已经有序return op;rotate(nums.b…

zerotier-client

title: “zerotier-client” createTime: 2022-10-10T11:50:5108:00 updateTime: 2022-10-10T11:50:5108:00 draft: false author: “zcb” tags: [“zerotier-plant”,“zerotier-client”,“zerotier”] categories: [“zerotier”] description: “测试的” 1.windows 1.1…

【深度学习】Pytorch 系列教程(十二):PyTorch数据结构:4、数据集(Dataset)

目录 一、前言 二、实验环境 三、PyTorch数据结构 0、分类 1、张量&#xff08;Tensor&#xff09; 2、张量操作&#xff08;Tensor Operations&#xff09; 3、变量&#xff08;Variable&#xff09; 4、数据集&#xff08;Dataset&#xff09; 随机洗牌 一、前言 Ch…

【Windows】搭建 FTP 服务器

如果需要开发ftp文件上传下载等功能&#xff0c;就需要搭建个ftp服务器&#xff0c;方便调试。 FTP服务 FTP是文件传输协议&#xff08;File Transfer Protocol&#xff09;的简称&#xff0c;该协议属于应用层协议&#xff08;端口号通常为21&#xff09;&#xff0c;用于In…

【入门篇】ClickHouse最优秀的开源列式存储数据库

文章目录 一、什么是ClickHouse&#xff1f;OLAP场景的关键特征列式数据库更适合OLAP场景的原因输入/输出CPU 1.1 ClickHouse的定义与发展历程1.2 ClickHouse的版本介绍 二、ClickHouse的主要特性2.1 高性能的列式存储2.2 实时的分析查询2.3 高度可扩展性2.4 数据压缩2.5 SQL支…

骨传导耳机有害处吗、骨传导耳机真的不好用吗?

骨传导耳机没有害处。 骨传导耳机是通过将声音传递到颅骨&#xff0c;再由颅骨传递到内耳&#xff0c;从而达到听声音的效果&#xff0c;与传统的耳机不同。 因此&#xff0c;骨传导耳机不会直接对人的身体健康、耳朵产生压力和损伤&#xff0c;也不会影响耳道和中耳的正常功能…

在ios系统上实现更改IP地址

在当今的互联网环境中&#xff0c;我们经常需要更改手机的IP地址来避免一些限制或保护我们的隐私。然而&#xff0c;在iOS系统上&#xff0c;更改IP地址并不像在其他平台上那么容易。因此&#xff0c;本文将分享一种简单的方法&#xff0c;帮助您在iOS系统上免费更改手机的IP地…

浅谈C++|构造.析构函数篇

一对象的初始化和处理 1.1构造函数和析构函数 C拥有构造函数和析构函数&#xff0c;这两个函数将会被编译器自动调用&#xff0c;完成对象初始化和清理工作。对象的初始化和清理工作是编译器强制要我们做的事情&#xff0c;因此如果我们不提供构造和析构&#xff0c;编译器提供…

vue学习之组件化开发

1. vue创建 基于vue2的项目 vue create vue-cli-learning 选择 “Manually select features” 取消勾选“Linter / Formatter” 选择“2.x” 选择“In package.json” 输入“N” 回车

elasticsearch索引同步

通常项目中使用elasticsearch需要完成索引同步&#xff0c;索引同步的方法很多&#xff1a; #1、针对实时性非常高的场景需要满足数据的及时同步&#xff0c;可以同步调用&#xff0c;或使用Canal去实现。 1&#xff09;同步调用即在向MySQL写数据后远程调用搜索服务的接口写…

Springboot -- DOCX转PDF(二)

之前记录了按照模板生成 DOCX 文件、并转换为 PDF 文件的方法 https://blog.csdn.net/qq_40096897/article/details/131979177?spm1001.2014.3001.5501 但是使用效果并不是很理想&#xff0c;转换完的 PDF 格式和原本的文档格式不匹配。所以在此重新找了一个文件转 PDF 的方法…

SpringMVC中的请求重定向和转发

一.概述 当处理器对请求处理完毕后&#xff0c;向其它资源进行跳转时&#xff0c;有两种跳转方式&#xff1a;请求转发与重 定向。而根据所要跳转的资源类型&#xff0c;又可分为两类&#xff1a;跳转到页面与跳转到其它处理器。注意&#xff0c;对于请求转发的页面&#xff0c…

算法通过村第八关-树(深度优先)青铜笔记|经典算法题目

文章目录 前言1. 二叉树里面的双指针1.1 判断两棵树是否相同1.2 对称二叉树1.3 合并二叉树 2. 路径专题2.1 二叉树的所有路径2.2 路径总和 3. 翻转的妙用总结 前言 提示&#xff1a;人类的底里是悲伤&#xff0c;我们都在用厚重的颜料&#xff0c;覆盖那些粗糙的线稿。--张皓宸…

Vulnhub实战-prime1

前言 VulnHub 是一个面向信息安全爱好者和专业人士的虚拟机&#xff08;VM&#xff09;漏洞测试平台。它提供了一系列特制的漏洞测试虚拟机镜像&#xff0c;供用户通过攻击和漏洞利用的练习来提升自己的安全技能。本次&#xff0c;我们本次测试的是prime1。 一、主机发现和端…