go高性能单机缓存项目

news2024/12/17 6:10:23

代码

// Copyright 2021 ByteDance Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package asynccache

import (
	"fmt"
	"log"
	"sync"
	"sync/atomic"
	"time"

	sf "golang.org/x/sync/singleflight"
)

// Options controls the behavior of AsyncCache.
type Options struct {
	RefreshDuration time.Duration
	Fetcher         func(key string) (interface{}, error)

	// If EnableExpire is true, ExpireDuration MUST be set.
	EnableExpire   bool
	ExpireDuration time.Duration

	ErrorHandler  func(key string, err error)
	ChangeHandler func(key string, oldData, newData interface{})
	DeleteHandler func(key string, oldData interface{})

	IsSame     func(key string, oldData, newData interface{}) bool
	ErrLogFunc func(str string)
}

// AsyncCache .
type AsyncCache interface {
	// SetDefault sets the default value of given key if it is new to the cache.
	// It is useful for cache warming up.
	// Param val should not be nil.
	SetDefault(key string, val interface{}) (exist bool)

	// Get tries to fetch a value corresponding to the given key from the cache.
	// If error occurs during the first time fetching, it will be cached until the
	// sequential fetching triggered by the refresh goroutine succeed.
	Get(key string) (val interface{}, err error)

	// GetOrSet tries to fetch a value corresponding to the given key from the cache.
	// If the key is not yet cached or error occurs, the default value will be set.
	GetOrSet(key string, defaultVal interface{}) (val interface{})

	// Dump dumps all cache entries.
	// This will not cause expire to refresh.
	Dump() map[string]interface{}

	// DeleteIf deletes cached entries that match the `shouldDelete` predicate.
	DeleteIf(shouldDelete func(key string) bool)

	// Close closes the async cache.
	// This should be called when the cache is no longer needed, or may lead to resource leak.
	Close()
}

// asyncCache .
type asyncCache struct {
	sfg  sf.Group
	opt  Options
	data sync.Map
}

type tickerType int

const (
	refreshTicker tickerType = iota
	expireTicker
)

type sharedTicker struct {
	sync.Mutex
	started  bool
	stopChan chan bool
	ticker   *time.Ticker
	caches   map[*asyncCache]struct{}
}

var (
	// 共用 ticker
	refreshTickerMap, expireTickerMap sync.Map
)

type entry struct {
	val    atomic.Value
	expire int32 // 0 means useful, 1 will expire
	err    Error
}

func (e *entry) Store(x interface{}, err error) {
	if x != nil {
		e.val.Store(x)
	} else {
		e.val = atomic.Value{}
	}
	e.err.Store(err)
}

func (e *entry) Touch() {
	atomic.StoreInt32(&e.expire, 0)
}

// NewAsyncCache creates an AsyncCache.
func NewAsyncCache(opt Options) AsyncCache {
	c := &asyncCache{
		sfg: sf.Group{},
		opt: opt,
	}
	if c.opt.ErrLogFunc == nil {
		c.opt.ErrLogFunc = func(str string) {
			log.Println(str)
		}
	}
	if c.opt.EnableExpire {
		if c.opt.ExpireDuration == 0 {
			panic("asynccache: invalid ExpireDuration")
		}
		ti, _ := expireTickerMap.LoadOrStore(c.opt.ExpireDuration,
			&sharedTicker{caches: make(map[*asyncCache]struct{}), stopChan: make(chan bool, 1)})
		et := ti.(*sharedTicker)
		et.Lock()
		et.caches[c] = struct{}{}
		if !et.started {
			et.started = true
			et.ticker = time.NewTicker(c.opt.ExpireDuration)
			go et.tick(et.ticker, expireTicker)
		}
		et.Unlock()
	}

	ti, _ := refreshTickerMap.LoadOrStore(c.opt.RefreshDuration,
		&sharedTicker{caches: make(map[*asyncCache]struct{}), stopChan: make(chan bool, 1)})
	rt := ti.(*sharedTicker)
	rt.Lock()
	rt.caches[c] = struct{}{}
	if !rt.started {
		rt.started = true
		rt.ticker = time.NewTicker(c.opt.RefreshDuration)
		go rt.tick(rt.ticker, refreshTicker)
	}
	rt.Unlock()
	return c
}

// SetDefault sets the default value of given key if it is new to the cache.
func (c *asyncCache) SetDefault(key string, val interface{}) bool {
	ety := &entry{}
	ety.Store(val, nil)
	actual, exist := c.data.LoadOrStore(key, ety)
	if exist {
		actual.(*entry).Touch()
	}
	return exist
}

