微服务网关(十二)redis流量统计

news2024/9/23 9:33:52

微服务网关(十二)redis流量统计中间件

redis流量统计中间件

redis详细:

redis.go:

// RedisConfPipline redis连接的设置方法,例如在流量统计中间件中设置数据和超时时间
func RedisConfPipline(pip ...func(c redis.Conn)) error {
	// 读取配置文件,创建连接
	c, err := lib.RedisConnFactory("default")
	if err != nil {
		return err
	}
	defer c.Close()
	for _, f := range pip {
		f(c)
	}
	c.Flush()
	return nil
}

// RedisConfDo redis执行操作的方法,例如在流量统计中间件中使用get方法获取redis中储存的流量数据
func RedisConfDo(commandName string, args ...interface{}) (interface{}, error) {
	c, err := lib.RedisConnFactory("default")
	if err != nil {
		return nil, err
	}
	defer c.Close()
	return c.Do(commandName, args...)
}

统计Handler:

flow_count_handler.go

单例模式设计,共用一个,避免重复创建,也为了使一个服务中的数据可以重复地累计

var FlowCounterHandler *FlowCounter

type FlowCounter struct {
   RedisFlowCountMap   map[string]*RedisFlowCountService
   RedisFlowCountSlice []*RedisFlowCountService
   Locker              sync.RWMutex
}

func NewFlowCounter() *FlowCounter {
   return &FlowCounter{
      RedisFlowCountMap:   map[string]*RedisFlowCountService{},
      RedisFlowCountSlice: []*RedisFlowCountService{},
      Locker:              sync.RWMutex{},
   }
}

func init() {
   FlowCounterHandler = NewFlowCounter()
}

func (counter *FlowCounter) GetCounter(serverName string) (*RedisFlowCountService, error) {
   for _, item := range counter.RedisFlowCountSlice {
      if item.AppID == serverName {
         return item, nil
      }
   }
   //NewRedisFlowCountService 创建统计器
   newCounter := NewRedisFlowCountService(serverName, 1*time.Second)
   counter.RedisFlowCountSlice = append(counter.RedisFlowCountSlice, newCounter)
   counter.Locker.Lock()
   defer counter.Locker.Unlock()
   counter.RedisFlowCountMap[serverName] = newCounter
   return newCounter, nil
}

业务核心:

redis_flow_count.go

// RedisFlowCountService 流量统计器结构体
type RedisFlowCountService struct {
	AppID       string
	Interval    time.Duration
	QPS         int64
	Unix        int64
	TickerCount int64
	TotalCount  int64
}


// NewRedisFlowCountService 参数:设置APPID和统计结果的刷新时间频率
func NewRedisFlowCountService(appID string, interval time.Duration) *RedisFlowCountService {
	reqCounter := &RedisFlowCountService{
		AppID:    appID,
		Interval: interval,
		QPS:      0,
		Unix:     0,
	}
	go func() {
		defer func() {
			if err := recover(); err != nil {
				fmt.Println(err)
			}
		}()
		ticker := time.NewTicker(interval)
		for {
			<-ticker.C
			//获取请求次数reqCounter.TickerCount-->0,1,2,3,4...次
			tickerCount := atomic.LoadInt64(&reqCounter.TickerCount)
			//重置请求次数reqCounter.TickerCount-->0
			atomic.StoreInt64(&reqCounter.TickerCount, 0)       

			currentTime := time.Now()
			dayKey := reqCounter.GetDayKey(currentTime)
			hourKey := reqCounter.GetHourKey(currentTime)
			if err := RedisConfPipline(func(c redis.Conn) {
				//数据增加并写入tickerCount--->+0\+1\+2...
				c.Send("INCRBY", dayKey, tickerCount)
				//超时时间设置
				c.Send("EXPIRE", dayKey, 86400*2)
                
				c.Send("INCRBY", hourKey, tickerCount)
				c.Send("EXPIRE", hourKey, 86400*2)
			}); err != nil {
				fmt.Println("RedisConfPipline err", err)
				continue
			}

			totalCount, err := reqCounter.GetDayData(currentTime)
			if err != nil {
				fmt.Println("reqCounter.GetDayData err", err)
				continue
			}
			nowUnix := time.Now().Unix()
			if reqCounter.Unix == 0 {
				reqCounter.Unix = time.Now().Unix()
				continue
			}
			tickerCount = totalCount - reqCounter.TotalCount
			if nowUnix > reqCounter.Unix {
				reqCounter.TotalCount = totalCount
				reqCounter.QPS = tickerCount / (nowUnix - reqCounter.Unix)
				reqCounter.Unix = time.Now().Unix()
			}
		}
	}()
	return reqCounter
}

