逐步学习Go-并发通道chan(channel)

news2024/11/16 0:03:51
概述

Go的Routines并发模型是基于CSP,如果你看过七周七并发,那么你应该了解。

什么是CSP?

"Communicating Sequential Processes"(CSP)这个词组的含义来自其英文直译以及在计算机科学中的使用环境。

CSP是 Tony Hoare 在1978年提出的,论文地址在:Communicating sequential processes | Communications of the ACM

拆字解释下

Communicating Sequential Processes(CSP)的三个单词:

  • C for Communicating: 通信,什么的通信那?进程/线程/协程的通信。

  • S for Sequenctial: 顺序的,什么的顺序?进程/线程/协程之间执行任务时应该是有顺序的,完全并行执行是理想化的,现实中就是要先指定完第一个或者第一批任务才能执行第二个或者第二批任务。

  • P for Processses: 进程,这个是进程,估计是因为这个概念提出来的时候比较早。我们这儿得抽象一下,Processes指的是进程/线程/协程。

那么我们来总结一下CSP,CSP就是多个能够进行通信,并且按照顺序执行任务的独立进程。这些进程在各自执行自己的任务的时候,还可以通过某种方式是进行通信。

在Golang中就是通过Channel进行通信。

好了,CSP解释完了,我们来看Go中的Channel,另外CSP的参与者Go Routine我在之前的文章中有提到过,大家可以去:逐步学习Go-协程goroutine

这张图就描述了CSP编程模型。

file

Go中routine代表图中的Process,Channel就是goroutine之间的连接。通道可以让一个goroutine发送信息到另一个goroutine。

Go中的channel

Go中Channel有两种类型:

  1. 无缓冲Channel(Unbuffered)
  2. 有缓冲Channel(Buffered)
    有缓冲的Channel其实就是一个环形缓冲队列;无缓冲的没有队列,因为读写都会阻塞。

Channel的定义

var channel名称 chan channel类型

// 类型自动推断
channel名称 := make(chan channel类型, buffer数量(int可以为0))

COPY

比如:我们可以这样来定义:

// 定义了一个channel,还没有make,不确定是否为有缓冲和无缓冲channel
var ch chan int

// 定义了一个chnnel, 容量为0,无缓冲channel
ch := make(chan int, 0)

// 定义了一个channel,容量为1,有缓冲channel
ch := make(chan int, 1)

COPY

我们实际使用的时候把Channel理解为队列就可以了。

Go中的Channel有两种类型:

  1. 无缓冲channel
  2. 有缓冲channel

无缓冲和有缓冲的特性如下:

  • 无缓冲Channel
    • 无缓冲Channel没有存储数据的能力
    • 发送方向Channel中发送数据的时候,发送方会阻塞直到有接受者接受这个数据
    • 无缓冲Channel典型应用就是go协程同步通信
    • 无缓冲Channel保证通信双方都要准备好数据交换
  • 有缓冲Channel
    • 有缓冲Channel需要定义Channel的容量
    • 发送方向有缓冲Channel发送数据的时候,只有容量满的时候才会阻塞
    • 接收方只有在有缓冲Channel为空时才会阻塞
    • 有缓冲通道的典型应用场景是生产者和消费者

Channel的操作

Channel主要支持2中操作:

  1. 发送(send)
  2. 接收(recv)

这三种操作在代码中的的定义和使用:

  1. 发送和接收都使用<-

来看代码:

// 先定义一个无缓冲channel
ch := make(chan int, 0)
ch := make(chan int)

// 发送数据到channel
ch <- 1

// 从channel中接收数据
<- ch

COPY

我们看到发送和接收都是使用<-,差别在于:

  1. ch在<-的左边,操作为发送
  2. ch在<-的右边,操作为接收

另外,channel在使用之前都要先创建,使用完毕后要关闭,分别使用makeclose关闭。

// 创建相当于分配channel(allocation)
ch := make(chan int, 0)

// 关闭channel,释放channel资源
defer close(ch)

COPY

channel创建完直接关闭了还能操作发送和接收吗?

