一篇文章带你吃透Go语言的Atomic和Channel--实战方法

news2024/11/13 17:25:30

一篇文章带你吃透Go语言的Atomic和Channel–实战方法

在这里插入图片描述

Atomic 要保证原子操作,一定要使用这几种方法

我们在学习 MutexRWMutex 等并发原语的实现时,你可以看到,最底层是通过 atomic 包中的一些原子操作来实现的

你可能会说,这些并发原语已经可以应对大多数的并发场景了,为啥还要学习原子操作呢?其实,这是因为,在很多场景中,使用并发原语实现起来比较复杂,而原子操作可以帮助我们更轻松地实现底层的优化。
之所以叫原子操作,是因为一个原子在执行的时候,其它线程不会看到执行一半的操作结果。在其它线程看来,原子操作要么执行完了,要么还没有执行,就像一个最小的粒子 - 原子一样,不可分割。

原子操作的基础知识

CPU 提供了基础的原子操作,不过,不同架构的系统的原子操作是不一样的。
对于单处理器单核系统来说,如果一个操作是由一个 CPU 指令来实现的,那么它就是原子操作,比如它的 XCHG 和 INC 等指令。如果操作是基于多条指令来实现的,那么,执行的
过程中可能会被中断,并执行上下文切换,这样的话,原子性的保证就被打破了,因为这个时候,操作可能只执行了一半

在多处理器多核系统中,原子操作的实现就比较复杂了。

由于 cache 的存在,单个核上的单个指令进行原子操作的时候,你要确保其它处理器或者核不访问此原子操作的地址,或者是确保其它处理器或者核总是访问原子操作之后的最新
的值。x86 架构中提供了指令前缀 LOCKLOCK 保证了指令(比如 LOCK CMPXCHG op1、op2)不会受其它处理器或 CPU 核的影响,有些指令(比如 XCHG)本身就提供
Lock 的机制。不同的 CPU 架构提供的原子操作指令的方式也是不同的,比如对于多核的 MIPSARM,提供了 LL/SC(Load Link/Store Conditional)指令,可以帮助实现原子
操作(ARMLL/SC 指令 LDREXSTREX)。

因为不同的 CPU 架构甚至不同的版本提供的原子操作的指令是不同的,所以,要用一种编程语言实现支持不同架构的原子操作是相当有难度的。不过,还好这些都不需要你操心,
因为 Go 提供了一个通用的原子操作的 API,将更底层的不同的架构下的实现封装成 atomic

关于 atomic,还有一个地方你一定要记住,atomic 操作的对象是一个地址,你需要把可寻址的变量的地址作为参数传递给方法,而不是把变量的值传递给方法。

基本方法

  • Add 方法就是给第一个参数地址中的值增加一个 delta 值 可以是负值
  • CAS 这个方法会比较当前 addr 地址里的值是不是 old,如果不等于 old,就返回 false;如果等于 old,就把此地址的值替换成 new 值,返回 true。这就相当于“判断相等才替换”。
  • Swap 直接交换值
  • Load 方法会取出 addr 地址中的值
  • Store 方法会把一个值存入到指定的 addr 地址中,即使在多处理器、多核、有 CPU cache 的情况下,这个操作也能保证 Store 是一个原子操作。
  • Value 类型 它可以原子地存取对象类型,但也只能存取,不能 CASSwap,常常用在配置变更等场景中。
type Config struct {
	NodeName string
	Addr     string
	Count    int32
}

func loadNewConfig() Config {
	return Config{NodeName: "北京", Addr: "10.77.95.27", Count: rand.Int31()}
}
func main() {
	var config atomic.Value
	config.Store(loadNewConfig())
	var cond = sync.NewCond(&sync.Mutex{}) // 设置新的config
	go func() {
		for {
			time.Sleep(time.Duration(5+rand.Int63n(5)) * time.Second)
			config.Store(loadNewConfig())
			cond.Broadcast() // 通知等待着配置已变更
		}
	}()
	go func() {
		for {
			cond.L.Lock()
			cond.Wait()                 // 等待变更信号
			c := config.Load().(Config) // 读取新的配置
			fmt.Printf("new config: %+v\n", c)
			cond.L.Unlock()
		}
	}()
	select {}
}

