golangの并发编程(GMP模型)

news2024/11/27 14:45:37

GMP模型 && channel

  • 1. 前言
  • 2. GMP模型
    • 2.1. 基本概念
    • 2.2. 调度器策略
    • 2.3. go指令的调度流程
    • 2.4. go启动周期的M0和G0
    • 2.5. GMP可视化
    • 2.6. GMP的几种调度场景
  • 3. channel
    • 3.1. channel的基本使用
    • 3.2. 同步器

1. 前言

Go中的并发是函数相互独立运行的体现,Goroutines是并发运行的函数。

  1. 并发:多线程程序在单核CPU上执行,每个线程使用时间片轮转执行,间隔是ms级别的
  2. 并行: 多线程程序在多核CPU上执行,每个cpu上都执行一个线程,同一时刻有多个线程执行
  3. go主线程是一个物理线程(内核态),可以发起多个协程goroutine,协程是一个轻量级线程(逻辑态)
  4. goroutine的特点:有独立的栈空间;共享程序堆空间;调度由用户控制
  5. 创建一个协程:go 任务函数

2. GMP模型

2.1. 基本概念

M:代表内核线程,记录内核线程栈信息,当goroutine调度到线程时,使用该goroutine自己的栈信息

P调度器processor,负责调度goroutine,维护一个本地goroutine队列,主线程从调度器上获得goroutine并执行,同时还负责部分内存的管理。

G:表示goroutine,每个goroutine都有自己的栈空间,定时器,初始化的栈空间在2k左右,空间会随着需求增长

M代表一个工作线程,在M上有一个P和G,P是绑定到M上的,G是通过P的调度获取的,在某一时刻,一个M上只有一个G(g0除外)在P上拥有一个G队列,里面是已经就绪的G,是可以被调度到线程栈上执行的协程,称为运行队列

全局队列:存放等待运行的G

P的本地队列:优先将新创建的G存入到P的本地队列,如果本地队列已满,则存入到全局队列

P列表:程序启动时创建,P的最大个数==GOMAXPROCS

M列表:当前OS分配到go程序的内核线程数

2.2. 调度器策略

  1. 复用线程
    • work stealing机制:当本线程无可运行的G时,尝试从其他线程绑定的调度器中偷取协程,而不是销毁线程
    • hand off机制:当本地线程由于G发生阻塞时,线程释放绑定的P,将P转给其他空闲的M线程来执行
  2. 并行GOMAXPROCS设置P的数量,最多有GOMAXPROCS个线程分布在多个CPU上同时运行
  3. 抢占 go中的协程最多只能占用CPU 10ms,防止其他协程处于饥饿状态
  4. 全局G队列 当M线程执行work stealing机制从其他调度器P中获取不到G时,可以从全局协程队列中获取

2.3. go指令的调度流程

  • 通过 go func() 创建一个goroutine
  • 有两个存储G的队列,一个是局部调度器P的本地协程队列,一个是全局协程队列。新创建的G保存在P的本地队列中,如果P的本地队列已满则保存在全局队列中
  • G只能运行在M中,一个M必须持有一个P。M从本地协程队列中选取一个可执行的G来执行,如果本地队列为空则采用stealing机制从其他协程队列中获取一个G来执行
  • 一个M调度G的执行过程其实是一个循环的过程
    • 当M执行某一个G时发生syscall(系统调用)或者其他阻塞操作,M发生阻塞。如果当前一些G正在执行,runtime会将这个线程M从P中移除,然后创建一个新的内核线程M来执行G(如果有空闲的内核线程可进行复用)
    • 当M系统调用结束时,此时G会尝试获取一个空闲的P执行,放入到P的本地队列中。如果获取不到调度器P,那么线程M则进行休眠,加入到空闲线程队列中,协程G则放入到全局协程队列中

2.4. go启动周期的M0和G0

M0

  • 启动程序后编号为0的主线程
  • 在全局变量runtime.m0中,不需要在heap上分配
  • 负责执行初始化操作和启动第一个协程G
  • 启动第一个协程之后,M0与其他主线程相同

G0

  • 每次启动一个M,第一个创建的goroutine即为G0,存放在全局空间
  • G0仅仅用于调度,不指向任何可执行的函数
  • 每个M都会绑定一个属于自己的G0
  • 在syscall时会将M切换到G0,再调度

2.5. GMP可视化

  • 创建trace文件
  • 启动trace
  • 执行业务代码
  • 停止trace
  • go run运行后,得到trace.out文件
  • 通过go tool trace trace文件名 可视化查看GMP