这个问题我们通过写代码来测试,我们先来测试发送,然后再测试接收。

  • 发送数据到关闭的Channel

    func TestUnbufferedChannel_ShouldPanic_whenWriteValueToAClosedChannel(t *testing.T) {
    
    f := func() {
        ch := make(chan int)
        close(ch)
    
        ch <- 1
    }
    
    assert.Panics(t, f, "should panic")
    }
    COPY

    运行截图:

    file

我们的UT PASS了表示发生了panic,这就说明我们不能向已经关闭的channel发送数据。

  • 在已经关闭的Channel上接收

func TestUnbufferedChannel_ShouldSuccess_whenRecvValueAtAClosedChannel(t *testing.T) {
    ch := make(chan int)
    close(ch)
    var val = <-ch
    assert.Equal(t, 0, val)
}

func TestUnbufferedChannel_ShouldSuccess_whenRecvEmptyValueAtAClosedChannel(t *testing.T) {
    ch := make(chan string)
    close(ch)
    var val = <-ch
    assert.Equal(t, "", val)
}

COPY

运行截图:

file

这两个UT都可以PASS,我只截图了一个PASS,这说明我们可以在一个关闭的channel上接收数据,只是接收到的都是0值。关于0值要特别说明一下,0值是针对不同类型的,比如:int的0值就是0,string的0值就是空字符串,指针的0值就是nil,看下面代码:

file

并非“任何后续的接收操作都将立即返回零值”,而是当channel中所有已发送的值都被接收后,接下来的接收操作会立即返回零值。

无缓冲channel

无缓冲通道顾名思义:就是没有数据缓冲能力的Channel,有goroutine向无缓冲Channel发送了数据就必须有另一个goroutine来接受,否则发送的goroutine会阻塞;反之,有goroutine从这个channel接受数据而没有另一个goroutine向这个channel发送,那么接受的goroutine也会阻塞。

应用场景:

  1. 部分任务需要同步就用无缓冲channel

来看场景代码:

有发送无接受

发送goroutine会被阻塞。


func TestUnbufferedChannel_ShouldWriteTimeout_WhenNoRoutineReadTheChannel(t *testing.T) {

    // 创建无缓冲channel
    c := make(chan int)
    is_timeout := false
    try_to_write_value := 1

    // when
    select {
    // 直接向channel中发送
    case c <- try_to_write_value:
    case <-time.After(3 * time.Second):
        // should
        is_timeout = true
    }

    assert.True(t, is_timeout)

}

COPY

file

有接受无发送

接收goroutine会被阻塞


func TestUnbufferedChannel_ShouldReadTimeout_WhenNoValueWriteToChannel(t *testing.T) {

    // 创建无缓冲channel
    c := make(chan int)
    is_timeout := false

    select {
    // 直接接受channel中的数据
    case <-c:
    case <-time.After(3 * time.Second):
        // should
        is_timeout = true
    }

    // 三秒后超时
    assert.True(t, is_timeout)

}

COPY

有发送有接受

有发送有接收,一切正常。

func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    // 将累加结果发送到channel
    c <- sum
}

func TestUnbufferedChannel_ShouldRecvValues_WhenWriteValueToChannel(t *testing.T) {

    // 创建无缓冲channel
    c := make(chan int)

    // given
    s := []int{1, 2, 3, 4, 5, 6}

    // when
    // 执行数组累加
    go sum(s[:], c)
    ret1 := <-c

    // should
    // 和应该是21
    assert.Equal(t, 21, ret1)
}

COPY

file

使用无缓冲Channel控制并发

// 先定义一个worker函数
// worker函数从无缓冲channel中接收
// 可以接到到数据就执行后面的打印内容
// 打印完成后退出
func worker(id int, lock chan bool) {
    var shouldRun = <-lock
    if shouldRun {
        fmt.Printf("time: %v Worker %d is working\n", time.Now(), id)
        time.Sleep(time.Second)
        fmt.Printf("time: %v Worker %d has finished\n", time.Now(), id)
    }
}

func TestUnbufferedChannel_ShouldRunOneByOne_When(t *testing.T) {
    lock := make(chan bool, 1)

    // 启动5个goroutine等待释放接收
    for i := 0; i < 5; i++ {
        go worker(i, lock)
    }

    // 发送5个true到channel
    for i := 0; i < 5; i++ {
        lock <- true
        time.Sleep(time.Second)
    }

    close(lock)

    time.Sleep(10 * time.Second)
}

