go-高效处理应用程序数据

news2025/1/23 3:54:11

一、背景

大型的应用程序为了后期的排障、运营等,会将一些请求、日志、性能指标等数据保存到存储系统中。为了满足这些需求,我们需要进行数据采集,将数据高效的传输到存储系统

二、问题

  • 采集服务仅仅针对某个需求开发,需要修改业务代码逻辑,会给业务带来比较大的负担,并且耦合度太高
  • 数据采集导致已有的服务请求延时变高
  • 采集性能差,需要较长的时间才能采集完一批数据
  • 服务关闭时会丢失数据

三、解决方案

  • 针对问题1,我们可以将数据采集从业务服务中解耦出来,专门创建一个数据采集服务。业务程序只需要将数据传输到指定的中间件,至于数据的处理、采样、过滤、传出等逻辑都在采集服务中完成。
  • 针对问题2,将数据的导出由同步改为异步,异步开启多个协程消费通道中的数据,这样对程序的性能几乎微乎其微
  • 针对问题3,设置好采集最长间隔、批量采集大小以及使用高性能的数据中间件作为中转,比如redis、kafka
  • 针对问题4,为导出器和采集服务增加关闭监听,程序关闭时将数据清空完再退出

四、采集服务实现

4.1 架构设计

在这里插入图片描述

4.2 storage

负责从数据中间件中拿到数据,接口定义如下:

type AnalyticsStorage interface {
	Init(config interface{}) error
	GetName() string
	Connect() bool
	GetAndDeleteSet(string) []interface{}
}

redis的storage实现

import (
	"crypto/tls"
	"strconv"
	"time"

	redis "github.com/go-redis/redis/v7"
	"github.com/marmotedu/errors"
	"github.com/mitchellh/mapstructure"

	genericoptions "github.com/marmotedu/iam/internal/pkg/options"
	"github.com/marmotedu/iam/pkg/log"
)

// ------------------- REDIS CLUSTER STORAGE MANAGER -------------------------------

// RedisKeyPrefix defines prefix for iam analytics key.
const (
	RedisKeyPrefix      = "analytics-"
	defaultRedisAddress = "127.0.0.1:6379"
)

var redisClusterSingleton redis.UniversalClient

// RedisClusterStorageManager is a storage manager that uses the redis database.
type RedisClusterStorageManager struct {
	db        redis.UniversalClient
	KeyPrefix string
	HashKeys  bool
	Config    genericoptions.RedisOptions
}

// NewRedisClusterPool returns a redis cluster client.
func NewRedisClusterPool(forceReconnect bool, config genericoptions.RedisOptions) redis.UniversalClient {
	if !forceReconnect {
		if redisClusterSingleton != nil {
			log.Debug("Redis pool already INITIALIZED")

			return redisClusterSingleton
		}
	} else {
		if redisClusterSingleton != nil {
			redisClusterSingleton.Close()
		}
	}

	log.Debug("Creating new Redis connection pool")

	maxActive := 500
	if config.MaxActive > 0 {
		maxActive = config.MaxActive
	}

	timeout := 5 * time.Second

	if config.Timeout > 0 {
		timeout = time.Duration(config.Timeout) * time.Second
	}

	var tlsConfig *tls.Config
	if config.UseSSL {
		tlsConfig = &tls.Config{
			InsecureSkipVerify: config.SSLInsecureSkipVerify,
		}
	}

	var client redis.UniversalClient
	opts := &RedisOpts{
		MasterName:   config.MasterName,
		Addrs:        getRedisAddrs(config),
		DB:           config.Database,
		Password:     config.Password,
		PoolSize:     maxActive,
		IdleTimeout:  240 * time.Second,
		ReadTimeout:  timeout,
		WriteTimeout: timeout,
		DialTimeout:  timeout,
		TLSConfig:    tlsConfig,
	}

	if opts.MasterName != "" {
		log.Info("--> [REDIS] Creating sentinel-backed failover client")
		client = redis.NewFailoverClient(opts.failover())
	} else if config.EnableCluster {
		log.Info("--> [REDIS] Creating cluster client")
		client = redis.NewClusterClient(opts.cluster())
	} else {
		log.Info("--> [REDIS] Creating single-node client")
		client = redis.NewClient(opts.simple())
	}

	redisClusterSingleton = client

	return client
}