第三方库的扩展

  • uber go/atomic,它定义和封装了几种与常见类型相对应的原子操作类型,这些类型提供了原子操作的方法。这些类型包括 Bool、Duration、Error、Float64、Int32、Int64、String、Uint32、Uint64 等。

atomic 原子操作的应用场景

举个例子:假设你想在程序中使用一个标志(flag,比如一个 bool 类型的变量),来标识一个定时任务是否已经启动执行了,你会怎么做呢?

我们先来看看加锁的方法。如果使用 MutexRWMutex,在读取和设置这个标志的时候加锁,是可以做到互斥的、保证同一时刻只有一个定时任务在执行的,所以使用 Mutex 或者 RWMutex 是一种解决方案。

其实,这个场景中的问题不涉及到对资源复杂的竞争逻辑,只是会并发地读写这个标志,这类场景就适合使用 atomic 的原子操作。具体怎么做呢?

你可以使用一个 uint32 类型的变量,如果这个变量的值是 0,就标识没有任务在执行,如果它的值是 1,就标识已经有任务在完成了。你看,是不是很简单呢?
再来看一个例子。假设你在开发应用程序的时候,需要从配置服务器中读取一个节点的配置信息。而且,在这个节点的配置发生变更的时候,你需要重新从配置服务器中拉取一份
新的配置并更新。你的程序中可能有多个 goroutine 都依赖这份配置,涉及到对这个配置对象的并发读写,你可以使用读写锁实现对配置对象的保护。在大部分情况下,你也可以
利用 atomic 实现配置对象的更新和加载。

使用 atomic 实现 Lock-Free queue


// lock-free的queue
type LKQueue struct {
	head unsafe.Pointer
	tail unsafe.Pointer
} 
// 通过链表实现,这个数据结构代表链表中的节点
type node struct {
	value interface{}
	next  unsafe.Pointer
}

func NewLKQueue() *LKQueue {
	n := unsafe.Pointer(&node{})
	return &LKQueue{head: n, tail: n}
} // 入队
func (q *LKQueue) Enqueue(v interface{}) {
	n := &node{value: v}
	for {
		tail := load(&q.tail)
		next := load(&tail.next)
		if tail == load(&q.tail) { // 尾还是尾
			if next == nil { // 还没有新数据入队
				if cas(&tail.next, next, n) { //增加到队尾
					cas(&q.tail, tail, n) //入队成功,移动尾巴指针
					return
				}
			} else { // 已有新数据加到队列后面,需要移动尾指针
				cas(&q.tail, tail, next)
			}
		}
	}
}

// 出队,没有元素则返回nil
func (q *LKQueue) Dequeue() interface{} {
	for {
		head := load(&q.head)
		tail := load(&q.tail)
		next := load(&head.next)
		if head == load(&q.head) { // head还是那个head
			if head == tail { // head和tail一样
				if next == nil { // 说明是空队列
					return nil
				} // 只是尾指针还没有调整,尝试调整它指向下一个
				cas(&q.tail, tail, next)
			} else { // 读取出队的数据
				v := next.value // 既然要出队了,头指针移动到下一个
				if cas(&q.head, head, next) {
					return v // Dequeue is done. return
				}
			}
		}
	}
}

// 将unsafe.Pointer原子加载转换成node
func load(p *unsafe.Pointer) (n *node) { return (*node)(atomic.LoadPointer(p)) }

// 封装CAS,避免直接将*node转换成unsafe.Pointer
func cas(p *unsafe.Pointer, old, new *node) (ok bool) {
	return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
}

Channel:另辟蹊径,解决并发问题

应用场景

  • 数据交流:当作并发的 buffer 或者 queue,解决生产者 - 消费者问题。多个 goroutine 可以并发当作生产者(Producer)和消费者(Consumer)。
  • 数据传递:一个 goroutine 将数据交给另一个 goroutine,相当于把数据的拥有权 (引用) 托付出去。
  • 信号通知:一个 goroutine 可以将信号 (closing、closed、data ready 等) 传递给另一个或者另一组 goroutine
  • 任务编排:可以让一组 goroutine 按照一定的顺序并发或者串行的执行,这就是编排的功能。
  • 锁:利用 Channel 也可以实现互斥锁的机制。

