Kotlin协程flow缓冲buffer
先看一个普通的flow:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
fun main(args: Array<String>) {
val delayTime = 100L
runBlocking {
val time = measureTimeMillis {
flowOf("A", "B", "C")
.onEach {
//生产数据,假设产生了耗时操作。
delay(delayTime)
println("$it onEach")
}
.collect {
//消费数据,假设产生了耗时操作。
delay(delayTime)
println("$it collect")
}
}
println("cost= $time ms")
}
}
A onEach
A collect
B onEach
B collect
C onEach
C collect
cost= 673 ms
再看加入buffer后:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.system.measureTimeMillis
fun main(args: Array<String>) {
val delayTime = 100L
runBlocking {
val time = measureTimeMillis {
flowOf("A", "B", "C")
.onEach {
//生产数据,假设产生了耗时操作。
delay(delayTime)
println("$it onEach")
}
.buffer(32) //缓冲区保存的数据最大条数。
.collect {
//消费数据,假设产生了耗时操作。
delay(delayTime)
println("$it collect")
}
}
println("cost= $time ms")
}
}
A onEach
B onEach
A collect
C onEach
B collect
C collect
cost= 465 ms
运行时间缩短。
buffer的意义在onEach和collect之间缓冲,让每一条的数据,不在生产者-消费者之间阻塞。可以看到当A进入collect后,由于要delay(100),所以此时Kotlin直接返回onEach,进行下一条数据的处理,如果没有buffer,则需要等待collect里面的这条数据处理完,才调度回到onEach里面处理下一条。
kotlin协程flow filter map flowOn zip combine(1)_zhangphil的博客-CSDN博客一、flow ,emit,onCompletion,collect。四、map,重组改写数据。八、conflate 合并。九、debounce去重。二、函数作为flow。https://blog.csdn.net/zhangphil/article/details/130084723Kotlin协程flow发送时间间隔debounce_zhangphil的博客-CSDN博客debounce蕴含了一定的缓冲思想,即,不立刻触发事件,而是先把要发射的数据进入队列,稍等一定时间(时间)延迟触发,而触发的重要条件取决于前一条数据和后一条数据的时间间隔,注意,前一条和后一条尚未被发射出去,只是待命中。发射完ABC,Kotlin审视待发射的ABCD,按理说ABCD与ABC间隔200,满足发射timeOut值,但是ABCD与ABCDE间隔100,所以跳过ABCD,直接发射ABCDE。四、map,重组改写数据。A和AB都待发射,A和AB间隔100,所以跳过A,直接发射AB。https://blog.csdn.net/zhangphil/article/details/132515686