golang中的并发模型

news2025/1/21 21:54:11

并发模型

传统的编程语言(如C++、Java、Python等)并非为并发而生的,因此它们面对并发的逻辑多是基于操作系统的线程。其并发的执行单元(线程)之间的通信利用的也是操作系统提供的线程或进程间通信的原语,比如共享内存、信号、管道、消息队列、套接字等。在这些通信原语中,使用最多、最广泛同时也最高效的是结合了线程同步原语(比如锁以及更为低级的原子操作)的共享内存方式,因此,可以说传统语言的并发模型是基于共享内存的模型

Untitled

这些传统的就基于共享内存的并发模型难用且易错,在大型程序中,开发人员在设计并发程序时需要根据线程模型对程序进行建模同时规划线程之间的通信方式,且程序难以阅读、理解、维护

Go采用了CSP(Communicating Sequential Process,通信顺序进程)模型

一个符合CSP模型的并发程序应该是一组通过输入/输出原语连接起来的P的集合

Untitled

CSP模型旨在简化并发程序的编写,让并发程序的编写与编写顺序程序一样简单。Tony Hoare认为输入/输出应该是基本的编程原语,数据处理逻辑(CSP中的P)仅需调用输入原语获取数据,顺序处理数据,并将结果数据通过输出原语输出

CSP理论中的P(Process,进程)是个抽象概念,它代表任何顺序处理逻辑的封装,它获取输入数据(或从其他P的输出获取),并生产可以被其他P消费的输出数据。

为了实现CSP模型中的输入/输出原语,Go引入了goroutine(P)之间的通信原语channel。通过channel将goroutine(P)组合与连接在一起,这使得设计和编写大型并发系统变得更为简单和清晰

虽然CSP模型已经成为Go语言支持的主流并发模型,但Go也支持传统的基于共享内存的并发模型,并提供基本的低级同步原语(主要是sync包中的互斥锁、条件变量、读写锁、原子操作等

那么在实践中应该如何选择是使用channel还是低级同步原语下的共享内存?

Go始终推荐以CSP模型风格构建并发程序,尤其是在复杂的业务层面。这将提升程序逻辑的清晰度,大大降低并发设计的复杂性,并让程序更具可读性和可维护性;

对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据,可以使用更为高效的低级同步原语(如sync.Mutex),以保证goroutine对数据的同步访问。

并发模式

在语言层面,Go针对CSP模型提供了三种并发原语。

  • goroutine:对应CSP模型中的P,封装了数据的处理逻辑,是Go运行时调度的基本执行单元。
  • channel:对应CSP模型中的输入/输出原语,用于goroutine之间的通信和同步。
  • select:用于应对多路输入/输出,可以让goroutine同时协调处理多个channel操作。

深入了解一下在实践中这些原语的常见组合方式,即并发模式:

创建模式

go关键字+function/method 创建 goroutine:

go fmt.println("I'm a goroutine")
​
c := srv.NewConn(rw)
go c.serve(connCtx)

在稍微复杂的程序里,需要考虑通过原语的承载体channel在goroutine间建立联系,所以通常采用以下方式建立goroutine:

type T struct {...}func spwan(f func()) chan T {
    c := make(chan T)
    go func() {
        ...
        f()
        ...
    }()
    return c
}func main() {
//使用c与新创建的goroutine通信
    c := spawn(func(){})
}

在内部创建一个goroutine并返回一个channel类型变量函数

spwan函数创建的新的goroutine和调用spwan函数的goroutine通过channel建立联系

函数得以实现得益于channel作为go语言的一等公民(first-class citizen)的存在:channel可以像变量一样被初始化、传递和赋值。上面例子中的spawn只返回了一个channel变量、

退出模式

goroutine的执行函数返回意味着goroutine退出。但有些时候会要求优雅退出,以下为方案:

分离(detached)模式

是使用最广泛的goroutine退出模式

创建它的goroutine不需要关心它的退出,这类goroutine在启动后即与其创建者彻底分离,其生命周期与其执行的主函数相关,函数返回即goroutine退出。这类goroutine有两个常见用途。

一次性任务:用来执行任务完成后既退出,比如此标准库代码:

// $GOROOT/src/net/dial.gofunc (d *Dialer) DialContext(ctx context.Context, network, address string) (Conn, error) {
    ...
    if oldCancel := d.Cancel; oldCancel != nil {
        subCtx, cancel := context.WithCancel(ctx)
        defer cancel()
//有数据处理后既退出
        go func() {
            select {
            case <-oldCancel:
                cancel()
            case <-subCtx.Done():
            }
        }()
        ctx = subCtx
    }
    ...
}