// Get tries to fetch a value corresponding to the given key from the cache.
// If error occurs during in the first time fetching, it will be cached until the
// sequential fetchings triggered by the refresh goroutine succeed.
func (c *asyncCache) Get(key string) (val interface{}, err error) {
	var ok bool
	val, ok = c.data.Load(key)
	if ok {
		e := val.(*entry)
		e.Touch()
		return e.val.Load(), e.err.Load()
	}

	val, err, _ = c.sfg.Do(key, func() (v interface{}, e error) {
		v, e = c.opt.Fetcher(key)
		ety := &entry{}
		ety.Store(v, e)
		c.data.Store(key, ety)
		return
	})
	return
}

// GetOrSet tries to fetch a value corresponding to the given key from the cache.
// If the key is not yet cached or fetching failed, the default value will be set.
func (c *asyncCache) GetOrSet(key string, def interface{}) (val interface{}) {
	if v, ok := c.data.Load(key); ok {
		e := v.(*entry)
		if e.err.Load() != nil {
			ety := &entry{}
			ety.Store(def, nil)
			c.data.Store(key, ety)
			return def
		}
		e.Touch()
		return e.val.Load()
	}

	val, _, _ = c.sfg.Do(key, func() (interface{}, error) {
		v, e := c.opt.Fetcher(key)
		if e != nil {
			v = def
		}
		ety := &entry{}
		ety.Store(v, nil)
		c.data.Store(key, ety)
		return v, nil
	})
	return
}

// Dump dumps all cached entries.
func (c *asyncCache) Dump() map[string]interface{} {
	data := make(map[string]interface{})
	c.data.Range(func(key, val interface{}) bool {
		k, ok := key.(string)
		if !ok {
			c.opt.ErrLogFunc(fmt.Sprintf("invalid key: %v, type: %T is not string", k, k))
			c.data.Delete(key)
			return true
		}
		data[k] = val.(*entry).val.Load()
		return true
	})
	return data
}

// DeleteIf deletes cached entries that match the `shouldDelete` predicate.
func (c *asyncCache) DeleteIf(shouldDelete func(key string) bool) {
	c.data.Range(func(key, value interface{}) bool {
		s := key.(string)
		if shouldDelete(s) {
			if c.opt.DeleteHandler != nil {
				go c.opt.DeleteHandler(s, value)
			}
			c.data.Delete(key)
		}
		return true
	})
}

// Close stops the background goroutine.
func (c *asyncCache) Close() {
	// close refresh ticker
	ti, _ := refreshTickerMap.Load(c.opt.RefreshDuration)
	rt := ti.(*sharedTicker)
	rt.Lock()
	delete(rt.caches, c)
	if len(rt.caches) == 0 {
		rt.stopChan <- true
		rt.started = false
	}
	rt.Unlock()

	if c.opt.EnableExpire {
		// close expire ticker
		ti, _ := expireTickerMap.Load(c.opt.ExpireDuration)
		et := ti.(*sharedTicker)
		et.Lock()
		delete(et.caches, c)
		if len(et.caches) == 0 {
			et.stopChan <- true
			et.started = false
		}
		et.Unlock()
	}
}

// tick .
// pass ticker but not use t.ticker directly is to ignore race.
func (t *sharedTicker) tick(ticker *time.Ticker, tt tickerType) {
	var wg sync.WaitGroup
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			t.Lock()
			for c := range t.caches {
				wg.Add(1)
				go func(c *asyncCache) {
					defer wg.Done()
					if tt == expireTicker {
						c.expire()
					} else {
						c.refresh()
					}
				}(c)
			}
			wg.Wait()
			t.Unlock()
		case stop := <-t.stopChan:
			if stop {
				return
			}
		}
	}
}

func (c *asyncCache) expire() {
	c.data.Range(func(key, value interface{}) bool {
		k, ok := key.(string)
		if !ok {
			c.opt.ErrLogFunc(fmt.Sprintf("invalid key: %v, type: %T is not string", k, k))
			c.data.Delete(key)
			return true
		}
		e, ok := value.(*entry)
		if !ok {
			c.opt.ErrLogFunc(fmt.Sprintf("invalid key: %v, type: %T is not entry", k, value))
			c.data.Delete(key)
			return true
		}
		if !atomic.CompareAndSwapInt32(&e.expire, 0, 1) {
			if c.opt.DeleteHandler != nil {
				go c.opt.DeleteHandler(k, value)
			}
			c.data.Delete(key)
		}
		return true
	})
}

