Kotlin 协程基础入门:Channel(通道)

news2024/11/27 16:50:39

简介
Kotlin 中的 Channel(通道)是一种协程间通信的机制,用于在协程之间传递数据。它提供了一种有序、线程安全的方式,让生产者和消费者之间进行异步数据传输。

Channel 这个管道的其中一端,是发送方;管道的另一端是接收方。而管道本身,则可以用来传输数据。

使用 Channel 可以实现以下操作:

  • 发送:通过 send() 方法将数据发送到通道。
  • 接收:通过 receive() 方法从通道接收数据。
  • 关闭:通过 close() 方法关闭通道来表示数据传输结束。

Channel 的应用场景包括:

  • 生产者-消费者模式:可以使用通道在生产者与消费者之间传递数据,并且通过协程的方式进行非阻塞的异步处理。
  • 协程之间的通信:可以使用通道来在不同的协程之间传递消息、事件或任何其他类型的数据。

下面是一个使用 Channel 的示例代码:

fun main() = runBlocking {
//coroutine#1
    val channel = Channel<Int>()

    // 生产者协程
    launch {
    //coroutine#2
        repeat(5) {
            // 发送数据到通道
            println("send: $it")
            channel.send(it)
        }
        // 关闭通道
        println("close: ")
        channel.close()
    }

    // 消费者协程
    launch {
    //coroutine#3
        for (element in channel) {
            // 从通道接收数据
            println("Received: $element")
        }
    }

    // 等待协程执行完成
    delay(1000)
}
//对应输出
send: 0
Received: 0
send: 1
send: 2
Received: 1
Received: 2
send: 3
send: 4
Received: 3
Received: 4
close: 

Process finished with exit code 0


在上述示例中,创建了一个 Channel<Int> 对象用于传递整数数据。通过 launch 函数创建了两个协程,一个用于发送数据,另一个用于接收数据。在发送者协程中,使用 send() 方法将 0 到 4 的整数发送到通道,并在发送完毕后关闭通道。在接收者协程中,我们使用 for 循环来不断从通道接收数据。

可以看出上面 coroutine#2、coroutine#3 是交替执行的,协程本来就是 互相协作的嘛。
还可以看出来,Channel 可以跨越不同的协程进行通信。在coroutine#1当中创建的 Channel,然后分别在 coroutine#2、coroutine#3 当中使用 Channel 来传递数据。

Channel中的close() 注意点

channel 其实也是一种协程资源,在用完 channel 以后,如果我们不去主动关闭它的话,是会造成不必要的资源浪费的。

如果在使用 Kotlin 的 Channel 时忘记关闭通道,可能会导致协程阻塞或内存泄漏等后果

如果把上面代码中的channel.close()给注释掉的话
我们创建了的 Channel<Int> 对象并在生产者协程中向通道发送了 0 到 4 的整数。但是,我们没有在生产者协程中关闭通道。由于通道没有被关闭,消费者协程会一直等待新的数据,导致协程无法结束。

如果在实际应用中忘记关闭通道,可能会导致以下问题:

  1. 内存泄漏:通道会持有对发送者和接收者协程的引用,导致相关资源无法及时释放,可能会产生内存泄漏。
  2. 协程阻塞:如果通道没有关闭,接收者协程会一直等待新的数据,导致协程无法正常结束,可能会阻塞整个程序的执行。

所以 所以: 使用完 Channel 后始终调用 close() 方法关闭通道,以确保协程能够正常退出,并释放相关资源。

Channel 的源码剖析

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> = ...

capacity

缓冲区大小(capacity)决定了通道能够缓存的元素数量。通道的缓冲区允许发送者在接收者未准备好接收时继续发送元素,从而实现异步、非阻塞的通信。