基本了解

通过 make,我们可以初始化一个 chan,未初始化的 chan 的零值是 nil。你可以设置它的容量,比如下面的 chan 的容量是 9527,我们把这样的 chan 叫做 buffered chan;如果
没有设置,它的容量是 0,我们把这样的 chan 叫做 unbuffered chan

如果 chan 中还有数据,那么,从这个 chan 接收数据的时候就不会阻塞,如果 chan 还未满(“满”指达到其容量),给它发送数据也不会阻塞,否则就会阻塞。unbuffered chan 只有读写都准备好之后才不会阻塞,
这也是很多使用 unbuffered chan 时的常见 Bug

  • chan 接收是有两个值的,第一个是通道中的值,第二个表示是否正常接收到那些值,如果第二个值为 falsechan 已经被被 close 而且 chan 中没有缓存的数据了
  • 发送和接收 都可用于 selectcase 语句
  • chan 还可以用作于 for-range 中,可以使用取出的值,也可以什么都不作 用于清空数据

使用 Channel 容易犯的错误

  1. closenilchan
  2. send 已经 closechan
  3. close 已经 closechan

什么时候选择使用 Channel

  1. 共享资源的并发访问使用传统并发原语;
  2. 复杂的任务编排和消息传递使用 Channel
  3. 消息通知机制使用 Channel,除非只想 signal 一个 goroutine,才使用 Cond
  4. 简单等待所有任务的完成用 WaitGroup,也有 Channel 的推崇者用 Channel,都可以;
  5. 需要和 Select 语句结合,使用 Channel
  6. 需要和超时配合时,使用 ChannelContext

Channel:透过代码看典型的应用模式

使用反射操作 Channel

select 语句可以处理 chansendrecv,可是,如果要处理 100chan 呢?一万个 chan 呢?

通过 reflect.Select 函数,你可以将一组运行时的 case clause 传入,当作参数执行。Goselect 是伪随机的,它可以在执行的 case 中随机选择一个 case,并把选择的这个 case
的索引(chosen)返回,如果没有可用的 case 返回,会返回一个 bool 类型的返回值,这个返回值用来表示是否有 case 成功被选择。如果是 recv case,还会返回接收的元素。

func main() {
	var ch1 = make(chan int, 10)
	var ch2 = make(chan int, 10)      // 创建SelectCase
	var cases = createCases(ch1, ch2) // 执行10次select
	for i := 0; i < 10; i++ {
		chosen, recv, ok := reflect.Select(cases)
		if recv.IsValid() { // recv case
			fmt.Println("recv:", cases[chosen].Dir, recv, ok)
		} else { // send case
			fmt.Println("send:", cases[chosen].Dir, ok)
		}
	}
}
func createCases(chs ...chan int) []reflect.SelectCase {
	var cases []reflect.SelectCase // 创建recv case
	for _, ch := range chs {
		cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)})
	} // 创建send case
	for i, ch := range chs {
		v := reflect.ValueOf(i)
		cases = append(cases, reflect.SelectCase{Dir: reflect.SelectSend, Chan: reflect.ValueOf(ch), Send: v})
	}
	return cases
}

典型的应用场景

消息交流

chan 的内部实现看,它是以一个循环队列的方式存放数据,所以,它有时候也会被当成线程安全的队列和 buffer 使用。一个 goroutine 可以安全地往 Channel 中塞数据,另
外一个 goroutine 可以安全地从 Channel 中读取数据,goroutine 就可以安全地实现信息交流了。

第一个例子是 worker 池的例子。他们将用户的请求放在一个 chan Job 中,这个 chan Job 就相当于一个待处理任务队列。除此之外,还有一个 chan chan Job
队列,用来存放可以处理任务的 worker 的缓存队列。 dispatcher 会把待处理任务队列中的任务放到一个可用的缓存队列中,worker 会一直处理它的缓存队列。通过使用 Channel,实现了一个 worker 池的任务处理中心,并且解耦
了前端 HTTP 请求处理和后端任务处理的逻辑。

