临时对象池是啥?

查看sync.Pool中的注释来了解pool的基本用法。主要有以下几点:

  1. pool是一个可以存储和取出临时对象的临时对象池。
  2. 池中的对象会在没有通知的情况下自动删除,如果一个对象池持有某个对象的唯一引用,那么该对象很有可能被回收。
  3. pool是多goroutine并发安全的。
  4. pool的目的是缓存已经分配但之后才回用到的对象,来减轻GC的压力。使用pool可以高效的构建线程安全的free list,但是其并不适用于所有场景的free list
  5. 一个适当的适用场景是:管理一个包内的多个独立运行的线程之间静默共享一组临时元素。pool提供了在多个client之间共享临时元素的机制。
  6. 在fmt包中有一个适用Pool的例子,其维持了一个动态大小的临时输出buffer。
  7. Pool不适合存储在短生命周期的对象,这种场景下的缓存效果不大。
  8. 一旦一个pool使用之后,便不能再被复制。

pool的结构体定义如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
	localSize uintptr        // size of the local array

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() interface{}
}

使用pool对象的时候,需要传入New方法,来创建自己需要的对象(Get方法拿不到对象的时候,会调用New方法)。

Pool 中有两个定义的公共方法,分别是 Put - 向池中添加元素;Get - 从池中获取元素,如果没有,则调用 New 生成元素,如果 New 未设置,则返回 nil。

使用的基本原理

Get

Pool会为每个P维护一个本地池,P的本地池分为私有对象private 和 共享池 shared。私有对象的元数据只能本地P使用,共享池中的shared中的元素可能会被其他P偷走

1
2
3
4
5
6
7
8
// Local per-P Pool appendix.
type poolLocalInternal struct {
  //P私有元素,获取private不需要加锁
	private interface{}   // Can be used only by the respective P.
  //各个P共享元素池,获取shared需要加锁
	shared  []interface{} // Can be used by any P.
	Mutex                 // Protects shared.
}

Get会优先查找本地private,再查找本地shared,最后再去其他P的shared去查找。如果都查不到则会调用New函数来获取新元素。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37


func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
	l := p.pin()
  //先取private
	x := l.private
	l.private = nil
	runtime_procUnpin()
	if x == nil {
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
      //加锁取shared
			x = l.shared[last]
			l.shared = l.shared[:last]
		}
		l.Unlock()
		if x == nil {
      //去取其他P的对象池数据
			x = p.getSlow()
		}
	}
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
  //都不存在则创建新对象,此时pool持有的对象未必为空,其他p可能是有私有数据的
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

getSlow

getSlow 会从其他的P中的shared获取可用元素:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (p *Pool) getSlow() (x interface{}) {
	// See the comment in pin regarding ordering of the loads.
	size := atomic.LoadUintptr(&p.localSize) // load-acquire
	local := p.local                         // load-consume
	// Try to steal one element from other procs.
	pid := runtime_procPin()
	runtime_procUnpin()
	for i := 0; i < int(size); i++ {
		l := indexLocal(local, (pid+i+1)%int(size))
    //对应的pool需要加上锁
		l.Lock()
		last := len(l.shared) - 1
		if last >= 0 {
			x = l.shared[last]
			l.shared = l.shared[:last]
			l.Unlock()
			break
		}
		l.Unlock()
	}
	return x
}

Put

Put优先把数据放到private对象中,如果private对象不为空,则放入shared池中。但是在放入池中时,会有1/4的几率把池数据丢掉。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
	if race.Enabled {
    //随机丢掉数据
		if fastrand()%4 == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
	l := p.pin()
  //优先放入private对象中
	if l.private == nil {
		l.private = x
		x = nil
	}
	runtime_procUnpin()
	if x != nil {
		l.Lock()
    //其次放入shared池中
		l.shared = append(l.shared, x)
		l.Unlock()
	}
	if race.Enabled {
		race.Enable()
	}
}

poolCleanup

在STW阶段,垃圾回收时,poolCleanup就会被调用。如果此时pool.shared被一个goroutine占用,那么整个Pool都会被保留下来,下次清理时内存占用差不多会翻一倍。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
func poolCleanup() {
	// This function is called with the world stopped, at the beginning of a garbage collection.
	// It must not allocate and probably should not call any runtime functions.
	// Defensively zero out everything, 2 reasons:
	// 1. To prevent false retention of whole Pools.
	// 2. If GC happens while a goroutine works with l.shared in Put/Get,
	//    it will retain whole Pool. So next cycle memory consumption would be doubled.
	for i, p := range allPools {
		allPools[i] = nil
		for i := 0; i < int(p.localSize); i++ {
			l := indexLocal(p.local, i)
			l.private = nil
			for j := range l.shared {
				l.shared[j] = nil
			}
			l.shared = nil
		}
		p.local = nil
		p.localSize = 0
	}
	allPools = []*Pool{}
}

使用示例

fmt中的printer pool

printer与其的临时对象池,newPrinter从对象池中获取printer,free将对象放入对象池中,这样就避免了每次都重新生成打印对象,节省了对象生成时间。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// pp is used to store a printer's state and is reused with sync.Pool to avoid allocations.
type pp struct {
	buf buffer

	// arg holds the current item, as an interface{}.
	arg interface{}

	// value is used instead of arg for reflect values.
	value reflect.Value

	// fmt is used to format basic items such as integers or strings.
	fmt fmt

	// reordered records whether the format string used argument reordering.
	reordered bool
	// goodArgNum records whether the most recent reordering directive was valid.
	goodArgNum bool
	// panicking is set by catchPanic to avoid infinite panic, recover, panic, ... recursion.
	panicking bool
	// erroring is set when printing an error string to guard against calling handleMethods.
	erroring bool
}

var ppFree = sync.Pool{
	New: func() interface{} { return new(pp) },
}

// newPrinter allocates a new pp struct or grabs a cached one.
func newPrinter() *pp {
	p := ppFree.Get().(*pp)
	p.panicking = false
	p.erroring = false
	p.fmt.init(&p.buf)
	return p
}

// free saves used pp structs in ppFree; avoids an allocation per invocation.
func (p *pp) free() {
	// Proper usage of a sync.Pool requires each entry to have approximately
	// the same memory cost. To obtain this property when the stored type
	// contains a variably-sized buffer, we add a hard limit on the maximum buffer
	// to place back in the pool.
	//
	// See https://golang.org/issue/23199
	if cap(p.buf) > 64<<10 {
		return
	}

	p.buf = p.buf[:0]
	p.arg = nil
	p.value = reflect.Value{}
	ppFree.Put(p)
}

源码解析