capacity 参数可接受以下可选值:

  • UNLIMITED:代表了无限容量;
  • CONFLATED,代表了容量为 1,新的数据会替代旧的数据;
  • BUFFERED,代表了具备一定的缓存容量,默认情况下是 64,具体容量由这个 VM 参数决定 “kotlinx.coroutines.channels.defaultBuffer”。
  • RENDEZVOUS(默认):这是一个特殊的值,表示无缓冲区通道。它将通道设置为同步模式,要求发送者和接收者同时准备好,才能成功进行通信。当发送者和接收者都在等待对方时,它们将会“会合”(rendezvous)并进行元素的传递。
  • 非负整数值:表示有限容量通道的缓冲区大小。这意味着通道可以在一定数量的元素被接收之前缓冲发送的元素。例如,如果将 capacity 设置为 10,那么可以在接收者接收之前向通道发送最多 10 个元素。

通常情况下,如果希望在发送和接收之间有一定的解耦和缓冲能力,可以选择使用具有非零容量的通道。而如果需要完全同步的通信方式,可以使用 RENDEZVOUS。

需要注意的是,通道的缓冲区大小并不限制发送者或接收者的数目。通道可以同时具有多个发送者和接收者。capacity 参数仅仅控制通道内部缓冲区的大小。

onBufferOverflow

Channel 类型具有一个名为 BufferOverflow 的枚举类,用于表示在通道缓冲区溢出时的处理策略。这个参数可以在创建通道时进行配置。以下是 BufferOverflow 所提供的可选值:

  1. BufferOverflow.SUSPEND: 默认的溢出策略。当通道的缓冲区已满时,发送操作将会被挂起,直到有空间可用。类似地,当缓冲区为空时,接收操作也会被挂起,直到有元素可用。
    示例
fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)

    launch {
        repeat(5) {
            channel.send(it)
            println("Sent: $it")
        }
        channel.close()
    }

    launch {
        for (element in channel) {
            println("Received: $element")
        }

    }

    delay(1000)
}
//输出
Sent: 0
Sent: 1
Received: 0
Received: 1
Sent: 2
Sent: 3
Received: 2
Received: 3
Sent: 4
Received: 4
  1. BufferOverflow.DROP_OLDEST: 在缓冲区已满时,新的元素会被丢弃,而不是挂起发送操作。这意味着最早进入缓冲区的元素将被丢弃,以为新元素腾出空间。
    示例
fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)

    launch {
        repeat(5) {
            channel.send(it)
            println("Sent: $it")
        }
        channel.close()
    }

    launch {
        for (element in channel) {
            println("Received: $element")
        }

    }

    delay(1000)
}
//输出
Sent: 0
Sent: 1
Sent: 2
Sent: 3
Sent: 4
Received: 3
Received: 4

Process finished with exit code 0
  1. BufferOverflow.DROP_LATEST:在缓冲区已满时,新的元素会被丢弃,而不是挂起发送操作。这意味着最新的元素将被丢弃,以维持最早进入缓冲区的元素。
    示例
fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_LATEST)

    launch {
        repeat(5) {
            channel.send(it)
            println("Sent: $it")
        }
        channel.close()
    }

    launch {
        for (element in channel) {
            println("Received: $element")
        }

    }

    delay(1000)
}
//输出
Sent: 0
Sent: 1
Sent: 2
Sent: 3
Sent: 4
Received: 0
Received: 1

Process finished with exit code 0

当缓冲区已满时,新的元素被丢弃。因此,最新的元素 “2” 和 “3”,“4”被丢弃,以维持最早进入缓冲区的元素 “0” 和 “1”。

借用朱涛大神的图概括下:
在这里插入图片描述

onUndeliveredElement

onUndeliveredElement 的作用,就是一个回调,当我们发送出去的 Channel 数据无法被接收方处理的时候,就可以通过 onUndeliveredElement 这个回调,来进行监听。

它的使用场景一般都是用于“接收方对数据是否被消费特别关心的场景”。比如说,我发送出去的消息,接收方是不是真的收到了?对于接收方没收到的信息,发送方就可以灵活处理了,比如针对这些没收到的消息,发送方可以先记录下来,等下次重新发送。

Channel 关闭引发的问题

由上面代码可知,忘记调用了 close(),所以会导致程序一直运行无法终止。
如果你不想每次使用它的时候就去关注有没有close(),有没有其他办法呢?当然,那就是produce{}