COPY

file

使用无缓冲Channel实现CompleteFuture.anyOf()

CompleteFuture.anyOf() 是 Java 中的一个函数,它返回一个新的 CompletableFuture,当给定的任何 CompletableFuture 完成时,返回的 CompletableFuture 也完成,并带有完成的 CompletableFuture 的结果。


// future函数使用time.Sleep模拟实际业务处理延迟
// 业务处理完成后将业务数据写入无缓冲Channel
func future(id int, delay time.Duration, resChan chan int) {
    time.Sleep(delay)
    fmt.Printf("Hi, I have finished my task, my id is %d\n", id)
    resChan <- id
}

// 接收一系列上面的future, 然后使用go routine启动这些future函数并将结果写入到result channel,最后再返回result channel。
func anyOf(futures ...<-chan int) <-chan int {
    result := make(chan int)
    for _, future := range futures {
        go func(f <-chan int) {
            result <- <-f
        }(future)
    }
    return result
}

func TestAnyOf_ShouldSuccess(t *testing.T) {
    // 创建无缓冲的 channel
    resChan1 := make(chan int)
    resChan2 := make(chan int)
    resChan3 := make(chan int)

    // 启动 goroutines
    go future(1, 3*time.Second, resChan1)
    go future(2, 2*time.Second, resChan2)
    go future(3, 5*time.Second, resChan3)

    result := anyOf(resChan1, resChan2, resChan3)

    assert.Equal(t, 2, <-result)
}

COPY

上面有两个比较让人纠结的语法:

  1. <-chan int
  2. result <- <-f
  • <-chan int表示只读通道,anyOf只能读取通道内的数据;有了只读就有只写,只写通道chan<- int
  • result <- <-f表示从通道f中接收数据并将数据写入到result通道。这一行相当于执行了
    v := <-f
    result <- v
    COPY

    file

有缓冲channel

有缓冲channel就是你可以暂时把数据发送到channel,如果channel的缓冲区没有被占用完就不会阻塞,缓冲区被占用完了就被阻塞了。

特性:

  1. 发送goroutine在缓冲区没有用完之前不会阻塞,缓冲区被使用完了之后发送goroutine就会被阻塞
  2. 接受goroutine在缓冲区有数据时,不会阻塞,缓冲区没有数据时会被阻塞

有缓冲channel应用场景是什么?

  1. 任务队列就是最典型的场景,生产者消费者模型
  2. 其他无缓冲channel搞不定的就用有缓冲channel

实现一个有缓冲channel的RateLimiter

import (
    "sync"
    "sync/atomic"
    "testing"

    "fmt"
    "time"

    "github.com/stretchr/testify/assert"
)

type RateLimiter struct {
    tokens       chan struct{}
    refillTicker *time.Ticker
    closeCh      chan struct{}
}

func NewRateLimiter(rate int) *RateLimiter {
    r := &RateLimiter{
        tokens:       make(chan struct{}, rate),
        refillTicker: time.NewTicker(time.Second / time.Duration(rate)),
        closeCh:      make(chan struct{}),
    }

    go r.refill()

    return r
}

func (r *RateLimiter) refill() {
    for {
        select {
        case <-r.refillTicker.C:
            select {
            case r.tokens <- struct{}{}:
            default:
            }
        case <-r.closeCh:
            r.refillTicker.Stop()
            return
        }
    }
}

func (r *RateLimiter) Acquire() {
    <-r.tokens
}

func (r *RateLimiter) TryAcquire() bool {
    select {
    case <-r.tokens:
        return true
    default:
        return false
    }
}

func (r *RateLimiter) Close() {
    close(r.closeCh)
}

func myTask(id int) {
    fmt.Printf("time: %v workder %d is working\n", time.Now(), id)
    time.Sleep(20 * time.Millisecond)
    fmt.Printf("time: %v workder %d has finished\n", time.Now(), id)
}