常驻后台执行的一些特定任务:如监视(monitor)、观察(watch)等。其实现通常采用for {…}或for { select{…} }代码段形式,并多以定时器(timer)或事件(event)驱动执行。

// $GOROOT/src/runtime/mgc.go
func gcBgMarkStartWorkers() {
    // 每个P都有一个运行在后台的用于标记的G
    for _, p := range allp {
        if p.gcBgMarkWorker == 0 {
            go gcBgMarkWorker(p) // 为每个P创建一个goroutine,以运行gcBgMarkWorker
            notetsleepg(&work.bgMarkReady, -1)
            noteclear(&work.bgMarkReady)
        }
    }
}func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ...
    for { // 常驻后台处理GC事宜
        ...
    }
}

Join模式

在线程模型中,父线程可以通过pthread_join来等待子线程结束并获取子线程的结束状态。

在Go中,我们有时候也有类似的需求:goroutine的创建者需要等待新goroutine结束。

  • 等待一个goroutine退出

先看一段实例代码

func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
    interval, ok := args[0].(int)
    if !ok {
        return
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
}func spawn(f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    go func() {
        f(args...)
        c <- struct{}{}
    }()
    return c
}func main() {
     done := spawn(worker, 5)
     println("spawn a worker goroutine")
     <-done
     println("worker done")
}

这个channel的用途就是在两个goroutine之间建立退出事件的“信号”通信机制。main goroutine在创建完新goroutine后便在该channel上阻塞等待,直到新goroutine退出前向该channel发送了一个信号。

运行过后

Untitled

  • 获取goroutine的退出状态

如果不仅要等goroutine退出还要精准获取其结束状态,可以通过自定义类型的channel实现这一需求:

var OK = errors.New("ok")func worker(args ...interface{}) error {
    if len(args) == 0 {
        return errors.New("invalid args")
    }
    interval, ok := args[0].(int)
    if !ok {
        return errors.New("invalid interval arg")
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
    return OK
}func spawn(f func(args ...interface{}) error, args ...interface{}) chan error {
    c := make(chan error)
    go func() {
        c <- f(args...)
    }()
    return c
}func main() {
    done := spawn(worker, 5)
    println("spawn worker1")
    err := <-done
    fmt.Println("worker1 done:", err)
    done = spawn(worker)
    println("spawn worker2")
    err = <-done
    fmt.Println("worker2 done:", err)
}

将channel中承载的类型由struct{}改为了error,这样channel承载的信息就不只是一个信号了,还携带了有价值的信息:新goroutine的结束状态。运行上述示例:

Untitled

  • 等待多个goroutine退出

有时候必须等待全部新goroutine退出,可以通过Go语言提供的sync.WaitGroup实现等待多个goroutine退出的模式:

func worker(args ...interface{}) {
    if len(args) == 0 {
        return
    }
    
    interval, ok := args[0].(int)
    if !ok {
        return
    }
    
    time.Sleep(time.Second * (time.Duration(interval)))
}func spawnGroup(n int, f func(args ...interface{}), args ...interface{}) chan struct{} {
    c := make(chan struct{})
    var wg sync.WaitGroup
    
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            name := fmt.Sprintf("worker-%d:", i)
            f(args...)
            println(name, "done")
            wg.Done() // worker done!
        }(i)
    }
    
    go func() {
        wg.Wait()
        c <- struct{}{}
    }()
    
    return c
}func main() {
    done := spawnGroup(5, worker, 3)
    println("spawn a group of workers")
    <-done
    println("group workers done")
}

通过sync.WaitGroup,spawnGroup每创建一个goroutine都会调用wg.Add(1),新创建的goroutine会在退出前调用wg.Done。

