go源码解读-sync.pool

news2024/10/5 17:27:58

go version 1.19.7
sync.pool 是go 内置的对象池技术, 管理临时对象,这些对象可以单独保存和检索, 减少GC次数
特点:1、 池不可以指定大小
2、 Get 没有的话会新生成一个对象
3、对象的周期取决于GC的周期
从go doc可以看到sync.pool 主要暴露Get 和 Put 两个方法, 以及一个New。
使用:用New初始化pool一个实例,获取的调用Get , 释放资源的时候调用Put

C:\Users\Josslynn>go doc sync.pool
package sync // import "sync"

type Pool struct {

        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        New func() any
        // Has unexported fields.
}
 ...... (描述忽略了)
func (p *Pool) Get() any
func (p *Pool) Put(x any)

使用实例:gin 中使用

// 初始化的时候, 调用New ,来初始化pool 实例, 这边的engine.allocateContext 是新建返回一个*Context 对象,
	engine.pool.New = func() any {
		return engine.allocateContext()
	}
// put get 的时候
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 	// 从pool 中拿出一个context
	c := engine.pool.Get().(*Context)
	c.writermem.reset(w)
	//把http 的req 放到这个context 中 
	c.Request = req
	// 但是为了防止这个context 是没有被GC 过的, 所以需要reset(除request的内容, 其他内容清空
	c.reset()
	// context 处理过了, 可以交给下一步处理了()
	engine.handleHTTPRequest(c)
	// 当handle 处理完了, context 这个临时对象 就需要释放,
	// 如果不调用put, GC 也会去释放这个对象,但是Get 的时候就一直是一个新的对象, 就没有使用pool的意义的了
	engine.pool.Put(c)
}

结构

