0%

一个go协程池的实现

GO 协程池(任务池)的实现

实现主要来源于:

servicecomb-service-center

  1. 首先创建一个协程池的结构体,考虑我们所需要的东西
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 任务接口,需要有一个 Run 的方法
type Task interface{
Run()
}

// 检查接口是否实现
var _ Task =(*sumtask)(nil)

// 一个求和任务,实现了 Task 接口里面的方法
type sumtask struct {
a int
b int
}

func(t *sumtask) Run() {
fmt.Println(a + b)
}


type Pool struct {
// 我们给协程设置一个空闲时间,如果协程一直没有负载,直接退出就行
idletime time.Duration
// pending 用于给运行中的协程添加任务
pending chan Task
// worker 用于限制协程数量
workers chan struct{}
}
  1. 创建构造函数
1
2
3
4
5
6
7
func NewPool(sz int) *Pool {
return &Pool{
pending: make(chan Task),
workers: make(chan struct{}, sz),
idleTime: 3 * time.Second,
}
}
  1. 任务分发函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func (p *Pool) Do(task Task) {
select {
// pending 是将任务放入到已经启动的协程中
case p.pending <- task:
// worker 是将任务放入新开的协程中
case p.worker <- struct{}{}:
go p.loop(task)
}
}


func (p *Pool) loop(task Task) {
timer := time.NewTimer(p.idleTime)
for {
task.Run()
select{
case <- timer.C:
fmt.Println("goroutine idle,and return")
return
// 如果任务时从 pending 中取出,则直接运行任务
case task = <- p.pending:
timer.Stop()
timer.Reset(p.idleTime)
}
}
}

  1. 测试一下
1
2
3
4
5
6
7
8
9
10
11
12
13
func main() {
p := NewPool(10)
task = sumtask{
a : 1,
b : 2,
}

for i:=0; i< 20; i++ {
p.Do(&task)
}

time.Sleep(20 * time.Second)
}