func main() {

	f, err := os.Create("trace.out")
	if err != nil {
		panic(err)
	}
	defer f.Close()

	err = trace.Start(f)
	if err != nil {
		panic(err)
	}

	fmt.Println("业务逻辑...")
	fmt.Printf("GMP Model Trace test")

	trace.Stop()
}

请添加图片描述

2.6. GMP的几种调度场景

  • G1创建G2:P拥有G1,M获取P开始运行G1,G1使用go func() 创建了G2,为了局部性G2优先加入到G1所在的本地协程队列

  • G1执行完毕:当G1调用goexit()退出时,M切换到所绑定的协程为G0,由G0负责调度本地协程队列中的G2,交给M执行

  • G2执行过程中开辟过多的协程:如果G2运行时需要创建6个协程,本地队列只能存放四个G3-G6,在创建G7时需要将本地协程队列的前两个协程与G7协程同时放入到全局协程队列中,此时本地协程队列还有一半空间,可以直接创建G8协程

  • 唤醒休眠队列的线程:在创建G时,运行的G会尝试唤醒其他空闲的调度器与内核线程组合进行绑定。假定G2唤醒了线程M2,M2绑定了P2,并且运行了G0,但P2的本地协程队列没有协程(空队列),此时M2为自旋线程

  • 自旋线程M2从全局协程队列中批量获取n个G。其中,GQ为全局协程队列的size,GOMAXPROCS为当前调度器个数。可以看作是从全局协程队列到本地协程队列的一种负载均衡策略

    n = min(len(GQ)/GOMAXPROCS+1, len(GQ/2))

  • 如果全局协程队列为空,自旋线程M2会执行work stealing机制,从其他调度器P的本地队列中获取一半协程G到M2的本地队列

  • 自旋线程个数 + 执行线程个数 ≤ GOMAXPROCS

3. channel

3.1. channel的基本使用

为什么需要channel?

1.主线程在等待所有goroutine全部完成的时间很难确定

2.通过全局变量加锁同步实现通讯,不利于多个协程对全局变量的读写操作

channel特点

通道用于在goroutines之间共享数据,保证同步交换。channel需要指定数据类型,数据在channel上传递:任何时刻只有一个goroutine可以访问数据项,保证线程同步。channel底层是一个队列,线程安全的,多个协程并发访问时不需要加锁;channel是有类型的,一个string的channel只能存放string类型

channel声明和使用

var intChan chan int
chanMap := make(chan map[string]string, 10)
var chan1 chan Person
var chan2 chan *Person

intChan <- 10 //写入数据到channel
num := <- intChan // 读取channel的数据

channel是引用类型,必须**初始化(make)**才能使用。channel不能进行扩容,在没有使用协程的情况下,如果channel数据已取完,再取则直接报错 dead lock error

channel的接收特性

  • 读和写操作对元素值的处理必须是原子性的
  • 对于同一个channel同时写和读,即使写入速度快于读取速度,依旧不会造成阻塞dead lock

channel关闭和遍历

  • 使用内置函数close关闭channel,当channel关闭后不能再写数据只能读取
  • for-range遍历时,如果出现channel没有关闭则出现dead lock

只写channel和只读channel

channel可以声明为只读或者只写,默认是可读可写的

var intchan chan <- int // 只写channel
var intchan <-chan int  // 只读channel

3.2. 同步器

WaitGroup实现同步

由于主线程一旦执行完毕,无论goroutines是否执行完,整个程序都会结束。因此,需要一种同步机制来协调主线程和协程之间的执行顺序

WaitGroup类似于JUC中的CountDownLatch

  • WaitGroup.Done() 表示已经完成了一个任务,等价于WaitGroup.Add(-1)
  • WaitGroup.Add(1) 表示增加一个任务到协程队列,计数器+1
  • 主线程中使用WaitGroup.Wait(),运行到这步会发生阻塞,直到WaitGroup中的计数器为0时才能继续向下执行
var wg sync.WaitGroup

func main(){
    for i := 0; i < 10; i++ {
		go show(i)
		wg.Add(1)
	}
    
	wg.Wait() // 等价于countDownLatch.await();
	fmt.Println("[main] continue...")
}


func task(i int) {
	// defer wg.Add(-1)
	defer wg.Done() // 等价于countDownLatch.countDown();
	fmt.Printf("[goroutine] 当前i=%d\n", i)
}

runtime包

  • runtime.Gosched() 让出CPU时间片,重新等待安排任务