type Pool struct {
	noCopy noCopy

	// 本地的 Processor(p) 的指针,实际指向 []poolLocal, 
	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	// []poolLocal 的大小
	localSize uintptr        // size of the local array

	// GC 时,victim 和 victimSize 会分别接管 local 和 localSize;
   	// victim 的目的是为了减少 GC 后冷启动导致的性能抖动,让分配对象更平滑;
	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	// 暴露外部使用
	New func() any
}
type poolLocal struct {
	poolLocalInternal
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
type poolLocalInternal struct {
	private any       // Can be used only by the respective P.
	shared  poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolChain struct {

	head *poolChainElt
	tail *poolChainElt
}
type poolChainElt struct {
	poolDequeue
    next, prev *poolChainElt

在这里插入图片描述

GET 流程

上图绿色标签 就是要获取的对象, 由上图的结构所示, 我们可以从这几个地方获取到元素
1、 new ,( 这个应该放在最后, pool 没有获取到对象的时候, 用new)
2、从当前p 中的private 中获取
3、从当前的 p 的share 中获取
4、从其他p 的share 中获取, (为啥不能从其他p 的private 中获取, private 私有的, 只能自己调用)
5、在gc 的时候, victim 接管了 local , 所以还可以从victim 中的cache中获取
6、从其他p 的cache 获取
在这里插入图片描述
有了流程图, 可以初步知道整个get的流程, 然后看代码

func (p *Pool) Get() any {
	// 这边不深入了, 只要注意, poo 不能复制, 不然会有问题
	if race.Enabled {
		race.Disable()
	}
	// p.pin 会返回一个poolLocal和pid , ( 是GMP模型中 每个goroutine 绑定一个 p 才能执行), 这边的p.pin 就是返回当前goroutine 绑定的p, 
	// pin() 中对当前p 加了锁
	l, pid := p.pin()
	// 从private 中获取元素, 是不需要加锁的, 
	// 把p中的private 拿到之后清空该字段,我们要GET一个对象, 这个对象现在在goroutine中并且是私有的
	x := l.private
	l.private = nil
	// 如果x 为空, 就是可能其他goroutine 已经把这个值取走了, 同一个对象x, p队里下的 g1 、g2 都来获取, g1获取到x, 并把这个值置为空, 那g2 来取的时候,就没有了
	if x == nil {
		// private 中没有对象,查看本地p.share 中是否有
		// 当前p 调自己的shared 用popHead 从头部获取
		x, _ = l.shared.popHead()
		if x == nil {
		// 本地p.share  尝试从 其他p 的share, 或者尝试从 victim cache 里取元素
			x = p.getSlow(pid)
		}
	}
	// G-M 锁定解除;上锁的逻辑在 pin() 中
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
	//pool 池中没有 , 找不到复用的对象了, 就NEW 一个返回去
	if x == nil && p.New != nil {
		x = p.New()
	}
	//返回对象
	return x
} 

pprivate 中没有, p.share 中也没有, 去其他p 以及victim 中看
补充:

所谓受害者缓存(Victim Cache),是一个与直接匹配或低相联缓存并用的、容量很小的全相联缓存。当一个数据块被逐出缓存时,并不直接丢弃,而是暂先进入受害者缓存。如果受害者缓存已满,就替换掉其中一项。当进行缓存标签匹配时,在与索引指向标签匹配的同时,并行查看受害者缓存,如果在受害者缓存发现匹配,就将其此数据块与缓存中的不匹配数据块做交换,同时返回给处理器。 – 维基百科

pool 清空机制 看最后的poolCleanup

func (p *Pool) getSlow(pid int) any {
	// 首先重新获取pool的localSize大小并且是原子操作
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	// 遍历size 的大小, 找到别的p
	for i := 0; i < int(size); i++ {
		// 尝试从其他p 关联的 的poollocal 中取一个
		// popTail  是双向链表的尾部开始偷一个p
		l := indexLocal(locals, (pid+i+1)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 从其他p 中, 没有获取到p , 尝试从victim cache 中获取
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		return nil
	}
	// gc 中的p.victim private 有这个值, 就返回
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	// 从其他p 的victim cache 中获取 
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}
	atomic.StoreUintptr(&p.victimSize, 0)
	return nil
}
//首次 Pool 需要把自己注册进 allPools 数组;
// Pool.local 数组按照 runtime.GOMAXPROCS(0) 的大小进行分配,如果是默认的,那么这个就是 P 的个数,也就是 CPU 的个数;
func (p *Pool) pinSlow() (*poolLocal, int) {
	// G-M 先解锁
	runtime_procUnpin()
	// 以下逻辑在全局锁 allPoolsMu 内 , 所有的对象池都在这个里面
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	// 重新获取, 因为刚刚解锁了, p 很有可能被抢占了 , 获取P 的 id
	pid := runtime_procPin()
	// poolCleanup won't be called while we are pinned.
	s := p.localSize
	l := p.local
	// 判断一下, 
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	// 如果指针是空, 说名第一次get , 
	if p.local == nil {
	// 首次,Pool 需要把自己注册进 allPools 数组
		allPools = append(allPools, p)
	}
	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
	size := runtime.GOMAXPROCS(0)
	// local 数组的大小就等于 runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	// 把local数组 的首地址, 放入 p 的local 中
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	// 返回pid 的poolLocal 
	return &local[pid], pid
}


func (c *poolChain) popTail() (any, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		return nil, false
	}

	for {
	    // 从双端队列尾部出队
		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			return nil, false
		}

		// 从链表中删除当前节点
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}
func (d *poolDequeue) popTail() (any, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
			// Queue is empty.
			return nil, false
		}
		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// Success.
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}

	// We now own slot.
	val := *(*any)(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)

	return val, true
}


PUT 流程

根据结构图中绿色的部分, 现在需要把对象放入池中, 可以这样放
1、优先放在p.private
2、private 已经有值了, 那就放在p.share 中
3、 放东西就没必要放其他p 了,

