Go 并发

news2025/1/9 10:37:47

来自 《Go 语言从入门到实战》 的并发章节学习笔记,欢迎阅读斧正,感觉该专栏整体来说对有些后端编程经验的来说比无后端编程经验的人更友好。。

Thread VS Groutine

创建时默认 Stack 大小:前者默认 1M,Groutint 的 Stack 初始化大小为 2K

和 KSE(Kernel Space Entity)关系:Java Thread 是 1:1,Groutine 是 M:N

扩展阅读:go语言之行–golang核武器goroutine调度原理、channel详解

import (
	"fmt"
	"testing"
	"time"
)

func TestGroutine(t *testing.T) {
	for i := 0; i < 10; i++ {
		// 使用go创建协程,但是需要注意的是:协程函数的 param 作为参数是外部 i 的数据拷贝
		go func(param int) {
			fmt.Println(param)
		}(i)
	}
	time.Sleep(time.Microsecond * 50)
}

共享内存并发机制

Lock

import (
	"sync"
	"testing"
	"time"
)

// 非线程安全
func TestCounter(t *testing.T) {
	counter := 0
	// 循环出 5000 个协程程序
	for i := 0; i < 5000; i++ {
		go func() {
			// 执行这个协程匿名函数给 counter 自增
			counter++
		}()
	}
	time.Sleep(1 * time.Second)
	// 输出的不是 5000,这是因为这 5000 个并发协程都在抢用 counter
	t.Logf("counter = %d", counter)
}

// 线程安全
func TestCounterThreadSafe(t *testing.T) {
	// 增加一个互斥锁
	var mut sync.Mutex
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			defer func() {
				// 使用 defer 用于释放资源,此处用于解除锁
				mut.Unlock()
			}()
			//  counter 自增前加锁
			mut.Lock()
			counter++
		}()
	}
	// 加这个延时是担心程序一下就跑完了,甚至协程还没有跑完,
	time.Sleep(1 * time.Second)
	// 输出 5000
	t.Logf("counter = %d", counter)
}

WaitGroup

在 Java 中等待其他线程完成用的是 Thread.join() 方法(主线程等待子线程的终止),WaitGroup 有着类似的功能

// 线程安全
func TestCounterWaitGroup(t *testing.T) {
	// 增加一个互斥锁
	var mut sync.Mutex
	//
	var wg sync.WaitGroup
	counter := 0
	for i := 0; i < 5000; i++ {
		go func() {
			defer func() {
				// 使用 defer 用于释放资源,此处用于解除锁
				mut.Unlock()
			}()
			//  counter 自增前加锁
			mut.Lock()
			counter++
			wg.Done()
		}()
	}
	// 加这个是担心程序一下就跑完了,甚至协程还没有跑完
	wg.Wait()
	// 输出 5000
	t.Logf("counter = %d", counter)
}

CSP 并发机制

Go 语言特有的,通过 channel 交互,channel 有阻塞型的,有 buffer 型的(缓冲)

import (
	"fmt"
	"testing"
	"time"
)

func service() string {
	fmt.Println("---service---立即执行 50 毫秒延时")
	time.Sleep(time.Millisecond * 50)
	return "---service---Done"
}

func otherTask() {
	fmt.Println("---otherTask---working on something else")
	fmt.Println("---otherTask---立即执行 100 毫秒延时")
	time.Sleep(time.Millisecond * 100)
	fmt.Println("---otherTask---Task is Done")
}

func AsyncService() chan string { // 该函数返回一个通道, 消息类型是 string
	// retCh := make(chan string)    // 声明一个无缓冲 chan通道, 这个通道的类型是 string, 通道用于协程间的通信
	//                               // ⚠️ 使用无缓冲通道时, 如果协程1 没有立即接受协程 2 通过管道发送的消息,就会阻塞,反之亦然
	retCh := make(chan string, 1) // 声明一个缓冲容量为 1 的chan通道, 这个通道的类型是 string, 通道用于协程间的通信
	go func() {                   // 此处执行 匿名协程函数
		ret := service()
		fmt.Println("---AsyncService---returned result:", ret)
		retCh <- ret //通过通道将 service()函数返回的值传递给  通道 retCh
		fmt.Println("---AsyncService---service exited")
	}()
	return retCh // 返回这个通道
}

func TestAsyncService(t *testing.T) {
	retCh := AsyncService()
	otherTask()
	fmt.Println(<-retCh)
}

func TestService(t *testing.T) {
	fmt.Println(service())
	otherTask()
}

多路选择和超时控制