func printMsg(msg string) {

	for i := 0; i < 5; i++ {
		fmt.Printf("[goroutine] msg: %v\n", msg)
	}
}

func main(){
    go printMsg("java is the best!")
	// go printMsg("spring cloud is all you need!")
	for i := 0; i < 5; i++ {
		runtime.Gosched()
		fmt.Println("[main] golang concurrent...")
	}
	fmt.Println("[main] continue...")
}

每次主线程运行到runtime.Gosched()时,将CPU时间片交出去,因此go printMsg任务会先执行,打印5次之后,主线程再打印5次。

  • runtime.Goexit() 退出当前协程
  • runtime.GOMAXPROCS 默认使用本机的最大CPU核数
  • sync.Mutex 互斥锁
var (
	FactorialMap = make(map[int]uint64, 16)
	// 声明一个全局互斥锁
	lock sync.Mutex
)

func main(){
    // 向map中写入数据
    for i := 1; i <= 20; i++ {
		go factorial(i)
	}
	// 防止主线程执行完毕goroutine还没完成任务
	time.Sleep(time.Second * 3)
    // 防止主线程和协程对临界资源的读写并发
	lock.Lock()
	for i, v := range FactorialMap {
		fmt.Printf("map[%d]=%d\n", i, v)
	}
	lock.Unlock()
}

func factorial(n int) {
	var res uint64
	res = 1
	for i := 1; i <= n; i++ {
		res *= uint64(i)
	}
	// 存在并发写问题 -> concurrent map writes
	// 需要加入互斥锁
	lock.Lock()
	FactorialMap[n] = res
	lock.Unlock()
}

select和switch

  • select用于处理异步IO操作,select可以监听case语句中channel的读写操作,当case中channel读写操作为非阻塞状态时(可读可写),触发相应的动作。解决从管道读取数据的阻塞问题,在遍历channel时如果不关闭则会发生阻塞导致deadlock

  • select中的case语句必须是一个channel操作,default语句总是可执行的

  • 如果有多个case都可运行,select可随机选出一个执行

  • 如果没有case可以执行,那么执行default逻辑

  • 如果没有case可以执行且没有default语句,select将会阻塞,直到某个case可以执行

var (
	intChan = make(chan int)
	strChan = make(chan string)
)
func main(){
    go func() {
		intChan <- 100
		strChan <- "golang"
		defer close(intChan)
		defer close(strChan)
	}()

	for {
		select {
		case r := <-intChan:
			fmt.Printf("[int chan] r: %v\n", r)
		case r := <-strChan:
			fmt.Printf("[string chan] r: %v\n", r)
		default:
			fmt.Println("no channel can be read!")
		}
	}
	fmt.Println("[main] continue...")
}

Timer

定时器,用于实现一些定时操作,内部也是通过channel实现的

func main(){
    timer1 := time.NewTimer(time.Second * 2)
	t1 := time.Now()
	fmt.Printf("time1: %v\n", t1)
	// timer1.C阻塞,直至2s结束继续执行
	t2 := <-timer1.C
	fmt.Printf("time2: %v\n", t2)

	timer2 := time.NewTimer(time.Second * 2)
	<-timer2.C
	fmt.Println("2s 后...")
	fmt.Printf("time3: %v\n", time.Now())

	timer3 := time.NewTimer(time.Second)
	go func() {
		<-timer3.C
		fmt.Println("timer3 blocked!")
	}()
	// 定时器停止,上面匿名函数中的<-timer3.C就不会阻塞了
	stop := timer3.Stop()
	if stop {
		fmt.Println("timer3 stopped!")
	}
}

Ticker

Timer只会执行一次,Ticker可以周期性的执行

func main(){
    ticker := time.NewTicker(time.Second)
	var sum int
	intChan := make(chan int)
	// 每隔1s向intChan中写入一个数,select语句从三个case分支随机选择一个执行
    // 主线程一直在读取,直到sum>=10读取结束
	go func() {
		for _ = range ticker.C {
			select {
			case intChan <- 1:
				fmt.Println("int channel写入1")
			case intChan <- 2:
				fmt.Println("int channel写入2")
			case intChan <- 3:
				fmt.Println("int channel写入3")
			}
		}
	}()

	for v := range intChan {
		fmt.Println("从int channel中读取到: ", v)
		sum += v
		if sum >= 10 {
			break
		}
	}
}

原子变量