func TestRateLimiter_ShouldPermitWithBlocking_WhenRequestOnce(t *testing.T) {
    rateLimiter := NewRateLimiter(100)

    startTime := time.Now()
    for i := 0; i < 1; i++ {
        rateLimiter.TryAcquire()
        myTask(i)
    }
    endTime := time.Now()

    elapsedTime := endTime.Sub(startTime)
    fmt.Printf("elapsed time: %v\n", elapsedTime)
    fmt.Printf("explect time: %v\n", 300*time.Millisecond)
    assert.True(t, elapsedTime < 300*time.Millisecond)
}

func TestRateLimiter_ShouldLimitPermits_WhenGivenLimitedResource(t *testing.T) {
    var counter int32 = 0
    rateLimiter := NewRateLimiter(100)
    wg := sync.WaitGroup{}
    startTime := time.Now()
    for i := range 1000 {
        wg.Add(1)
        go func() {
            rateLimiter.Acquire()
            myTask(i)
            atomic.AddInt32(&counter, 1)
            wg.Done()
        }()

    }
    wg.Wait()
    endTime := time.Now()
    elapsedTime := endTime.Sub(startTime)
    fmt.Printf("elapsed time: %v\n", elapsedTime)
    fmt.Printf("should greater than explect time: %v\n", 10*time.Second)
    assert.Equal(t, counter, int32(1000))
    assert.True(t, 10*time.Second < elapsedTime)
}

COPY

file

实现无缓冲Channel实现Java中的CyclicBarrier

CyclicBarrier 是一个同步工具,它允许一组线程互相等待,直到他们都到达了一个共同的屏障点。在涉及固定大小的线程团队必须偶尔相互等待的程序中,CyclicBarriers 非常有用。之所以称之为“循环”屏障,是因为在等待的线程被释放之后,它可以被重复使用。

  • await() 所有的参与者都调用了wait方法后返回或者被中断
    我们就实现这个await方法,暂时不支持中断,代码如下:

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

// CyclicBarrier 让一组goroutine在到达某个点之后才能继续执行
type CyclicBarrier struct {
    // 总goroutine数量
    participant int
    // 用于等待所有goroutine准备好
    waitGroup sync.WaitGroup
    // 无缓冲channel,用于goroutine间同步
    barrierChan chan struct{}
    running     int32
}

// NewCyclicBarrier 创建一个新的CyclicBarrier
func NewCyclicBarrier(participant int) *CyclicBarrier {
    b := &CyclicBarrier{
        participant: participant,
        barrierChan: make(chan struct{}),
        running:     int32(participant),
    }
    // 设置等待的goroutine数
    b.waitGroup.Add(participant)
    return b
}

// 当一个goroutine调用Wait时,
// 它将在屏障处等待,
// 直到所有goroutine都到达这里
func (b *CyclicBarrier) Wait() {
    // 一个goroutine准备好了
    b.waitGroup.Done()

    // 等待所有goroutine都准备好
    b.waitGroup.Wait()

    // 当所有goroutine都准备好了,关闭channel进行广播通知
    if atomic.AddInt32(&b.running, -1) == 0 {
        close(b.barrierChan)
    } else {
        // 等待通知
        <-b.barrierChan
    }

}

// 阻塞调用goroutine直到所有goroutine都调用了Wait方法,
// 屏障开放后,重新置为待关闭状态
func (b *CyclicBarrier) Await() {
    // 等待屏障开放的信号
    <-b.barrierChan

    // 重置屏障状态
    b.barrierChan = make(chan struct{})
    b.waitGroup.Add(b.participant)
}

func (b *CyclicBarrier) Close() {
    close(b.barrierChan)
}

func main() {
    // 这里我们设置3个goroutine参与
    barrier := NewCyclicBarrier(100)

    for i := 0; i < 100; i++ {
        go func(i int) {
            fmt.Printf("Goroutine %d is working...\n", i)
            // 模拟工作
            time.Sleep(time.Duration(i+1) * time.Second)
            fmt.Printf("Goroutine %d reached the barrier.\n", i)
            barrier.Wait()

            fmt.Printf("Goroutine %d passed the barrier.\n", i)
        }(i)
    }

    // 主goroutine等待所有goroutine都到达屏障
    barrier.Await()
    fmt.Println("All goroutines have passed the barrier")
}

