题目
手撕 对无序的切片查询指定数 使用context进行子协程的销毁 并且进行超时处理。
全局变量定义
var (
startLoc = int64(0) // --- 未处理切片数据起始位置
endLoc = int64(0) // --- 切片数据右边界 避免越界
offset = int64(0) // --- 根据切片和协程数量 在主线程 动态设置
target = 42 // --- 设置的目标值
mu sync.Mutex // --- 避免并发冲突使用的全局锁
)
1.并发处理
1.1 使用atomic原子操作
使用CAS操作解决并发问题(不使用锁) 效率上和使用全局锁在 100000 上几乎没差别
// --- 使用atomic原子操作
start = atomic.LoadInt64(&startLoc)
end = start + offset
if end > endLoc {
end = endLoc
}
// 应该不会出现ABA问题
if ok := atomic.CompareAndSwapInt64(&startLoc, start, end); ok == false {
continue
}
1.2 使用全局锁
mu.Lock()
start = startLoc
end = start + offset
startLoc = end
mu.Unlock()
if start >= endLoc {
return
}
if end > endLoc {
end = endLoc
}
1.3主线程手动切片全部代码
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
func find(nums []int, ctx context.Context, wg *sync.WaitGroup, target int, start, end int64) {
defer wg.Done()
for {
select {
case <-ctx.Done():
// 如果接收到取消信号,退出协程
return
default:
for i := start; i < end; i++ {
if nums[i] == target {
// 使用 atomic 以确保线程安全
atomic.StoreInt32(&valid, 1)
return
}
}
return
}
}
}
var valid int32
func main() {
sliceLen := int64(1000000)
// 创建一个背景上下文和一个取消功能
ctx := context.Background()
// 假设 ddl 是一个固定的截止时间
ddl := time.Now().Add(10 * time.Second) // 假设 5 秒钟后超时
newCtx, cancel := context.WithDeadline(ctx, ddl)
// 创建一个较大的切片 nums 并初始化
nums := make([]int, sliceLen)
// 初始化切片为随机数据,例如从 1 到 100,值为42的即为目标
for i := 0; i < len(nums); i++ {
nums[i] = i
}
offset := sliceLen / 10
startLoc := int64(0)
startTime := time.Now()
// 使用 WaitGroup 来等待所有协程完成
var wg sync.WaitGroup
// 启动多个协程进行查找
for i := 0; i < 10; i++ {
wg.Add(1)
go find(nums, newCtx, &wg, 42, startLoc, startLoc+offset)
startLoc = startLoc + offset
}
// 等待结果
go func() {
wg.Wait()
cancel() // 等待所有协程结束后,调用 cancel
}()
// 检查结果
select {
case <-newCtx.Done():
if atomic.LoadInt32(&valid) == 1 {
fmt.Println("Found target!")
} else {
fmt.Println("Timeout or not found.")
}
}
duration := time.Since(startTime)
fmt.Printf("程序运行时间: %s\n", duration)
}
1.4 采取锁处理 & 原子操作 全部代码
package main
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
)
var (
startLoc = int64(0)
endLoc = int64(0)
offset = int64(0)
target = 42
mu sync.Mutex
)
func find(nums []int, ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
var start, end int64
for {
select {
case <-ctx.Done():
// 如果接收到取消信号,退出协程
return
default:
// --- 使用全局锁
// 查找区间
//mu.Lock()
//start = startLoc
//end = start + offset
//startLoc = end
//mu.Unlock()
//if start >= endLoc {
// return
//}
//if end > endLoc {
// end = endLoc
//}
// --- 使用atomic原子操作
start = atomic.LoadInt64(&startLoc)
end = start + offset
if end > endLoc {
end = endLoc
}
if start >= endLoc {
return
}
// 应该不会出现ABA问题
if ok := atomic.CompareAndSwapInt64(&startLoc, start, end); ok == false {
//time.Sleep(100)
continue
}
for i := start; i < end; i++ {
if nums[i] == target {
// 使用 atomic 以确保线程安全
atomic.StoreInt32(&valid, 1)
return
}
}
}
}
}
var valid int32
func main() {
sliceLen := int64(100000)
// 创建一个背景上下文和一个取消功能
ctx := context.Background()
// 假设 ddl 是一个固定的截止时间
ddl := time.Now().Add(10 * time.Second) // 假设 5 秒钟后超时
newCtx, cancel := context.WithDeadline(ctx, ddl)
// 创建一个较大的切片 nums 并初始化
nums := make([]int, sliceLen)
endLoc = sliceLen
// 初始化切片为随机数据,例如从 1 到 100,值为42的即为目标
for i := 0; i < len(nums); i++ {
nums[i] = i
}
startTime := time.Now()
// 使用 WaitGroup 来等待所有协程完成
var wg sync.WaitGroup
offset = int64(sliceLen / 10)
// 启动多个协程进行查找
for i := 0; i < 10; i++ {
wg.Add(1)
go find(nums, newCtx, &wg)
}
// 等待结果
go func() {
wg.Wait()
cancel() // 等待所有协程结束后,调用 cancel
}()
// 检查结果
select {
case <-newCtx.Done():
if atomic.LoadInt32(&valid) == 1 {
fmt.Println("Found target!")
} else {
fmt.Println("Timeout or not found.")
}
}
duration := time.Since(startTime)
fmt.Printf("程序运行时间: %s\n", duration)
}
2.Context部分
2.1 context是并发安全
创建的初始context有两种 TODO()和Background(),查看内部结构体, 实际都是emptyCtx。
Background()创建的上下文通常被认为整个请求的顶级 Context,而TODO()创建的通常被认为是暂时的、未确定的 Context。
func Background() Context {
return backgroundCtx{}
}
func TODO() Context {
return todoCtx{}
}
1. 传值Value
直接对父context进行包装,并不会修改父context
type valueCtx struct {
Context
key, val any
}
func WithValue(parent Context, key, val any) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}
2. 设置超时时间 WithDeadline
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
return WithDeadlineCause(parent, d, nil)
}
2.2 context的信号传递
以cancel部分举例说明
1. 设置超时时间
设置取消函数的接口主要分为下列几种情况:
- 父Ctx为nil, 抛出异常
- 父Ctx具有超时时间,且比设置的超时时间更早结束,则新建CancelCtx加入父Ctx监听列表,且返回该新建CancelCtx。
- 设置新的包含超时时间的timerCtx(内部继承了cancelCtx结构体),加入父Ctx的监听列表,检查是否已经超时, 超时则取消该上下文, 没超时则设置计时器,等待取消。
func WithDeadlineCause(parent Context, d time.Time, cause error) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
deadline: d,
}
c.cancelCtx.propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded, cause) // deadline has already passed
return c, func() { c.cancel(false, Canceled, nil) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded, cause)
})
}
return c, func() { c.cancel(true, Canceled, nil) }
}
2.设置子Ctx监听父Ctx
上下文取消传播:propagateCancel 的核心目的是将父上下文的取消信号(及其取消原因)传递给子上下文。不同的父上下文类型(如 *cancelCtx 或实现了 AfterFunc 方法的上下文)会采取不同的处理方式。
并发处理:通过 goroutines.Add(1) 和新的 goroutine 来监听父上下文的取消事件,确保并发场景下的取消传播。
其中分为三种情况:
- 父Ctx未设置Done ,则无需监听
- 父Ctx设置了回调函数
- 父Ctx类型是*cancelCtx,则把子Ctx加入自身map中,每个子Ctx都会开启协程监听父Ctx信号,同步取消自身。
主要就是依赖Channel进行信号传递。
func (c *cancelCtx) propagateCancel(parent Context, child canceler) {
c.Context = parent
done := parent.Done()
if done == nil {
return // parent is never canceled
}
select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err(), Cause(parent))
return
default:
}
if p, ok := parentCancelCtx(parent); ok {
// parent is a *cancelCtx, or derives from one.
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err, p.cause)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
return
}
if a, ok := parent.(afterFuncer); ok {
// parent implements an AfterFunc method.
c.mu.Lock()
stop := a.AfterFunc(func() {
child.cancel(false, parent.Err(), Cause(parent))
})
c.Context = stopCtx{
Context: parent,
stop: stop,
}
c.mu.Unlock()
return
}
goroutines.Add(1)
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err(), Cause(parent))
case <-child.Done():
}
}()
}
参考链接:
Go 语言并发编程与 Context | Go 语言设计与实现
3.channel部分
3.1channel底层结构
在有缓冲区的channel部分,数据使用环形链表进行存储,存储有变量记录有效数据区域。
type hchan struct {
qcount uint // Channel 中的元素个数
dataqsiz uint // Channel 中的循环队列的长度
buf unsafe.Pointer // Channel 的缓冲区数据指针
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // Channel 的发送操作处理到的位置
recvx uint // Channel 的接收操作处理到的位置
recvq waitq // 等待消息的双向链表
sendq waitq // 发生消息双向链表
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
// 创建双向链表 构造等待消息 或 发生消息的goroutine的双向链表
type waitq struct {
first *sudog
last *sudog
}
有缓冲区
无缓冲区
3.2 对于不同的channel进行读入读出的不同情况
如果给一个 nil 的 channel 发送数据,会造成永远阻塞。
如果从一个 nil 的 channel 中接收数据,也会造成永久阻塞。
给一个已经关闭的 channel 发送数据, 会引起 panic。
从一个已经关闭的 channel 接收数据, 如果缓冲区中为空,则返回一个零值。
同时分为有缓冲区和无缓冲区两种,前者是异步的,在缓冲区未满时,可以持续输入,不会阻塞,直到缓冲区满;后者则为有goroutine输入,等待有协程进行数据消费,否则持续阻塞。
对nil的channel不可操作。
参考链接:
https://www.cnblogs.com/Paul-watermelon/articles/17484439.html
Go 语言 Channel 实现原理精要 | Go 语言设计与实现 (draveness.me)