func (o *RedisFlowCountService) GetDayKey(t time.Time) string {
	dayStr := t.In(lib.TimeLocation).Format("20060102")
	//设置到redis的key中
	return fmt.Sprintf("%s_%s_%s", RedisFlowDayKey, dayStr, o.AppID)
}

func (o *RedisFlowCountService) GetHourKey(t time.Time) string {
	hourStr := t.In(lib.TimeLocation).Format("2006010215")
	return fmt.Sprintf("%s_%s_%s", RedisFlowHourKey, hourStr, o.AppID)
}

// GetHourData 封装的获取方法 redis的get获取数据
func (o *RedisFlowCountService) GetHourData(t time.Time) (int64, error) {
	return redis.Int64(RedisConfDo("GET", o.GetHourKey(t)))
}

func (o *RedisFlowCountService) GetDayData(t time.Time) (int64, error) {
	return redis.Int64(RedisConfDo("GET", o.GetDayKey(t)))
}

中间件详细:

http_flow_count.go

func HTTPFlowCountMiddleware() gin.HandlerFunc {
   return func(c *gin.Context) {
      serverInterface, ok := c.Get("service")
      if !ok {
         middleware.ResponseError(c, 2001, errors.New("service not found"))
         c.Abort()
         return
      }
      //从上下文中获取后转换
      serviceDetail := serverInterface.(*dao.ServiceDetail)

      //统计项 1 全站 2 服务
      //1、全站
      totalCounter, err := public.FlowCounterHandler.GetCounter(public.FlowTotal)
      if err != nil {
         middleware.ResponseError(c, 4001, err)
         c.Abort()
         return
      }
      totalCounter.Increase()
      //_, _ = totalCounter.GetDayData(time.Now())
      //fmt.Printf("totalCounter qps:%v,dayCount:%v\n", totalCounter.QPS, dayCount)
      //2、服务
      serviceCounter, err := public.FlowCounterHandler.GetCounter(public.FlowServicePrefix + serviceDetail.Info.ServiceName)
      if err != nil {
         middleware.ResponseError(c, 4001, err)
         c.Abort()
         return
      }
      serviceCounter.Increase()
      //_, _ = serviceCounter.GetDayData(time.Now())
      //fmt.Printf("serviceCounter qps:%v,dayCount:%v", serviceCounter.QPS, dayServiceCount)
      c.Next()
   }
}

中间件代码解释 | 一次流量统计功能完成的流程:

本段代码中,由public.FlowCounterHandler.GetCounter方法获取服务的流量统计,返回目标服务的流量统计器。从目标服务的流量统计器中,即可获取到从Redis中读取到的请求次数

同时,调用totalCounter.Increase()方法,原子增加流量统计器中的TickerCount值,由0增到1,

image-20230110213245903

此时,在redis_flow_count.go的NewRedisFlowCountService方法的无限for循环中,就会在提取出tickerCount的值后,将其重新置为0。接着,将提取出的tickerCount通过INCRBY命令语句,写入Redis中,接着flush刷新,该数据便写入Redis中了。完成一次流量统计。

image-20230110213440432

补充:SETNX分布式锁

写入redis,试用SETNX分布式锁实现实现写入,可以避免读取和修改数据造成结果混乱

使用场景:

通常在分布式系统中,我们经常会从数据库中读取数据和修改数据,然而这不是一个原子操作,在并发时就会导致数据的不正确,例如一会下面的电商秒杀,库存数量的更新就会遇到。

redis是单线程的,单纯的使用同步锁只能保证单体系统下正常运行,但是在微服务架构下没法保证,所以要使用setnx分布式锁实现写入

基本语法:

Redis Setnx(SET if Not eXists) 命令在指定的 key 不存在时,为 key 设置指定的值。

redis> EXISTS job                # job 不存在
(integer) 0

redis> SETNX job "programmer"    # job 设置成功
(integer) 1

redis> SETNX job "code-farmer"   # 尝试覆盖 job ,失败
(integer) 0

redis> GET job                   # 没有被覆盖
"programmer"

说人话就是:使用setnx命令设置了一个key,之后再次设置覆盖就会报错,除非将这个key删除了,才能重新设置