COPY

参考

  1. go/src/runtime/chan.go at master · golang/go · GitHub
  2. 逐步学习Go-并发通道chan(channel) – FOF编程网

编写不易,如有问题请评论告知

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1549025.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PTA L2-037 包装机

一种自动包装机的结构如图 1 所示。首先机器中有 N 条轨道&#xff0c;放置了一些物品。轨道下面有一个筐。当某条轨道的按钮被按下时&#xff0c;活塞向左推动&#xff0c;将轨道尽头的一件物品推落筐中。当 0 号按钮被按下时&#xff0c;机械手将抓取筐顶部的一件物品&#x…

详解智慧路灯杆网关的集中供电能力

智慧路灯杆网关是智慧杆物联网系统中不可或缺的设备。智慧杆网关不仅可以作为杆载设备与云平台、设备与设备之间的桥梁&#xff0c;促进数据的无缝传输&#xff0c;而且还能提供高效的能源管理和供电功能。 BMG8200系列交流型智慧路灯杆网关就集成了强大的供电能力&#xff0c;…

短视频矩阵系统--技术3年源头迭代

短视频矩阵系统核心技术算法主要包括以下几个方面&#xff1a; 1. 视频剪辑&#xff1a;通过剪辑工具或API从各大短视频平台抓取符合要求的视频。这些视频通常符合某些特定条件&#xff0c;如特定关键词、特定时间段发布的视频、视频点赞评论转发等数据表现良好的视频。 2. 视…

Kotlin 中的类和构造方法

1 Kotlin 中的类以及接口 对象是什么&#xff1f;任何可以描述的事物都可以看作对象。我们以鸟为例&#xff0c;来分析它的组成&#xff1a; 形状、颜色等可以看作是鸟的静态属性&#xff1b;年龄、大小等可以看作是鸟的动态属性&#xff1b;飞行、进食等可以看作是鸟的行为&…

VTK 光源 Transform 自定义BoundingBox绘制

这段代码展示了如何在 VTK 中创建光源&#xff0c;并在场景中添加光源的可视化表示。以下是关于代码的详细解释和教程&#xff1a; 创建光源 vtkSmartPointer<vtkLight> light vtkSmartPointer<vtkLight>::New();使用 vtkSmartPointer 创建了一个智能指针&#…

oracle补丁升级(19.3-19.22)

一、备份原来的opatch和数据库文件 这里要根据自己的路径&#xff1a; mv /u01/app/oracle/product/19.3.0/db_1/OPatch /u01/app/oracle/product/19.3.0/db_1/OPatch.bakcd /u01/app mkdir -p /u01/app/backup tar -pcvf /u01/app/backup/oracle_backup.tar /u01/app/oracle/…

glibc内存管理ptmalloc - 实时打印bin链的变化

前言 在《glibc内存管理ptmalloc - largebin》中我们详细解释了 largebins共63个&#xff0c;并用表格点出了每个bin的size的范围largebin在free一些内存后的状态 特别是第2点&#xff0c;我其实不太满意&#xff0c;因为只有全部free后的一个结果&#xff0c;并没有中间状态…

【jenkins+cmake+svn管理c++项目】jenkins回传文件到svn(windows)

书接上文&#xff1a;创建一个项目 在经过cmakemsbuild顺利生成动态库之后&#xff0c;考虑到我一个项目可能会生成多个动态库&#xff0c;它们分散在build内的不同文件夹&#xff0c;我希望能将它们收拢到一个文件夹下&#xff0c;并将其回传到svn。 一、动态库移位—cmake实…

工作12年了,我还没能过上自己想要的生活

写这篇文章之前&#xff0c;我想了很久&#xff0c;不知道该如何下笔&#xff0c;如何向读者说明这些年我是怎么走过来的&#xff0c;我只是依稀的记得当时的自己犹如在昨天。 2009年大学毕业&#xff0c;我和大多数的毕业生一样写简历求职。不管是招聘会还是网上投简历&#x…

容器网络隔离验证

结论&#xff0c;可以直接扫描内网路由能通的机器。 1.节点1 192.168.55.6 2.节点2 192.168.55.5 3.非节点3 192.168.55.3

