您现在的位置是:首页 > 文章详情

sync.Pool实现原理

日期:2019-08-10点击:455

sync.Pool实现原理

对象的创建和销毁会消耗一定的系统资源(内存,gc等),过多的创建销毁对象会带来内存不稳定与更长的gc停顿,因为go的gc不存在分代,因而更加不擅长处理这种问题。因而go早早就推出Pool包用于缓解这种情况。Pool用于核心的功能就是Put和Get。当我们需要一个对象的时候通过Get获取一个,创建的对象也可以Put放进池子里,通过这种方式可以反复利用现有对象,这样gc就不用高频的促发内存gc了。

结构

 type Pool struct { noCopy noCopy local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. New func() interface{} } 

创建时候指定New方法用于创建默认对象,local,localSize会在随后用到的时候生成. local是一个poolLocalInternal的切片指针。

 type poolLocalInternal struct { private interface{} // Can be used only by the respective P. shared []interface{} // Can be used by any P. Mutex // Protects shared. } 

当不同的p调用Pool时,每个p都会在local上分配这样一个poolLocal,索引值就是p的id。 private存放的对象只能由创建的p读写,shared则会在多个p之间共享。

PUT

 // Put adds x to the pool. func (p *Pool) Put(x interface{}) { if x == nil { return } if race.Enabled { if fastrand()%4 == 0 { // Randomly drop x on floor. return } race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } l := p.pin() if l.private == nil { l.private = x x = nil } runtime_procUnpin() if x != nil { l.Lock() l.shared = append(l.shared, x) l.Unlock() } if race.Enabled { race.Enable() } } 

Put先要通过pin函数获取当前Pool对应的pid位置上的localPool,然后检查private是否存在,存在则设置到private上,如果不存在就追加到shared尾部。

 func (p *Pool) pin() *poolLocal { pid := runtime_procPin() // In pinSlow we store to localSize and then to local, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { // 这句话的意思是如果当前pool的localPool切片尚未创建,尚未创建这句话肯定是false的 return indexLocal(l, pid) } return p.pinSlow() } 

pin函数先通过自旋加锁(可以避免p自身发生并发),在检查本地local切片的size,size大于当前pid则使用pid去本地local切片上索引到localpool对象,否则就要走pinSlow对象创建本地localPool切片了.

func (p *Pool) pinSlow() *poolLocal { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() allPoolsMu.Lock() defer allPoolsMu.Unlock() pid := runtime_procPin() // poolCleanup won't be called while we are pinned. s := p.localSize l := p.local if uintptr(pid) < s { return indexLocal(l, pid) } if p.local == nil { allPools = append(allPools, p) } // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one. size := runtime.GOMAXPROCS(0) local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release return &local[pid] } 

pinShow先要取消自旋锁,因为后面的lock内部也会尝试自旋锁,下面可能会操作allpool因而这里需要使用互斥锁allPoolsMu,然后又加上自旋锁,(这里注释说不会发生poolCleanup,但是查看代码gcstart只是查看了当前m的lock状态,然而避免不了其他m触发的gc,尚存疑),这里会再次尝试之前的操作,因为可能在unpin,pin之间有并发产生了poolocal,确认本地local切片是空的才会生成一个新的pool。后面是创建Pool上的localPool切片,runtime.GOMAXPROCS这里的作用是返回p的数量,用于确定pool的localpool的数量.

GET

 func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } l := p.pin() x := l.private l.private = nil runtime_procUnpin() if x == nil { l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] } l.Unlock() if x == nil { x = p.getSlow() } } if race.Enabled { race.Enable() if x != nil { race.Acquire(poolRaceAddr(x)) } } if x == nil && p.New != nil { x = p.New() } return x } 

GET 先调用pin获取本地local,这个具体流程和上面一样了,如果当前private存在返回private上面的对象,如果不存在就从shared查找,存在返回尾部对象,反之就要从其他的p的localPool里面偷了。

 func (p *Pool) getSlow() (x interface{}) { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire local := p.local // load-consume // Try to steal one element from other procs. pid := runtime_procPin() runtime_procUnpin() for i := 0; i < int(size); i++ { l := indexLocal(local, (pid+i+1)%int(size)) l.Lock() last := len(l.shared) - 1 if last >= 0 { x = l.shared[last] l.shared = l.shared[:last] l.Unlock() break } l.Unlock() } return x } 

首先就要获取当前size,用于轮询p的local,这里的查询顺序不是从0开始,而是是从当前p的位置往后查一圈。查到依次检查每个p的shared上是否存在对象,如果存在就获取末尾的值。 如果所有p的poollocal都是空的,那么初始化的New函数就起作用了,调用这个New函数创建一个新的对象出来。

清理

func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. // Defensively zero out everything, 2 reasons: // 1. To prevent false retention of whole Pools. // 2. If GC happens while a goroutine works with l.shared in Put/Get, // it will retain whole Pool. So next cycle memory consumption would be doubled. for i, p := range allPools { allPools[i] = nil for i := 0; i < int(p.localSize); i++ { l := indexLocal(p.local, i) l.private = nil for j := range l.shared { l.shared[j] = nil } l.shared = nil } p.local = nil p.localSize = 0 } allPools = []*Pool{} } 

pool对象的清理是在每次gc之前清理,通过runtime_registerPoolCleanup函数注册一个上面的poolCleanup对象,内部会把这个函数设置到clearpool函数上面,然后每次gc之前会调用clearPool来取消所有pool的引用,重置所有的Pool。代码很简单就是轮询一边设置nil,然后取消所有poollocal,pool引用。方法简单粗暴。由于clearPool是在STW中调用的,如果Pool存在大量对象会拉长STW的时间,在已经有提案来修复这个问题了(CL 166961.)[https://go-review.googlesource.com/c/go/+/166961/]

原文链接:https://my.oschina.net/hunjixin/blog/3086168
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章