produce()CoroutineScope的扩展函数,用于创建一个生产者通道,该通道可以与协程一起使用来生成数据流。

示例

fun main() = runBlocking {
    val channel = produce<Int> {
        for (i in 1..5) {
            delay(1000)  // 模拟生产者产生数据的延迟
            send(i)      // 发送数据到通道
        }
    }

    for (item in channel) {
        println(item)  // 打印从通道接收到的数据
    }

    println("Done")
}
//输出
1
2
3
4
5
Done

Process finished with exit code 0

可以看到,当所有的数据都已经被发送到通道,并且生产者完成后,程序就正常退出了,这是因为produce() 函数将会自动关闭通道。

上面示例都是通过for循环进行接收生产者发送的数据的,如果使用 channel.receive()单个接收会出现什么问题呢?
示例

fun main() = runBlocking {
    val channel = produce<Int> {
        for (i in 1..3) {
            delay(1000)  // 模拟生产者产生数据的延迟
            send(i)      // 发送数据到通道
        }
    }
    channel.receive()
    println("Received 0")
    channel.receive()
    println("Received 1")
    channel.receive()
    println("Received 2")
    channel.receive()
    println("Received 3")
    channel.receive()
    println("Received 4")

    println("Done")
}
//输出
Received 0
Received 1
Received 2
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed

通道被关闭,再使用 channel.receive() 会抛出 ClosedReceiveChannelException异常
如果不使用produce()创建channel,上面代码打印到 Received 2 后,程序会被永远挂起,此处不再写示例

在channel中,对于发送方,我们可以使用“isClosedForSend”来判断当前的 Channel 是否关闭;对于接收方来说,我们可以用“isClosedForReceive”来判断当前的 Channel 是否关闭。

尽管 尽管有上面两个方法判断Channel 是否关闭,使用它们还会遇到其他问题,可以自行了解,此处不再赘述,
记住一句话:最好不要用 channel.receive()

以上代码除了可以使用 for 循环以外,还可以使用 Kotlin 为我们提供的另一个高阶函数:channel.consumeEach {}
·示例·

fun main() = runBlocking {
    val channel = Channel<Int>() // 创建一个整数通道

    // 启动一个生产者协程,向通道中发送数据
    launch {
        for (i in 1..5) {
            channel.send(i)
        }
        channel.close() // 关闭通道
    }

    // 使用 consumeEach 函数消费通道中的元素
    channel.consumeEach { element ->
        println("Received: $element")
    }

    println("Done")
}
//输出
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Done

Process finished with exit code 0

需要注意的是,consumeEach() 函数会挂起当前协程,直到通道关闭并且所有的元素都被消费完毕。因此,当使用 consumeEach() 函数时,我们无需手动检查通道是否已关闭,它会自动处理通道的关闭操作。

补充:在某些特殊场景下,如果我们必须要自己来调用 channel.receive(),那么可以考虑使用 receiveCatching(),它可以防止异常发生。

为什么说 Channel 是“热”的?

在编程语境中,“热”(Hot)和"冷"(Cold)通常用于描述数据流的行为方式。
"冷"数据流是指当消费者订阅数据流时,生产者才开始生成数据,并且每个消费者都是独立获取数据的。这意味着当没有消费者订阅时,生产者不会产生任何数据。每个消费者独立消费数据,不会相互影响。

相比之下,"热"数据流是指数据流在生产者开始生成数据之前,消费者就已经订阅了数据流。生产者产生数据时,所有的消费者都会立即接收到数据,并且消费者无法决定何时开始接收数据。在这种情况下,数据流是连续的,不管消费者是否准备好处理数据。

Kotlin 中的 Channel 通常被认为是"热"的,因为它们在创建时就立即开始传递数据。当你向 Channel 发送数据时,无论是否有消费者准备好接收数据,数据仍然会被缓冲或传递。这意味着如果没有消费者在等待接收数据,那么可能会导致数据积压在通道中,直到有消费者准备好接收为止。

由于 Channel 是"热"的,所以在使用 Channel 时需要小心。如果没有适当的处理方式,可能会导致数据积压或消费者无法跟上产生速度,从而引发问题,例如内存泄漏。