4005.K次取反后最大化的数组和

// 定义一个名为Solution的类 class Solution {// 定义一个public方法largestSumAfterKNegations&#xff0c;输入参数为一个整数数组nums和一个整数K&#xff0c;返回值类型为整数public int largestSumAfterKNegations(int[] nums, int K) {// 使用Java流对数组中的元素进行操…

win11 环境配置 之 Jmeter

一、安装 JDK 1. 安装 jdk 截至当前最新时间&#xff1a; 2024.3.27 jdk最新的版本 是 官网下载地址&#xff1a; https://www.oracle.com/java/technologies/downloads/ 建议下载 jdk17 另存为到该电脑的 D 盘下&#xff0c;新建jdk文件夹 开始安装到 jdk 文件夹下 2. 配…

1111111111111111111111111111111111

欢迎关注博主 Mindtechnist 或加入【智能科技社区】一起学习和分享Linux、C、C、Python、Matlab&#xff0c;机器人运动控制、多机器人协作&#xff0c;智能优化算法&#xff0c;滤波估计、多传感器信息融合&#xff0c;机器学习&#xff0c;人工智能等相关领域的知识和技术。关…

ClickHouse初体验

1.clickHouse是啥&#xff1f; ClickHouse 是俄罗斯的 Yandex 于 2016 年开源的列式存储数据库(DBMS)&#xff0c;使用 C语言编写&#xff0c;主要用于在线分析处理查询(OLAP)&#xff0c;能够使用SQL查询实时生成分析数据报告 2.clickHouse的特点 2.1列式存储 对于列的聚合&…

Java零基础入门到精通_Day 3

37 switch default&#xff1a; 后面的break;可以省略 38 春夏秋冬 注意事项:在switch语句中&#xff0c;如果case控制的语句体后面不写break&#xff0c;将出现穿透现象&#xff0c;在不判断下一个case值的情况下&#xff0c;向下运行 直到遇到break&#xff0c;或者整体swi…

班级综合测评管理系统的设计与实现|Springboot+ Mysql+Java+ B/S结构(可运行源码+数据库+设计文档)

本项目包含可运行源码数据库LW&#xff0c;文末可获取本项目的所有资料。 推荐阅读100套最新项目持续更新中..... 2024年计算机毕业论文&#xff08;设计&#xff09;学生选题参考合集推荐收藏&#xff08;包含Springboot、jsp、ssmvue等技术项目合集&#xff09; 目录 1. …

opengl草稿复习,承上启下

目录 1、链接文件夹中的cpp 2、链接资源到输出目录 3、多编译目标 4、cmakelist添加库 4、添加glfw和glad 5、glfw运行 6、NDC、VBO、VAO 7、渐变三角形 8、渲染两个三角形 9、渲染两个三角形&#xff0c;同时基于原来颜色进行渐变 1、链接文件夹中的cpp cmake_minimu…

一本书掌握数字化运维方法,构建数字化运维体系

文章目录 前言主要内容读者对象 前言 数字化转型已经成为大势所趋&#xff0c;各行各业正朝着数字化方向转型&#xff0c;利用数字化转型方法论和前沿科学技术实现降本、提质、增效&#xff0c;从而提升竞争力。 数字化转型是一项长期工作&#xff0c;包含的要素非常丰富&…

React和Vue.js的有什么区别

在当今前端开发领域&#xff0c;React 和 Vue.js 作为两大热门的前端框架备受开发者关注。它们各自拥有独特的特点和优势&#xff0c;在实际项目中有着广泛的运用。本文将深入探讨 React 和 Vue.js 之间的区别&#xff0c;从组件化方式、数据绑定、模板语法以及生态系统和工具支…

鸿蒙HarmonyOS应用开发之C/C++标准库机制概述

OpenHarmony NDK提供业界标准库 libc标准库、 C标准库 &#xff0c;本文用于介绍C/C标准库在OpenHarmony中的机制&#xff0c;开发者了解这些机制有助于在NDK开发过程中避免相关问题。 1. C兼容性 在OpenHarmony系统中&#xff0c;系统库与应用Native库都在使用C标准库&#…