func getRedisAddrs(config genericoptions.RedisOptions) (addrs []string) {
	if len(config.Addrs) != 0 {
		addrs = config.Addrs
	}

	if len(addrs) == 0 && config.Port != 0 {
		addr := config.Host + ":" + strconv.Itoa(config.Port)
		addrs = append(addrs, addr)
	}

	return addrs
}

// RedisOpts is the overridden type of redis.UniversalOptions. simple() and cluster() functions are not public
// in redis library. Therefore, they are redefined in here to use in creation of new redis cluster logic.
// We don't want to use redis.NewUniversalClient() logic.
type RedisOpts redis.UniversalOptions

func (o *RedisOpts) cluster() *redis.ClusterOptions {
	if len(o.Addrs) == 0 {
		o.Addrs = []string{defaultRedisAddress}
	}

	return &redis.ClusterOptions{
		Addrs:     o.Addrs,
		OnConnect: o.OnConnect,

		Password: o.Password,

		MaxRedirects:   o.MaxRedirects,
		ReadOnly:       o.ReadOnly,
		RouteByLatency: o.RouteByLatency,
		RouteRandomly:  o.RouteRandomly,

		MaxRetries:      o.MaxRetries,
		MinRetryBackoff: o.MinRetryBackoff,
		MaxRetryBackoff: o.MaxRetryBackoff,

		DialTimeout:        o.DialTimeout,
		ReadTimeout:        o.ReadTimeout,
		WriteTimeout:       o.WriteTimeout,
		PoolSize:           o.PoolSize,
		MinIdleConns:       o.MinIdleConns,
		MaxConnAge:         o.MaxConnAge,
		PoolTimeout:        o.PoolTimeout,
		IdleTimeout:        o.IdleTimeout,
		IdleCheckFrequency: o.IdleCheckFrequency,

		TLSConfig: o.TLSConfig,
	}
}

func (o *RedisOpts) simple() *redis.Options {
	addr := defaultRedisAddress
	if len(o.Addrs) > 0 {
		addr = o.Addrs[0]
	}

	return &redis.Options{
		Addr:      addr,
		OnConnect: o.OnConnect,

		DB:       o.DB,
		Password: o.Password,

		MaxRetries:      o.MaxRetries,
		MinRetryBackoff: o.MinRetryBackoff,
		MaxRetryBackoff: o.MaxRetryBackoff,

		DialTimeout:  o.DialTimeout,
		ReadTimeout:  o.ReadTimeout,
		WriteTimeout: o.WriteTimeout,

		PoolSize:           o.PoolSize,
		MinIdleConns:       o.MinIdleConns,
		MaxConnAge:         o.MaxConnAge,
		PoolTimeout:        o.PoolTimeout,
		IdleTimeout:        o.IdleTimeout,
		IdleCheckFrequency: o.IdleCheckFrequency,

		TLSConfig: o.TLSConfig,
	}
}

func (o *RedisOpts) failover() *redis.FailoverOptions {
	if len(o.Addrs) == 0 {
		o.Addrs = []string{"127.0.0.1:26379"}
	}

	return &redis.FailoverOptions{
		SentinelAddrs: o.Addrs,
		MasterName:    o.MasterName,
		OnConnect:     o.OnConnect,

		DB:       o.DB,
		Password: o.Password,

		MaxRetries:      o.MaxRetries,
		MinRetryBackoff: o.MinRetryBackoff,
		MaxRetryBackoff: o.MaxRetryBackoff,

		DialTimeout:  o.DialTimeout,
		ReadTimeout:  o.ReadTimeout,
		WriteTimeout: o.WriteTimeout,

		PoolSize:           o.PoolSize,
		MinIdleConns:       o.MinIdleConns,
		MaxConnAge:         o.MaxConnAge,
		PoolTimeout:        o.PoolTimeout,
		IdleTimeout:        o.IdleTimeout,
		IdleCheckFrequency: o.IdleCheckFrequency,

		TLSConfig: o.TLSConfig,
	}
}

