go Pool
Pool用于展示如何使用有缓冲的通道实现资源池,来管理可以在任意数量的goroutine之间共享及独立使用的资源。这种模式在需要共享一组静态资源的情况(如共享数据库连接或者内存缓冲区)下非 常有用。如果goroutine需要从池里得到这些资源中的一个,它可以从池里申请,使用完后归还到资源池里。
按照上面的要求,整个资源池的要求和框架图梳理如下:
- 资源池能够根据工厂函数创建多个,也就是资源池要是个通用的不能只针对一种资源的资源池
- 资源池要有上限
- 资源池需要保证多线程安全
- 资源池在回收资源时,如果资源已经超过资源池上限不能放到资源池里面,需要直接释放Close掉
提到池子,channel天然就是一个池子,而且channel是多线程安全的,所以可以直接使用管道来安全的共享资源。文章按照如下四个方面对Pool资源池的设计过程以及使用进行剖析。
数据结构
既然确定了使用管道进行资源的共享,那么Pool里面就不能少了管道。
resources chan io.Closer
这里我们定义一个类型为io.Closer的chan,因为管道中的资源都需要再不使用时清理资源,因此这里所有传入到管道的资源都需要实现io.Closer接口。
向管道中放入资源和从管道中取出资源是多线程安全的,但是打开管道和关闭管道本身又是线程不安全的,因此这里需要对管道的创建和关闭进行保护, 这里采用sync.Mutex互斥锁进行保护。
mutex sync.Mutex
除了管道,Pool还需要一个资源创建函数,用于在资源池里创建资源。
factory func() (io.Closer, error)
factory用于创建资源,返回一个实现了io.Closer的对象,表示创建的资源。
最后,还需要一个原子变量用来表示Pool的状态。
closed bool
atomic包提供了一些原子操作,包括对bool变量的设置。
错误处理
当向一个已经关闭的Pool请求资源时,需要返回一个错误提醒用户该Pool已经关闭。
var ErrPoolClosed = errors.New("Pool has been closed.")
方法实现
创建Pool
调用New方法创建Pool,需要提供一个资源创建函数,以及资源池中初始化的资源数量。
// New 创建一个新的池子,这个池子会调用factory创建资源,并可设置资源上限
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size value too small")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
获取资源
既然是资源池,那么就要提供获取资源的方法,这里采用Acquire作为获取资源的方法。资源获取时有两种情况:
- 如果有空闲资源,就直接返回该资源。
- 如果没有空闲资源,就调用factory创建一个新资源。
// Acquire 从池子中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
// 检查是否有空闲资源
case r, ok := <-p.resources:
if !ok {
return nil, ErrPoolClosed
}
return r, nil
// 因为没有空闲资源,所以创建一个新资源
default:
if p.factory == nil {
return nil, ErrPoolClosed
}
return p.factory()
}
}
这里用来select的一个技巧将阻塞的资源变成非阻塞的分支。当向一个空管道中获取资源时,会阻塞住,但是通过select可以在通道阻塞的情况下执行default分支,从而将阻塞的操作转换为非阻塞。
释放资源
当释放资源时也存在两种情况:
- 如果资源池已经关闭,就直接关闭该资源。
- 如果资源池没有满,就直接将资源放回资源池。
// Release 将一个使用后的资源放回池子里面
func (p *Pool) Release(r io.Closer) {
// Secure this operation with the Close operation.
p.m.Lock()
defer p.m.Unlock()
// If the pool is closed, discard the resource.
if p.closed {
err := r.Close()
if err != nil {
return
}
return
}
// select能将所有阻塞的资源通过默认分支变成非阻塞的分支
select {
// 试图将资源放回资源池
case p.resources <- r:
log.Println("Release:", "In Queue")
// 如果资源池已经满了,就关闭这个资源
default:
log.Println("Release:", "Closing")
err := r.Close()
if err != nil {
return
}
}
}
关闭资源池Pool
当资源池使用完毕时需要及时关闭资源池,并释放掉所有的资源。
// Close 关闭资源池
func (p *Pool) Close() {
// 保证多线程安全
p.m.Lock()
defer p.m.Unlock()
// 如果资源池已经关闭,就不用再关闭了
if p.closed {
return
}
// 关闭资源池并释放所有资源
close(p.resources)
p.closed = true
// 通道即使关闭了,也能通过range将通道中的资源全部接收一遍
for r := range p.resources {
err := r.Close()
if err != nil {
return
}
}
}
按照上述步骤将资源池的视线示例汇总如下
// Package pool 管理用户资源的池子
package pool
import (
"errors"
"io"
"log"
"sync"
)
// 说明go 1.6之后,自带sync.Pool用来管理资源
// Pool 管理用户资源的池子,可以在多个goroutine之间共享资源
type Pool struct {
m sync.Mutex
resources chan io.Closer
factory func() (io.Closer, error)
closed bool
}
// ErrPoolClosed 表示(Acquire)了一个已经关闭的池子
var ErrPoolClosed = errors.New("pool has been closed")
// New 创建一个新的池子,这个池子会调用factory创建资源,并可设置资源上限
func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New("size value too small")
}
return &Pool{
factory: fn,
resources: make(chan io.Closer, size),
}, nil
}
// Acquire 从池子中获取一个资源
func (p *Pool) Acquire() (io.Closer, error) {
select {
// 检查是否有空闲资源
case r, ok := <-p.resources:
if !ok {
return nil, ErrPoolClosed
}
return r, nil
// 因为没有空闲资源,所以创建一个新资源
default:
if p.factory == nil {
return nil, ErrPoolClosed
}
return p.factory()
}
}
// Release 将一个使用后的资源放回池子里面
func (p *Pool) Release(r io.Closer) {
// Secure this operation with the Close operation.
p.m.Lock()
defer p.m.Unlock()
// If the pool is closed, discard the resource.
if p.closed {
err := r.Close()
if err != nil {
return
}
return
}
// select能将所有阻塞的资源通过默认分支变成非阻塞的分支
select {
// 试图将资源放回资源池
case p.resources <- r:
log.Println("Release:", "In Queue")
// 如果资源池已经满了,就关闭这个资源
default:
log.Println("Release:", "Closing")
err := r.Close()
if err != nil {
return
}
}
}
// Close 关闭资源池
func (p *Pool) Close() {
// 保证多线程安全
p.m.Lock()
defer p.m.Unlock()
// 如果资源池已经关闭,就不用再关闭了
if p.closed {
return
}
// 关闭资源池并释放所有资源
close(p.resources)
p.closed = true
// 通道即使关闭了,也能通过range将通道中的资源全部接收一遍
for r := range p.resources {
err := r.Close()
if err != nil {
return
}
}
}
调用示例
// 这个示例展示了Pool的使用,其实新版本的go里面已经内置了资源池,这里只是为了演示如何使用自己实现的Pool
package main
import (
"io"
"log"
"math/rand"
"sync"
"sync/atomic"
"time"
"patterns/pool"
)
const (
maxGoroutines = 25 // 协程数量.
pooledResources = 2 // 资源池的大小
)
// 数据库连接池资源信息.
type dbConnection struct {
ID int32
}
// Close 实现 io.Closer 接口
func (dbConn *dbConnection) Close() error {
log.Println("Close: Connection", dbConn.ID)
return nil
}
// idCounter 为每个连接生成一个唯一的ID
var idCounter int32
// createConnection 连接创建工厂函数
func createConnection() (io.Closer, error) {
id := atomic.AddInt32(&idCounter, 1)
log.Println("Create: New Connection", id)
return &dbConnection{id}, nil
}
func main() {
// 协程同步
var wg sync.WaitGroup
wg.Add(maxGoroutines)
// 创建资源池.
p, err := pool.New(createConnection, pooledResources)
if err != nil {
log.Println(err)
}
// 资源池性能模拟测试.
for query := 0; query < maxGoroutines; query++ {
// 这里将query作为参数传入,是为了避免闭包时引用导致所有协程都拿到的是同一个query的值
go func(q int) {
performQueries(q, p)
wg.Done()
}(query)
}
// 等待所有协程结束
wg.Wait()
// 关闭资源池
log.Println("Shutdown Program.")
p.Close()
}
// performQueries 测试资源池的查询表现
func performQueries(query int, p *pool.Pool) {
// 获取一个连接池连接
conn, err := p.Acquire()
if err != nil {
log.Println(err)
return
}
// 将连接信息放回连接池
defer p.Release(conn)
// 使用延迟模拟查询事件
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
log.Printf("Query: QID[%d] CID[%d]\n", query, conn.(*dbConnection).ID)
}
附录
- 数据来源-《go语言实战》
- 代码仓库:gitee pool
如果感觉文章对你有用欢迎点赞,评论和关注,谢谢!