并发编程
https://www.cnblogs.com/Survivalist/p/11527949.html
进程和线程、协程的区别_线程协程进程的区别-CSDN博客
Golang中的并发编程是一个重点,我们要了解Golang中的并发Goroutine因此需要先理解进程、线程、之后再理解协程。
进程:操作系统进行资源分配的最小单元,是程序在执行过程中的一次活动,包括程序,数据集合,程序控制块(PCB)等。每个进程都有独立的内存空间,包括代码、数据、堆栈,因此进程之间相互隔离。进程切换开销大(包括栈、寄存器、页表、文件句柄等切换)。尽管更安全,但也占据了较多系统资源
线程:操作系统进行调度的最小单元,是进程中执行的基本单元,由线程ID,当前指令指针PC,寄存器和堆栈组成。一个进程包含>=1个线程,其中一个为主线程,多个线程时通过共享内存,上下文切换较快,资源开销较小。共享内存尤其需要注意线程同步互斥问题。
协程:用户轻量级线程,由程序控制,不被操作系统内核管理
更轻量,独立栈空间(协程之间不共享内存,但是可以通信(channel)进行交互),更易并发(高效切换无需过多锁)
并行和并发区别
goroutine
参考文章:Goroutine · Golang 学习笔记
每个goroutine是官方实现的超级"线程池"
每个实例4-5KB栈内存和实现机制大幅减少创建和销毁使得go更易实现高并发
goroutine奉行通信(配合channel)实现共享内存。
在go语言层面内置调度和上下文切换机制,并且go程序会智能地将任务合理的分配给CPU
简单demo
package main
import (
"fmt"
"time"
)
func Hello() {
fmt.Println("Hello Function!")
}
func main() {
//在函数前加入关键字go
go Hello()
fmt.Println("Main done")
//休眠,等go Hello()执行完
time.Sleep(time.Second)
}
sync.WaitGroup demo
- WaitGroup用来启动一组goroutine,等待任务做完再结束goroutine。
- wg.Add(delta int):设置将要启动的Goroutine的数量,来设置WaitGroup内部计数器
- wg.Done():每个goroutine完成后,计数器-1 ;对于可能panic的可以使用defer wg.Done()
- wg.Wait():阻塞自己,等待所有goroutine完成任务,计数器减为0,返回
sync.WaitGroup中的Add和Done线程安全,可以从多个groutine中调用这两个方法,不用担心数据竞争和其他并发问题。
package main
import (
"fmt"
"sync"
)
//启动多个goroutine
func main() {
//协程同步
var wg sync.WaitGroup
wg.Add(9)
for i := 0; i < 9; i++ {
//当作参数传入会拷贝一份,因此可以保证输出0-8
go func(i int) {
defer wg.Done()
fmt.Printf("%d ",i)
}(i)
}
// 阻塞主程序,等待所有 Goroutine 完成
wg.Wait()
}
输出结构并发乱序。0-9的其中一个组合
sync.Map demo
-
- sync.Map并发安全的sync.Map,可以安全并发的读写操作,常见操作见代码
- 与之相对应的原生map,线程不安全,并发读写时需要加锁
package main
import (
"fmt"
"sync"
)
func main() {
//sync.Map的key和value都是interface{}
var m sync.Map
//写入
m.Store("1", 18)
m.Store("2", 20)
//读取
age, ok := m.Load("1")
if ok {
fmt.Println("读取成功", age, ok)
} else {
fmt.Println("读取失败!")
}
//遍历!!
m.Range(func(key, value interface{}) bool {
fmt.Println("遍历:key=", key, " value=", value)
return true
})
//根据key删除
m.Delete("2")
age, ok = m.Load("2")
if ok {
fmt.Println("删除后读取成功", age, ok)
} else {
fmt.Println("删除后读取失败!")
}
//存在则读取否则写入
//如果存在key=2,ok返回为true,否则false
age, ok = m.LoadOrStore("2", "100")
if ok {
fmt.Println("已存在的:", age)
} else {
fmt.Println("不存在,store后的:", age)
}
}
map并发 demo
-
- 原生map实现并发时一定需要加锁来保证安全,不然报错。
- sync.Map安全Map,不需要上锁解锁操作。
package main
import (
"fmt"
"sync"
)
func main() {
//没有加锁的并发写入,则会报错
m := make(map[int]int)
var wg sync.WaitGroup
var mu sync.Mutex
for i := 0; i < 9; i++ {
wg.Add(1)
go func(i int) {
for j := 0; j < 9; j++ {
//上锁
mu.Lock()
m[j] = i
mu.Unlock()
}
wg.Done()
}(i)
}
// 安全Map
var sm sync.Map
for i := 0; i < 9; i++ {
wg.Add(1)
go func(i int) {
for j := 0; j < 9; j++ {
sm.Store(j, i)
}
wg.Done()
}(i)
}
//完成前面并发任务后输出
wg.Wait()
fmt.Println("最终打印map值:", m)
fmt.Print("最终打印sync.Map值:")
sm.Range(func(key, value interface{}) bool {
fmt.Printf("%d:%d ", key, value)
return true
})
}
go的GMP调度原理
channel
go中不要通过共享内存来通信,而是通过通信来共享内存。
参考:Go Channel 详解
go高性能编程:GitHub - wuqinqiang/Go_Concurrency: go concurrency class code
go语言核心类型,管道,并发中可以进行发送或接收数据进行通信。
<-
使用make创建channel,chan底层是一个环形数组
类型:chan chan <- <-chan
使用场景:
-
- 消息传递、消息过滤
- 信号广播
- 事件的订阅和广播
- 任务分发
- 结果汇总
- 并发控制
- 同步和异步
简单demo
无缓冲channel(B要第一时间知道A是否完成)、有缓冲channel(生产者消费者模型)
package main
import (
"fmt"
"sync"
)
func main() {
c := make(chan int, 2)
defer close(c)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
c <- 3 + 4
c <- 1
fmt.Println("发送成功")
}()
wg.Add(1)
go func() {
defer wg.Done()
c <- 100
}()
//wg.Wait()
i := <-c
j := <-c
_ = <-c//忽略这个值
fmt.Println(i, j)
wg.Wait()
}
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y)
}
channel的range
func main() {
go func() {
time.Sleep(1 * time.Hour)
}()
c := make(chan int)
go func() {
for i := 0; i < 10; i = i + 1 {
c <- i
}
close(c)
}()
for i := range c {
fmt.Println(i)
}
fmt.Println("Finished")
}
这个range会一直从c中获取,直到c关闭
select demo
类似与linux中io的select、poll、epoll。
select语句类似于switch,随机执行一个可执行的case,select只用于通信操作,如果没有case可运行那么将阻塞,直到有case可运行。默认的字句总是可以运行。
-
- 每个case都必须是一个通信
- 所有channel表达式都会被求值
- 所有被发送的表达式都会被求值
- 如果任意某个通信可以进行,它就执行;其他被忽略。
- 如果有多个case都可以运行,Select会随机地选出一个执行。其他不会执行。
- 否则:如果有default子句,则执行该语句。
- 如果没有default字句,select将阻塞,直到某个通信可以运行;Go不会重新对channel或值进行求值。
package main
import "fmt"
//select用于退出
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
timeout demo
package main
import "time"
import "fmt"
func main() {
c1 := make(chan string, 1)
go func() {
time.Sleep(time.Second * 2)
c1 <- "result 1"
}()
select {
case res := <-c1:
fmt.Println(res)
//超时退出
case <-time.After(time.Second * 1):
fmt.Println("timeout 1")
}
}
单向通道 demo
send chan <- string//只能发送给send
read <-chan string// 只能读取read
package main
import (
"fmt"
"time"
)
func Produce(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i * i
}
}
func Consumer(in <-chan int) {
for num := range in {
fmt.Println(num)
}
}
func main() {
c := make(chan int, 0)
go Produce(c)
go Consumer(c)
time.Sleep(time.Second)
}
package main
import "fmt"
// 只能发送给管道
func Counter(out chan<- int) {
for i := 0; i < 10; i++ {
out <- i
}
close(out)
}
// chan <- 只能发送给管道 <-chan 管道发送嘛,因此只能接收
func Squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func Printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go Counter(ch1)
go Squarer(ch2, ch1)
Printer(ch2)
}
输出0-9的平方。
协程池demo
Golang学习篇——协程池_golang 携程池-CSDN博客
package main
import (
"fmt"
"math/rand"
"sync"
)
// 当前task
type Task struct {
Id int
Random int
}
// 结果
type Result struct {
Task *Task
Sum int
}
// 创建Task
func CreateTask(taskChan chan<- *Task, wg *sync.WaitGroup) {
defer wg.Done()
for id := 0; id < 100000; id++ {
//创建Task
task := &Task{
Id: id,
Random: rand.Intn(200) + 1,
}
// 传递给taskChan管道
taskChan <- task
}
close(taskChan)
}
// 创建线程池来处理
func CreatePool(num int, taskChan <-chan *Task, resultChan chan<- *Result, wg *sync.WaitGroup) {
for i := 0; i < num; i++ {
wg.Add(1)
// 创建多个goroutine并发
go func() {
for task := range taskChan {
// 当前的Num
currentNum := task.Random
sum := 0
// 计算sum的值
for currentNum != 0 {
temp := currentNum % 10
sum += temp
currentNum /= 10
}
// 此时任务的结果是:
currentResult := &Result{
Task: task,
Sum: sum,
}
// 发送给结果管道
resultChan <- currentResult
}
wg.Done()
}()
}
}
// 开启打印 Result
func PrintResult(resultChan <-chan *Result) {
//输出
for res := range resultChan {
fmt.Printf("输出结果,Id:=%d,Random:=%d,Sum:=%d\n", res.Task.Id, res.Task.Random, res.Sum)
}
}
func main() {
// 创建task管道,传递task
taskChan := make(chan *Task, 128)
// 结果管道
resultChan := make(chan *Result, 128)
// 确保goroutine全部完成
var wg sync.WaitGroup
wg.Add(1)
go CreateTask(taskChan, &wg)
// 创建协程池
CreatePool(133, taskChan, resultChan, &wg)
go func() {
wg.Wait()
close(resultChan)
}()
// 创建协程进行打印
PrintResult(resultChan)
}
channel一定注意防止被阻塞而导致程序出现死锁!!!
并发安全和锁
互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。sync.Mutex
sync.Mutex互斥锁demo
多个goroutine对同一个共享资源(当前的x)的竞争你,x=x+1,在汇编当中并不是原子性的操作,因此并发时会导致数据不一致,方法1,上锁。
package main
import (
"fmt"
"sync"
"sync/atomic"
)
var (
total int32
wg sync.WaitGroup
mutex sync.Mutex
)
func Add() {
defer wg.Done()
for i := 0; i < 10000; i++ {
//原子操作
atomic.AddInt32(&total, 1)
//mutex.Lock()
//total++
//mutex.Unlock()
}
}
func Del() {
defer wg.Done()
for i := 0; i < 10000; i++ {
atomic.AddInt32(&total, -1)
//mutex.Lock()
//total--
//mutex.Unlock()
}
}
func main() {
fmt.Println("origin num:", total)
wg.Add(2)
go Add()
go Del()
wg.Wait()
fmt.Println("After num:", total)
}
package main
import (
"fmt"
"sync"
)
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func Add() {
for i := 0; i < 50; i++ {
lock.Lock()
x = x + 1
lock.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go Add()
go Add()
wg.Wait()
fmt.Println(x)
}
读写互斥锁 demo
package main
import (
"fmt"
"sync"
"time"
)
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex //读写互斥锁
)
func Write() {
//lock.Lock() //加互斥锁
rwlock.Lock()
x = x + 1
time.Sleep(10 * time.Millisecond)
rwlock.Unlock()
//lock.Unlock() //解互斥锁
wg.Done()
}
func Read() {
//lock.Lock()
rwlock.RLock()
time.Sleep(time.Millisecond)
rwlock.RUnlock()
//lock.Unlock()
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go Write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go Read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
sync
前面介绍过sync的一些方法
sync.WaitGroup
sync.Once
参考:Go sync.Once | Go 语言高性能编程 | 极客兔兔
执行一次的函数,可以在代码任意位置加载,常用于单例模式(懒汉式),并发场景安全。而init是package首次执行时加载(饿汉式)
对外接口:func (o *Once) Do(f func())
sync.Map
这个是并发安全的Map
原子操作
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
var x int64
var l sync.Mutex
var wg sync.WaitGroup
// 普通版加函数
func add() {
// x = x + 1
x++ // 等价于上面的操作
wg.Done()
}
// 互斥锁版加函数
func mutexAdd() {
l.Lock()
x++
l.Unlock()
wg.Done()
}
// 原子操作版加函数
func atomicAdd() {
atomic.AddInt64(&x, 1)
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10000; i++ {
wg.Add(1)
//go add() // 普通版add函数 不是并发安全的
//go mutexAdd() // 加锁版add函数 是并发安全的,但是加锁性能开销大
go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
}
wg.Wait()
end := time.Now()
fmt.Println(x)
fmt.Println(end.Sub(start))
}
Context
context详解:https://www.cnblogs.com/juanmaofeifei/p/14439957.html
Go 语言并发编程与 Context | Go 语言设计与实现
Context是用来用来处理goroutine,可以在多个goroutine中传递取消信号、超时等。
通俗的解释:Context · Go语言中文文档
由于golang的server在goroutine当中,context就是有效管理这些goroutine,相互调用的goroutine之间通过传递context变量保持关联,这样在不用暴露各goroutine内部实现细节的前提下,有效地控制各goroutine的运行。
引入--退出goroutine
方式1,采用全局变量
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
// 退出全局变量
var stop bool
func worker() {
defer wg.Done()
for {
if stop {
break
}
time.Sleep(time.Second)
fmt.Println("worker")
}
}
func main() {
wg.Add(1)
go worker()
time.Sleep(3 * time.Second)
stop = true
wg.Wait()
}
方式2,采用管道通信
package main
import (
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
var ch = make(chan struct{})
func worker() {
defer wg.Done()
LOOP:
for {
select {
case <-ch:
fmt.Println("exit")
break LOOP
default:
time.Sleep(time.Second)
fmt.Println("worker")
}
}
}
func main() {
wg.Add(1)
go worker()
time.Sleep(3 * time.Second)
ch <- struct{}{}
wg.Wait()
}
方式3,采用context
package main
import (
"context"
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func worker(ctx context.Context) {
defer wg.Done()
LOOP:
for {
select {
case <-ctx.Done():
fmt.Println("exit")
break LOOP
default:
time.Sleep(time.Second)
fmt.Println("worker")
}
}
}
func main() {
wg.Add(1)
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
time.Sleep(3 * time.Second)
cancel() //等待子routine结束
wg.Wait()
}
如果函数当中需要被控制、超时、传递时,但不希望改变原来的接口时,函数第一个参数传入ctx。
context
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}
WithDeadline
package main
import (
"context"
"fmt"
"time"
)
func main() {
d := time.Now().Add(50 * time.Millisecond)
ctx, canel := context.WithDeadline(context.Background(), d)
// 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。
// 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。
defer canel()
select {
case <-time.After(10 * time.Millisecond):
fmt.Println("overslept")
case <-ctx.Done():
fmt.Println(ctx.Err())
}
}
WithTimeout
package main
import (
"context"
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func worker(ctx context.Context) {
defer wg.Done()
LOOP:
for {
select {
case <-ctx.Done():
fmt.Println("exit")
break LOOP
default:
time.Sleep(time.Second)
fmt.Println("worker")
}
}
}
func main() {
wg.Add(1)
//超时控制
ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
go worker(ctx)
time.Sleep(3 * time.Second)
wg.Wait()
}
WithValue
package main
import (
"context"
"fmt"
"sync"
"time"
)
var wg sync.WaitGroup
func worker(ctx context.Context) {
//拿到key,value
fmt.Printf("traceid:%s\n", ctx.Value("traceid"))
//记录一些日志等等,方便排查
defer wg.Done()
LOOP:
for {
select {
case <-ctx.Done():
fmt.Println("exit")
break LOOP
default:
time.Sleep(time.Second)
fmt.Println("worker")
}
}
}
func main() {
wg.Add(1)
//超时控制
ctx, _ := context.WithTimeout(context.Background(), 6*time.Second)
//传递一些值,后续可能链路追踪id
childCtx := context.WithValue(ctx, "traceid", "123456")
go worker(childCtx)
time.Sleep(3 * time.Second)
wg.Wait()
}