channel
// cancelFn 数据通道关闭通知退出
func cancelFn(dataChan chan int) {
for {
select {
case val, ok := <-dataChan:
// 关闭data通道时,通知退出
// 一个可选是判断data=指定值时退出
if !ok {
fmt.Printf("Channel closed !!!")
return
}
fmt.Printf("Receive data from dataChan %d\n", val)
}
}
}
func main() {
channels := make([]chan int, 10)
for i := 0; i < 10; i++ {
channels[i] = make(chan int)
go cancelFn(channels[i])
channels[i] <- 1 // 向管道写数据
fmt.Println(i, "quit")
}
}
watitGroup
var wg sync.WaitGroup
func main() {
ch := make(chan int)
wg.Add(1) //设置计数器 表示goroutine个数加1
go func() {
v, ok := <-ch
if ok {
fmt.Println("value", v)
}
wg.Done() //执行结束之后 , goroutine个数减1
}()
wg.Add(1)
go func() {
ch <- 4
wg.Done()
}()
wg.Wait() //主goroutine阻塞,等待计数器变为0
}
WaitGroup原理
type WaitGroup struct {
statel [3]uint32
/*
长度为3的数组包含两个计数器和一个信号量
counter : 当前还未执行的结束的goroutine计数器
waiter count : 等待goroutine-group结束的goroutine数量
semaphore: 信号量
*/
}
WaitGroup对外提供了三个接口
- Add(delta int) : 将delta值加到counter中
- Wait(): waiter递增加1 , 并阻塞等待信号量semaphore
- Done(): counter递减1 , 按照waiter数值释放相应次数的信号量
Add(delta int)
Add() 做了两件事 , 一是把delta值累加到counter中,因为delta可以为负值.所以说当counter变为0时,根据waiter数值释放等量的信号量 , 把等待的goroutine全部唤醒,如果couner变为负值,则触发panic.
Wait()
Wait()方法一个是要累加waiter , 二是阻塞等待信号量.
Done()
Done 只做一件事,把counter减少1,其实Done里面调用的就是Add(-1)
context原理
Context实际上只定义了接口,凡是实现该接口的类都能称为Context.
type Context interface {
Deadline() (deadline time.Time , ok bool)
Done() <-chan struct{}
Err() error
value(key interface{}) interface{}
}
Deadline()
该方法返回一个deadline和标识是否已设置deadline的bool值,如果没有设置deadline , 则ok为false,此时deadline为一个初始值的time.Time值.
Done()
该方法返回一个用于探测context是否取消的channel,当context取消时,会自动将该channel关闭. 对于不支持取消的context(如:context.Backgroud) , 该方法可能会返回nil.
Err()
该方法描述context关闭的原因.关闭原因由context实现控制.
value()
有一种context,它不是用于控制呈树状分布的goroutine , 而是用于在树状分布的goroutine之间传递信息.Value()方法就是此种类型的context,根据key查询map集合中的value.
空context
context包中定义了一个公用的emptyCtx全局变量 , 名为backgroud,可以使用context.Backgroud()获取它.context包中提供了四个方法创建不同类型的context , 使用这四个方法如果没有父context,则都需要传入background , 即将background作为父节点:
- WithCancel();
- WithDeadline();
- WithTimeout();
- WithValue();
context包中实现Context接口的struct,除了emptyCtx , 还有cancelCtx , timerCtx 和 valueCtx三种.
cancelCtx
type cancelCtx struct {
Context
mu sync.Mutex
done chan struct{}
children map[canceler]struct{}
err error
}
children 中记录了由此context 派生的所有child , 此context被"cancel"时,会把其中所有的child都cancel掉.cancelCtx与deadline和value无关 , 所以只需要实现Done() 和 Err() 外露接口即可.
Cancel()接口的实现
cancel()内部方法时理解cancelCtx的关键cancelCtx.children的map中,其中key值即后代对象,value值并没有意义.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
c.mu.Lock()
c.err = err //设置一个error,说明关闭原因
close(c.done) //将channel关闭,以此通知派生的context
for child := range c.children { //遍历所有children,逐个调用cancel方法
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent { //正常情况下,需要将自己从parent删除
removeChild(c.Context, c)
}
}
WithCancel()方法的使用案例
func HandelRequest(ctx context.Context) {
go WriteRedis(ctx)
go WriteDatabase(ctx)
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running")
time.Sleep(2 * time.Second)
}
}
}
func WriteRedis(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteRedis Done.")
return
default:
fmt.Println("WriteRedis running")
time.Sleep(2 * time.Second)
}
}
}
func WriteDatabase(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteDatabase Done.")
return
default:
fmt.Println("WriteDatabase running")
time.Sleep(2 * time.Second)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
go HandelRequest(ctx)
time.Sleep(5 * time.Second)
fmt.Println("It's time to stop all sub goroutines!")
cancel()
//Just for test whether sub goroutines exit or not
time.Sleep(5 * time.Second)
}
HandelRequest()用于处理某个请求 , 其又会创建两个协程 , main协程可以在适当时机cancel掉所有自子协程
timeCtx
type timerCtx struct {
cancelCtx
timer *time.Timer
deadline time.Time
}
timerCtx 在cancelCtx的基础上,增加了deadline用于标示自动cancel的最终时间,而timer就是一个触发自动cancel的定时器.由此衍生出了WithDeadline()和WithTimeout().
- deadline:指定最后期限.
- timeout: 指定最长存活时间.
package main
import (
"fmt"
"time"
"context"
)
func HandelRequest(ctx context.Context) {
go WriteRedis(ctx)
go WriteDatabase(ctx)
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running")
time.Sleep(2 * time.Second)
}
}
}
func WriteRedis(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteRedis Done.")
return
default:
fmt.Println("WriteRedis running")
time.Sleep(2 * time.Second)
}
}
}
func WriteDatabase(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("WriteDatabase Done.")
return
default:
fmt.Println("WriteDatabase running")
time.Sleep(2 * time.Second)
}
}
}
func main() {
ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
go HandelRequest(ctx)
time.Sleep(10 * time.Second)
}
valueCtx
type valueCtx struct {
Context
key, val interface{}
}
valueCtx 只是在Context基础上增加了一个key-value对,用于在各级协程之间传递数据.因此只需要实现Value()接口.
func HandelRequest(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("HandelRequest Done.")
return
default:
fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter"))
time.Sleep(2 * time.Second)
}
}
}
func main() {
ctx := context.WithValue(context.Background(), "parameter", "1")
go HandelRequest(ctx)
time.Sleep(10 * time.Second)
}
子协程可以读到context的key-value