数据传递

有 4 个 goroutine,编号为 1、2、3、4。每秒钟会有一个 goroutine 打印出它自己的编号,要求你编写程序,让输出的编号总是按照 1、2、3、4、1、2、3、4……这个顺序打印出来。

type Token struct{}

func newWorker(id int, ch chan Token, nextCh chan Token) {
	for {
		token := <-ch         // 取得令牌
		fmt.Println((id + 1)) // id从1开始

		time.Sleep(time.Second)
		nextCh <- token
	}
}
func main() {
	chs := []chan Token{make(chan Token), make(chan Token), make(chan Token), make(chan Token)} // 创建4个worker
	for i := 0; i < 4; i++ {
		go newWorker(i, chs[i], chs[(i+1)%4])
	}

	//首先把令牌交给第一个worker
	chs[0] <- struct{}{}
	select {}
}

这类场景有一个特点,就是当前持有数据的 goroutine 都有一个信箱,信箱使用 chan 实现,goroutine 只需要关注自己的信箱中的数据,处理完毕后,就把结果发送到下一家的信箱中。

信号通知

chan 类型有这样一个特点:chan 如果为空,那么,receiver 接收数据的时候就会阻塞等待,直到 chan 被关闭或者有新的数据到来。利用这个机制,我们可以实现 wait/notify 的设计模式。

除了正常的业务处理时的 wait/notify,我们经常碰到的一个场景,就是程序关闭的时候,我们需要在退出之前做一些清理(doCleanup 方法)的动作。这个时候,我们经常要使用 chan

比如,使用 chan 实现程序的 graceful shutdown,在退出之前执行一些连接关闭、文件 close、缓存落盘等一些动作。

func main() {
    go func() {
        ...... // 执行业务处理
    }()                                                                  // 处理CTRL+C等中断信号
    termChan := make(chan os.Signal)
    signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <- termChan // 执行退出之前的清理动作
    doCleanup()
    fmt.Println("优雅退出")
}

有时候,doCleanup 可能是一个很耗时的操作,比如十几分钟才能完成,如果程序退出需要等待这么长时间,用户是不能接受的,所以,在实践中,我们需要设置一个最长的等待
时间。只要超过了这个时间,程序就不再等待,可以直接退出。所以,退出的时候分为两个阶段:

  1. closing,代表程序退出,但是清理工作还没做;
  2. closed,代表清理工作已经做完。