// Put adds x to the pool.
func (p *Pool) Put(x any) {
	if x == nil {
		return
	}
	if race.Enabled {
		if fastrandn(4) == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	// 初始化, 禁止p 被抢占, 并获取对应的poollocal 的内容
	l, _ := p.pin()
	// 尝试放到 p.private 的位置
	if l.private == nil {
		l.private = x
	} else {
		// private 放不了, 放p.share 去
		l.shared.pushHead(x)
	}
	// 解锁, p处理完了, 可以自由活动了
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

func (c *poolChain) pushHead(val any) {
	d := c.head
	if d == nil {
		// Initialize the chain.
		const initSize = 8 // Must be a power of 2
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		// 当head 节点是空的时候, 创建一个新的节点, 这边主要要用原子操作, poptail 的时候回收缩链表, 清除节点
		storePoolChainElt(&c.tail, d)
	}
	// 然后往head 中push 对象
	if d.pushHead(val) {
		return
	}

	// 如果没有成功, 该节点的ring 满了, 继续新增节点, ring 是之前的2倍, 且有最大长度限制
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// Can't make it any bigger.
		newSize = dequeueLimit
	}

	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	c.head = d2
	storePoolChainElt(&d.next, d2)
	d2.pushHead(val)
}
func (d *poolDequeue) pushHead(val any) bool {
	// 先获取 headtail 然后解析出head 和tail
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)
	// 看一下 是否满了, 满了返回false
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
		// Queue is full.
		return false
	}
	
	slot := &d.vals[head&uint32(len(d.vals)-1)]
	typ := atomic.LoadPointer(&slot.typ)
	// popTail 会把value 置为nil, 所以这边需要判断一下, popTail 和 pushHead 有竞争关系 popHead 和pushHead 则没有竞争关系
	// popTail 和 pushHead 有竞争关系 pushHead 是对当前p的localcache 操作, 而 popTail 会抢其他p的localcache 操作
	// 而 popHead 和PushHead 都是当前p 的处理, 同一个p只有有一个goroutine 在执行, 所以popHead 和PushHead不可能同时发生
	if typ != nil {
		// 不为空, 对象不能放这里, 返回false
		return false
	}

	// 为空了, 把对象放进去
	if val == nil {
		val = dequeueNil(nil)
	}
	*(*any)(unsafe.Pointer(slot)) = val
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}


func (p *Pool) pin() (*poolLocal, int) {
	// 加锁, 当前p 禁止抢占
	pid := runtime_procPin()
	// In pinSlow we store to local and then to localSize, here we load in opposite order.
	// Since we've disabled preemption, GC cannot happen in between.
	// Thus here we must observe local at least as large localSize.
	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
	return p.pinSlow()
}

GC

var (
   allPoolsMu Mutex  // 全局锁
   allPools []*Pool  // 存放程序运行期间所有的 Pool 实例地址
   oldPools []*Pool  // 配合 victim 机制用的
)

// 在 GC 开始的时候,gcStart() 函数中会调用 clearpools() -> poolCleanup() 函数。也就是说,每一轮 GC 都是对所有的 Pool 做一次清理
func poolCleanup() {
	// 将旧pool 中的 victim 清空, 
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// 把现有的local cache置为victim cache
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}
	// 清理掉 allPools 和 oldPools 中所有的 Pool 实例
	oldPools, allPools = allPools, nil
}

两轮回收机制 , victim 将删除缓存的动作由一次操作变成了两次操作, 每次清理的是上上次的缓存内容。本次的先放在victim cache 中, 这样Get 的时候,其他获取不到, 可以从这边再次获取到。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/464322.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

13、MDK分散加载方式管理多块内存

MDK分散加载: 默认情况下是通过MDK的option选项设置Flash和RAM大小&#xff0c;这种情况下所有的管理工作都是编译来处理的&#xff0c; MDK自动生成的分散加载文件&#xff1a;H7_ProjectTest.sct ; ************************************************************* ; *…

Java_异常

