目录
- 一、并发模型
- 二、MPG并发模型
- 三、Goroutine的使用
- 1 - 协程使用
- 2 - panic与defer
- 四、channel的同步与异步‘’
- 1 - 同步与异步channel
- 2 - 关闭channel
- 五、并发安全性
- 1 - 资源竞争
- 2 - 原子操作
- 3 - 读写锁
- 4 - 容器的并发安全
- 六、多路复用
- 1 - 阻塞I/O
- 2 - 非阻塞I/O
- 3 - 多路复用I/O
一、并发模型
- 进程与线程:无论开辟多少个线程,并发是由内核数量决定的
- 任何语言的并行,到操作系统层面,都是内核线程的并行
- 同一个进程内的多个线程共享系统资源,进程的创建、销毁、切换比线程大很多
- 从进程到线程再到协程, 其实是一个不断共享, 不断减少切换成本的过程
- python:内核线程与进程一一对应
- C++、Java:内核线程与线程一一对应
- golang:动态变换,没有对应关系
- 协程与线程
协程 | 线程 | |
---|---|---|
创建数量 | 轻松创建上百万个协程而不会导致系统资源衰竭 | 通常最多不能超过1万个 |
内存占用 | 初始分配4k堆栈,随着程序的执行自动增长删除 | 创建线程时必须指定堆栈且是固定的,通常以M为单位 |
切换成本 | 协程切换只需保存三个寄存器,耗时约200纳秒 | 线程切换需要保存几十个寄存器,耗时约1000纳秒 |
调度方式 | 非抢占式,由Go runtime主动交出控制权(对于开发者而言是抢占式) | 在时间片用完后,由 CPU 中断任务强行将其调度走,这时必须保存很多信息 |
创建销毁 | goroutine因为是由Go runtime负责管理的,创建和销毁的消耗非常小,是用户级的 | 创建和销毁开销巨大,因为要和操作系统打交道,是内核级的,通常解决的办法就是线程池 |
- 查看逻辑核心数
func main() {
fmt.Println(runtime.NumCPU()) //16
}
二、MPG并发模型
- MPG概念
- M(Machine)对应一个内核线程
- P(Processor)虚拟处理器,代表M所需的上下文环境,是处理用户级代码逻辑的处理器
- P的数量由环境变量中的GOMAXPROCS决定,默认情况下就是核数
- G(Goroutine)本质上是轻量级的线程,G0正在执行,其他G在等待
- MPG模型
- M和内核线程的对应关系是确定的
- G0阻塞(如系统调用)时,P与G0、M0解绑,P被挂到其他M上,然后继续执行G队列
- G0解除阻塞后,如果有空闲的P,就绑定M0并执行G0;否则G0进入全局可运行队列(runqueue),P会周期性扫描全局runqueue,使上面的G得到执行;如果全局runqueue为空,就从其他P的等待队列里偷一半G过来
三、Goroutine的使用
1 - 协程使用
- 两种启动协程的常见方式:
- 以下2份代码都无法打印出Add,因为main函数已经结束了
- 如果想要打印出Add,可以添加time.Sleep来让协程运行完,才结束主进程
func Add(a, b int) int {
fmt.Println("Add")
return a + b
}
func main() {
go Add(2, 4)
}
func main() {
go func(a, b int) int {
fmt.Println("Add")
return a + b
}(2, 4)
}
- 优雅地等子协程结束:虽然上述情况我们使用time.Sleep来让协程运行完,才结束主进程,但是实际情况中我们并不知道协程需要多久才能执行完,此时就可以使用sync.WaitGroup来优雅的等待子协程结束
var wg = sync.WaitGroup{}
func Add() {
defer wg.Done() //减1
time.Sleep(100 * time.Millisecond)
fmt.Println("Add")
}
func main() {
wg.Add(2) //加2
go Add()
go Add()
wg.Wait() //等待为0
}
- 协程之间互不影响:除了main协程退出后,所有协程都会退出;其他的协程之间互不影响
var wg = sync.WaitGroup{}
func Add() {
defer wg.Done() //减1
go Sub()
fmt.Println("over Add")
}
func Sub() {
time.Sleep(2000 * time.Millisecond)
fmt.Println("over Sub")
}
func main() {
wg.Add(2) //加2
go Add()
go Add()
wg.Wait() //等待为0
time.Sleep(5000 * time.Millisecond) //等待Sub协程
// over Add
// over Add
// over Sub
// over Sub
}
- ele使用的是全局的ele
- fmt.Printf使用的是IO操作
- for range中使用的协程是无延迟的
- 也就是说当第一个协程执行fmt.Printf的时候,for循环可能已经结束了,此时的ele已经变成了4
func main() {
arr := []int{1, 2, 3, 4}
for _, ele := range arr {
go func() {
fmt.Printf("%d ", ele) //用的是协程外面的全局变量ele。输出4 4 4 4
}()
}
time.Sleep(1 * time.Second)
}
- 闭包:向协程内部传递变量来解决上述问题
func main() {
arr := []int{1, 2, 3, 4}
for _, ele := range arr {
go func(value int) {
fmt.Printf("%d ", value) //1 4 2 3,这里打印是乱序的没有问题
}(ele)
}
time.Sleep(1 * time.Second)
}
- sync.Once:有时候需要确保在高并发的场景下有些事情只执行一次,比如加载配置文件、关闭管道等
var oc sync.Once
var a int = 5
func main() {
go func() {
oc.Do(func() {
a++
})
}()
go func() {
oc.Do(func() {
a++
})
}()
time.Sleep(time.Second)
fmt.Println(a) //6
}
2 - panic与defer
-
何时会发生panic
- 运行时错误会导致panic,比如数组越界、除0
- 程序主动调用panic(error)
-
panic会执行什么
- 逆序执行当前goroutine的defer链(recover从这里介入)
- 打印错误信息和调用堆栈
- 调用exit(2)结束整个进程
-
**defer **
- defer在函数退出前被调用,注意不是在代码的return语句之前执行,因为return语句不是原子操作
- 如果发生panic,则之后注册的defer不会执行
- defer服从先进后出原则,即一个函数里如果注册了多个defer,则按注册的逆序执行
- defer后面可以跟一个匿名函数
func F() {
defer fmt.Printf("11111 ")
defer fmt.Printf("22222 ")
fmt.Printf("GGGGG ")
defer fmt.Printf("33333 ")
defer fmt.Printf("44444 ")
panic("papapa")
defer fmt.Printf("55555 ")
fmt.Printf("FFFFF ")
}
func main() {
go F()
time.Sleep(time.Second)
//无panic:GGGGG FFFFF 55555 44444 33333 22222 11111
//panic:GGGGG 44444 33333 22222 11111
}
- recover
func F() {
defer fmt.Printf("11111 ")
defer fmt.Printf("22222 ")
fmt.Printf("GGGGG ")
defer fmt.Printf("33333 ")
defer func() {
//从panic发生的地方中途结束本协程
//协程有中recover,即使panic也不会结束整个进程
recover()
}()
defer fmt.Printf("44444 ")
panic("papapa")
defer fmt.Printf("55555 ")
fmt.Printf("FFFFF ")
}
func main() {
go F()
time.Sleep(time.Second)
fmt.Println("this is main")
//无panic:GGGGG FFFFF 55555 44444 33333 22222 11111
//panic:GGGGG 44444 33333 22222 11111
//无recover有panic:GGGGG 44444 33333 22222 11111
//有revocer有panic:GGGGG 44444 33333 22222 11111 this is main
}
四、channel的同步与异步‘’
1 - 同步与异步channel
- 共享内存:很多语言通过共享内存来实现线程间的通信,通过加锁来访问共享数据,如数组、map或结构体。go语言也实现了这种并发模型
- CSP:CSP(communicating sequential processes)讲究的是“以通信的方式来共享内存”,在go语言里channel是这种模式的具体实现
- 异步channel:
asynChann := make(chan int, 8)
- channel底层维护一个环形队列(先进先出),make初始化时指定队列的长度
- 队列满时,写阻塞;队列空时,读阻塞
- sendx指向下一次写入的位置, recvx指向下一次读取的位置
- recvq(等待读的goroutine队列)维护因读管道而被阻塞的协程,sendq(等待写的goroutine队列)维护因写管道而被阻塞的协程
- 同步channel:同步管道可以认为队列容量为0,当读协程和写协程同时就绪时它们才会彼此帮对方解除阻塞
- 创建同步管道:
syncChann := make(chan int)
- 往管理里放数据:
syncChann < -1
-> 生产者 - 从管道取出数据:
v := <- syncChann
-> 消费者
- 创建同步管道:
- 关于channel的死锁与阻塞
- Channel满了,就阻塞写;Channel空了,就阻塞读
- 阻塞之后会交出cpu,去执行其他协程,希望其他协程能帮自己解除阻塞
- 如果阻塞发生在main协程里,并且没有其他子协程可以执行,那就可以确定“希望永远等不来”,自已把自己杀掉,报一个fatal error:deadlock出来
- 如果阻塞发生在子协程里,就不会发生死锁,因为至少main协程是一个值得等待的“希望”,会一直等(阻塞)下去
func main() {
//channel仅作为协程间同步的工具,不需要传递具体的数据,管道类型可以用struct{}
//空结构体变量的内存占用为0,因此struct{}类型的管道比bool类型的管道还要省内存
ch := make(chan struct{}, 1)
ch <- struct{}{} //有1个缓冲可以用,无需阻塞,可以立即执行
go func() { //子协程1
time.Sleep(5 * time.Second) //sleep一个很长的时间
<-ch //如果把本行代码注释掉,main协程5秒钟后会报fatal error
fmt.Println("sub routine 1 over")
}()
//由于子协程1已经启动,寄希望于子协程1帮自己解除阻塞,所以会一直等子协程1执行结束
//如果子协程1执行结束后没帮自己解除阻塞,则希望完全破灭,报出deadlock
ch <- struct{}{}
fmt.Println("send to channel in main routine")
go func() { //子协程2
time.Sleep(2 * time.Second)
ch <- struct{}{} //channel已满,子协程2会一直阻塞在这一行
fmt.Println("sub routine 2 over")
}()
time.Sleep(3 * time.Second)
fmt.Println("main routine exit")
// sub routine 1 over
// send to channel in main routine
// main routine exit
}
2 - 关闭channel
- 关闭channel的注意点
- 只有当管道关闭时,才能通过range遍历管道里的数据,否则会发生fatal error
- 管道关闭后读操作会立即返回,如果缓冲已空会返回“0值”
- ele, ok := <-ch ok==true代表ele是管道里的真实数据
- 向已关闭的管道里send数据会发生panic
- 不能重复关闭管道,不能关闭值为nil的管道,否则都会panic
func main() {
c := make(chan int, 2)
c <- 1
c <- 2
close(c) //如果不先close会报 -> fatal error: all goroutines are asleep - deadlock!
// c <- 3 //关闭管道后继续向管道写入发生panic: send on closed channel
for ele := range c {
fmt.Printf("%d ", ele) //1 2
}
//close channel之后,读操作总是立即返回
//如果channel里没有元素,则返回对应类型的默认值
v := <-c
fmt.Println(v) //0
}
- channel的应用
func upstream(ch chan struct{}) {
time.Sleep(15 * time.Millisecond)
fmt.Println("一个上游协程执行结束")
ch <- struct{}{} //写
}
func downstream(ch chan struct{}) {
<-ch //读
fmt.Println("下游协程开始执行")
}
func main() {
upNum := 4 //上游协程的数量
downNum := 5 //下游协程的数量
upCh := make(chan struct{}, upNum)
downCh := make(chan struct{}, downNum)
//启动上游协程和下游协程,实际下游协程会先阻塞
for i := 0; i < upNum; i++ {
go upstream(upCh) //time.Sleep(15 * time.Millisecond) 延迟执行
}
for i := 0; i < downNum; i++ {
go downstream(downCh) //读阻塞
}
//同步点
for i := 0; i < upNum; i++ {
<-upCh //主线程读阻塞
}
//通过管道让下游协程开始执行
for i := 0; i < downNum; i++ {
downCh <- struct{}{}
}
time.Sleep(10 * time.Millisecond) //等下游协程执行结束
}
- 结果分析
一个上游协程执行结束
一个上游协程执行结束
一个上游协程执行结束
一个上游协程执行结束
下游协程开始执行
下游协程开始执行
下游协程开始执行
下游协程开始执行
下游协程开始执行
五、并发安全性
1 - 资源竞争
- 资源竞争:多协程并发修改同一块内存,产生资源竞争
- n++不是原子操作,并发执行时会存在脏写。n++分为3步:取出n,加1,结果赋给n。测试时需要开1000个并发协程才能观察到脏写
- n++不是原子操作,并发执行时会存在脏写。n++分为3步:取出n,加1,结果赋给n。测试时需要开1000个并发协程才能观察到脏写
var n int
func main() {
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
n++
}()
}
wg.Wait()
fmt.Println(n) //无论运行多少次都不会到达1000
}
- -rece:go run或go build时添加-race参数检查资源竞争情况
go run -race .\main.go
2 - 原子操作
- 原子操作
func atomic.AddInt32(addr *int32, delta int32) (new int32)
func atomic.LoadInt32(addr *int32) (val int32)
var n int32
func main() {
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
atomic.AddInt32(&n, 1)
}()
}
wg.Wait()
fmt.Println(n)
}
3 - 读写锁
- 读写锁:
var lock sync.RWMutex //声明读写锁,无需初始化
lock.Lock() lock.Unlock() //加写锁和释放写锁
lock.RLock() lock.RUnlock() //加读锁和释放读锁
var n int32
var lock sync.RWMutex
func main() {
wg := sync.WaitGroup{}
wg.Add(1000)
for i := 0; i < 1000; i++ {
go func() {
defer wg.Done()
lock.Lock()
n++
lock.Unlock()
}()
}
wg.Wait()
fmt.Println(n)
}
- 写锁可以排斥其他写锁:任意时刻只可以加一把写锁,且不能加读锁
var lock sync.RWMutex
func main() {
go func() {
lock.Lock()
fmt.Println("A lock success")
}()
fmt.Println()
go func() {
lock.RLock() //阻塞,排斥读锁
fmt.Println("B lock success")
}()
fmt.Println()
go func() {
lock.Lock() //阻塞,排斥写锁
fmt.Println("C lock success")
}()
time.Sleep(time.Second)
}
- 没加写锁时,可以同时加多把读锁,读锁加上之后不能再加写锁
var lock sync.RWMutex
func main() {
go func() {
lock.RLock()
fmt.Println("A lock success")
}()
fmt.Println()
go func() {
lock.RLock()
fmt.Println("B lock success")
}()
fmt.Println()
go func() {
lock.Lock() //阻塞,排斥写锁
fmt.Println("C lock success")
}()
time.Sleep(time.Second)
}
4 - 容器的并发安全
- 数组、slice、struct:允许并发修改(可能会脏写)
//切片和数组一样
func main() {
lst := make([]int, 5)
go func() {
for i := 0; i < len(lst); i += 1 {
time.Sleep(10 * time.Millisecond)
lst[i] = 888
}
}()
go func() {
for i := 0; i < len(lst); i += 1 {
time.Sleep(10 * time.Millisecond)
lst[i] = 555
}
}()
time.Sleep(time.Second)
fmt.Println(lst)
}
// PS C:\develop_project\go_project\proj1> go run .\main.go
// [888 888 888 888 888]
// PS C:\develop_project\go_project\proj1> go run .\main.go
// [888 555 555 555 555]
// PS C:\develop_project\go_project\proj1> go run .\main.go
// [555 555 555 555 555]
// PS C:\develop_project\go_project\proj1> go run .\main.go
// [555 555 555 555 555]
// PS C:\develop_project\go_project\proj1> go run .\main.go
// [888 555 555 555 555]
// PS C:\develop_project\go_project\proj1> go run .\main.go
// [555 888 888 888 888]
//struct
func main() {
type Student struct {
Age int
Name string
}
stu := new(Student)
go func() {
for i := 0; i < 10; i += 1 {
time.Sleep(10 * time.Millisecond)
stu.Age = 18
time.Sleep(10 * time.Millisecond)
stu.Name = "Jack"
}
}()
go func() {
for i := 0; i < 10; i += 1 {
time.Sleep(10 * time.Millisecond)
stu.Age = 11
time.Sleep(10 * time.Millisecond)
stu.Name = "Tom"
}
}()
time.Sleep(time.Second)
fmt.Println(stu)
}
// PS C:\develop_project\go_project\proj1> go run .\main.go
// &{18 Jack}
// PS C:\develop_project\go_project\proj1> go run .\main.go
// &{11 Jack}
// PS C:\develop_project\go_project\proj1> go run .\main.go
// &{11 Tom}
- map:并发修改map有时会发生panic
fatal error: concurrent map writes
func main() {
mp := make(map[int]int, 10)
go func() {
for i := 0; i < 100; i += 1 {
mp[i] = i
}
}()
go func() {
for i := 0; i < 100; i += 1 {
mp[i] = i
}
}()
time.Sleep(time.Second)
fmt.Println(mp)
}
- sync.Map:如果需要并发修改map请使用sync.Map
func main() {
var mp sync.Map
go func() {
for i := 0; i < 100; i += 1 {
mp.Store(i, i)
}
}()
go func() {
for i := 0; i < 100; i += 1 {
mp.Store(i, i)
}
}()
time.Sleep(time.Second)
fmt.Println(mp.Load(0))
}
六、多路复用
- 操作系统级的I/O模型有
- 阻塞I/O
- 非阻塞I/O
- 信号驱动I/O
- 异步I/O
- 多路复用I/O:
- Linux下,一切皆文件
- 包括普通文件、目录文件、字符设备文件(键盘、鼠标)、块设备文件(硬盘、光驱)、套接字socket等等
- 文件描述符(File descriptor,FD)是访问文件资源的抽象句柄,读写文件都要通过它
- 文件描述符就是个非负整数,每个进程默认都会打开3个文件描述符:0标准输入、1标准输出、2标准错误
- 由于内存限制,文件描述符是有上限的,可通过ulimit –n查看,文件描述符用完后应及时关闭
1 - 阻塞I/O
2 - 非阻塞I/O
- 阻塞I/O与非阻塞I/O的区别
- 阻塞I/O在阻塞的时候是不会处理其他事务一直阻塞等待
- 非阻塞I/O在阻塞的时候可以先处理其他事务,提高CPU的利用率
- read和write默认是阻塞模式
ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t nbytes);
- 通过系统调用fcntl可将文件描述符设置成非阻塞模式
int flags = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, flags | O_NONBLOCK);
3 - 多路复用I/O
- **多路复用I/O **
- select系统调用可同时监听1024个文件描述符的可读或可写状态
- poll用链表存储文件描述符,摆脱了1024的上限
- 各操作系统实现了自己的I/O多路复用函数,如epoll、 evport 和kqueue等
- go的多路复用I/O:
- go多路复用函数以netpoll为前缀,针对不同的操作系统做了不同的封装,以达到最优的性能
- 在编译go语言时会根据目标平台选择特定的分支进行编译
LOOP:
for {
select { //同时监听多个channel,谁准备好就执行谁
case n := <-countCh:
fmt.Println(n)
case <-finishCh:
fmt.Println("finish")
break LOOP //break LOOP 退出for循环,在使用for select的时,单独一个break无法退出
case <-abortCh:
fmt.Println("abort")
break LOOP //break LOOP 退出for循环
}
}