这样的一个特性就可以用于加锁使得数据同步的功能上

  • 使用setnx命令设置key相当于加锁;
  • 执行业务代码
  • 删除key就相当于解锁

图片

但是单纯这么使用还有缺陷去,一旦中间的业务代码操作出现了异常,就会导致程序无法解锁,而其他请求也会一直无法拿到key,造成程序逻辑死锁

这时可以采取捕获异常的方式解决,保证即使上述逻辑出问题,也能del掉

问题:

  1. 在执行上锁后有一台服务器出现宕机、断电,导致异常无法抛出,key一直存在,仍会导致死锁

解决办法:

上锁的同时,利用原子性的操作设置key的时长,过期后就抛出异常

  1. 如果减库存的操作时间很长,超出的锁的过期时长,应该如何操作??????????

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/342837.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux 内核定时器实验

目录 一、内核时间管理简介 二、内核定时器简介 三、驱动编写 1、修改makefile 2、添加定义 3、初始化led函数 4、添加调用 5、初始化定时器与定时器处理函数 这部分代码如下 四、ioctl函数 五、内核添加unlocked_ioctl 函数 1、添加设备操作集unlocked_ioctl成员 2…

【Android 后台持续定位】

最近工作中遇到了一个涉及后台持续性定位的问题。这里做一下总结&#xff1a;随着Android版本的条件&#xff0c;Google对后台服务管控的也是越来越严格。 这时有的小伙伴会说了&#xff0c;开启一个service然后把服务和通知关联一下变成前台服务&#xff0c;不就行了&#xff…

steam/csgo搬砖,2023年最暴利的项目

这个项目赚钱主要来源于两个地方&#xff1a; 1.比如说今天美元的汇率是1美元6.8人民币&#xff0c;那我们有特定的渠道能拿到1美元5.0-5.5左右人民币的价格&#xff0c;100美元的汇率差利润就有180元左右的利润&#xff0c;当然这个价格是根据国际的汇率上下会有浮动的。 2.…

什么是极速文件传输,极速文件传输如何进行大文件传输

当谈到大文件传输时&#xff0c;人们总是担心大数据文件的大小以及将它们从一个位置交换到另一个位置需要多长时间。由于数据捕获高分辨率视频和图像的日益复杂&#xff0c;文件的大小不断增加。数据工作流在地理上变得越来越分散。在一个位置生成的文件在其他位置处理或使用。…

晚上适合做什么副业?靠自己的劳动赚钱最光荣

对于大多数普通人来说&#xff0c;晚上的空闲时间是很多的&#xff0c;但是却总是在手机上打游戏、刷视频而白白度过了一晚上。其实最近几年来很多朋友都想利用晚上的时间做一些副业&#xff0c;因为当下的工资已经满足不了自己的需求&#xff0c;再加上生活方方面面的压力&…

【swagger2】开发api文档

文章目录一、swagger2 简介背景Open API ???swagger2的作用swagger2常用工具组件&#xff1a;二、Springfox三、springBoot使用swagger2&#xff08;简单示例&#xff09;四、Swagger-UI使用五、配置文件1、配置类&#xff1a;给docket上下文配置api描述信息2、配置类&#…

净现值、投资回收期例题讲解

净现值概念净现值&#xff08;NPV&#xff09;&#xff1a;指今后某年的Y元相当于今年的X元。需要关注两个概念&#xff1a;利率&#xff1a;利率是指借款、存入或借入金额&#xff08;称为本金总额&#xff09;中每个期间到期的利息金额与票面价值的比率。贴现率&#xff08;D…

微软Bing的AI人工只能对话体验名额申请教程

微软Bing 免费体验名额申请教程流程ChatGPT这东西可太过火了。国外国内&#xff0c;圈里圈外都是人声鼎沸。微软&#xff0c;谷歌&#xff0c;百度这些大佬纷纷出手。连看个同花顺都有GPT概念了&#xff0c;搞技术&#xff0c;做生意的看来都盯上了 流程 下面就讲一下如何申…

Python3遍历文件夹提取关键字及其附近字符

要求&#xff1a; 1&#xff0c;遍历文件夹下所有的.xml文件 2&#xff0c;从.xml文件中提取关键字以及左右十个字符 3&#xff0c;输出到excel 一&#xff1a;遍历文件夹找到所有xml文件及其路径 for root, dirs, files in os.walk(self.inputFilePath):for file in files:…