在spawnGroup中还创建了一个用于监视的goroutine,该goroutine调用sync.WaitGroup的Wait方法来等待所有goroutine退出。

在所有新创建的goroutine退出后,Wait方法返回,该监视goroutine会向done这个channel写入一个信号,这时main goroutine才会从阻塞在done channel上的状态中恢复,继续往下执行。

运行上述示例代码:

支持超时机制的等待

设置合理的退出时间,如若没有退出,则继续执行下一步:

func main() {
    done := spawnGroup(5, worker, 30)
    println("spawn a group of workers")
    
    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-done:
        println("group workers done")
    }
}

notify-and-wait模式

main goroutine的停止代表着整个程序的停止,如果不事先通知退出,则容易导致业务数据损坏、不完整

我们可以通过notify-and-wait(通知并等待)模式来满足这一场景的要求。虽然这一模式也不能完全避免损失,但是它给了各个goroutine一个挽救数据的机会,从而尽可能减少损失。

  • 通知并等待一个goroutine的退出
func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}func spawn(f func(int)) chan string {
    quit := make(chan string)
    go func() {
        var job chan int // 模拟job channel
        for {
            select {
            case j := <-job:
                f(j)
            case <-quit:
                quit <- "ok"
            }
        }
    }()
    return quit
}func main() {
    quit := spawn(worker)
    println("spawn a worker goroutine")
    
    time.Sleep(5 * time.Second)
    
    // 通知新创建的goroutine退出
    println("notify the worker to exit...")
    quit <- "exit"
    
    timer := time.NewTimer(time.Second * 10)
    defer timer.Stop()
    select {
    case status := <-quit:
        println("worker done:", status)
    case <-timer.C:
        println("wait worker exit timeout")
    }
}

执行

此时,spawn函数不仅发送退出信号给创建者还承载创建者发送的退出信号,形成了一个双向的数据通道

  • 通知并等待多个goroutine退出

channel存在一个特性:当使用close关闭channel时,所有阻塞到该channel上的goroutine都会得到通知,所以可以利用这一特性实现这一模式:

func worker(j int) {
    time.Sleep(time.Second * (time.Duration(j)))
}func spawnGroup(n int, f func(int)) chan struct{} {
    quit := make(chan struct{})
    job := make(chan int)
    var wg sync.WaitGroup
    
    for i := 0; i < n; i++ {
        wg.Add(1)
        go func(i int) {
            defer wg.Done() // 保证wg.Done在goroutine退出前被执行
            name := fmt.Sprintf("worker-%d:", i)
            for {
                j, ok := <-job
                if !ok {
                    println(name, "done")
                    return
                }
                // 执行这个job
                worker(j)
            }
        }(i)
    }
    
    go func() {
        <-quit
        close(job) // 广播给所有新goroutine
        wg.Wait()
        quit <- struct{}{}
    }()
    
    return quit
}func main() {
    quit := spawnGroup(5, worker)
    println("spawn a group of workers")
    
    time.Sleep(5 * time.Second)
    // 通知 worker goroutine 组退出
    println("notify the worker group to exit...")
    quit <- struct{}{}
    
    timer := time.NewTimer(time.Second * 5)
    defer timer.Stop()
    select {
    case <-timer.C:
        println("wait group workers exit timeout!")
    case <-quit:
        println("group workers done")
    }
}

创建者直接利用了worker goroutine接收任务(job)的channel来广播退出通知,而实现这一广播的代码就是close(job)。此时各个worker goroutine监听job channel,当创建者关闭job channel时,通过“comma ok”模式获取的ok值为false,也就表明该channel已经被关闭,于是worker goroutine执行退出逻辑(退出前wg.Done()被执行)。

执行:

退出模式的应用

由于goroutine的运行状态不同,因此很难用同种框架全面管理,所以我们可以只实现一个“超时等待退出”框架,以统一解决各种运行状态

一组goroutine的退出有两种情况,第一种情况是并发退出,当goroutine的退出先后数据对数据处理无影响时可使用;另一种则是串行退出,也就是次序错误可能导致程序状态混乱和错误

  • 并发退出
  • 串行退出

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1227633.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vite - 静态资源处理 - json文件导入