func main() {
	var closing = make(chan struct{})
	var closed = make(chan struct{})
	go func() { // 模拟业务处理
		for {
			select {
			case <-closing:
				return
			default: // ....... 业务计算
				time.Sleep(100 * time.Millisecond)
			}
		}
	}()            // 处理CTRL+C等中断信号
	termChan := make(chan os.Signal)
	signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) <- termChan
	close(closing) // 执行退出之前的清理动作
	go doCleanup(closed)
	select {
	case <-closed:
	case <-time.After(time.Second):
		fmt.Println("清理超时,不等了")
	}
	fmt.Println("优雅退出")
}
func doCleanup(closed chan struct{}) { time.Sleep((time.Minute))
	close(closed)

使用 chan 也可以实现互斥锁。

要想使用 chan 实现互斥锁,至少有两种方式。一种方式是先初始化一个 capacity 等于 1 的 Channel,然后再放入一个元素。这个元素就代表锁,谁取得了这个元素,就相当于获
取了这把锁。另一种方式是,先初始化一个 capacity 等于 1 的 Channel,它的“空槽”代表锁,谁能成功地把元素发送到这个 Channel,谁就获取了这把锁。

这是使用 Channel 实现锁的两种不同实现方式,我重点介绍下第一种。理解了这种实现方式,第二种方式也就很容易掌握了,我就不多说了。


// 使用chan实现互斥锁
type Mutex struct {
	ch chan struct{}
}

// 使用锁需要初始化
func NewMutex() *Mutex {
	mu := &Mutex{make(chan struct{}, 1)}
	mu.ch <- struct{}{}
	return mu
}

// 请求锁,直到获取到
func (m *Mutex) Lock() {
	<-m.ch
}

// 解锁
func (m *Mutex) Unlock() {
	select {
	case m.ch <- struct{}{}:
	default:
		panic("unlock of unlocked mutex")
	}
} // 尝试获取锁
func (m *Mutex) TryLock() bool {
	select {
	case <-m.ch:
		return true
	default:
	}
	return false
} // 加入一个超时的设置
func (m *Mutex) LockTimeout(timeout time.Duration) bool {
	timer := time.NewTimer(timeout)
	select {
	case <-m.ch:
		timer.Stop()
		return true
	case <-timer.C:
	}
	return false
}

// 锁是否已被持有
func (m *Mutex) IsLocked() bool {
	return len(m.ch) == 0
}
func main() {
	m := NewMutex()
	ok := m.TryLock()
	fmt.Printf("locked v %v\n", ok)
	ok = m.TryLock()
	fmt.Printf("locked %v\n", ok)
}

你可以用 buffer 等于 1 的 chan 实现互斥锁,在初始化这个锁的时候往 Channel 中先塞入一个元素,谁把这个元素取走,谁就获取了这把锁,把元素放回去,就是释放了锁。元
素在放回到 chan 之前,不会有 goroutine 能从 chan 中取出元素的,这就保证了互斥性。

在这段代码中,还有一点需要我们注意下:利用 select+chan 的方式,很容易实现 TryLockTimeout 的功能。具体来说就是,在 select 语句中,我们可以使用 default
TryLock,使用一个 Timer 来实现 Timeout 的功能。

任务编排

我们学习了 WaitGroup,我们可以利用它实现等待模式:启动一组 goroutine 执行任务,然后等待这些任务都完成。其实,我们也可以使用 chan 实现
WaitGroup 的功能。我来重点介绍下多个 chan 的编排方式,总共 5 种,分别是 Or-Done 模式扇入模式扇出模式StreamMap-Reduce

Or-Done 模式

我们会使用“信号通知”实现某个任务执行完成后的通知机制,在实现时,我们为这个任务定义一个类型为 chan struct{} 类型的 done 变量,等任务结束后,我们就可以 close
个变量,然后,其它 receiver 就会收到这个通知。这是有一个任务的情况,如果有多个任务,只要有任意一个任务执行完,我们就想获得这个信号,这就是 Or-Done 模式。

func or(channels ...<-chan interface{}) <-chan interface{} { // 特殊情况,只有零个或者1个chan
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}
	orDone := make(chan interface{})
	go func() {
		defer close(orDone)
		switch len(channels) {
		case 2: // 2个也是一种特殊情况
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default: //超过两个,二分法递归处理
			m := len(channels) / 2
			select {
			case <-or(channels[:m]...):
			case <-or(channels[m:]...):
			}
		}
	}()
	return orDone
}

func sig(after time.Duration) <-chan interface{} {
	c := make(chan interface{})
	go func() {
		defer close(c)
		time.Sleep(after)
	}()
	return c
}
func main() {
	start := time.Now()
	<-or(sig(10*time.Second), sig(20*time.Second), sig(30*time.Second), sig(40*time.Second), sig(50*time.Second), sig(01*time.Minute))
	fmt.Printf("done after %v", time.Since(start))
}

这里的实现使用了一个巧妙的方式,当 chan 的数量大于 2 时,使用递归的方式等待信号。

扇入模式

扇入借鉴了数字电路的概念,它定义了单个逻辑门能够接受的数字信号输入最大量的术语。一个逻辑门可以有多个输入,一个输出。
在软件工程中,模块的扇入是指有多少个上级模块调用它。而对于我们这里的 Channel 扇入模式来说,就是指有多个源 Channel 输入、一个目的 Channel 输出的情况。扇入比就
是源 Channel 数量比 1。 每个源 Channel 的元素都会发送给目标 Channel,相当于目标 Channelreceiver 只需要监听目标 Channel,就可以接收所有发送给源 Channel 的数据。

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
	out := make(chan interface{})
	go func() {
		defer close(out) // 构造SelectCase slice
		var cases []reflect.SelectCase
		for _, c := range chans {
			cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c)})
		} // 循环,从cases中选择一个可用的
		for len(cases) > 0 {
			i, v, ok := reflect.Select(cases)
			if !ok { // 此channel已经close
				cases = append(cases[:i], cases[i+1:]...)
				continue
			}
			out <- v.Interface()
		}
	}()
	return out
}
扇出模式