靓号管理-搜索

搜索手机号&#xff1a; 最后一条就是使用的关键mobile__contains 使用字典&#xff1a; 后端的逻辑&#xff1a; """靓号列表"""data_dict {}search_data request.GET.get(q, "")# 根据关键字进行搜索&#xff0c;如果关键字存在&…

综合项目 旅游网 【5.旅游线路收藏功能】

分析判断当前登录用户是否收藏过该线路当页面加载完成后&#xff0c;发送ajax请求&#xff0c;获取用户是否收藏的标记根据标记&#xff0c;展示不同的按钮样式编写代码后台代码RouteServlet/*** 判断当前登录用户是否收藏过该路线*/ public void isFavorite(HttpServletReques…

.md文件上传视频的踩坑经历小记

分别用QQ录制了前后两个视频&#xff0c;并利用video标签引用。这两个视频&#xff0c;明明代码一样&#xff0c;偏偏就一个成功&#xff0c;一个失败。 代码如下&#xff1a; <!-- 能够成功显示mp4视频 --> <video src"/images/video/2020110411.mp4" co…

海卡和海派有什么区别

一、海卡和海派有什么区别 海派和海卡实际上就是快船和慢船的区别。都是头程选用海运的方式&#xff0c;海派是到海港海关清关拆柜后&#xff0c;尾程配送是采用快递配送。而海卡则是到海港海关清关拆柜后&#xff0c;尾程选用货车配送。1、海派比较适用于小件货物 海派是海运抵…

OPenCV库移植到ARM开发板子上面配置过程

步骤一 1&#xff0c;环境准备去下载opencv官方的源码。 我这里用的是opencv-4.5.5版本的 2&#xff0c;还需要交叉编译工具一般&#xff0c;你交叉编译的工具板子厂家会提供工具&#xff0c;最好还是用板子厂家提供的交叉编译工具&#xff0c;因为我之前编译试过其他的交叉…

第一章:unity性能优化之内存优化

目录 前言 unity性能优化之内存的优化 一、unity Analysis工具的使用。 二、内存优化方法 1、设置和压缩图片 2、图片格式 3、动画文件 4、模型 5、RenderTexture&#xff08;RT&#xff09; 6、分辨率 7、资源的重复利用 8、shader优化 9、对bundle进行良好的管…

数字三角形

题目描述上图给出了一个数字三角形。从三角形的顶部到底部有很多条不同的路径。对于每条路径&#xff0c;把路径上面的数加起来可以得到一个和&#xff0c;你的任务就是找到最大的和。路径上的每一步只能从一个数走到下一层和它最近的左边的那个数或者右 边的那个数。此外&…

RK3399平台开发系列讲解(文件系统篇)虚拟文件系统的数据结构

🚀返回专栏总目录 文章目录 一、超级块二、挂载描述符三、文件系统类型四、索引节点五、目录项沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将介绍虚拟文件系统的数据结构。 一、超级块 文件系统的第一块是超级块,用来描述文件系统的总体信息。当我们把文件系…

【论文速递】Arxiv2018 - 加州伯克利大学借助引导网络实现快速、准确的小样本分割

【论文速递】Arxiv2018 - 加州伯克利大学借助引导网络实现快速、准确的小样本分割 【论文原文】&#xff1a;Few-Shot Segmentation Propagation with Guided Networks 【作者信息】&#xff1a;Kate Rakelly∗ Evan Shelhamer∗ Trevor Darrell Alexei Efros Sergey Levine …

源码深度解析Spring Bean的加载

在应用spring 的过程中&#xff0c;就会涉及到bean的加载&#xff0c;bean的加载经历一个相当复杂的过程&#xff0c;bean的加载入口如下&#xff1a; 使用getBean&#xff08;&#xff09;方法进行加载Bean&#xff0c;最终调用的是AbstractBeanFactory.doGetBean() 进行Bean的…

Hudi-基本概念(时间轴、文件布局、索引、表类型、查询类型、数据写、数据读、Compaction)

文章目录基本概念时间轴(TimeLine)文件布局&#xff08;File Layout&#xff09;Hudi表的文件结构Hudi存储的两个部分Hudi的具体文件说明索引&#xff08;Index&#xff09;原理索引选项全局索引与非全局索引索引的选择策略对事实表的延迟更新对事件表的去重对维度表的随机更删…