直接就说明白了 vite 中对json 文件直接当作一个模块来解析。 可以直接导入使用&#xff01; 可以直接导入使用&#xff01; 可以直接导入使用&#xff01;json文件中的key&#xff0c;直接被作为一个属性&#xff0c;可以单独被导入。 因此&#xff0c;导入json文件有两种方式…

72. 编辑距离(动态规划)

题目 题解 class Solution:def minDistance(self, word1: str, word2: str) -> int:# 定义状态&#xff1a;dp[i][j]表示将word1[0:i]转换成word2[0:j]所使用的最少操作数dp [[0 for j in range(len(word2)1)] for i in range(len(word1)1)]# badcase: dp[0][j] j, dp[i]…

kaggle新赛:SenNet 3D肾脏分割大赛(3D语义分割)

赛题名称&#xff1a;SenNet HOA - Hacking the Human Vasculature in 3D 赛题链接&#xff1a;https://www.kaggle.com/competitions/blood-vessel-segmentation 赛题背景 目前&#xff0c;人类专家标注员需要手动追踪血管结构&#xff0c;这是一个缓慢的过程。即使有专家…

defer和async

如果两个属性浏览器都不兼容&#xff0c;推荐把<script>标签放到底部 一般情况下&#xff0c;浏览器在解析html源文件时&#xff0c;如果遇到外部的<script>标签&#xff0c;解析过程就会先暂停&#xff0c;这时会对script进行加载&#xff0c;执行两个过程&…

【Linux进程】进程等待 与 进程替换 原理与函数使用

文章目录 一、进程等待1.1 意义 / 必要性1.2 进程等待的函数&#xff08;wait / waitpid&#xff09;1.3 status参数1.4 获取子进程status1.5 进程的阻塞等待与非阻塞等待 二、进程替换2.1 引言2.2 进程替换原理2.3 替换函数 一、进程等待 1.1 意义 / 必要性 为什么要有进程等…

【【VDMA彩条显示实验之二】】

VDMA彩条显示实验之二 这一篇紧接上一篇文章 我们添加一个 VID_out 的 IP核 其实 相对来说 就是我们把 传进来的串行信号 转化成并行输出各个信号 &#xff08;把 Stream 的 输出信号流转化成在 RGB上 输出的 格式 &#xff09; 下面是对IP核的简介 AXI4-Stream to Video Out…

八、Linux关机重启和用户登录注销

1.Linux关机、重启 基本介绍 shutdown -h now 立即进行关机 shutdown -h 1 “hello&#xff0c;1分钟后会关机了”(h&#xff1a;halt) shutdown 默认就是&#xff08;shutdown -h 1&#xff09; shutdown -r now 现在重新启动计算机(r : reboot) halt 关机&#xff0c;作用和…

Tomcat无法映射到activiti-app导致activiti无法启动页面

原因之一&#xff1a;JDK版本与Tomcat版本不匹配&#xff0c;jdk8 yyds 我使用的是JDK11&#xff0c;Tomcat是9.0的&#xff0c;都是最新的&#xff0c;但还是不行&#xff0c;最后JDK改为8&#xff0c;tomcat的cmd后台没有报错&#xff0c;activiti-pp也可以正常访问了,很神奇…

基于RK3588全高端智能终端机器人主板

一、小尺寸板型设计 该款主板为小型板&#xff0c;尺寸仅为125*85mm&#xff0c;更小更紧凑&#xff0c;可完美适应各类高端智能自助终端&#xff1b; 二、八核高端处理器 采用RK3588S八核64位处理器&#xff0c;8nm LP制程&#xff0c;主频最高达2.4GHz&#xff0c;搭载Andr…

Python大数据之linux学习总结——day11_ZooKeeper

ZooKeeper ZK概述 ZooKeeper概念: Zookeeper是一个分布式协调服务的开源框架。本质上是一个分布式的小文件存储系统 ZooKeeper作用: 主要用来解决分布式集群中应用系统的一致性问题。 ZooKeeper结构: 采用树形层次结构&#xff0c;ZooKeeper树中的每个节点被称为—Znode。且树…

Web实战:基于Django与Bootstrap的在线计算器

