整理Golang忽略的问题
- 参考资料
- 1.WaitGroup与GoRoutine的竞速
- 2.Mutex互斥锁和RWMutex互斥读写锁
- 3.poll,select,epoll
- 4.何时栈和堆?
- 5.GoRoutine合理使用
- 6.GoRoutine优雅退出
- 6.1data channel关闭通知退出
- 6.2exit channel关闭通知退出
- 6.3context超时或取消通知退出
- 6.4WaitGroup/ErrGroup判断所有协程关闭后退出
- 7.make和new区别
- 8.动态保活Worker工作池
- 9.Bug调试/性能分析
- 9.1 shell内置time指令
- 9.2 top和GODEBUG/gctrace
- 9.3 pprof
- 10.GC-标记/清除
- 11.内存溢出
- 11.1例如map[string]*ObjectA在某个[]*ObjectB中,一直未被回收
- 11.2新的error类型
- 12.内存泄漏
- 12.1substring导致的内存泄漏
- 12.2子切片引起的内存泄漏
- 12.3不重置丢失的切片元素中的指针引起的内存泄漏
- 12.4被卡住的goroutine引起的内存泄漏
- 12.5不在使用的但是没有stop的time.Ticker导致的内存泄漏
- 12.6不正确使用终结器导致的内存泄漏
- 13.Channel
- 14.Cond
参考资料
https://github.com/aceld/golang
https://zhuanlan.zhihu.com/p/597424646
https://www.cnblogs.com/xxswkl/p/14248560.html
1.WaitGroup与GoRoutine的竞速
package main
import (
"sync"
//"time"
)
const N = 10
var wg = &sync.WaitGroup{}
func main() {
for i := 0; i < N; i++ {
go func(i int) {
wg.Add(1)
println(i)
defer wg.Done()
}(i)
}
wg.Wait()
}
结果不唯一,所有go未必都能执行到:这是因为go执行太快了,导致wg.Add(1)还未执行main函数就执行完毕了
改进如下
package main
import (
"sync"
)
const N = 10
var wg = &sync.WaitGroup{}
func main() {
for i:= 0; i< N; i++ {
wg.Add(1)
go func(i int) {
println(i)
defer wg.Done()
}(i)
}
wg.Wait()
}
2.Mutex互斥锁和RWMutex互斥读写锁
Mutex,绝对锁(互斥锁),同一时间只有一个
RWMutex,读写锁,RLock()读锁,可以有多个读锁,Lock()写锁,写操作是完全互斥,当一个goroutine写时,其他既不能写也不能读
var count int
var wg sync.WaitGroup
var rw sync.RWMutex
func main() {
wg.Add(10)
for i:=0;i<5;i++ {
go read(i)
}
for i:=0;i<5;i++ {
go write(i);
}
wg.Wait()
}
func read(n int) {
// 读锁是RLock(),
rw.RLock()
fmt.Printf("读goroutine %d 正在读取...\n",n)
v := count
fmt.Printf("读goroutine %d 读取结束,值为:%d\n", n,v)
wg.Done()
rw.RUnlock()
}
func write(n int) {
// 写锁是Lock()
rw.Lock()
fmt.Printf("写goroutine %d 正在写入...\n",n)
v := rand.Intn(1000)
count = v
fmt.Printf("写goroutine %d 写入结束,新值为:%d\n", n,v)
wg.Done()
rw.Unlock()
}
Map可以根据读写锁改造成线程安全的SynchronizedMap
// 安全的Map
type SynchronizedMap struct {
rw *sync.RWMutex
data map[interface{}]interface{}
}
// 存储操作
func (sm *SynchronizedMap) Put(k,v interface{}){
sm.rw.Lock()
defer sm.rw.Unlock()
sm.data[k]=v
}
// 获取操作 只有这个加的是读锁,
func (sm *SynchronizedMap) Get(k interface{}) interface{}{
sm.rw.RLock()
defer sm.rw.RUnlock()
return sm.data[k]
}
// 删除操作
func (sm *SynchronizedMap) Delete(k interface{}) {
sm.rw.Lock()
defer sm.rw.Unlock()
delete(sm.data,k)
}
// 遍历Map,并且把遍历的值给回调函数,可以让调用者控制做任何事情
func (sm *SynchronizedMap) Each(cb func (interface{},interface{})){
sm.rw.RLock()
defer sm.rw.RUnlock()
for k, v := range sm.data {
cb(k,v)
}
}
// 生成初始化一个SynchronizedMap
func NewSynchronizedMap() *SynchronizedMap{
return &SynchronizedMap{
rw:new(sync.RWMutex),
data:make(map[interface{}]interface{}),
}
}
3.poll,select,epoll
poll
while true {
for i in 流[] {
if i has 数据 {
读 或者 其他处理
}
}
}
select
while true {
select(流[]); //阻塞
//有消息抵达
for i in 流[] {
if i has 数据 {
读 或者 其他处理
}
}
}
epoll
while true {
可处理的流[] = epoll_wait(epoll_fd); //阻塞
//有消息抵达,全部放在 “可处理的流[]”中
for i in 可处理的流[] {
读 或者 其他处理
}
}
4.何时栈和堆?
编译器会做逃逸分析(escape analysis),当发现变量的作用域没有跑出函数范围,就可以在栈上,反之则必须分配在堆。
5.GoRoutine合理使用
一个GoRoutine大概占用2.5KB
推荐1:channel与sync同步组合方式
package main
import (
"fmt"
"math"
"sync"
"runtime"
)
var wg = sync.WaitGroup{}
func busi(ch chan bool, i int) {
fmt.Println("go func ", i, " goroutine count = ", runtime.NumGoroutine())
<-ch
wg.Done()
}
func main() {
//模拟用户需求go业务的数量
task_cnt := math.MaxInt64
ch := make(chan bool, 3)
for i := 0; i < task_cnt; i++ {
wg.Add(1)
ch <- true
go busi(ch, i)
}
wg.Wait()
}
推荐2:利用无缓冲channel与任务发送/执行分离方式
package main
import (
"fmt"
"math"
"sync"
"runtime"
)
var wg = sync.WaitGroup{}
func busi(ch chan int) {
for t := range ch {
fmt.Println("go task = ", t, ", goroutine count = ", runtime.NumGoroutine())
wg.Done()
}
}
func sendTask(task int, ch chan int) {
wg.Add(1)
ch <- task
}
func main() {
ch := make(chan int) //无buffer channel
goCnt := 3 //启动goroutine的数量
for i := 0; i < goCnt; i++ {
//启动go
go busi(ch)
}
taskCnt := math.MaxInt64 //模拟用户需求业务的数量
for t := 0; t < taskCnt; t++ {
//发送任务
sendTask(t, ch)
}
wg.Wait()
}
6.GoRoutine优雅退出
6.1data channel关闭通知退出
适用简单任务,复杂的更推荐context单独通知
// cancelFn 数据通道关闭通知退出
func cancelFn(dataChan chan int) {
for {
select {
case val, ok := <-dataChan:
// 关闭data通道时,通知退出
// 一个可选是判断data=指定值时退出
if !ok {
log.Printf("Channel closed !!!")
return
}
log.Printf("Revice dataChan %d\n", val)
}
}
}
6.2exit channel关闭通知退出
部分简单场景适用
// exitChannelFn 单独退出通道关闭通知退出
func exitChannelFn(wg *sync.WaitGroup, taskNo int, dataChan chan int, exitChan chan struct{}) {
defer wg.Done()
for {
select {
case val, ok := <-dataChan:
if !ok {
log.Printf("Task %d channel closed !!!", taskNo)
return
}
log.Printf("Task %d revice dataChan %d\n", taskNo, val)
// 关闭exit通道时,通知退出
case <-exitChan:
log.Printf("Task %d revice exitChan signal!\n", taskNo)
return
}
}
}
6.3context超时或取消通知退出
主流推荐
// contextCancelFn context取消或超时通知退出
func contextCancelFn(wg *sync.WaitGroup, taskNo int, dataChan chan int, ctx context.Context) {
defer wg.Done()
for {
select {
case val, ok := <-dataChan:
if !ok {
log.Printf("Task %d channel closed !!!", taskNo)
return
}
log.Printf("Task %d revice dataChan %d\n", taskNo, val)
// ctx取消或超时,通知退出
case <-ctx.Done():
log.Printf("Task %d revice exit signal!\n", taskNo)
return
}
}
}
6.4WaitGroup/ErrGroup判断所有协程关闭后退出
最常用,参考如下
// 多个任务并行控制,等待所有任务完成
func TestTaskControl(t *testing.T) {
dataChan := make(chan int)
taskNum := 3
wg := sync.WaitGroup{}
wg.Add(taskNum)
// 起多个协程,data关闭时退出
for i := 0; i < taskNum; i++ {
go func(taskNo int) {
defer wg.Done()
t.Logf("Task %d run\n", taskNo)
for {
select {
case _, ok := <-dataChan:
if !ok {
t.Logf("Task %d notify to stop\n", taskNo)
return
}
}
}
}(i)
}
// 通知退出
go func() {
time.Sleep(3 * time.Second)
close(dataChan)
}()
// 等待退出完成
wg.Wait()
}
7.make和new区别
相同
堆空间分配
不同
make: 只用于slice、map以及channel的初始化, 无可替代
new: 用于类型内存分配(初始化值为0), 不常用
new不常用
所以有new这个内置函数,可以给我们分配一块内存让我们使用,但是现实的编码中,它是不常用的。我们通常都是采用短语句声明以及结构体的字面量达到我们的目的,比如:
i : =0
u := user{}
make 无可替代
我们在使用slice、map以及channel的时候,还是要使用make进行初始化,然后才才可以对他们进行操作。
8.动态保活Worker工作池
WorkerManager作为主Goroutine, worker作为子Goroutine
WorkerManager.go
type WorkerManager struct {
//用来监控Worker是否已经死亡的缓冲Channel
workerChan chan *worker
// 一共要监控的worker数量
nWorkers int
}
//创建一个WorkerManager对象
func NewWorkerManager(nworkers int) *WorkerManager {
return &WorkerManager{
nWorkers:nworkers,
workerChan: make(chan *worker, nworkers),
}
}
//启动worker池,并为每个Worker分配一个ID,让每个Worker进行工作
func (wm *WorkerManager)StartWorkerPool() {
//开启一定数量的Worker
for i := 0; i < wm.nWorkers; i++ {
i := i
wk := &worker{id: i}
go wk.work(wm.workerChan)
}
//启动保活监控
wm.KeepLiveWorkers()
}
//保活监控workers
func (wm *WorkerManager) KeepLiveWorkers() {
//如果有worker已经死亡 workChan会得到具体死亡的worker然后 打出异常,然后重启
for wk := range wm.workerChan {
// log the error
fmt.Printf("Worker %d stopped with err: [%v] \n", wk.id, wk.err)
// reset err
wk.err = nil
// 当前这个wk已经死亡了,需要重新启动他的业务
go wk.work(wm.workerChan)
}
}
worker.go
type worker struct {
id int
err error
}
func (wk *worker) work(workerChan chan<- *worker) (err error) {
// 任何Goroutine只要异常退出或者正常退出 都会调用defer 函数,所以在defer中想WorkerManager的WorkChan发送通知
defer func() {
//捕获异常信息,防止panic直接退出
if r := recover(); r != nil {
if err, ok := r.(error); ok {
wk.err = err
} else {
wk.err = fmt.Errorf("Panic happened with [%v]", r)
}
} else {
wk.err = err
}
//通知 主 Goroutine,当前子Goroutine已经死亡
workerChan <- wk
}()
// do something
fmt.Println("Start Worker...ID = ", wk.id)
// 每个worker睡眠一定时间之后,panic退出或者 Goexit()退出
for i := 0; i < 5; i++ {
time.Sleep(time.Second*1)
}
panic("worker panic..")
//runtime.Goexit()
return err
}
三、测试
main.go
func main() {
wm := NewWorkerManager(10)
wm.StartWorkerPool()
}
我们会发现,无论子Goroutine是因为 panic()异常退出,还是Goexit()退出,都会被主Goroutine监听到并且重启。主要我们就能够起到保活的功能了. 当然如果线程死亡?进程死亡?我们如何保证? 大家不用担心,我们用Go开发实际上是基于Go的调度器来开发的,进程、线程级别的死亡,会导致调度器死亡,那么我们的全部基础框架都将会塌陷。那么就要看线程、进程如何保活啦,不在我们Go开发的范畴之内了。
9.Bug调试/性能分析
9.1 shell内置time指令
time go run test2.go
9.2 top和GODEBUG/gctrace
package main
import (
"log"
"runtime"
"time"
)
func test() {
//slice 会动态扩容,用slice来做堆内存申请
container := make([]int, 8)
log.Println(" ===> loop begin.")
for i := 0; i < 32*1000*1000; i++ {
container = append(container, i)
}
log.Println(" ===> loop end.")
}
func main() {
log.Println("Start.")
test()
log.Println("force gc.")
runtime.GC() //强制调用gc回收
log.Println("Done.")
time.Sleep(3600 * time.Second) //睡眠,保持程序不退出
}
$go build -o snippet_mem && ./snippet_mem
$top -p $(pidof snippet_mem)
GODEBUG=‘gctrace=1’ ./snippet_mem
gc # @#s #%: #+#+# ms clock, #+#/#/#+# ms cpu, #->#-># MB, # MB goal, # P
gc # GC次数的编号,每次GC时递增
@#s 距离程序开始执行时的时间
#% GC占用的执行时间百分比
#+…+# GC使用的时间
#->#-># MB GC开始,结束,以及当前活跃堆内存的大小,单位M
# MB goal 全局堆内存大小
# P 使用processor的数量
9.3 pprof
import(
"net/http"
_ "net/http/pprof"
)
go func() {
log.Println(http.ListenAndServe("0.0.0.0:10000", nil))
}()
输入地址:http://127.0.0.1:10000/debug/pprof/heap?debug=1
10.GC-标记/清除
GoV1.3- 普通标记清除法,整体过程需要启动STW,效率极低。
GoV1.5- 三色标记法, 堆空间启动写屏障,栈空间不启动,全部扫描之后,需要重新扫描一次栈(需要STW),效率普通
GoV1.8-三色标记法,混合写屏障机制, 栈空间不启动,堆空间启动。整个过程几乎不需要STW,效率较高。
11.内存溢出
11.1例如map[string]*ObjectA在某个[]*ObjectB中,一直未被回收
map[string]*ObjectA = nil即可避免未被回收内存的问题.
11.2新的error类型
创建一个新的类型
type ErrNegativeSqrt float64
并为其实现
func (e ErrNegativeSqrt) Error() string在 Error 方法内调用 fmt.Sprint(e) 会让程序陷入死循环。可以通过先转换 e 来避免这个问题:fmt.Sprint(float64(e))。
在 Error 方法内调用 fmt.Sprint(e) 会让程序陷入死循环。可以通过先转换 e 来避免这个问题:fmt.Sprint(float64(e))。
12.内存泄漏
12.1substring导致的内存泄漏
var s0 string // a package-level variable
// A demo purpose function.
func f(s1 string) {
s0 = s1[:50]
// Now, s0 shares the same underlying memory block
// with s1. Although s1 is not alive now, but s0
// is still alive, so the memory block they share
// couldn't be collected, though there are only 50
// bytes used in the block and all other bytes in
// the block become unavailable.
}
func demo() {
s := createStringWithLengthOnHeap(1 << 20) // 1M bytes
f(s)
}
解决方案:
func f(s1 string) {
s0 = string([]byte(s1[:50]))
}
func f(s1 string) {
s0 = (" " + s1[:50])[1:]
}
import "strings"
func f(s1 string) {
var b strings.Builder
b.Grow(50)
b.WriteString(s1[:50])
s0 = b.String()
}
第三种方法的缺点是有点冗长。好消息是从go1.12开始我们可以调用带值为1的count参数的strings.Repeat函数来克隆一个字符串。从go1.12开始,strings.Repeat函数的底层实现将使用strings.Builder,以避免不必要的副本。
12.2子切片引起的内存泄漏
var s0 []int
func g(s1 []int) {
// Assume the length of s1 is much larger than 30.
s0 = s1[len(s1)-30:]
}
如果我们想避免那种内存泄漏,我们必须复制s0的30个元素,这样s0的活性不会阻止收集s1元素的内存块。
func g(s1 []int) {
s0 = append([]int(nil), s1[len(s1)-30:]...)
// Now, the memory block hosting the elements
// of s1 can be collected if no other values
// are referencing the memory block.
}
12.3不重置丢失的切片元素中的指针引起的内存泄漏
在下面的代码中,在调用h函数之后,为切片s的第一个和最后一个元素分配的内存块将丢失。
func h() []*int {
s := []*int{new(int), new(int), new(int), new(int)}
// do something with s ...
return s[1:3:3]
}
只要返回的切片仍然存在,它将阻止s的任何元素被收集,这样可以防止为s的第一个和最后一个元素引用的两个int值分配的两个内存块被收集。
如果我们想避免这种类型的内存泄漏,我们必须重置存储在丢失元素中的指针。
func h() []*int {
s := []*int{new(int), new(int), new(int), new(int)}
// do something with s ...
// Reset pointer values.
s[0], s[len(s)-1] = nil, nil
return s[1:3:3]
}
12.4被卡住的goroutine引起的内存泄漏
有时候,Go程序中的一些goroutines可能永远处于阻塞状态。这种goroutines被称为卡住的goroutines。Go运行时不会杀死挂起的goroutine,因此为挂起的goroutine分配的资源(以及引用的内存块)永远不会被垃圾收集。
Go运行时不会杀死挂goroutines有两个原因。一个是有时Go运行时很难判断阻塞goroutine是否会被永久阻止。另一个有时候我们故意让goroutine悬挂。例如,有时我们可能会让Go程序的主要goroutine挂起以避免程序退出。
12.5不在使用的但是没有stop的time.Ticker导致的内存泄漏
当time.Timer值不再使用时,它将在一段时间后被垃圾收集。但是对于一个时间来说这不是真的.Ticker值。我们应该停止一个时间。不再使用时的标签值。
12.6不正确使用终结器导致的内存泄漏
为作为循环引用组的成员的值设置终结器可以防止收集为循环引用组分配的所有存储器块。这是真正的内存泄漏。
例如,在调用以下函数并退出之后,为x和y分配的内存块不能保证在将来的垃圾收集中被垃圾收集。
func memoryLeaking() {
type T struct {
v [1<<20]int
t *T
}
var finalizer = func(t *T) {
fmt.Println("finalizer called")
}
var x, y T
// The SetFinalizer call makes x escape to heap.
runtime.SetFinalizer(&x, finalizer)
// The following line forms a cyclic reference
// group with two members, x and y.
// This causes x and y are not collectable.
x.t, y.t = &y, &x // y also escapes to heap.
}
因此,请避免为循环引用组中的值设置终结器。
顺便说一句,我们不应该使用终结器作为对象析构函数。
13.Channel
首先,我们先复习一下Channel都有哪些特性?
给一个 nil channel 发送数据,造成永远阻塞
从一个 nil channel 接收数据,造成永远阻塞
给一个已经关闭的 channel 发送数据,引起 panic
从一个已经关闭的 channel 接收数据,如果缓冲区中为空,则返回一个零值
无缓冲的channel是同步的,而有缓冲的channel是非同步的
以上5个特性是死东西,也可以通过口诀来记忆:
“空读写阻塞,写关闭异常,读关闭空零”。
package main
import (
"fmt"
"sync"
"time"
)
func main() {
RightExample()
ErrorExample()
}
func ErrorExample() {
fmt.Println("ErrorExample")
ch := make(chan int, 1000)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
go func() {
for {
a, ok := <-ch
if !ok {
fmt.Println("close")
return
}
fmt.Println("a: ", a)
}
}()
close(ch)
fmt.Println("ok")
time.Sleep(time.Second * 100)
}
var wg sync.WaitGroup = sync.WaitGroup{}
func RightExample() {
fmt.Println("RightExample")
ch := make(chan int, 1000)
wg.Add(10)
go func() {
for i := 0; i < 10; i++ {
ch <- i
}
}()
go func() {
for {
a, ok := <-ch
if !ok {
fmt.Println("close")
return
}
fmt.Println("a: ", a)
wg.Done()
}
}()
wg.Wait()
close(ch)
fmt.Println("ok")
}
14.Cond
1.简介
sync.Cond 是基于互斥锁/读写锁实现的条件变量,用来协调想要访问共享资源的那些 Goroutine。当共享资源状态发生变化时,sync.Cond 可以用来通知等待条件发生而阻塞的 Goroutine。
sync.Cond 基于互斥锁/读写锁,那它和互斥锁有什么区别呢?
互斥锁 sync.Mutex 通常用来保护共享的临界资源,条件变量 sync.Cond 用来协调想要访问共享资源的 Goroutine。当共享资源的状态发生变化时,sync.Cond 可以用来通知被阻塞的 Goroutine。
2.使用场景
sync.Cond 经常用在多个 Goroutine 等待,一个 Goroutine 通知(事件发生)的场景。如果是一个通知,一个等待,使用互斥锁或 channel 就能搞定了。
我们想象一个非常简单的场景:
有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据。在这种情况下,如果单纯使用 chan 或互斥锁,那么只能有一个协程可以等待,并读取到数据,没办法通知其他的协程也读取数据。
这个时候,就需要有个全局的变量来标志第一个协程数据是否接受完毕,剩下的协程,反复检查该变量的值,直到满足要求。或者创建多个 channel,每个协程阻塞在一个 channel 上,由接收数据的协程在数据接收完毕后,逐个通知。总之,需要额外的复杂度来完成这件事。
Go 语言在标准库 sync 中内置一个 sync.Cond 用来解决这类问题。
package main
import (
"fmt"
"sync"
)
func main() {
var mu sync.Mutex
// 创建 cond
cond := sync.NewCond(&mu)
// 计数
var count uint64
// 报名表
var stuSlice []int
// 模拟学生报名参加课外活动
for i := 0; i < 30; i++ {
go func(i int) {
cond.L.Lock()
stuSlice = append(stuSlice, i)
count++
cond.L.Unlock()
// Broadcast 唤醒所有等待此 cond 的 goroutine, Signal 只唤醒一个
cond.Broadcast()
}(i)
}
// 调用 Wait方法前, 调用者必须持有锁
cond.L.Lock()
for count != 30 {
// 调用者被阻塞,并被放入 cond 的等待队列中
cond.Wait()
}
cond.L.Unlock()
fmt.Println(len(stuSlice), stuSlice)
}