// GetName returns the redis cluster storage manager name.
func (r *RedisClusterStorageManager) GetName() string {
	return "redis"
}

// Init initialize the redis cluster storage manager.
func (r *RedisClusterStorageManager) Init(config interface{}) error {
	r.Config = genericoptions.RedisOptions{}
	err := mapstructure.Decode(config, &r.Config)
	if err != nil {
		log.Fatalf("Failed to decode configuration: %s", err.Error())
	}

	r.KeyPrefix = RedisKeyPrefix

	return nil
}

// Connect will establish a connection to the r.db.
func (r *RedisClusterStorageManager) Connect() bool {
	if r.db == nil {
		log.Debug("Connecting to redis cluster")
		r.db = NewRedisClusterPool(false, r.Config)

		return true
	}

	log.Debug("Storage Engine already initialized...")

	// Reset it just in case
	r.db = redisClusterSingleton

	return true
}

func (r *RedisClusterStorageManager) hashKey(in string) string {
	return in
}

func (r *RedisClusterStorageManager) fixKey(keyName string) string {
	setKeyName := r.KeyPrefix + r.hashKey(keyName)

	log.Debugf("Input key was: %s", setKeyName)

	return setKeyName
}

// GetAndDeleteSet get and delete key from redis.
func (r *RedisClusterStorageManager) GetAndDeleteSet(keyName string) []interface{} {
	log.Debugf("Getting raw key set: %s", keyName)

	if r.db == nil {
		log.Warn("Connection dropped, connecting..")
		r.Connect()

		return r.GetAndDeleteSet(keyName)
	}

	log.Debugf("keyName is: %s", keyName)

	fixedKey := r.fixKey(keyName)

	log.Debugf("Fixed keyname is: %s", fixedKey)

	var lrange *redis.StringSliceCmd
	_, err := r.db.TxPipelined(func(pipe redis.Pipeliner) error {
		lrange = pipe.LRange(fixedKey, 0, -1)
		pipe.Del(fixedKey)

		return nil
	})
	if err != nil {
		log.Errorf("Multi command failed: %s", err)
		r.Connect()
	}

	vals := lrange.Val()

	result := make([]interface{}, len(vals))
	for i, v := range vals {
		result[i] = v
	}

	log.Debugf("Unpacked vals: %d", len(result))

	return result
}

// SetKey will create (or update) a key value in the store.
func (r *RedisClusterStorageManager) SetKey(keyName, session string, timeout int64) error {
	log.Debugf("[STORE] SET Raw key is: %s", keyName)
	log.Debugf("[STORE] Setting key: %s", r.fixKey(keyName))

	r.ensureConnection()
	err := r.db.Set(r.fixKey(keyName), session, 0).Err()
	if timeout > 0 {
		if expErr := r.SetExp(keyName, timeout); expErr != nil {
			return expErr
		}
	}
	if err != nil {
		log.Errorf("Error trying to set value: %s", err.Error())

		return errors.Wrap(err, "failed to set key")
	}

	return nil
}

// SetExp is used to set the expiry of a key.
func (r *RedisClusterStorageManager) SetExp(keyName string, timeout int64) error {
	err := r.db.Expire(r.fixKey(keyName), time.Duration(timeout)*time.Second).Err()
	if err != nil {
		log.Errorf("Could not EXPIRE key: %s", err.Error())
	}

	return errors.Wrap(err, "failed to set expire time for key")
}

func (r *RedisClusterStorageManager) ensureConnection() {
	if r.db != nil {
		// already connected
		return
	}
	log.Info("Connection dropped, reconnecting...")
	for {
		r.Connect()
		if r.db != nil {
			// reconnection worked
			return
		}
		log.Info("Reconnecting again...")
	}
}

4.3 pump

我们首先会针对某种业务创建对应的数据结构,比如

