使用Channel的一些业务场景
首先需要明确的就是,发送方才知道什么时候关闭 channel
,这个是比较符合逻辑的。
我们需要知道哪些情况会使 channel
发生 panic
- 关闭一个 nil 值会引发
- 关闭一个已经关闭的 channel 会引发
- 向一个已经关闭的 channel 发送数据会引发
常见应用场景
- 信号通知
- 超时控制
- 生产消费模型
- 数据传递
- 控制并发数
- 互斥锁
- one million……
信号通知
如果只是单纯的使用通知操作,那么类型就使用 struct{}
。因为空结构体在 go
中是不占用内存空间的
package main
import (
"fmt"
"time"
)
func main() {
isOver := make(chan struct{})
go func() {
collectMsg(isOver)
}()
<-isOver
calculateMsg()
}
// 采集
func collectMsg(isOver chan struct{}) {
time.Sleep(500 * time.Millisecond)
fmt.Println("完成采集工具")
isOver <- struct{}{}
}
// 计算
func calculateMsg() {
fmt.Println("开始进行数据分析")
}
超时控制
<-time.After(1 * time.Second)
是过了指定时间后返回一个 channel
,select case
是哪一个 channel
先有反应就先处理,所以可以做到一个超时控制的作用
func main() {
select {
case <-doWork():
fmt.Println("任务结束")
case <-time.After(1 * time.Second):
fmt.Println("任务处理超时")
}
}
func doWork() <-chan struct{} {
ch := make(chan struct{})
go func() {
// 任务处理耗时
time.Sleep(2 * time.Second)
close(ch)
}()
return ch
}
消费者模型
这个就不多说了,一个生产,一个消费
数据传递
type token struct{}
func main() {
num := 4
var chs []chan token
// 4 个work
for i := 0; i < num; i++ {
chs = append(chs, make(chan token))
}
for j := 0; j < num; j++ {
go worker(j, chs[j], chs[(j+1)%num])
}
// 先把令牌交给第一个
chs[0] <- struct{}{}
select {}
}
func worker(id int, ch chan token, next chan token) {
for {
// 对应work 取得令牌
token := <-ch
fmt.Println(id + 1)
time.Sleep(1 * time.Second)
// 传递给下一个
next <- token
}
}
控制并发数
func main() {
limit := make(chan struct{}, 10)
jobCount := 100
for i := 0; i < jobCount; i++ {
go func(index int) {
limit <- struct{}{}
job(index)
<-limit
}(i)
}
time.Sleep(20 * time.Second)
}
func job(index int) {
// 耗时任务
time.Sleep(1 * time.Second)
fmt.Printf("任务:%d已完成n", index)
}
互斥锁
我们也可以通过 channel
实现一个小小的互斥锁。通过设置一个缓冲区为1的通道,如果成功地往通道发送数据,说明拿到锁,否则锁被别人拿了,等待他人解锁。
type ticket struct{}
type Mutex struct {
ch chan ticket
}
// 创建一个缓冲区为1的通道作
func newMutex() *Mutex {
return &Mutex{ch: make(chan ticket, 1)}
}
// 谁能往缓冲区为1的通道放入数据,谁就获取了锁
func (m *Mutex) Lock() {
m.ch <- struct{}{}
}
// 解锁就把数据取出
func (m *Mutex) unLock() {
select {
case <-m.ch:
default:
panic("已经解锁了")
}
}
func main() {
mutex := newMutex()
go func() {
// 如果是1先拿到锁,那么2就要等1秒才能拿到锁
mutex.Lock()
fmt.Println("任务1拿到锁了")
time.Sleep(1 * time.Second)
mutex.unLock()
}()
go func() {
mutex.Lock()
// 如果是2拿先到锁,那么1就要等2秒才能拿到锁
fmt.Println("任务2拿到锁了")
time.Sleep(2 * time.Second)
mutex.unLock()
}()
time.Sleep(500 * time.Millisecond)
// 用了一点小手段这里最后才能拿到锁
mutex.Lock()
mutex.unLock()
close(mutex.ch)
}