文章目录 写在前面实验目标实验内容1. 创建项目2. 导入框架3. 配置项目前端代码后端代码 4. 运行项目 注意事项写在后面 写在前面 本期内容&#xff1a;基于Django与Bootstrap的在线计算器 实验环境&#xff1a; vscodepython(3.11.4)django(4.2.7)bootstrap(3.4.1)jquery(3…

​软考-高级-系统架构设计师教程(清华第2版)【第12章 信息系统架构设计理论与实践(P420~465)-思维导图】​

软考-高级-系统架构设计师教程&#xff08;清华第2版&#xff09;【第12章 信息系统架构设计理论与实践&#xff08;P420~465&#xff09;-思维导图】 课本里章节里所有蓝色字体的思维导图

什么是tomcat, tomcat该如何使用?(java)

tomcat是什么? tomcat翻译过来为汤姆猫, 但是他可不是猫和老鼠中的汤姆, 而是java中的tom, 虽然java中的tomcat没有猫和老鼠那么出名, 但是他仍然是java中的中流砥柱 下图为java中的tomcat, 也就是最右边这个黄色的猫: Tomcat是Apache 软件基金会&#xff08;Apache Software …

AI绘画使用Stable Diffusion(SDXL)绘制三星堆风格的图片

一、前言 三星堆文化是一种古老的中国文化&#xff0c;它以其精湛的青铜铸造技术闻名&#xff0c;出土文物中最著名的包括青铜面具、青铜人像、金杖、玉器等。这些文物具有独特的艺术风格&#xff0c;显示了高度的工艺水平和复杂的社会结构。 青铜面具的巨大眼睛和突出的颧骨&a…

【洛谷 B2002】Hello,World!(顺序结构)

Hello,World! 题目描述 编写一个能够输出 Hello,World! 的程序。 提示&#xff1a; 使用英文标点符号&#xff1b;Hello,World! 逗号后面没有空格。H 和 W 为大写字母。 输入格式 输出格式 样例 #1 样例输入 #1 无样例输出 #1 Hello,World!思路 #include 是一个预处…

MyBatis 快速入门

MyBatis 快速入门 前言什么是 MyBatis简介核心特性使用示例配置文件Mapper 接口SQL 映射文件使用 MyBatis 如果大家对以上的导读很懵怎么办&#xff01;没关系 往下阅读&#xff01; 1. MyBatis 介绍1.1. 什么是MyBatis1.2. 持久层1.3. 框架1.4. JDBC 弊端1.5.…

有成效的工作

从开始上班起&#xff0c;听到过工作是做不完得。 大概的意思&#xff0c;现在的工作做完了&#xff0c;就会分配新的工作。所以总也做不完。 如果是做不完的&#xff0c;那么是不是在一个岗位上就一直干着呢。既然这个很难成立。那其实工作是可以干得完的。 一个岗位的终结&am…

redis+python 建立免费http-ip代理池;验证+留接口

前言: 效果图: 对于网络上的一些免费代理ip,http的有效性还是不错的;但是,https的可谓是凤毛菱角; 正巧,有一个web可以用http访问,于是我就想到不如直接拿着免费的HTTP代理去做这个! 思路: 1.单页获取ipporttime (获取time主要是为了后面使用的时候,依照时效可以做文章) 2.整…

矩阵运算_矩阵的协方差矩阵/两个矩阵的协方差矩阵_求解详细步骤示例

1. 协方差矩阵定义 在统计学中&#xff0c;方差是用来度量单个随机变量的离散程度&#xff0c;而协方差则一般用来刻画两个随机变量的相似程度。 参考&#xff1a; 带你了解什么是Covariance Matrix协方差矩阵 - 知乎 2. 协方差矩阵计算过程 将输入数据A进行中心化处理得到A…

马斯克的SpaceX星舰又炸了!发射不久后失联自毁

就在几小时前&#xff0c;马斯克旗下SpaceX 发射了有史以来最强大的星舰&#xff0c;但在发射后不久发生爆炸。 在这次发射尝试中&#xff0c;星舰一二级成功进行了分离&#xff0c;但二级助推器和星舰都发生了快速意料之外的解体。在发射半小时后&#xff0c;SpaceX 宣布二级自…