type AnalyticsRecord struct {
	TimeStamp  int64     `json:"timestamp"`
	Username   string    `json:"username"`
	Effect     string    `json:"effect"`
	Conclusion string    `json:"conclusion"`
	Request    string    `json:"request"`
	Policies   string    `json:"policies"`
	Deciders   string    `json:"deciders"`
	ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

pump负责将数据导出到指定的数据存储系统,比如promethus、mongo、ES等,它的接口定义如下:

type Pump interface {
	GetName() string
	New() Pump
	Init(interface{}) error
	WriteData(context.Context, []interface{}) error
	SetTimeout(timeout int)
	GetTimeout() int
}

4.4 exporter

exporter依赖storage和pump,每个exporter负责一种业务,一般对应一种数据结构

type AnalyticsRecord struct {
	TimeStamp  int64     `json:"timestamp"`
	Username   string    `json:"username"`
	Effect     string    `json:"effect"`
	Conclusion string    `json:"conclusion"`
	Request    string    `json:"request"`
	Policies   string    `json:"policies"`
	Deciders   string    `json:"deciders"`
	ExpireAt   time.Time `json:"expireAt"   bson:"expireAt"`
}

exporter从storage中取出数据转成对应的数据结构,并进行过滤和去掉冗余字段内容。最后将数据通过pump导出到数据系统中,exporter大概如下:

type AnalyticsExporter struct {
	storage storage.Storage
	pump    pump.Pump
	filter      []filter.Filters  //对数据进行过滤
	timeout               int
	OmitDetailedRecording bool  //将冗余字段置为空
}

func (e *AnalyticsExporter) Export() {
	//1.从storage中拉取数据
	//2. 转换为对应的数据结构
	//3. 过滤数据
	//4. 置空冗余字段
	//5. 导出数据
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1923649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker容器的生命周期

引言 Docker 容器作为一种轻量级虚拟化技术,在现代应用开发和部署中扮演着重要角色。理解容器的生命周期对于有效地管理和运维容器化应用至关重要。本文将深入探讨 Docker 容器的生命周期,从创建到销毁的各个阶段,帮助读者更好地掌握容器管理…

分手后如何走出夜晚的抑郁,告别失眠困扰?

在这个快速变化的世界里,分手成为了许多人生活中不得不面对的现实。而每当夜幕降临,那种难以言表的孤独感和深深的抑郁往往让人倍感煎熬,甚至陷入失眠的漩涡。那么,分手后我们该如何应对这种情绪困扰,重新找回自己的宁…

防火墙NAT和智能选路实验详解(华为)

目录 实验概述实验拓扑实验要求要求一要求二要求三要求四要求五 实验概述 从我上面一个博客能够了解到NAT和防火墙选路原理 ——>防火墙nat和智能选路,这一章我通过实验来详解防火墙关于nat和智能选路从而能熟练使用和配置防火墙,这里使用的是华为US…

lvs集群、NAT模式和DR模式、keepalive

目录 lvs集群概念 集群的类型:三种类型 系统可靠性指标 lvs集群中的术语 lvs的工作方式 NAT模式 lvs的工具 算法 实验 数据流向 步骤 一 、调度器配置(test1 192.168.233.10) 二、RS配置(nginx1和nginx2)…

Android:如何绘制View

点击查看Android 如何绘制视图官网 一、简介 Android 框架会在 Activity 获得焦点时请求 Activity 绘制其布局。Android 框架会处理绘制流程,但该 Activity 必须提供其布局层次结构的根节点。 Android 框架会绘制布局的根节点,并测量和绘制布局树。它会…

【每日一练】python类和对象现实举例详细讲解

""" 本节课程目的: 1.掌握类描述现实世界实物思想 2.掌握类和对象的关系 3.理解什么事面向对象 """ #比如设计一个闹钟,在这里就新建一个类 class Clock:idNone #闹钟的序列号,也就是类的属性priceNone #闹…

Redis学习笔记(个人向)

Redis学习笔记(个人向) 1. 概述 是一个高性能的 key-value 数据库;其具有以下三个特点: Redis支持数据的持久化,可以将内存中的数据保存在磁盘中,重启的时候可以再次加载进行使用。Redis不仅仅支持简单的key-value类型的数据&…

Nginx+Keepalive调度的高可用

nginxkeepalive: 调度器的高可用 vip地址主备之间的切换,主在工作时,p地址只在主上,主停止工作,p飘移到备服务器。 在主备的优先级不变的情况下,主恢复工作,vp会飘回到主服务器。 1、配优先级 2、配置…

EventBus学习

视频:05_尚硅谷_EventBus_粘性事件案例_哔哩哔哩_bilibili 1.整体框架 2.demo下载地址:https://github.com/greenrobot/EventBus 3.实现非粘性时间流程: 3.1导入架包eventbus-3.0.0.jar和eventbus-3.0.0-sources.jar 3.2在接受数据页面注…

k8s(五)---名称空间

五、名称空间 名称空间是k8s划分不同工作空间的逻辑单位,是k8s资源逻辑隔离的机,。可以给不同的租户,不同的环境、不同的项目创建对应的命名空间。 1、查看名称空间 kubectl get ns kubectl get namespaces 此处展示了四个命名空间 2、管理名称空间 1…

更新商品前端接口编写

文章目录 新增页面书写写表单价格符号的显示然后状态的书写后端枚举书写时间书写使用组件 新增页面书写 书写直接复制页面 写表单的绑定信息 然后绑定表单 表单绑定还有表单数据的绑定 标签中ref的作用就是将 该组件注册到vue对象的ref属性中 那么在vue运行的时候,会加载所…

IOC、DI<4> Unity、AOP、MVCAOP、UnityAOP 区别

IOC():控制反转,把程序上层对下层的依赖,转移到第三方的容器来装配 是程序设计的目标,实现方式包含了依赖注入和依赖查找(.net里面只有依赖注入) DI:依赖注入&#xff0c…

【网络文明】关注网络安全

在这个数字化时代,互联网已成为我们生活中不可或缺的一部分,它极大地便利了我们的学习、工作、娱乐乃至日常生活。然而,随着网络空间的日益扩大,网络安全问题也日益凸显,成为了一个不可忽视的全球性挑战。认识到网络安…

Gitee简易使用流程(后期优化)

目录 1.修改用户名 2.文件管理 新建文件/文件夹流程如下: 上传文件流程如下: 以主页界面为起点 1.修改用户名 点解右上角的头像--> 点击“账号设置” 点击左边栏里的“个人资料“ 直接修改用户名即可 2.文件管理 选择一个有修改权限仓库&#…

【轻松拿捏】Java-final关键字(面试)

目录 1. 定义和基本用法 回答要点: 示例回答: 2. final 变量 回答要点: 示例回答: 3. final 方法 回答要点: 示例回答: 4. final 类 回答要点: 示例回答: 5. final 关键…

yolov8预测

yoloV8 官方地址 预测 -Ultralytics YOLO 文档 1.图片预测 from ultralytics import YOLO #### 图片预测1 ### https://www.youtube.com/watch?vneBZ6huolkg ### https://github.com/ultralytics/ultralytics ### https://github.com/abdullahtarek/football_analysis…

Linux C语言基础 day10

目录 学习目标: 学习内容: 1.指针指向数组 1.1 指针与数组的关系 1.2 指针与一维数组关系实现 1.2.1 指针与一维数组的关系 1.2.2 指针指向一维整型数组作为函数参数传递 课外作业: 学习目标: 一周掌握 C基础知识 学习内…

专业条码二维码扫描设备和手机二维码扫描软件的区别?

条码二维码技术已广泛应用于我们的日常生活中,从超市结账到公交出行,再到各类活动的入场验证,条码二维码的便捷性不言而喻,而在条码二维码的扫描识别读取过程中,专业扫描读取设备和手机二维码扫描软件成为了两大主要工…

uniapp使用多列布局显示图片,一行两列

完整代码&#xff1a; <script setup>const src "https://qiniu-web-assets.dcloud.net.cn/unidoc/zh/shuijiao.jpg" </script><template><view class"content"><view class"img-list"><image :src"src…

日志自动分析-操作系统-GscanLogonTracerf8x

&#x1f3bc;个人主页&#xff1a;金灰 &#x1f60e;作者简介:一名简单的大一学生;易编橙终身成长社群的嘉宾.✨ 专注网络空间安全服务,期待与您的交流分享~ 感谢您的点赞、关注、评论、收藏、是对我最大的认可和支持&#xff01;❤️ &#x1f34a;易编橙终身成长社群&#…