Java_异常 1.什么是异常 ​ 生活中的异常&#xff1a;感冒发烧、电脑蓝屏、手机死机等。 ​ 程序中的异常&#xff1a;磁盘空间不足、网络连接中断、被加载的资源不存在等。 ​ 程序异常解决办法&#xff1a;针对程序中非正常情况&#xff0c;Java语言引入了异常&#xff0…

【C++】类和对象(1)

文章目录 前言浅浅了解一、面向过程和面向对象二、 类和对象的关系三、创建类和对象 逐步深入一、类的访问限定符二、 封装三、类的作用域四、类对象模型五、this指针 前言 浅浅了解 一、面向过程和面向对象 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解…

智能汽车开启中央计算革命,全场景智能“车芯”强势崛起

伴随着汽车跨域融合时代的到来&#xff0c;智能汽车芯片正处于快速迭代期&#xff0c;同时牌桌上的玩家也在加速挪换位置。 一方面&#xff0c;包括丰田、大众集团等在内的全球汽车制造商正在进入芯片平台的切换周期&#xff0c;加速推动汽车芯片市场格局的改变。 另一方面&a…

Ubuntu22.04部署eurekaserver集群

Ubuntu22.04部署eurekaserver集群 为了更好的浏览体验&#xff0c;欢迎光顾勤奋的凯尔森同学个人博客http://www.huerpu.cc:7000 每次都启动eureka的项目&#xff0c;太繁琐了&#xff0c;我们把eureka部署到Ubuntu&#xff0c;就可以愉快的玩耍了。 1 配置文件设置 准备了…

设计模式 -- 观察者模式

前言 月是一轮明镜,晶莹剔透,代表着一张白纸(啥也不懂) 央是一片海洋,海乃百川,代表着一块海绵(吸纳万物) 泽是一柄利剑,千锤百炼,代表着千百锤炼(输入输出) 月央泽,学习的一种过程,从白纸->吸收各种知识->不断输入输出变成自己的内容 希望大家一起坚持这个过程,也同…

淘宝天猫数据分析:2023年健康养生三大品类数据分析

随着人们健康意识的不断增强&#xff0c;越来越多的年轻人都开始加入养生大军的队伍中&#xff0c;我国的健康养生产业也迎来了发展机遇。 在天猫平台上&#xff0c;养生茶、养生壶和滋补养生原料是养生市场的几大重点类目&#xff0c;接下来&#xff0c;结合鲸参谋电商数据分析…

Docker 相关概念

1、Docker是什么&#xff1f; 如何确保应用能够在这些环境中运行和通过质量检测&#xff1f;并且在部署过程中不出现令人头疼的版本、配置问题&#xff0c;也无需重新编写代码和进行故障修复&#xff1f; 答案就是使用容器。Docker之所以发展如此迅速&#xff0c;也是因为它对…

电脑硬盘分区合并怎么操作?分享2个方法!

案例&#xff1a;电脑硬盘怎么分区&#xff1f; 【我把我的电脑硬盘分成了多个区域&#xff0c;这样可以方便存储和管理数据。现在我需要调整分区&#xff0c;对分区进行合并&#xff0c;但我不知道该如何操作&#xff0c;有没有小伙伴知道&#xff1f;】 在使用电脑的过程中…

4核8G云服务器4c8g或4h8g指的是什么?

4核8G云服务器什么意思&#xff1f;4c8g或4h8g代表CPU内存配置&#xff0c;4c8g是指4核CPU、8G内存&#xff0c;准确来讲由于是云服务器&#xff0c;4核指的是4核vCPU&#xff0c;4核8G就是指云服务器CPU内存配置。云服务器不只是CPU内存&#xff0c;还有公网带宽和系统盘&…

经典 Learned Index 结构设计及其应用

引言 学习索引是一种新型的索引结构&#xff0c;可以帮助数据库更快地查找数据。学习索引的诞生可以追溯到 2017 年&#xff0c;由 Google Brain 团队的 Kraska 等人在论文[1]中首次提出,探讨了使用神经网络替代传统数据结构&#xff08;如 B-Tree&#xff09;来构建索引的可行…