总结起来,"热"通道是指数据在产生之前,消费者就已经订阅了通道,并且数据会持续传递给所有消费者,无论他们是否准备好处理数据。这与"冷"数据流不同,后者只有在有消费者订阅时才会开始生成数据。

一句话概括:Channel 是“热”的。这是因为“不管有没有接收方,发送方都会工作”。


感谢:朱涛 · Kotlin 编程第一课

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/778057.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV实现照片换底色处理

目录 1.导言 2.引言 3.代码分析 4.优化改进 5.总结 1.导言 在图像处理领域&#xff0c;OpenCV是一款强大而广泛应用的开源库&#xff0c;能够提供丰富的图像处理和计算机视觉功能。本篇博客将介绍如何利用Qt 编辑器调用OpenCV库对照片进行换底色处理&#xff0c;实现更加…

Promise 讲解,js知识,es6

文章目录 一、Promise的三种状态1. 初始态pending2. 成功态fulfilled&#xff0c;调用resolve方法3. 失败态rejected&#xff0c;调用reject方法 二、Promise的方法then方法catch方法 三、async和awaitasync 函数await 表达式 四、代码举例帮助理解1、Promise的值通过then方法获…

【idea】编译热部署

项目场景&#xff1a; 实际工作中&#xff0c;用到了idea&#xff0c;发现idea不编译代码&#xff0c;热部署什么的都不行 问题描述 在实际的工作中idea遇到了各种问题&#xff0c;之前一直用的2022版的&#xff0c;公司用的jboss起的项目&#xff0c;启动过程极其痛苦&#…

电子档案管理系统

电子文档 登陆成功后点击左上角“”选择“档案管理”跳转到“档案管理首页”如下图: 该界面列出用户被授权查看的可视化数据图形,柱图、饼图、线图、雷达图等,并结合数据仓库里的动态数据进行数据展现。 图形所展示的数据可根据企业需求定制,点击图形即可查看关联内容,方…

C++类与对象(上部曲)

目录 面向过程和面向对象初步认识 类的引入 类的定义 类的两种定义方式&#xff1a; 1. 声明和定义全部放在类体中 2. 类声明放在.h文件中&#xff0c;成员函数定义放在.cpp文件中 类的访问限定符及封装 1 访问限定符 2 封装 类的实例化 类对象的存储方式 this指针 …

IDEA快速创建SpringBoot

文件具有错误的版本 61.0, 应为 52.0报错可以看看是不是Springboot的版本比较高 和jdk版本不匹配 package com.qf.controller;import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframewor…

黑马 pink h5+css3+移动端前端

网页概念 网页是网站的一页,网页有很多元素组成,包括视频图片文字视频链接等等,以.htm和.html后缀结尾,俗称html文件 HTML 超文本标记语言,描述网页语言,不是编程语言,是标记语言,有标签组成 超文本指的是不光文本,还有图片视频等等标签 常用浏览器 firefox google safari…

从Vue2到Vue3【四】——Composition API(第四章)

系列文章目录 内容链接从Vue2到Vue3【零】Vue3简介从Vue2到Vue3【一】Composition API&#xff08;第一章&#xff09;从Vue2到Vue3【二】Composition API&#xff08;第二章&#xff09;从Vue2到Vue3【三】Composition API&#xff08;第三章&#xff09;从Vue2到Vue3【四】C…

os.environ[“CUDA_VISIBLE_DEVICES“]学习总结

今天发现一个很有意思的东西 import torch import os # Specify the GPU device os.environ["CUDA_VISIBLE_DEVICES"] "1" print(torch.cuda.is_available())但是如果修改下面的设置后&#xff0c;结果就变成了 import torch import os # Specify the…

【100天精通python】Day7:数据结构_列表 List的创建、删除、访问、遍历、统计、排序、推导等使用

目录 1 列表的创建 2 列表的删除 3 访问列表元素 4 遍历列表 5 添加修改删除列表元素 6 对列表进行统计和计算 7 对列表进行排序 8 列表推导式 9 多维列表 在Python中&#xff0c;列表是一种有序的可变数据类型&#xff0c;用于存储一组元素。 列表使用方括号“[] ”来…