func (c *asyncCache) refresh() {
	c.data.Range(func(key, value interface{}) bool {
		k, ok := key.(string)
		if !ok {
			c.opt.ErrLogFunc(fmt.Sprintf("invalid key: %v, type: %T is not string", k, k))
			c.data.Delete(key)
			return true
		}
		e, ok := value.(*entry)
		if !ok {
			c.opt.ErrLogFunc(fmt.Sprintf("invalid key: %v, type: %T is not entry", k, value))
			c.data.Delete(key)
			return true
		}

		newVal, err := c.opt.Fetcher(k)
		if err != nil {
			if c.opt.ErrorHandler != nil {
				go c.opt.ErrorHandler(k, err)
			}
			if e.err.Load() != nil {
				e.err.Store(err)
			}
			return true
		}

		if c.opt.IsSame != nil && !c.opt.IsSame(k, e.val.Load(), newVal) {
			if c.opt.ChangeHandler != nil {
				go c.opt.ChangeHandler(k, e.val.Load(), newVal)
			}
		}

		e.Store(newVal, err)
		return true
	})
}

流程图

设置过期定时器
设置刷新定时器
有错误
无错误
开始
NewAsyncCache 初始化 AsyncCache 实例
设置过期定时器
设置刷新定时器
返回 AsyncCache 实例
SetDefault 设置默认值
为给定的键值对中的值设定默认值
如果在缓存池中就刷新键值对过期时间
Get 获取缓存
GetOrSet 获取或设置缓存
Dump 转储缓存
DeleteIf 删除缓存
关闭 AsyncCache
缓存存在?
加载缓存值
触发 Fetcher 获取值
singleflight 处理请求
存储或更新缓存
缓存存在?
检查缓存错误
设置默认值
加载缓存值
设置默认值
Range 遍历缓存
将缓存存到另一个图中
Range 遍历缓存
满足删除条件?
删除缓存项
EnableExpire?
停止过期定时器
停止刷新定时器
资源清理

其中的refreshTickerMap, expireTickerMap存放的是每个特定的刷新时间/过期时间对应的sharedTicker
每个sharedTicker负责多个相同刷新时间/过期时间的缓存池的更新/过期操作

测试代码

package main

import (
	"asynccache/asynccache"
	"fmt"
	"log"
	"time"
)

// 模拟一个简单的数据获取函数
func simpleFetcher(key string) (interface{}, error) {
	log.Printf("Fetching data for key: %s\n", key)
	time.Sleep(500 * time.Millisecond) // 模拟数据获取的延迟
	return fmt.Sprintf("value_for_%s", key), nil
}

// 打印缓存中所有的数据观察
func showAllCacheData(cache asynccache.AsyncCache) {
	cacheData := cache.Dump() // 导出cache数据
	// cacheData map[string]interface{} 类型为interface{},代表任意类型
	for k, v := range cacheData {
		// %s代表匹配字符串,%+v代表构造任意类型
		log.Printf("Fetching data for key: %s, value: %+v", k, v)
	}
}