类似于JUC中的AtomicInteger原子整型等,使用CAS机制进行同步。常见原子操作有

  • 加减 atomic.AddInt32(&num, 1)
  • read atomic.LoadInt32(&num)
  • CAS atomic.CompareAndSwapInt32(&num, 100, 200) 如果num==100修改为200否则此次CAS失败
  • 交换 atomic.SwapInt32(&num, 200)
  • write atomic.StoreInt32(&num, 200)
var num int32

func AtomicTest() {
	for i := 0; i < 100; i++ {
		go add()
		go sub()
	}
	fmt.Printf("num: %v\n", num)
}

func add() {
	atomic.AddInt32(&num, 1)
	fmt.Printf("[add method] num: %v\n", num)
}

func sub() {
	atomic.AddInt32(&num, -1)
	fmt.Printf("[sub method] num: %v\n", num)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/372915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RabbitMQ学习(十一):RabbitMQ 集群

一、集群1.1 为什么要使用集群前面我们介绍了如何安装及运行 RabbitMQ 服务&#xff0c;不过这些是单机版的&#xff0c;无法满足目前真实应用的 要求。如果 RabbitMQ 服务器遇到内存崩溃、机器掉电或者主板故障等情况&#xff0c;该怎么办&#xff1f;单台 RabbitMQ 服务器可以…

西北工业大学大学物理(II)期末试题选填解析2021-2022

2 金属薄片&#xff0c;就暗示了载流子是电子了。3 熟练掌握左右手即可。4 又是位移电流。6 感应电场。随时间变化着的磁场能在其周围空间激发一种电场&#xff0c;它能对处于其中的带电粒子施以力的作用&#xff0c;这就是涡旋电场&#xff0c;又叫感生电场。涡旋电场是非保守…

使用file-selector-button美化原生文件上传

前言 你平时见到的上传文件是下面这样的? 还是下面这种美化过的button样式 还是下面这种复杂的上传组件。 <input type="file" >:只要指定的是type类型的input,打开浏览器就是上面第一种原生的浏览器默认的很丑的样式。 下面的两种是我从ElementUI截的图,…

Flume简介

Flume是一个高可用&#xff0c;高可靠&#xff0c;分布式的海量日志采集、聚合和传输的系统&#xff0c;能够有效的收集、聚合、移动大量的日志数据。 优点&#xff1a; 使用Flume采集数据不需要写一行代码&#xff0c;注意是一行代码都不需要&#xff0c;只需要在配置文件中…

切记:别手欠升级手机系统!

今天手欠升级了手机系统&#xff0c;相机直接不能用了......过年手机坏了&#xff0c;买的报修险申请了售后&#xff0c;换了主板换主板还没保存数据&#xff0c;结果大家都懂&#xff0c;手机里那么多照片视频全没了&#xff0c;这个怪自己平时没做数据备份&#xff0c;手机坏…

【fly-iot飞凡物联】(2):如何从0打造自己的物联网平台,使用开源的技术栈搭建一个高性能的物联网平台,目前在设计阶段。

目录前言1&#xff0c;fly-iot 飞凡物联2&#xff0c;mqtt-broker 服务3, 管理后台产品/设备设计4,数据存储目前使用mysql&#xff0c;消息存储到influxdb中5,规则引擎使用 ekuiper6, 总结和其他的想法前言 本文的原文连接是: https://blog.csdn.net/freewebsys/article/detail…

用一个例子告诉你 怎样在spark中创建RDD

目录 1. 前言 2. 分发驱动中scala集合中的数据 2.1 parallelize 2.2 makeRDD 2.3 range 3. 分发外部存储系统中的数据 3.1 textFile 3.2 wholeTextFiles 1. 前言 众所周知&#xff0c;spark是一种计算引擎(用来计算数据)&#xff0c;但是数据从何而来呢&#xff1f; …

Windows注册表的读写操作

目录1 注册表(Registry)介绍1.1 注册表简介1.2 注册表位置1.3 开启/禁用 注册表编辑器1.4 注册表的结构1.5 修改注册表实例2 程序中对注册表的读写操作2.1 打开和关闭注册表2.2 创建和删除指定的注册表键2.3 读取和设置指定注册表中某个键值2.4 增加和删除注册表键中某个键值2.…

华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典

文章目录2023 年用 Python 语言解华为 OD 机试题&#xff0c;一篇博客找全。华为 OD 机试题清单&#xff08;机试题库还在逐日更新&#xff09;2023 年用 Python 语言解华为 OD 机试题&#xff0c;一篇博客找全。 在 2023 年&#xff0c;Python 已成为广泛使用的编程语言之一&…

rabbitmq添加用户,虚拟机步,设置rabbitmq配置文件

第一步&#xff0c;登录后台控制页面 http://ip:15672第二步&#xff0c;添加用户和权限 重点&#xff1a;选择Admin和Users 第三步&#xff0c;添加虚拟机 点击侧边的Virtual Hosts 第四步将虚拟机和用户搭配 注意新建好后&#xff0c;在虚拟机列表中&#xff0c;点击虚拟机…

ubuntu安装spinningup

ubuntu安装spinningup 经过这篇博客安装好mujoco和mujoco-py后&#xff0c;下面安装强化学习代码库spinningup 按照spinningup官网的安装步骤走&#xff0c;下面我总结一下安装过程中出现的问题 在安装spinningup的时候&#xff0c;最好重建一个新的虚拟环境&#xff0c;因为…

基于springboot+vue的校园招聘系统

博主主页&#xff1a;猫头鹰源码 博主简介&#xff1a;Java领域优质创作者、CSDN博客专家、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战 主要内容&#xff1a;毕业设计(Javaweb项目|小程序等)、简历模板、学习资料、面试题库、技术咨询 文末联系获取 项目介绍…

Java基础语法的心得总结

一、数据存储的理解可以参考第四大部分https://blog.csdn.net/xiaoxixicc/article/details/124222375个人理解&#xff1a;栈内存中保存的实际上是对象在堆内存中的引用地址。通过这个引用地址可以快速查找到保存中堆内存中的对象。二、static静态变量&#xff08;共享的作用&a…

[数据结构]:05-循环队列(链表)(C语言实现)

目录 前言 已完成内容 循环队列实现 01-开发环境 02-文件布局 03-代码 01-主函数 02-头文件 03-QueueCommon.cpp 04-QueueFunction.cpp 结语 前言 此专栏包含408考研数据结构全部内容&#xff0c;除其中使用到C引用外&#xff0c;全为C语言代码。使用C引用主要是为了…

(三十七)大白话SQL标准中对事务的4个隔离级别,都是如何规定的呢?

之前我们给大家讲了数据库中多个事务并发时可能产生的几种问题&#xff0c;包括了脏写、脏读、不可重复读、幻读&#xff0c;几种问题 那么针对这些多事务并发的问题&#xff0c;实际上SQL标准中就规定了事务的几种隔离级别&#xff0c;用来解决这些问题。 注意一下&#xff…

SSM项目 替换为 SpringBoot

一、运行SSM项目 保证项目改为SpringBoot后运行正常&#xff0c;先保证SSM下运行正常。 项目目录结构 创建数据库&#xff0c;导入sql文件 查看项目中连接数据jar版本&#xff0c;修改对应版本&#xff0c;修改数据库配置信息 配置启动tomcat 运行项目&#xff0c;测试正常…

考虑极端天气线路脆弱性的配电网分布式电源和储能优化配置模型

目录 1 主要内容 1.1 线路脆弱性分析 ​编辑 1.2 配电网线路故障分析 1.3 蒙特卡洛随机抽样的线路脆弱性分析模型伪代码 1.4 配电网储能和光伏优化配置 2 程序效果 3 下载链接 1 主要内容 程序主要参考《考虑极端天气线路脆弱性的配电网分布式电源配置优化模型-马宇帆》…

RL笔记:动态规划(2): 策略迭代

目录 0. 前言 (4.3) 策略迭代 Example 4.2: Jack’s Car Rental Exercise 4.4 Exercise 4.5 Exercise 4.6 Exercise 4.7 0. 前言 Sutton-book第4章&#xff08;动态规划&#xff09;学习笔记。本文是关于其中4.2节&#xff08;策略迭代&#xff09;。 (4.3) 策略迭代 基…

【JavaWeb】复习重点内容

✅✅作者主页&#xff1a;&#x1f517;孙不坚1208的博客 &#x1f525;&#x1f525;精选专栏&#xff1a;&#x1f517;JavaWeb从入门到精通&#xff08;持续更新中&#xff09; &#x1f4cb;&#x1f4cb; 本文摘要&#xff1a;本篇文章主要分享JavaWeb的学习重点内容。 &a…

C++11多线程编程 二:多线程通信,同步,锁

C11多线程编程 一&#xff1a;多线程概述 C11多线程编程 二&#xff1a;多线程通信&#xff0c;同步&#xff0c;锁 C11多线程编程 三&#xff1a;锁资源管理和条件变量 2.1 多线程的状态及其切换流程分析 线程状态说明&#xff1a; 初始化&#xff08;Init&#xff09;&am…