appuploader 常规使用登录方法

转载&#xff1a;登录appuploader 目录 登录appuploader 常规使用登录方法 双击appuploader.exe 启动appuploader 点击底部的未登录&#xff0c;弹出登录框 在登录框内输入apple开发者账号 如果没有apple开发者账号&#xff0c;只是普通的apple账号&#xff0c;请勾选上未…

题目 2056: 汉诺塔 ==理解递归

题目 2056: 汉诺塔 https://www.dotcpp.com/oj/problem2056.html 做题情况 参考代码&#xff1a; //package Dotcpp;import java.util.Scanner;public class Main {public static void main(String[] args) {Scanner sc new Scanner(System.in);int n sc.nextInt();// prin…

3.30 haas506 2.0开发教程-example - SD卡存储数据读写

SD卡存储数据读写 案例说明数据的写入与读取串口工具读取数据接收数据CSV格式 案例说明 部分设备使用过程中需要保存大量数据到TF卡中&#xff0c;大部分场景拔插TF卡有不太方便。 所以本案例介绍一种使用串口工具取出设备TF卡中的数据保存在电脑中的方法。 保存格式可以自己定…

详细安装使用教程】店侦探 - 跟踪店铺数据,学习运营技巧,引流关键词,电商人必备工具

简介 店侦探插件是一款电商网络浏览插件&#xff0c;能够帮助店主更好地运营自己的网店&#xff0c;这款插件功能十分全面强大&#xff0c;可以全面跟踪店铺的销量情况、引流关键词、直通车、营销活动、宝贝变更跟踪&#xff01;感兴趣的朋友快来体验吧&#xff0c;跟踪店铺数…

常见的用户密码加密及破解方法

用户密码安全是互联网行业需要保障的重要安全之一&#xff0c;由于黑客的入侵和内部的泄露&#xff0c;保证用户密码安全并不是件容易的事情&#xff0c;但如果采用合适的算法加密用户密码&#xff0c;即使信息泄露出去&#xff0c;黑客也无法还原出原始的密码(或者还原的代价非…

vue3 封装ECharts组件

一、前言 前端开发需要经常使用ECharts图表渲染数据信息&#xff0c;在一个项目中我们经常需要使用多个图表&#xff0c;选择封装ECharts组件复用的方式可以减少代码量&#xff0c;增加开发效率。 ECharts图表大家应该用的都比较多&#xff0c;基础的用法就不细说了&#xff…

如何成为企业急需的技术人才:掌握这些技能,提升你的实力和竞争力

在当前竞争激烈的互联网环境中&#xff0c;作为程序员等技术岗&#xff0c;必须不断的学习&#xff0c;才能不断提升自身实力&#xff0c;锻炼自身技能。想要成为一名企业急需的技术人才&#xff0c;需要学习哪些技能呢&#xff1f; 一、IT技术发展背景及历程 IT技术是当今社…

如何借助分布式存储 JuiceFS 加速 AI 模型训练

传统的机器学习模型&#xff0c;数据集比较小&#xff0c;模型的算法也比较简单&#xff0c;使用单机存储&#xff0c;或者本地硬盘就足够了&#xff0c;像 JuiceFS 这样的分布式存储并不是必需品。 随着近几年深度学习的蓬勃发展&#xff0c;越来越多的团队开始遇到了单机存储…

【22-23 春学期】人工智能基础--AI作业6-误差反向传播

老师发布作业链接&#xff1a;(429条消息) 【22-23 春学期】AI作业6-误差反向传播_HBU_David的博客-CSDN博客 目录 老师发布作业链接&#xff1a;(429条消息) 【22-23 春学期】AI作业6-误差反向传播_HBU_David的博客-CSDN博客 1.梯度下降 2.反向传播 3.计算图 4.使用Numpy…