有扇入模式,就有扇出模式,扇出模式是和扇入模式相反的。
扇出模式只有一个输入源 Channel,有多个目标 Channel,扇出比就是 1 比目标 Channel 数的值,经常用在设计模式中的观察者模式中(观察者设计模式定义了对象间的一种一
对多的组合关系。这样一来,一个对象的状态发生变化时,所有依赖于它的对象都会得到通知并自动刷新)。在观察者模式中,数据变动后,多个观察者都会收到这个变更信号。
下面是一个扇出模式的实现。从源 Channel 取出一个数据后,依次发送给目标 Channel
在发送给目标 Channel 的时候,可以同步发送,也可以异步发送:

func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) {
	go func() {
		defer func() { //退出时关闭所有的输出chan 
			for i := 0; i < len(out); i++ {
				close(out[i])
			}
		}()
		for v := range ch { // 从输入chan中读取数据
			v := v
			for i := 0; i < len(out); i++ {
				i := i
				if async { //异步
					go func() {
						out[i] <- v // 放入到输出chan中,异步方式
					}()
				} else {
					out[i] <- v // 放入到输出chan中,同步方式
				}
			}
		}
	}()
}
Stream

这里我来介绍一种把 Channel 当作流式管道使用的方式,也就是把 Channel 看作流 (Stream),提供跳过几个元素,或者是只取其中的几个元素等方法

func asStream(done <-chan struct{}, values ...interface{}) <-chan interface{} {
    s := make(chan interface{}) //创建一个unbuffered的channel
    go func() {                 // 启动一个goroutine,往s中塞数据
        defer close(s)             // 退出时关闭chan
        for _, v := range values { // 遍历数组
            select {
                case <-done:
                    return
                case s <- v: // 将数组元素塞入到chan中
            }
        }
    }()
    return s
}

流创建好以后,该咋处理呢?下面我再给你介绍下实现流的方法。

  1. takeN:只取流中的前 n 个数据;
  2. takeFn:筛选流中的数据,只保留满足条件的数据;
  3. takeWhile:只取前面满足条件的数据,一旦不满足条件,就不再取;
  4. skipN:跳过流中前几个数据;
  5. skipFn:跳过满足条件的数据;
  6. skipWhile:跳过前面满足条件的数据,一旦不满足条件,当前这个元素和以后的元素都
    会输出给 Channelreceiver
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} {
	takeStream := make(chan interface{}) // 创建输出流
	go func() {
		defer close(takeStream)
		for i := 0; i < num; i++ { // 只读取前num个元素
			select {
			case <-done:
				return
			case takeStream <- <-valueStream: //从输入流中读取元素
			}
		}
	}()
	return takeStream
}
Map-Reduce

单机单进程的 map-reduce 方法。map-reduce 分为两个步骤,第一步是映射(map),处理队列中的数据,第二步是规约 (reduce),把列表中的每一个元素按照一定的处理方式处理成结果,放入到结果队列中。
就像做汉堡一样,map 就是单独处理每一种食材,reduce 就是从每一份食材中取一部分,做成一个汉堡。

func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} {
	out := make(chan interface{}) //创建一个输出chan
	if in == nil {                // 异常检查
		close(out)
		return out
	}
	go func() { // 启动一个goroutine,实现map的主要逻辑
		defer close(out)
		for v := range in { // 从输入chan读取数据,执行业务操作,也就是map操作
			out <- fn(v)
		}
	}()
	return out
}

func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} {
	if in == nil { // 异常检查
		return nil
	}
	out := <-in         // 先读取第一个元素
	for v := range in { // 实现reduce的主要逻辑
		out = fn(out, v)
	}
	return out
}