func main() {
	// 创建一个 AsyncCache 实例
	cache := asynccache.NewAsyncCache(asynccache.Options{
		RefreshDuration: 2 * time.Second, // 每2秒刷新一次
		Fetcher:         simpleFetcher,
		EnableExpire:    true,
		ExpireDuration:  5 * time.Second, // 每5秒过期一次
		ErrorHandler: func(key string, err error) {
			log.Printf("Error fetching key %s: %v\n", key, err)
		},
		ChangeHandler: func(key string, oldData, newData interface{}) {
			log.Printf("Key %s changed from %v to %v\n", key, oldData, newData)
		},
		DeleteHandler: func(key string, oldData interface{}) {
			log.Printf("Key %s expired with value %v\n", key, oldData)
		},
	})

	// 设置默认值
	cache.SetDefault("key1", "default_value_for_key1")

	// 观察缓存数据
	showAllCacheData(cache)

	// 获取值
	val, err := cache.Get("key1")
	if err != nil {
		log.Printf("Error getting key1: %v\n", err)
	} else {
		log.Printf("Got value for key1: %v\n", val)
	}

	// 使用 GetOrSet
	val = cache.GetOrSet("key2", "default_value_for_key2")
	log.Printf("Got value for key2: %v\n", val)

	// 等待刷新和过期
	time.Sleep(6 * time.Second)

	// 再次获取值
	val, err = cache.Get("key1")
	if err != nil {
		log.Printf("Error getting key1 after refresh: %v\n", err)
	} else {
		log.Printf("Got value for key1 after refresh: %v\n", val)
	}

	// 删除特定的缓存项
	cache.DeleteIf(func(key string) bool {
		return key == "key2"
	})

	// 关闭缓存
	cache.Close()

	// 尝试获取值,应该会失败
	val, err = cache.Get("key1")
	if err != nil {
		log.Printf("Error getting key1 after close: %v\n", err)
	} else {
		log.Printf("Got value for key1 after close: %v\n", val)
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2260897.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3576 Android14,内存大于4G时UVC应用无法申请内存

最近有个项目需要将Linux虚拟成UVC摄像头&#xff0c;开发过程中遇到一个奇怪的事情&#xff0c;通过V4l2框架接口申请内存时&#xff0c;相同的板子&#xff0c;只是内存一个4G一个8G。4G的内存可以申请成功&#xff0c;8G就不行。提示“内存不足” 内存更大反而内存不足&…

TimesFM(Time Series Foundation Model)时间序列预测股市价格的数据研究(4)

TimesFM&#xff08;Time Series Foundation Model&#xff09;时间序列预测的数据研究(3)-CSDN博客文章浏览阅读846次&#xff0c;点赞19次&#xff0c;收藏12次。1. **表示预测区间**&#xff1a;在很多预测任务中&#xff0c;模型给出的不只是一个单一的预测值&#xff08;比…

opencv所有常见函数

一、opencv图像操作 二、opencv图像的数值运算 三、opencv图像的放射变换 四、opencv空间域图像滤波 五、图像灰度化与直方图 六、形态学图像处理 七、阈值处理与边缘检测 八、轮廓和模式匹配

常见漏洞—SSRF_FastCGI

FastCGI协议 简介 Fast CGI源自旧版本的CGI 路由/结构图 # 访问url --> 浏览器生成HTTP请求报文 --> web server解析请求&#xff08;例如nginx&#xff09; web server 是内容的分发者 当访问静态页面时&#xff0c;web server 会直接返回资源&#xff0c;例如index.htm…

【游戏设计原理】10 - 科斯特的游戏理论

科斯特的游戏理论强调了游戏与学习之间的关系&#xff0c;认为“玩得开心”与“学习”是紧密相连的。换句话说&#xff0c;游戏的核心魅力在于通过适当的挑战和不断的学习进程激发玩家的内啡肽循环&#xff0c;这让玩家在不断的探索和进步中找到乐趣。 科斯特的理论通过游戏是…

ES-IndexTemplate和DynamicTemplate

IndexTemplate 什么是IndexTemplate 索引模板&#xff0c;帮助你设定Mappings和Settings&#xff0c;并按照一定的规则&#xff0c;自动匹配到新创建的索引之上 模板仅在一个索引被新建的时候&#xff0c;才会产生应用&#xff0c;索引被修改不会影响已创建的索引可以设定多…

【大语言模型】ACL2024论文-27 Mementos:一个全面的多模态大型语言模型在图像序列推理上的基准测试

【大语言模型】ACL2024论文-27 Mementos&#xff1a;一个全面的多模态大型语言模型在图像序列推理上的基准测试 目录 文章目录 【大语言模型】ACL2024论文-27 Mementos&#xff1a;一个全面的多模态大型语言模型在图像序列推理上的基准测试目录文章摘要研究背景问题与挑战如何…

CSS基础与应用详解

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Css篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来Css篇专栏内容:CSS基础与应用详解 前言 CSS&#xff08;层叠样式表&#xff09;是网页设计中不可或缺的一部分&am…

C/S软件授权注册系统(Winform+WebApi+.NET8+EFCore版)

适用软件&#xff1a;C/S系统、Winform桌面应用软件。 运行平台&#xff1a;Windows .NETCore&#xff0c;.NET8 开发工具&#xff1a;Visual Studio 2022&#xff0c;C#语言 数据库&#xff1a;Microsoft SQLServer 2012&#xff0c;Oracle 21c&#xff0c;MySQL8&#xf…

国标GB28181网页直播平台EasyGBS国标EasyGBD对讲音频demo

近年来&#xff0c;随着信息技术的飞速发展&#xff0c;视频监控领域正经历从传统安防向智能化、网络化安防的深刻转变。在此过程中&#xff0c;GB28181标准凭借其强大的功能和灵活性&#xff0c;成为了推动视频监控系统互联互通和高效管理的重要一环。通过支持GB28181协议&…

session 共享服务器

1.安装 kryo-3.0.3.jar asm-5.2.jar objenesis-2.6.jar reflectasm-1.11.9.jar minlog-1.3.1.jar kryo-serializers-0.45.jar msm-kryo-serializer-2.3.2.jar memcached-session-manager-tc9-2.3.2.jar spymemcached-2.12.3.jar memcached-session-manager-2.3.2.jar …

【蓝桥杯国赛真题15】python质因数个数 蓝桥杯青少年组python编程国赛真题详细解析

目录 python质因数个数 一、题目要求 1、编程实现 2、输入输出 二、算法分析 三、程序编写 四、程序说明 五、运行结果 六、考点分析 七、 推荐资料 1、蓝桥杯比赛 2、考级资料 3、其它资料 python质因数个数 第十二届蓝桥杯青少年组python比赛国赛真题详细解析 …

智能硬件「百团大战」:AI驱动的周期来了吗?

要想在竞争激烈的市场中打造出真正的AI硬件“爆款”&#xff0c;并非简单地在现有硬件上堆砌AI功能就能实现&#xff0c;而是需要深刻理解AI的本质&#xff0c;用AI技术从底层逻辑出发&#xff0c;彻底重塑硬件产品的设计、功能与用户体验。 作者|斗斗 编辑|皮爷 出品|产…

Linux核心概念与常用命令

文章目录 一、Linux概述1、常见的操作系统2、Linux发展史3、Linux目录结构 二、文件和目录操作1、pwd - 显示当前目录2、cd - 切换目录3、ls - 列出目录内容4、mkdir - 创建目录5、touch - 创建空文件6、cp - 复制文件或目录7、mv - 移动或重命名文件8、rm - 删除文件或目录9、…

uniappp配置导航栏自定义按钮(解决首次加载图标失败问题)

1.引入iconfont的图标&#xff0c;只保留这两个文件 2.App.vue引入到全局中 import "./static/fonts/iconfont.css"3.pages.json中配置text为图标对应的unicode {"path": "pages/invite/invite","style": {"h5": {"…

vue组件开发:构建响应式快捷导航

前言 快捷导航不仅能够显著提升系统的灵活性和用户交互性&#xff0c;还极大地增强了用户的操作体验。本文将展示如何在 vue 中实现一个既可自定义又具备响应式特性的快捷导航菜单。 一、实现思路 列表页 结构设计 定义页面结构&#xff0c;包含一个导航卡片和一个对话框组件&a…

基于 Spring Boot 实现图片的服务器本地存储及前端回显

??导读&#xff1a;本文探讨了在网站开发中图片存储的各种方法&#xff0c;包括本地文件系统存储、对象存储服务&#xff08;如阿里云OSS&#xff09;、数据库存储、分布式文件系统及内容分发网络&#xff08;CDN&#xff09;。文中详细对比了这些方法的优缺点&#xff0c;并…

深入了解IPv6——光猫相关设定:DNS来源、DHCPv6服务、前缀来源等

光猫IPv6设置后的效果对比图&#xff1a; 修改前&#xff1a; 修改后&#xff1a; 一、DNS来源 1. 网络连接 来源&#xff1a; 从上游网络&#xff08;如运营商&#xff09;获取 IPv6 DNS 信息&#xff0c;通过 PPPoE 或 DHCPv6 下发。 特点&#xff1a; DNS 服务器地址直…

欧科云链研究院:AI时代,如何证明“我是我”?

OKG Research&#xff5c;编辑 近日&#xff0c;OpenAI 发布了新模型 Sora。这是一款高性能的文本到多模态生成工具&#xff0c;支持从文本生成精细的图像和动态视频。 相较早先发布的视频样例&#xff0c;该功能目前已经可以由用户真实上手体验&#xff0c;目前由于服务过载…

Cesium进阶教程——自定义图形、外观、绘图基础、现有着色器移植至Cesium、ShadowMapping、视频GIS、模型压平、卷帘

基础必看 WEBGL基础&#xff08;从渲染管线角度解读&#xff09; 参考路线 http://www.xt3d.online/tutorial/further/article.html 自定义图形 https://blog.csdn.net/m0_55049655/article/details/138908327 https://blog.csdn.net/m0_55049655/article/details/140306837 …