简介
Kotlin 中的 Channel(通道)是一种协程间通信的机制,用于在协程之间传递数据。它提供了一种有序、线程安全的方式,让生产者和消费者之间进行异步数据传输。
Channel 这个管道的其中一端,是发送方;管道的另一端是接收方。而管道本身,则可以用来传输数据。
使用 Channel 可以实现以下操作:
- 发送:通过
send()
方法将数据发送到通道。 - 接收:通过
receive()
方法从通道接收数据。 - 关闭:通过
close()
方法关闭通道来表示数据传输结束。
Channel 的应用场景包括:
- 生产者-消费者模式:可以使用通道在生产者与消费者之间传递数据,并且通过协程的方式进行非阻塞的异步处理。
- 协程之间的通信:可以使用通道来在不同的协程之间传递消息、事件或任何其他类型的数据。
下面是一个使用 Channel 的示例代码:
fun main() = runBlocking {
//coroutine#1
val channel = Channel<Int>()
// 生产者协程
launch {
//coroutine#2
repeat(5) {
// 发送数据到通道
println("send: $it")
channel.send(it)
}
// 关闭通道
println("close: ")
channel.close()
}
// 消费者协程
launch {
//coroutine#3
for (element in channel) {
// 从通道接收数据
println("Received: $element")
}
}
// 等待协程执行完成
delay(1000)
}
//对应输出
send: 0
Received: 0
send: 1
send: 2
Received: 1
Received: 2
send: 3
send: 4
Received: 3
Received: 4
close:
Process finished with exit code 0
在上述示例中,创建了一个 Channel<Int>
对象用于传递整数数据。通过 launch
函数创建了两个协程,一个用于发送数据,另一个用于接收数据。在发送者协程中,使用 send()
方法将 0 到 4 的整数发送到通道,并在发送完毕后关闭通道。在接收者协程中,我们使用 for
循环来不断从通道接收数据。
可以看出上面 coroutine#2、coroutine#3
是交替执行的,协程本来就是 互相协作的嘛。
还可以看出来,Channel 可以跨越不同的协程进行通信。在coroutine#1
当中创建的 Channel
,然后分别在 coroutine#2、coroutine#3
当中使用 Channel 来传递数据。
Channel中的close() 注意点
channel 其实也是一种协程资源,在用完 channel 以后,如果我们不去主动关闭它的话,是会造成不必要的资源浪费的。
如果在使用 Kotlin 的 Channel 时忘记关闭通道,可能会导致协程阻塞或内存泄漏等后果
如果把上面代码中的channel.close()
给注释掉的话
我们创建了的 Channel<Int>
对象并在生产者协程中向通道发送了 0 到 4 的整数。但是,我们没有在生产者协程中关闭通道。由于通道没有被关闭,消费者协程会一直等待新的数据,导致协程无法结束。
如果在实际应用中忘记关闭通道,可能会导致以下问题:
- 内存泄漏:通道会持有对发送者和接收者协程的引用,导致相关资源无法及时释放,可能会产生内存泄漏。
- 协程阻塞:如果通道没有关闭,接收者协程会一直等待新的数据,导致协程无法正常结束,可能会阻塞整个程序的执行。
所以 所以: 使用完 Channel 后始终调用 close() 方法关闭通道,以确保协程能够正常退出,并释放相关资源。
Channel 的源码剖析
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> = ...
capacity
缓冲区大小(capacity)决定了通道能够缓存的元素数量。通道的缓冲区允许发送者在接收者未准备好接收时继续发送元素,从而实现异步、非阻塞的通信。
capacity 参数可接受以下可选值:
UNLIMITED
:代表了无限容量;CONFLATED
,代表了容量为 1,新的数据会替代旧的数据;BUFFERED
,代表了具备一定的缓存容量,默认情况下是 64,具体容量由这个 VM 参数决定 “kotlinx.coroutines.channels.defaultBuffer”。RENDEZVOUS(默认)
:这是一个特殊的值,表示无缓冲区通道。它将通道设置为同步模式,要求发送者和接收者同时准备好,才能成功进行通信。当发送者和接收者都在等待对方时,它们将会“会合”(rendezvous)并进行元素的传递。非负整数值
:表示有限容量通道的缓冲区大小。这意味着通道可以在一定数量的元素被接收之前缓冲发送的元素。例如,如果将 capacity 设置为 10,那么可以在接收者接收之前向通道发送最多 10 个元素。
通常情况下,如果希望在发送和接收之间有一定的解耦和缓冲能力,可以选择使用具有非零容量的通道。而如果需要完全同步的通信方式,可以使用 RENDEZVOUS。
需要注意的是,通道的缓冲区大小并不限制发送者或接收者的数目。通道可以同时具有多个发送者和接收者。capacity 参数仅仅控制通道内部缓冲区的大小。
onBufferOverflow
Channel
类型具有一个名为 BufferOverflow
的枚举类,用于表示在通道缓冲区溢出时的处理策略。这个参数可以在创建通道时进行配置。以下是 BufferOverflow
所提供的可选值:
BufferOverflow.SUSPEND:
默认的溢出策略。当通道的缓冲区已满时,发送操作将会被挂起,直到有空间可用。类似地,当缓冲区为空时,接收操作也会被挂起,直到有元素可用。
示例
fun main() = runBlocking {
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)
launch {
repeat(5) {
channel.send(it)
println("Sent: $it")
}
channel.close()
}
launch {
for (element in channel) {
println("Received: $element")
}
}
delay(1000)
}
//输出
Sent: 0
Sent: 1
Received: 0
Received: 1
Sent: 2
Sent: 3
Received: 2
Received: 3
Sent: 4
Received: 4
BufferOverflow.DROP_OLDEST:
在缓冲区已满时,新的元素会被丢弃,而不是挂起发送操作。这意味着最早进入缓冲区的元素将被丢弃,以为新元素腾出空间。
示例
fun main() = runBlocking {
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
launch {
repeat(5) {
channel.send(it)
println("Sent: $it")
}
channel.close()
}
launch {
for (element in channel) {
println("Received: $element")
}
}
delay(1000)
}
//输出
Sent: 0
Sent: 1
Sent: 2
Sent: 3
Sent: 4
Received: 3
Received: 4
Process finished with exit code 0
BufferOverflow.DROP_LATEST:
在缓冲区已满时,新的元素会被丢弃,而不是挂起发送操作。这意味着最新的元素将被丢弃,以维持最早进入缓冲区的元素。
示例
fun main() = runBlocking {
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_LATEST)
launch {
repeat(5) {
channel.send(it)
println("Sent: $it")
}
channel.close()
}
launch {
for (element in channel) {
println("Received: $element")
}
}
delay(1000)
}
//输出
Sent: 0
Sent: 1
Sent: 2
Sent: 3
Sent: 4
Received: 0
Received: 1
Process finished with exit code 0
当缓冲区已满时,新的元素被丢弃。因此,最新的元素 “2” 和 “3”,“4”被丢弃,以维持最早进入缓冲区的元素 “0” 和 “1”。
借用朱涛
大神的图概括下:
onUndeliveredElement
onUndeliveredElement 的作用,就是一个回调,当我们发送出去的 Channel 数据无法被接收方处理的时候,就可以通过 onUndeliveredElement 这个回调,来进行监听。
它的使用场景一般都是用于“接收方对数据是否被消费特别关心的场景”。比如说,我发送出去的消息,接收方是不是真的收到了?对于接收方没收到的信息,发送方就可以灵活处理了,比如针对这些没收到的消息,发送方可以先记录下来,等下次重新发送。
Channel 关闭引发的问题
由上面代码可知,忘记调用了 close()
,所以会导致程序一直运行无法终止。
如果你不想每次使用它的时候就去关注有没有close()
,有没有其他办法呢?当然,那就是produce{}
produce()
是CoroutineScope
的扩展函数,用于创建一个生产者通道,该通道可以与协程一起使用来生成数据流。
示例
fun main() = runBlocking {
val channel = produce<Int> {
for (i in 1..5) {
delay(1000) // 模拟生产者产生数据的延迟
send(i) // 发送数据到通道
}
}
for (item in channel) {
println(item) // 打印从通道接收到的数据
}
println("Done")
}
//输出
1
2
3
4
5
Done
Process finished with exit code 0
可以看到,当所有的数据都已经被发送到通道,并且生产者完成后,程序就正常退出了,这是因为produce() 函数将会自动关闭通道。
上面示例都是通过for循环进行接收生产者发送的数据的,如果使用 channel.receive()单个接收会出现什么问题呢?
示例
fun main() = runBlocking {
val channel = produce<Int> {
for (i in 1..3) {
delay(1000) // 模拟生产者产生数据的延迟
send(i) // 发送数据到通道
}
}
channel.receive()
println("Received 0")
channel.receive()
println("Received 1")
channel.receive()
println("Received 2")
channel.receive()
println("Received 3")
channel.receive()
println("Received 4")
println("Done")
}
//输出
Received 0
Received 1
Received 2
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
通道被关闭,再使用 channel.receive() 会抛出 ClosedReceiveChannelException
异常
如果不使用produce()创建channel,上面代码打印到 Received 2
后,程序会被永远挂起,此处不再写示例
在channel中,对于发送方,我们可以使用“isClosedForSend”来判断当前的 Channel 是否关闭;对于接收方来说,我们可以用“isClosedForReceive”来判断当前的 Channel 是否关闭。
尽管 尽管有上面两个方法判断Channel 是否关闭,使用它们还会遇到其他问题,可以自行了解,此处不再赘述,
记住一句话:最好不要用 channel.receive()
以上代码除了可以使用 for 循环以外,还可以使用 Kotlin 为我们提供的另一个高阶函数:channel.consumeEach {}
·示例·
fun main() = runBlocking {
val channel = Channel<Int>() // 创建一个整数通道
// 启动一个生产者协程,向通道中发送数据
launch {
for (i in 1..5) {
channel.send(i)
}
channel.close() // 关闭通道
}
// 使用 consumeEach 函数消费通道中的元素
channel.consumeEach { element ->
println("Received: $element")
}
println("Done")
}
//输出
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Done
Process finished with exit code 0
需要注意的是,consumeEach() 函数会挂起当前协程,直到通道关闭并且所有的元素都被消费完毕。因此,当使用 consumeEach() 函数时,我们无需手动检查通道是否已关闭,它会自动处理通道的关闭操作。
补充:在某些特殊场景下,如果我们必须要自己来调用 channel.receive(),那么可以考虑使用 receiveCatching(),它可以防止异常发生。
为什么说 Channel 是“热”的?
在编程语境中,“热”(Hot)和"冷"(Cold)通常用于描述数据流的行为方式。
"冷"数据流是指当消费者订阅数据流时,生产者才开始生成数据,并且每个消费者都是独立获取数据的。这意味着当没有消费者订阅时,生产者不会产生任何数据。每个消费者独立消费数据,不会相互影响。
相比之下,"热"数据流是指数据流在生产者开始生成数据之前,消费者就已经订阅了数据流。生产者产生数据时,所有的消费者都会立即接收到数据,并且消费者无法决定何时开始接收数据。在这种情况下,数据流是连续的,不管消费者是否准备好处理数据。
Kotlin 中的 Channel 通常被认为是"热"的,因为它们在创建时就立即开始传递数据。当你向 Channel 发送数据时,无论是否有消费者准备好接收数据,数据仍然会被缓冲或传递。这意味着如果没有消费者在等待接收数据,那么可能会导致数据积压在通道中,直到有消费者准备好接收为止。
由于 Channel 是"热"的,所以在使用 Channel 时需要小心。如果没有适当的处理方式,可能会导致数据积压或消费者无法跟上产生速度,从而引发问题,例如内存泄漏。
总结起来,"热"通道是指数据在产生之前,消费者就已经订阅了通道,并且数据会持续传递给所有消费者,无论他们是否准备好处理数据。这与"冷"数据流不同,后者只有在有消费者订阅时才会开始生成数据。
一句话概括:Channel 是“热”的。这是因为“不管有没有接收方,发送方都会工作”。
感谢:朱涛 · Kotlin 编程第一课