多路选择代码:
在这里插入图片描述
超时控制代码:
在这里插入图片描述

import (
	"fmt"
	"testing"
	"time"
)

func service() string {
	time.Sleep(time.Millisecond * 500)
	return "Done"
}

func AsyncService() chan string {
	retCh := make(chan string, 1)
	//retCh := make(chan string, 1)
	go func() {
		ret := service()
		fmt.Println("returned result.")
		retCh <- ret
		fmt.Println("service exited.")
	}()
	return retCh
}

func TestSelect(t *testing.T) {
	/**
	select 语句中,case不依赖代码书写顺序。
	如果case中有1个有消息时,其他case/default则不会执行。
	如果case中有多个消息时,随机任选1个进行执行,其他不会执行
	如果所有case都没有消息时,同时含有defalut分之,则会走default分之
	如果所有case都没有消息时,没有default分之,则会阻塞等待case中返回消息继续执行。
	*/
	select {
	case ret := <-AsyncService():
		t.Log(ret)
	case <-time.After(time.Millisecond * 100):
		t.Error("time out")
	}
}

``

# channel
![在这里插入图片描述](https://img-blog.csdnimg.cn/59bfa1b8ede14c66afbd8482f3701e38.png)
```go
import (
	"fmt"
	"sync"
	"testing"
)

func dataProducer(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for i := 0; i < 10; i++ {
			ch <- i
		}
		// 数据发送完成,关闭 channel
		close(ch)
		// 往关闭的 channel 发内容会有 panic
		//ch <- i
		wg.Done()
	}()

}

func dataReceiver(ch chan int, wg *sync.WaitGroup) {
	go func() {
		for {
			if data, ok := <-ch; ok {
				fmt.Println(data)
			} else {
				break
			}
		}
		wg.Done()
	}()

}

func TestCloseChannel(t *testing.T) {
	var wg sync.WaitGroup
	ch := make(chan int)
	wg.Add(1)
	dataProducer(ch, &wg)
	wg.Add(1)
	dataReceiver(ch, &wg)
	// wg.Add(1)
	// dataReceiver(ch, &wg)
	wg.Wait()

}

任务取消

通过关闭 channel 取消

package cancel_by_close

import (
	"fmt"
	"testing"
	"time"
)

// 任务是否已被取消
// 实现原理:
// 检查是否从 channel 收到一个消息,如果收到一个消息,我们就返回 true,代表任务已经被取消了
// 当没有收到消息,channel 会被阻塞,多路选择机制就会走到 default 分支上去。
func isCancelled(cancelChan chan struct{}) bool {
	select {
	case <-cancelChan:
		return true
	default:
		return false
	}
}

func cancel_1(cancelChan chan struct{}) {
	cancelChan <- struct{}{}
}

func cancel_2(cancelChan chan struct{}) {
	close(cancelChan)
}

// 利用 CSP, 多路选择机制和 channel 的关闭与广播实现任务取消功能
func TestCancel(t *testing.T) {
	cancelChan := make(chan struct{}, 0)
	for i := 0; i < 5; i++ {
		go func(i int, cancelCh chan struct{}) {
			for {
				if isCancelled(cancelCh) {
					break
				}
				time.Sleep(time.Millisecond * 5)
			}
			fmt.Println(i, "Cancelled")
		}(i, cancelChan)
	}
	cancel_1(cancelChan)
	// cancel_2(cancelChan)
	time.Sleep(time.Second * 1)
}

通过 Context 取消

使用 context 为了解决层级取消的问题,就是取消一个协程的子协程(树)甚至孙子协程(树)的问题
在这里插入图片描述

import (
	"context"
	"fmt"
	"testing"
	"time"
)

func isCancelled(ctx context.Context) bool {
	select {
	case <-ctx.Done():
		return true
	default:
		return false
	}
}

func TestCancel(t *testing.T) {
	ctx, cancel := context.WithCancel(context.Background())
	for i := 0; i < 5; i++ {
		go func(i int, ctx context.Context) {
			for {
				if isCancelled(ctx) {
					break
				}
				time.Sleep(time.Millisecond * 5)
			}
			fmt.Println(i, "Cancelled")
		}(i, ctx)
	}
	cancel()
	time.Sleep(time.Second * 1)
}

常见并发任务

只执行一次

使用 sync.Once 完成,类似于 Java 中的单例模式,避免资源的反复初始化。

如果type Singleton struct {} 定义的是空结构体,那么无论用不用once.Do(),最后得到的obj地址都是一样的。当然不用once.Do()时,除了输出10个相同的地址,还会输出10次“Create obj“。
将Singleton结构体里添加内容后,比如type Singleton struct {a bool},再做实验,每次运行就会得到不同的地址了。

空结构体情况:

import (
	"fmt"
	"sync"
	"testing"
	"unsafe"
)

type Singleton struct {
}

var singleInstance *Singleton

// 主要用于类似于单例的场景中,避免资源的反复初始化
var once sync.Once

func GetSingletonObj() *Singleton {
	once.Do(func() {
		fmt.Println("Create Obj")
		singleInstance = new(Singleton)
	})
	return singleInstance
}

func TestGetSingletonObj(t *testing.T) {

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			obj := GetSingletonObj()
			fmt.Printf("%x\n", unsafe.Pointer(obj))
			wg.Done()
		}()
	}
}

结构体非空情况:

import (
	"fmt"
	"sync"
	"testing"
	"unsafe"
)

type Singleton struct {
	// 
	flag bool
}

var singleInstance *Singleton

// 主要用于类似于单例的场景中,避免资源的反复初始化
var once sync.Once

func GetSingletonObj() *Singleton {
	once.Do(func() {
	fmt.Println("Create Obj")
	singleInstance = new(Singleton)
	singleInstance.flag = true
	})
	return singleInstance
}

// 此时 Singleton 的 struct 中不是空结构,加了 once.Do 能确保单例
func TestGetSingletonObj(t *testing.T) {

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			obj := GetSingletonObj()
			fmt.Printf("%x\n", unsafe.Pointer(obj))
			wg.Done()
		}()
	}
}

仅需任意任务完成

import (
	"fmt"
	"runtime"
	"testing"
	"time"
)

func runTask(id int) string {
	time.Sleep(10 * time.Millisecond)
	return fmt.Sprintf("The result is from %d", id)
}

func FirstResponse() string {
	numOfRunner := 10
	// 如果不使用 buffer channel,会出现协程泄露的情况:比如说第 3 个协程就返回了,但是还是会创建 11 个
	ch := make(chan string)
	// 使用的话,就不会新创建协程执行直接返回
	//ch := make(chan string, numOfRunner)
	for i := 0; i < numOfRunner; i++ {
		go func(i int) {
			ret := runTask(i)
			ch <- ret
		}(i)
	}
	return <-ch
}

func TestFirstResponse(t *testing.T) {
	// 系统当前协程数
	t.Log("Before:", runtime.NumGoroutine())
	t.Log(FirstResponse())
	time.Sleep(time.Second * 1)
	t.Log("After:", runtime.NumGoroutine())

}

所有任务完成

对象池

使用 buffered channel 实现:
在这里插入图片描述
obj_pool.go:

import (
	"errors"
	"time"
)

type ReusableObj struct {
}

type ObjPool struct {
	bufChan chan *ReusableObj //用于缓冲可重用对象
}

func NewObjPool(numOfObj int) *ObjPool {
	objPool := ObjPool{}
	objPool.bufChan = make(chan *ReusableObj, numOfObj)
	for i := 0; i < numOfObj; i++ {
		objPool.bufChan <- &ReusableObj{}
	}
	return &objPool
}

func (p *ObjPool) GetObj(timeout time.Duration) (*ReusableObj, error) {
	select {
	case ret := <-p.bufChan:
		return ret, nil
	case <-time.After(timeout): //超时控制
		return nil, errors.New("time out")
	}

}

func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
	select {
	case p.bufChan <- obj:
		return nil
	default:
		return errors.New("overflow")
	}
}

obj_pool_test.go:

import (
	"fmt"
	"testing"
	"time"
)

func TestObjPool(t *testing.T) {
	pool := NewObjPool(10)
	// if err := pool.ReleaseObj(&ReusableObj{}); err != nil { //尝试放置超出池大小的对象
	// 	t.Error(err)
	// }
	for i := 0; i < 11; i++ {
		if v, err := pool.GetObj(time.Second * 1); err != nil {
			t.Error(err)
		} else {
			fmt.Printf("%T\n", v)
			if err := pool.ReleaseObj(v); err != nil {
				t.Error(err)
			}
		}

	}

	fmt.Println("Done")
}

sync.Pool 对象缓存

获取

在这里插入图片描述

返回

在这里插入图片描述

使用

import (
	"fmt"
	"runtime"
	"sync"
	"testing"
)

// 不 GC输出:
// Create a new object.
// 100
// 3
// GC 输出:
// Create a new object.
// 100
// Create a new object.
// 100
func TestSyncPool(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 100
		},
	}

	v := pool.Get().(int)
	fmt.Println(v)
	pool.Put(3)
	runtime.GC() //GC 会清除sync.pool中缓存的对象
	v1, _ := pool.Get().(int)
	fmt.Println(v1)
}

func TestSyncPoolInMultiGroutine(t *testing.T) {
	pool := &sync.Pool{
		New: func() interface{} {
			fmt.Println("Create a new object.")
			return 10
		},
	}

	pool.Put(100)
	pool.Put(100)
	pool.Put(100)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(id int) {
			fmt.Println(pool.Get())
			wg.Done()
		}(i)
	}
	wg.Wait()
}

对象生命周期

在这里插入图片描述

总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/132099.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言可变参数与内存管理

有时&#xff0c;您可能会碰到这样的情况&#xff0c;您希望函数带有可变数量的参数&#xff0c;而不是预定义数量的参数。C 语言为这种情况提供了一个解决方案&#xff0c;它允许您定义一个函数&#xff0c;能根据具体的需求接受可变数量的参数。下面的实例演示了这种函数的定…

LeetCode题解 二叉树(八):404 左叶子之和;513 找树左下角的值;112 路径总和;113 路径总和II

二叉树 404 左叶子之和 easy 左叶子结点也好判断&#xff0c;若某结点属于左结点&#xff0c;且无子树&#xff0c;就是左叶子结点 即也如此&#xff0c;所以如果要判断&#xff0c;必然要从父结点下手&#xff0c;涉及到三层结点的处理 如果要使用递归法&#xff0c;要使用…

(二十三)大白话数据库服务器上的RAID存储架构的电池充放电原理

文章目录 1、RAID卡的缓存2、RAID卡的缓存里的数据会突然丢失怎么办?3、锂电池存在性能衰减问题1、RAID卡的缓存 服务器使用多块磁盘组成的RAID阵列的时候,一般会有一个RAID卡,这个RAID卡是带有一个缓存的,这个缓存不是直接用我们的服务器的主内存的那种模式,他是一种跟内…

网络静态路由综合实验

1.首先分配ip,配置ip和环回 [Huawei]sysname R1 [R1]interface LoopBack 0 [R1-LoopBack0]ip add 192.168.1.33 28 [R1-LoopBack0]q [R1]int l 1 [R1-LoopBack1]ip add 192.168.1.49 28 [R1-LoopBack1]q [R1]int g 0/0/0 [R1-GigabitEthernet0/0/0]ip add 192.168.1.1 30 [R1-…

JVM与Java体系结构

目录 前言 架构师每天都在思考什么&#xff1f; Java vs C Java生态圈 字节码 多语言混合编程 虚拟机与Java虚拟机 虚拟机 Java虚拟机 JVM的位置 JVM整体结构 Java代码执行流程 JVM的架构模型 举例 字节码反编译 总结 栈 JVM生命周期 虚拟机的启动 虚拟机的…

时间从来不语,确回答了所有问题——我的2022年终总结

趁着没阳&#xff0c;趁着电脑还能开机&#xff0c;趁着还能写&#xff0c;赶紧小结过去这一年。没有别的感觉&#xff0c;就是感觉太快&#xff0c;时间太过匆匆.....最大的感触是两个字“变化”&#xff0c;如果非要说四个字是“变化太快”&#xff0c;就如当下的yi情政策&am…

多线程_进阶

文章目录线程通信概念使用方式案例单例模式阻塞式队列线程池常见的锁策略乐观锁 悲观锁CASCAS存在的问题:ABA问题读写锁自旋锁公平锁 非公平锁非公平锁公平锁synchronizedjvm对synchronized的优化:锁升级synchronized的其他优化Lock体系synchronized vs lock独占锁vs共享锁独占…

Arch/Manjaro换源+安装常用的软件+安装显卡驱动

本文将教你&#xff1a;换源安装显卡驱动&#xff0c;安装常用软件例如腾讯会议&#xff0c;QQ&#xff0c;WPS 一起交流Linux知识&#xff0c;欢迎加入Skype群&#xff1a; Join conversationhttps://join.skype.com/q6wrF3d6Usni pacman换清华源 首先安装vim&#xff0c;用来…

(二十四)大白话RAID锂电池充放电导致的MySQL数据库性能抖动的优化

案例实战:RAID锂电池充放电导致的MySQL数据库性能抖动的优化 文章目录 1、磁盘故障怎么保障数据不丢失?2、线上MySQL数据库的性能定期抖动的原因1、磁盘故障怎么保障数据不丢失? 前面经过了几天的生产经验的一些铺垫,包括MySQL磁盘读写的机制,Linux存储系统的原理,RAID磁…

垃圾佬图拉丁装机

理论知识 缩线程 amd搞了个推土机架构 两个核心公用一个浮点运算单元&#xff0c;因为浮点运算只占百分之二十。 浮点运算应该交给更适合的gpu去做 好的对比 RDP 微软的RDP本身就定位是一个远程登录和维护windows系的工具&#xff0c;它为什么要支持管理别的系统&#xff…

【mybatis generator实战】 1.crud 2.计数 3.自定义复杂mapper代码组织

1.计数 2.CRUD 增 注意&#xff1a; insert&#xff1a;一个必须全部有值。 insertSelective是&#xff1a;部分有值就行&#xff0c;用的较多。 有疑问可以看源码&#xff0c;发现xxxSelective就是拼接了一些参数。 删 改 注意&#xff1a; 4个更新方法&#xff1a; …

QML学习笔记【06】:QML与C++交互

1 QML端直接调用C端变量及函数 1、 创建继承自QObject的C类&#xff0c;对象必须继承自QObject才能在QML被使用和访问 2、在类定义中使用Q_PROPERTY导出成员的READ、WRITE、NOTIFY接口&#xff0c;这样类中的成员变量就可以在QML调用和修改了&#xff0c;同时变量被修改后也会…

剑指 Offer 18. 删除链表的节点

一、题目描述 给定单向链表的头指针和一个要删除的节点的值&#xff0c;定义一个函数删除该节点。 返回删除后的链表的头节点。 示例 1: 输入: head [4,5,1,9], val 5 输出: [4,1,9] 解释: 给定你链表中值为 5 的第二个节点&#xff0c;那么在调用了你的函数之后&#xf…

2022年最后一篇推文 | C语言编程十诫

正文大家好&#xff0c;我是bug菌~2022年最后一篇推文原本选择一篇年终总结会比较合适&#xff0c;然而坐在窗台&#xff0c;望着窗外思索了良久&#xff0c;与往年总结有点不同&#xff0c;这个时间节点有着他的特殊性&#xff0c;不出意外&#xff0c;明年会有非常多的变化、…

OpenShift 4 - 用 HyperShift 实现以“托管集群”方式部署运行 OpenShift 集群

《OpenShift / RHEL / DevSecOps / Ansible 汇总目录》 说明&#xff1a;本文已经在 OpenShift 4.11 ACM 2.6 AWS 环境中验证 文章目录用 HyperShift 实现 OpenShift 托管集群什么是 HyperShift 托管集群以及架构HyperShift 托管集群的价值成本优势部署优势管理优势在 RHACM …

【Kaggle】Global Wheat Detection

代码链接 实验目的 小麦来自世界各地。密度的小麦植株经常重叠&#xff1b;风会使得照片模糊&#xff1b;外观会因成熟度&#xff0c;颜色&#xff0c;基因型和头部方向而异。使用图像处理和目标检测完成小麦头的位置的标定。完成训练并现场验证后上传指定的输出文件进行验证…

大数据NiFi(三):NiFi关键特性

文章目录 NiFi关键特性 一、​​​​​​​​​​​​​​流管理

人工智能轨道交通行业周刊-第28期(2022.12.26-2023.1.1)

本期关键词&#xff1a;NOCC、车站闸机、雾闪、2022年度盘点、智慧园区 1 整理涉及公众号名单 1.1 行业类 RT轨道交通中关村轨道交通产业服务平台人民铁道世界轨道交通资讯网铁路信号技术交流北京铁路轨道交通网上榜铁路视点ITS World轨道交通联盟VSTR铁路与城市轨道交通Rai…

安全—01day

DNS 域名解析过程: 1.浏览器首先查询浏览器的缓存&#xff0c;因为浏览器会按照一定的频率缓存 DNS 记录 2.若浏览器无缓存&#xff0c;那么查询操作系统的 HOST 文件&#xff0c;查询是否有 DNS 记录。 3.若还没有命中域名&#xff0c;就请求本地域名服务器该服务器一般都会缓…

Qt音视频开发07-合并音视频文件

一、前言 之前已经把音视频分开存储了对应的文件&#xff0c;因为这个需求特别少&#xff0c;当然确实有部分用户是需要把音视频分开存储&#xff0c;但是毕竟是很少数&#xff0c;绝大部分的用户都是音视频合并到一个MP4文件&#xff0c;所以如果要合并到一个文件&#xff0c…