// 生成一个数据流
func asStream(done <-chan struct{}) <-chan interface{} {
	s := make(chan interface{})
	values := []int{1, 2, 3, 4, 5}
	go func() {
		defer close(s)
		for _, v := range values { // 从数组生成
			select {
			case <-done:
				return
			case s <- v:
			}
		}
	}()
	return s
}
func main() {
	in := asStream(nil)                                              // map操作: 乘以10
	mapFn := func(v interface{}) interface{} { return v.(int) * 10 } // reduce操作: 对map的结果进行累加

	reduceFn := func(r, v interface{}) interface{} { return r.(int) + v.(int) }
	sum := reduce(mapChan(in, mapFn), reduceFn) //返回累加结果
	fmt.Println(sum)
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/728353.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言中级篇请看另一篇文章,这一篇文章只写给高手看【高阶C语言】【更新中】【原创】

文章目录 前言define和typedef的区别?前言 关于C语言,博主已经写了不少的笔记总结了,C语言基础版可看我的专栏里面的C语言精华,C语言进阶版可看我的谭浩强C语言第五版,C语言高阶版看此篇文章即可 C Primer Plus书籍,C语言精华,截图 + 代码 + 学习总结笔记【11000字】【…

AOCVSBOCV、AOCV table

文章目录 AOCV&#xff08;Advanced OCV&#xff09;&SBOCV&#xff08;Stage Based OCV&#xff09;---更精确&#xff0c;剔除悲观度Random variation&#xff08;Depth/Stage based AOCV&#xff09;Systematic variation&#xff08;Distance based AOCV&#xff09;一…

阻塞队列是什么

1、阻塞队列是什么? (1) 栈与队列 1&#xff09;栈&#xff1a;先进后出&#xff0c;后进先出 2&#xff09;队列&#xff1a;先进先出 (2) 阻塞队列 阻塞&#xff1a;必须要阻塞/不得不阻塞 阻塞队列是一个队列&#xff0c;在数据结构中起的作用如下图&#xff1a; 线程1…

直播美颜工具与实时美颜SDK开发指南

近年来&#xff0c;随着直播行业的蓬勃发展&#xff0c;越来越多的用户开始关注直播内容的质量。其中&#xff0c;美颜功能成为直播平台上不可或缺的一项特色功能。下文小编将从基础原理到实际应用&#xff0c;帮助开发者更好地实现高效又自然的美颜效果。 一、背景 在直播过…

spring之ApplicationContext

spring之ApplicationContext ApplicationContextApplicationContext源码ApplicationContext继承接口分析ApplicationContext两个比较重要的实现类AnnotationConfigApplicationContextClassPathXmlApplicationContext 国际化---MessageSource资源加载---ResourceLoader获取运行时…

多元回归预测 | Matlab鲸鱼算法(WOA)优化极限梯度提升树XGBoost回归预测,WOA-XGBoost回归预测模型,多变量输入模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 多元回归预测 | Matlab鲸鱼算法(WOA)优化极限梯度提升树XGBoost回归预测,WOA-XGBoost回归预测模型,多变量输入模型 评价指标包括:MAE、RMSE和R2等,代码质量极高,方便学习和替换数据。要求2018版本及以上。 部分源…

css实现九宫格有边框,最外层四周无边框

1.先设置9个div&#xff0c;如下&#xff1a; <div class"wrapper"><div class"cell"></div><div class"cell"></div><div class"cell"></div><div class"cell"></div&…

【MySQL】连接 MySQL使用二进制方式连接和脚本连接,修改密码,增加新用户,显示命令

作者简介&#xff1a; 辭七七&#xff0c;目前大一&#xff0c;正在学习C/C&#xff0c;Java&#xff0c;Python等 作者主页&#xff1a; 七七的个人主页 文章收录专栏&#xff1a; 七七的闲谈 欢迎大家点赞 &#x1f44d; 收藏 ⭐ 加关注哦&#xff01;&#x1f496;&#x1f…

RocketMQ5.0--消息发送

RocketMQ5.0–消息发送 一、消息 // 消息所属topic private String topic; // 消息Flag&#xff08;RocketMQ不作处理&#xff09;&#xff0c;即&#xff1a;用户处理 private int flag; // 扩展属性 private Map<String, String> properties; // 消息体 private byte…

Pandas+Pyecharts | 北京近五年历史天气数据可视化

文章目录 &#x1f3f3;️‍&#x1f308; 1. 导入模块&#x1f3f3;️‍&#x1f308; 2. Pandas数据处理2.1 读取数据2.2 处理最低气温最高气温数据2.3 处理日期数据2.4 处理风力风向数据 &#x1f3f3;️‍&#x1f308; 3. Pyecharts数据可视化3.1 2018-2022年历史温度分布…

漏洞复现 || H3C iMC 存在远程命令执行

免责声明 技术文章仅供参考,任何个人和组织使用网络应当遵守宪法法律,遵守公共秩序,尊重社会公德,不得利用网络从事危害国家安全、荣誉和利益,未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失,均由使…

软件的验收测试应该怎么正确实施?

验收测试的主要目的是为了确定软件系统是否满足客户或最终用户的需求和期望&#xff0c;同时确保软件产品的质量标准达到预期。验收测试还可以提供客户和最终用户关于软件系统质量的反馈和建议&#xff0c;以便软件开发团队能够更好地改进和优化软件产品&#xff0c;那软件的验…

【QT】QtXlsx安装使用

QtXlsx库 QtXlsx介绍QtXlsx Qt配置简单使用示例 QtXlsx介绍 QtXlsx是一个可以读取和写入Excel文件的库。它不需要Microsoft Excel&#xff0c;可以在Qt5支持的任何平台上使用。 这里一定是需要QT5支持的。 生成一个新的 .xlsx 文件从现有的 .xlsx 文件中提取数据编辑现有的 .x…

Linux常用指令(下)

目录 一&#xff1a;Linux基础指令 查看联机手册 文本查看相关 时间相关 查找相关 打包和压缩相关 查看Linux版本和体系 其它指令和热键 二&#xff1a;重定向 输入重定向 输出重定向 三&#xff1a;管道 一&#xff1a;Linux基础指令 查看联机手册 Linux的命令有…

ADS笔记,新旧两组仿真数据进行绘图和列表对比

做个笔记&#xff0c;以防遗忘 ADS版本&#xff1a;2023 原理图器件参数的不同&#xff0c;怎么进行对比观看&#xff0c;操作如下 目录 一、数据绘图对比二、数据列表对比 一、数据绘图对比 选择Simulation Setting 然后修改原理图器件的参数&#xff0c;再次重复之前的操作…

SpringBoot2+Vue2实战(十三)用户前台页面设计与实现

Front.vue <template><div><!--头部--><div style"display: flex; height: 60px;line-height: 60px;border-bottom: 1px solid #ccc"><div style"width: 300px;display: flex;padding-left: 30px"><div style"widt…

CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3)

