本文所有实例代码运行go版本:go version go1.18.10 windows/amd64
1 并发编程介绍
1.1 串行、并发、并行
- 串行:所有任务一件一件做,按照事先的顺序依次执行,没有被执行到的任务只能等待。最终执行完的时间等于各个子任务之和。
- 并发:是以交替的方式利用
等待
某个任务的时间来处理其他任务计算逻辑,在计算机中,例如一个单核CPU,会通过时间片
算法,来高效合理的分配cpu计算资源。从用户角度来看似乎是多个任务在同时执行。
- 并行:在同一时刻处理计算多个任务,以多核CPU为例,可以实现同时处理计算多个任务,一个CPU负责一个任务的计算逻辑,大家做到同时进行,就像三个任务有三个工人同时干活一样。
1.2 进程、线程、协程
- 进程:是程序运行的基本单位,每个进程都有自己的独立内存空间,不同的进程可以通过
进程之间的互相通信
进行交流。比如:电脑上的 QQ、微信、WPS等都有各自的进程。在操作系统级别来看,进程是操作系统
对一个正在运行的程序的一种抽象
。一个系统里面可以同时运行多个进程,而每一个进程又好像是在独占的使用硬件资源,通过处理器在进程之间不停的切换来实现。 - 线程:线程是
处理器(CPU)资源分配和调度的基本单位
,在一个进程中可以有多个线程,每个线程都运行在进程的环境上下文中,不同线程之间可以通过线程之间通信
进行数据交换。比如:在360安全卫视进程中,你可以同时进行垃圾清理和病毒查杀,在微信中,你可以刷朋友圈同时接收消息。 - 进程和线程区别:
- 创建和开销方面,进程的创建需要系统分配内存和CPU,文件句柄等资源,销毁时也要进行相应的回收,所以进程的管理开销很大;但是线程的管理开销则很小。
- 进程之间不会相互影响;而一个线程崩溃可能会导致进程崩溃,从而影响同个进程里面的其他线程。
- 线程是进程的子任务,是处理器(CPU)分配和调度的基本单位,进程是对运行时程序的封装,是系统进行资源分配和调度的基本单元。
- 协程:在理解协程之前,需要明白线程的几个问题:
- 1、在执行过程中分为用户态和内核态,两个状态的切换会造成资源开销;
- 2、面线程创建的越多,CPU切换的就越频繁,因为操作系统的调度要保证相对公平
- 3、线程的创建、销毁都需要调用系统调用,每次请求都创建,高并发下开销就显得很大,而且线程的数量不能太多,占用内存是 MB 级别。
- 基于上面的问题,协程被提出,协程是用户态(用户空间)的一种抽象,对操作系统内核而言并没有这个概念,依然是以线程维度调度。协程的主要思想是在用户态实现调度算法,来达到用少量线程,处理大量任务的调度,因为是用户态调度切换,不涉及内核切换和不同线程之间的上下文切换,大大减少开销。
最后来一个图描述三者之间的关系:
2 并发核心-Goroutine
2.1 goroutine介绍
在Go中使用goroutine来实现并发,在Go中协程的概念最终落地到goroutine中,可以称为Go协程、协程Coroutine等。goroutine是由Go的运行时(runtime)调度和管理的,Go程序会将 goroutine 中的任务合理地分配给每个CPU。
在Go并发编程中无需关注:进程、线程、协程的概念,无需写创建和销毁的代码,只需要关注goroutine即可。而且goroutine的使用相当简单,Go在语言层面提供go
关键字去开启一个goroutine。例如你想让函数 fun1
使用goroutine执行:
go func1()
2.2 使用goroutine
为一个函数创建一个goroutine,只需要在调用函数的时候在前面加上go
关键字即可。
func func1() {
fmt.Println("Hello F1!")
}
func main() {
go func1()
fmt.Println("main goroutine done!")
// 这里睡一会,防止main结束后,func1 来不及运行
time.Sleep(time.Second)
}
两次运行结果可能不一样
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
Hello F1!
main done!
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
main done!
Hello F1!
go
关键字可以用于匿名函数,并且goroutine实现多个并发非常简单,如下启动是个goroutine:
func main() {
for i := 0; i < 10; i++ {
go func(n int) {
fmt.Println("执行了:", n)
}(i)
}
fmt.Println("main done!")
// 这里睡一会,防止main结束后,func1 来不及运行
time.Sleep(time.Second)
}
运行结果:
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
main done!
执行了: 4
执行了: 5
执行了: 1
执行了: 0
执行了: 7
执行了: 2
执行了: 3
执行了: 8
执行了: 9
执行了: 6
2.3 goroutine资源
在第一章中知道,线程是由操作系统内核进行调度的,涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,调度成本和开销比较大,而goroutine是由go runtime
进行管理调度,大量的goroutine映射到少量的线程中去,其调度和切换更加轻量,基本都在用户态完成。一个goroutine的栈只有几K
大小,非常轻量,轻松是实现10W
级别并发支持。
3 数据交换-Channel
3.1 Channel是什么
在第二章介绍知道,go天然有高并发的特性,并且实现简单,但在实际开发中,难免会遇到不同并发线程(协程)之间进行数据交换和通信
,在Java等编程语言中可以通过共享内存数据(即某个对象后者变量)实现不同线程之间数据传递和通信,同时为了保证数据的安全性需要合理的加锁。
在goroutine中,引入了一个新的概念 channel
通道,来实现数据传递,可以将channel看做是联通多个goroutine的数据桥梁
,可以让一个goroutine发送特定的数据到另一个goroutine中去,并且保证先进先出的顺序。
3.2 Channel的语法和使用
3.2.1 channel定义
在go中channel是一种类型,可以通过和变量一样的方式进行定义,如下:
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []string // 声明一个传递string切片的通道
var ch3 chan MyStruct // 声明一个结构体类型的通道
fmt.Printf("ch1:%#v\n", ch1)
fmt.Printf("ch2:%#v\n", ch2)
fmt.Printf("ch3:%#v\n", ch3)
fmt.Printf("ch4:%#v\n", ch4)
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
ch1:(chan int)(nil)
ch2:(chan bool)(nil)
ch3:(chan []string)(nil)
ch4:(chan main.MyStruct)(nil)
从程序执行结果可以看出channel是引用类型(nil
),通道声明后默认值nil值。
3.2.2 channel初始化
可以使用go内置make
函数进行初始化:
fmt.Println("开始初始化")
ch1 = make(chan int, 10) // 初始化一个int通道,通道缓冲大小10
ch2 = make(chan bool, 20) // 初始化一个bool通道,通道缓冲大小20
ch3 = make(chan []string) // 初始化一个[]string通道,无通道缓冲
ch4 = make(chan MyStruct, 5) // 初始化一个MyStruct通道,通道缓冲大小5
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
ch1:(chan int)(nil)
ch2:(chan bool)(nil)
ch3:(chan []string)(nil)
ch4:(chan main.MyStruct)(nil)
开始初始化
ch1:(chan int)(0xc0000180b0)
ch2:(chan bool)(0xc000112080)
ch3:(chan []string)(0xc00005c060)
ch4:(chan main.MyStruct)(0xc00005c0c0)
3.2.3 channel操作
channel发送接收数据使用 <-
符号
- 发送:向ch1通道中发送一个1-5五个数字
ch1 <- 1
ch1 <- 2
ch1 <- 3
ch1 <- 4
ch1 <- 5
- 接收:从ch1中接收数字,这里为了方便就for循环接收五次,注意:
通道在没有数据接收时,会进行阻塞
for i := 0; i < 5; i++ {
n := <-ch1
fmt.Println("从ch1中接收数据:", n)
}
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
从ch1中接收数据: 1
从ch1中接收数据: 2
从ch1中接收数据: 3
从ch1中接收数据: 4
从ch1中接收数据: 5
- 关闭:一个通道可以通过内置函数
close
进行关闭,关闭后的通道不能再发送数据,但还可以接收数据,如果管道中还有数据则正常接收,如果没有则返回0
close(ch1)
ch1 <- 6 // 这里会报错
fmt.Println("从ch1中接收数据:", <-ch1)
panic: send on closed channel
goroutine 1 [running]:
main.main()
D:/dev/go/workspace/go_demo_code/temp/t1.go:36 +0x32c
exit status 2
- 通道关闭后,如何感知?当通道被关闭时,往该通道发送值会引发
panic
,从该通道里接收的值一直都是类型零
值,那么在循环操作通道的时候如何感知通道被关闭了,可以通过如下方式。
// 通道关闭后手动break
for {
n, ok := <-ch1
if !ok {
break
}
fmt.Println("从ch1中接收数据:", n)
}
// 通道关闭后会自动退出for range循环
for i := range ch1 {
fmt.Println(i)
}
完整代码:
package main
import "fmt"
type MyStruct struct {
}
func main() {
var ch1 chan int // 声明一个传递整型的通道
var ch2 chan bool // 声明一个传递布尔型的通道
var ch3 chan []string // 声明一个传递string切片的通道
var ch4 chan MyStruct // 声明一个结构体类型的通道
fmt.Printf("ch1:%#v\n", ch1)
fmt.Printf("ch2:%#v\n", ch2)
fmt.Printf("ch3:%#v\n", ch3)
fmt.Printf("ch4:%#v\n", ch4)
fmt.Println("开始初始化")
ch1 = make(chan int, 10) // 初始化一个int通道,通道缓冲大小10
ch2 = make(chan bool, 20) // 初始化一个bool通道,通道缓冲大小20
ch3 = make(chan []string) // 初始化一个[]string通道,无通道缓冲
ch4 = make(chan MyStruct, 5) // 初始化一个MyStruct通道,通道缓冲大小5
ch1 <- 1
ch1 <- 2
ch1 <- 3
ch1 <- 4
ch1 <- 5
for i := 0; i < 5; i++ {
n := <-ch1
fmt.Println("从ch1中接收数据:", n)
}
close(ch1)
ch1 <- 6
fmt.Println("从ch1中接收数据:", <-ch1)
fmt.Println("从ch1中接收数据:", <-ch1)
}
3.3 channel实战
需求:定义两个int类型的 channel,开启三个goroutine,go1 发送数据到到通道 ch1,go2接收ch1通道中的数值进行平方操作,再将结果写入到ch2,go3接收ch2的数据进行打印输出。
package main
import (
"fmt"
"time"
)
func main() {
var ch1 = make(chan int, 5)
var ch2 = make(chan int, 5)
go func1(ch1)
go func2(ch1, ch2)
go func3(ch2)
// 主函数睡眠
time.Sleep(time.Second)
}
func func1(ch chan int) {
for i := 0; i < 10; i++ {
fmt.Println("发送一个数据:", i)
ch <- i
}
}
func func2(ch1, ch2 chan int) {
// 这里死循环无限接收
for {
n := <-ch1
fmt.Println("对数据进行平方处理:", n)
ch2 <- n * n
}
}
func func3(ch chan int) {
// 这里死循环无限接收
for {
n := <-ch
fmt.Println("接收到数据了直接打印:", n)
}
}
运行结果:
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
发送一个数据: 0
发送一个数据: 1
发送一个数据: 2
发送一个数据: 3
发送一个数据: 4
发送一个数据: 5
发送一个数据: 6
对数据进行平方处理: 0
对数据进行平方处理: 1
对数据进行平方处理: 2
对数据进行平方处理: 3
对数据进行平方处理: 4
对数据进行平方处理: 5
对数据进行平方处理: 6
发送一个数据: 7
发送一个数据: 8
发送一个数据: 9
接收到数据了直接打印: 0
接收到数据了直接打印: 1
接收到数据了直接打印: 4
接收到数据了直接打印: 9
接收到数据了直接打印: 16
接收到数据了直接打印: 25
接收到数据了直接打印: 36
对数据进行平方处理: 7
对数据进行平方处理: 8
对数据进行平方处理: 9
接收到数据了直接打印: 49
接收到数据了直接打印: 64
接收到数据了直接打印: 81
4 多路复用-Select
第三章我们知道了可以通过channel进行多个goroutine的数据交换,在使用通道时,如果没有数据接收会阻塞,处理监听多个通道,就无法通过一个goroutine很好的接收数据。这种情况go提供了select
,多路复用,可以同时监听多个channel,使用如下:
select {
case c1 := <- ch1:
fmt.Println("c1=", c1)
case c2 := <- ch1:
fmt.Println("c2=", c2)
}
- select语句是专为通道而设计的,所以每个case表达式中都只能包含操作通道的表达式
- select 默认阻塞,只有监听的channel中有发送或者接受数据时才运行
- 设置default则不阻塞,通道内没有待接受的数据则执行default
- 多个channel准备好时,会随机选一个执行
5 并发安全-Mutex锁
在使用多个goroutine操作临界资源(共享资源),就会发生竞争情况,出现数据安全问题,最总结果和期望的不一致,如下:
var num int
func main() {
go add()
go add()
time.Sleep(time.Second * 2)
fmt.Println(num)
}
func add() {
for i := 0; i < 10000; i++ {
num++
}
}
运行三次,两次结果都错误:
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
15737
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
20000
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
15825
5.1 互斥锁
互斥锁
顾名思义相互排斥,同一时间保证只有一个goroutine可以持有锁,进行共享资源的访问,在go中互斥锁使用sync.Mutex
实现。
- 声明锁对象:
var lock sync.Mutex
- 调用锁方法加锁或者解锁
- lock.Lock():会一直等待直到获取锁
- lock.TryLock():尝试获得锁,获取失败立即返回
- lock.Unlock():释放锁
- 多个goroutine同时等待一个锁时,唤醒的策略是随机的。
var num int
// 声明一把锁
var lock sync.Mutex
func main() {
go add()
go add()
time.Sleep(time.Second * 2)
fmt.Println(num)
}
func add() {
for i := 0; i < 10000; i++ {
// 共享资源访问前加锁
lock.Lock()
num++
// 操作完释放锁
lock.Unlock()
}
}
运行结果:
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
20000
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
20000
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
20000
5.2 读写锁
和其他编程语言类似,go也提供了读写锁
,提高读多写少场景的锁性能,分为读锁
和写锁
两部分具有一下特性:
- 并发读操作不加锁, R R 不阻塞
- 一个goroutine获取
读锁
后,其他goroutine,获取写锁
就会等待,R W 阻塞 - 一个goroutine获取
写锁
后,其他goroutine,获取读写锁
都会等待,W R、W W 阻塞
一句话:读读共享,读写互斥,写写互斥
var num int
//var lock sync.Mutex
var rwlock sync.RWMutex
func main() {
go add()
go add()
go read()
time.Sleep(time.Second * 2)
fmt.Println(num)
}
func read() {
// 测试效果读取5次
for i := 0; i < 5; i++ {
// 加读锁
rwlock.RLock()
fmt.Println("读取:", num)
// 释放读锁
rwlock.RUnlock()
// 让出CPU执行时间,后面会介绍
runtime.Gosched()
}
}
func add() {
for i := 0; i < 10000; i++ {
// 加写锁
rwlock.Lock()
num++
// 释放写锁
rwlock.Unlock()
}
}
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
读取: 0
读取: 999
读取: 1212
读取: 1334
读取: 1335
20000
6 并发控制-Sync
多个goroutine并发运行时,难免需要对并发程序进行控制,如第五章的锁控制并发安全访问,灾还有一些常见的情况:
- 一个goroutine,需要等待多个goroutine完成任务再执行业务代码。
- 某些特定代码在并发场景下只希望被执行一次。
- 多个等待中的goroutine,接收到一个goroutine通知后,开始处理一些业务(如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据)
- go默认的map是并发不安全的,实际开发中我们需要一些并发类的容器,例如map等
sync包提供了一些开箱即用的api和对象,帮助我们控制并发goroutine,解决实际需求。
6.1 sync.WaitGroup
WaitGroup类似Java的CountDownLatch,可以实现等待并发任务执行完, sync.WaitGroup有以下几个方法:
-
Add(delta int) 计数器+delta
-
Done() 计数器-1
-
Wait() 阻塞直到计数器变为0
var num int
//var lock sync.Mutex
var rwlock sync.RWMutex
//定义一个WaitGroup
var wg sync.WaitGroup
func main() {
// 三个 goroutine,这里直接加三
wg.Add(3)
go add()
go add()
go read()
// 等待所有goroutine任务结束
wg.Wait()
fmt.Println(num)
}
func read() {
// 测试效果读取5次
for i := 0; i < 5; i++ {
// 加读锁
rwlock.RLock()
fmt.Println("读取:", num)
// 释放读锁
rwlock.RUnlock()
// 让出CPU执行时间
runtime.Gosched()
}
// 结束一个减一
wg.Done()
}
func add() {
for i := 0; i < 10000; i++ {
// 加写锁
rwlock.Lock()
num++
// 释放写锁
rwlock.Unlock()
}
wg.Done()
}
6.2 sync.Once
sync.Once,用来保证某种行为只会被执行一次,高并发场景,可以用来处理一次性操作
核心函数:
func (o *Once) Do(f func())
例子:
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func main() {
wg.Add(3)
var once sync.Once
go load(&once)
go load(&once)
go load(&once)
wg.Wait()
fmt.Println("主方法结束")
}
// 注意这里需要传入指针类型
func load(once *sync.Once) {
fmt.Println("load方法被调用了")
once.Do(func() {
fmt.Println("无论多少次,once.Do只会调用一次")
})
wg.Done()
}
运行结果
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
load方法被调用了
无论多少次,once.Do只会调用一次
load方法被调用了
load方法被调用了
主方法结束
6.3 sync.Cond
sync.Cond
基于互斥锁/读写锁,经常用在多个 goroutine 等待,一个 goroutine 通知的场景,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
cond := sync.NewCond(&sync.Mutex{})
go fun1(cond)
go fun1(cond)
go fun1(cond)
go func() {
time.Sleep(time.Second)
fmt.Println("发出信号了")
cond.Broadcast()
//cond.Signal()
}()
time.Sleep(time.Second * 2)
fmt.Println("主方法结束")
}
func fun1(cond *sync.Cond) {
cond.L.Lock()
fmt.Println("func1被调用了...")
cond.Wait()
fmt.Println("func1结束了")
cond.L.Unlock()
}
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
func1被调用了...
func1被调用了...
func1被调用了...
发出信号了
func1结束了
func1结束了
func1结束了
主方法结束
6.4 sync.Map
看一个并发情况下操作map的代码,执行后会报错:fatal error: concurrent map writes
func main() {
m := make(map[int]int)
go put(m, 0)
go put(m, 10)
go put(m, 20)
time.Sleep(time.Second * 2)
fmt.Println("主方法结束")
}
func put(m map[int]int, start int) {
for i := start; i < start+10; i++ {
m[i] = i * i
}
}
当然解决这个问题可以通过对map
操作加锁,Go语言的sync包中还提供了一个开箱即用的并发安全版map sync.Map
func main() {
// 声明一个并发map
syncMap := &sync.Map{}
go syncPut(syncMap, 0)
go syncPut(syncMap, 10)
go syncPut(syncMap, 20)
time.Sleep(time.Second * 1)
fmt.Println("主方法结束")
// 遍历打印
syncMap.Range(func(key, value any) bool {
fmt.Printf("key=%v, value=%v\n", key, value)
return true
})
}
func syncPut(m *sync.Map, start int) {
for i := start; i < start+10; i++ {
// 安全的存储数据
m.Store(i, i*i)
}
}
7 原子操作 Atomic
在go中我们使用加锁来保证并发场景下数据安全访问,但加锁的代价比较大,涉及到内核态的上下文切换会比较耗时,go中针对基本数据类型的操作提供了atomic
包,可以实现原子的操作基本数据类型。
// 原子性的获取*addr的值。
func LoadInt64(addr *int64) (val int64)
func LoadUint32(addr *uint32) (val uint32)
func LoadUint64(addr *uint64) (val uint64)
func LoadUintptr(addr *uintptr) (val uintptr)
func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
//原子性的将val的值保存到*addr。
func StoreInt32(addr *int32, val int32)
func StoreInt64(addr *int64, val int64)
func StoreUint32(addr *uint32, val uint32)
func StoreUint64(addr *uint64, val uint64)
func StoreUintptr(addr *uintptr, val uintptr)
func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
//原子性的将val的值添加到*addr并返回新值。
func AddInt32(addr *int32, delta int32) (new int32)
func AddInt64(addr *int64, delta int64) (new int64)
func AddUint32(addr *uint32, delta uint32) (new uint32)
func AddUint64(addr *uint64, delta uint64) (new uint64)
func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
//原子性的将新值保存到*addr并返回旧值。
func SwapInt32(addr *int32, new int32) (old int32)
func SwapInt64(addr *int64, new int64) (old int64)
func SwapUint32(addr *uint32, new uint32) (old uint32)
func SwapUint64(addr *uint64, new uint64) (old uint64)
func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
//原子性的比较*addr和old,如果相同则将new赋值给*addr并返回真。
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
举一个简答的例子:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var wg sync.WaitGroup
var num int32
wg.Add(3)
go func() {
for i := 0; i < 1000; i++ {
atomic.AddInt32(&num, 1)
}
wg.Done()
}()
go func() {
for i := 0; i < 1000; i++ {
atomic.AddInt32(&num, 1)
}
wg.Done()
}()
go func() {
for i := 0; i < 1000; i++ {
atomic.AddInt32(&num, 1)
}
wg.Done()
}()
wg.Wait()
fmt.Println(num)
}
PS D:\dev\go\workspace\go_demo_code> go run .\temp\t1.go
3000
8 底层控制-Runtime
Go Runtime 后续专门写一篇文章深入介绍,这里只介绍几个简单的方法,初步了解Go Runtime。
- runtime.GOMAXPROCS 设置多少个OS线程来同时执行Go代码,默认值是机器上的CPU核心数
- runtime.Gosched() 让出CPU时间片,重新等待安排任务
- runtime.Goexit() 退出当前协程
- runtime.NumGoroutine() 查看当前Goroutine数量
- runtime.NumCPU() 返回cpu数量
- runtime.GC() 让运行时系统进行一次强制性的垃圾收集