汇编习题1-100和

.text .globl _start_start:mov r0,#0MOV r1,#0stop:cmp r1,#0x64addcc r1,r1,#0x1addcc r0,r0,r1b stop .end运行结果&#xff1a; 寄存器R0就为16进制的结果

K8S初级入门系列之四-Namespace/ConfigMap/Secret

一、前言 本章节我们继续学习Namespace、ConfigMap、Secret基础概念&#xff0c;了解他们基本用法和操作。NameSpace为命名空间&#xff0c;在同一集群中试下资源隔离。ConfigMap通过key-value的方式实现明文配置数据的保存&#xff0c;Secret与ConfigMap类似&#xff0c;不过是…

Windows下使用rocketMq

1、下载&#xff08;下载zip后解压即可&#xff09; 下载地址&#xff1a;下载 | RocketMQ 2、配置环境变量&#xff08;注意&#xff1a;该目录的下一级是bin&#xff09; 3、启动 在bin目录下使用cmd 分别输入 3.1 启动name server (下图是启动成功的显示&#xff0c;窗口…

CPU密集型和IO密集型任务的权衡:如何找到最佳平衡点

关于作者&#xff1a;CSDN内容合伙人、技术专家&#xff0c; 从零开始做日活千万级APP。 专注于分享各领域原创系列文章 &#xff0c;擅长java后端、移动开发、人工智能等&#xff0c;希望大家多多支持。 目录 一、导读二、概览三、CPU密集型与IO密集型3.1、CPU密集型3.2、I/O密…

dp算法 力扣978、力扣139、力扣467

目录 一、力扣978978. 最长湍流子数组 - 力扣&#xff08;LeetCode&#xff09; &#xff08;一&#xff09;题目详情 &#xff08;二&#xff09;算法讲解 &#xff08;三&#xff09;代码 二、力扣139139. 单词拆分 - 力扣&#xff08;LeetCode&#xff09; &#xff0…

计网学习笔记 wireless mobile networks

无线局域网的组成 无线网络在近些年来一直是个非常流行的东西。现在的移动用户数量已经大大超过了有线用户数量&#xff0c;比例大于5:1。 实现无线网络的挑战性 从有线到无线是网络构建上一个伟大的设想&#xff0c;挑战性主要集中在wireless和mobility两个点上。 这两个是…

Express 框架的基本操作

目录 1、应用生成器 2、基本路由 2.1、在跟路由下配置 GET请求&#xff0c;返回对应相应内容。 2.2、在跟路由下配置 POST请求&#xff0c;返回对应相应内容。 2.3、在跟路由下配置 PUT请求&#xff0c;返回对应相应内容。 2.4、在根路由下配置DELETE请求&#xff0c;返回对…

【剑指offer】学习计划day4

目录 一. 前言 二.数组中重复的数字 a.题目 b.题解分析 c.AC代码 三.在排序数组中查找数字 I a.题目 b.题解分析 c.AC代码 四.0&#xff5e;n-1中缺失的数字 a.题目 b.题解分析 c.AC代码 一. 前言 本系列是针对Leetcode中剑指offer学习计划的记录与思路讲解。详情查看以下…

学习系统编程No.32【线程互斥实战】

引言&#xff1a; 北京时间&#xff1a;2023/7/19/15:22&#xff0c;昨天更新完博客&#xff0c;和舍友下了一会棋&#xff0c;快乐就是这么简单&#xff0c;哈哈哈&#xff01;总体来说&#xff0c;摆烂程度得到一定的改善&#xff0c;想要达到以前的水准&#xff0c;需要一定…

分布式 - 消息队列Kafka:Kafka分区常见问题总结

文章目录 01. Kafka 的分区是什么&#xff1f;02. Kafka 为什么需要分区&#xff1f;03. Kafka 分区有什么作用&#xff1f;03. Kafka 为什么使用分区的概念而不是直接使用多个主题呢&#xff1f;04. Kafka 分区的数量有什么限制&#xff1f;05. Kafka 分区的副本有什么作用&am…