在Golang中实现协程池(Goroutine Pool)可以提高并发任务的执行效率,特别是在需要限制并发数量或管理资源的情况下。协程池允许你控制同时运行的协程数量,从而避免创建过多的协程导致系统资源耗尽。
以下是一个简单的协程池实现示例:
-
定义协程池结构体:
协程池结构体需要包含任务队列、工作协程数量、等待组等。 -
实现任务提交和协程管理:
使用通道(channel)来管理任务队列,并使用等待组(sync.WaitGroup)来等待所有任务完成。
package main
import (
"fmt"
"sync"
"time"
)
// Task represents a unit of work that the goroutine pool will execute.
type Task func()
// GoroutinePool represents a pool of goroutines that can execute tasks.
type GoroutinePool struct {
tasks chan Task
workerPool chan struct{}
wg sync.WaitGroup
maxWorkers int
}
// NewGoroutinePool creates a new goroutine pool with a specified maximum number of workers.
func NewGoroutinePool(maxWorkers int) *GoroutinePool {
pool := &GoroutinePool{
tasks: make(chan Task),
workerPool: make(chan struct{}, maxWorkers),
maxWorkers: maxWorkers,
}
// Start worker goroutines
for i := 0; i < maxWorkers; i++ {
pool.workerPool <- struct{}{}
go pool.worker()
}
return pool
}
// worker is the goroutine that executes tasks from the tasks channel.
func (p *GoroutinePool) worker() {
for task := range p.tasks {
task()
<-p.workerPool // Signal that a worker is available again
}
}
// Submit adds a task to the goroutine pool.
func (p *GoroutinePool) Submit(task Task) {
p.wg.Add(1)
go func() {
defer p.wg.Done()
<-p.workerPool // Wait for a worker to be available
p.tasks <- task
}()
}
// Wait waits for all submitted tasks to complete.
func (p *GoroutinePool) Wait() {
p.wg.Wait()
close(p.tasks) // Close the tasks channel to signal workers to exit
for range p.workerPool { // Drain the workerPool to ensure all workers have exited
}
}
func main() {
// Create a goroutine pool with 3 workers
pool := NewGoroutinePool(3)
// Submit some tasks to the pool
for i := 0; i < 10; i++ {
taskNum := i
pool.Submit(func() {
fmt.Printf("Executing task %d\n", taskNum)
time.Sleep(time.Second) // Simulate work
})
}
// Wait for all tasks to complete
pool.Wait()
fmt.Println("All tasks completed")
}
解释
- 结构体定义:
Task
:表示一个任务,是一个无参数的函数。GoroutinePool
:包含任务通道tasks
、工作协程控制通道workerPool
、等待组wg
和最大工作协程数量maxWorkers
。
- 创建协程池:
NewGoroutinePool
函数初始化协程池,并启动指定数量的工作协程。
- 工作协程:
worker
方法从tasks
通道中接收任务并执行,执行完成后将工作协程标记为可用(通过向workerPool
发送一个空结构体)。
- 提交任务:
Submit
方法将任务添加到任务通道,并等待一个工作协程变为可用。
- 等待任务完成:
Wait
方法等待所有任务完成,并关闭任务通道,确保所有工作协程退出。
使用示例
在 main
函数中,我们创建了一个包含3个工作协程的协程池,并提交了10个任务。每个任务打印一个消息并模拟1秒的工作。最后,我们等待所有任务完成并打印完成消息。
这个简单的协程池实现可以根据需要进行扩展,例如添加错误处理、任务超时、动态调整工作协程数量等功能。