上回说到在我们搭好的YAF3环境上使用yaf处理pcap文件得到silk flow&#xff0c;再使用super mediator工具转为ipfix&#xff0c;继而在spark中导入mothra&#xff0c;就可以开始数据分析了。然而在我们粗粗一用之下&#xff0c;却发现DPI信息在ipfix文件中找不到&#xff0c;到…

【Excel】csv乱码

原因 CSV用UTF-8编码 Excel用ANSI编码 解决 1 创建一个新的Excel 2 数据 > 从文本/CSV 3 选择文件 4 选择 文件原始格式 和 分隔符 &#xff08;根据自己文件进行选择&#xff0c;如果不知道编码&#xff0c;可以一个一个的试&#xff0c;直到不出现乱码&#xff09;

【Go|第5期】Lorca无法正常运行的解决方案

日期&#xff1a;2023年7月5日 作者&#xff1a;Commas 签名&#xff1a;(ง •_•)ง 积跬步以致千里,积小流以成江海…… 注释&#xff1a;如果您觉得有所帮助&#xff0c;帮忙点个赞&#xff0c;也可以关注我&#xff0c;我们一起成长&#xff1b;如果有不对的地方&#xff…

奇怪的SQL问题+1

我的 VIP 用户又抛给我一个 SQL 问题&#xff0c;我很激动&#xff0c;因为素材又来了&#xff1a; 我一看&#xff0c;这个表没什么花头&#xff0c;不就是没设置主键吗&#xff0c;MySQL 会默认生成一个主键&#xff0c;这跟 delete 不掉数据好像也没啥关系。 然后他说&…