Flow
- 一、Flow
- 1、Flow是什么东西?
- 2、实现功能
- 3、特点
- 4、冷流和热流
- 5、流的连续性
- 6、流的构建器
- 7、流的上下文
- 8、指定流所在协程
- 9、流的取消
- 9.1、超时取消
- 9.2、主动取消
- 9.3、密集型任务的取消
- 10、背压和优化
- 10.1、buffer 操作符
- 10.2、 flowOn
- 10.3、conflate 操作符
- 10.4、collectLatest 操作符
- 二、操作符
- 1、变换操作符
- 1.1、buffer (缓存)
- 1.2、map (变换)
- 1.2.1、map
- 1.2.2、mapNotNull (不空的下发)
- 1.2.3、mapLatest
- 1.3、transform (一转多)
- 1.4、reduce (累*加减乘除)
- 1.5、fold(累*加减乘除 and 拼接)
- 1.6、flatMapConcat (有序变换)
- 1.7、flatMapMerge (无序变换)
- 1.8、flatMapLatest (截留)
- 2、过滤型操作符
- 2.1、take (截留)
- 2.1.2、takeWhile
- 2.2、filter(满足条件下发)
- 2.2.2、filterNotNull (不空的下发)
- 2.2.3、filterNot(符合条件的值将被丢弃)
- 2.2.4、filterInstance (筛选符合类型的值)
- 2.3、skip 和 drop(跳过)
- 2.3.2、dropWhile
- 2.4、distinctUntilChanged (过滤重复)
- 2.4.2、distinctUntilChangedBy
- 2.5、single (判断是否一个事件)
- 2.6、first (截留第一个事件)
- 2.7、debounce (防抖动)
- 2.8、conflate
- 2.9、sample (周期采样)
- 3、组合型操作符
- 3.1、count (计数)
- 3.2、zip (合并元素)
- 3.3、combine(合并元素)
- 3.4、merge (合并成流)
- 3.5、flattenConcat (展平流)
- 3.6、flattenMerge(展平流)
- 4、异常操作符
- 4.1、catch (拦截异常)
- 4.2、retry (重试)
- 4.2.2、retryWhen
- 4.3、withTimeout (超时)
- 5、辅助操作符
- 5.1、onXXX
- 5.2、delay (延时)
- 5.3、measureTimeMillis (计时)
- 参考地址
一、Flow
1、Flow是什么东西?
Flow 是有点类似 RxJava 的 Observable
都有冷流和热流之分;
都有流式构建结构;
都包含 map、filter 等操作符。
区别于Observable,Flow可以配合挂起函数使用
2、实现功能
异步返回多个值
可以实现下载功能等,Observable 下发数组时可以实现什么功能,他就能实现什么功能
当文件下载时,对应的后台下载进度,就可以通过Flow里面的emit发送数据,通过collect接收对应的数据。
转:https://blog.csdn.net/qq_30382601/article/details/121825461
3、特点
- flow{…}块中的代码可以挂起
- 使用flow,suspend修饰符可以省略
- 流使用emit函数发射值
- 流使用collect的函数收集值
- flow类似冷流,flow中代码直到流被收集(调用collect)的时候才运行,类似lazy,什么时候用,什么时候执行。
- 流的连续性:流收集都是按顺序收集的
- flowOn可更改流发射的上下文,即可以指定在主线程或子线程中执行
- 与之相对的是热流,我们即将介绍的 StateFlow 和 SharedFlow 是热流,在垃圾回收之前,都是存在内存之中,并且处于活跃状态的。
转:https://blog.csdn.net/zx_android/article/details/122744370
4、冷流和热流
-
冷流
冷流类似冷启动,代码在被用到才会执行,如你需要使用的数据在网络,需要先请求网络才能得到数据
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集的时候才运行。 -
热流
热流类似热启动,代码在用到之前已经准备好,如你请求过网络,数据已经缓存在本地,你只需直接使用即可
5、流的连续性
流的连续性:流收集都是按顺序收集的
6、流的构建器
如下三种为冷流构建器
- flow{emit} .collect{}
- flowOf(***).collect{}
- (***).asFlow().collect{}
@Test
fun `test flow builder`() = runBlocking<Unit> {
flowOf("one", "two", "three")
.onEach { delay(1000) }
.collect { value ->
println(value)
}
(1..3).asFlow().collect { value ->
println(value)
}
flow<Int> {
for (i in 11..13) {
delay(1000) //假装在一些重要的事情
emit(i) //发射,产生一个元素
}
}.collect { value ->
println(value)
}
}
7、流的上下文
flowOn (多用于切线程)
流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存。
fun simpleFlow3() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
@Test
fun `test flow context`() = runBlocking<Unit> {
simpleFlow3()
.collect { value -> println("Collected $value ${Thread.currentThread().name}") }
}
如下:流的发射和接收在一个协程内
Flow started Test worker @coroutine#1
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
flow{…}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发生(emit)
如下这种写法不被允许
fun simpleFlow4() = flow<Int> {
withContext(Dispatchers.Default) {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
}
那么如何切换协程上下文呢?
flowOn操作符,该函数用于更改流发射的上下文
fun simpleFlow5() = flow<Int> {
println("Flow started ${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)
@Test
fun `test flow context`() = runBlocking<Unit> {
simpleFlow5()
.collect { value -> println("Collected $value ${Thread.currentThread().name}") }
}
如下:切换上下文成功
Flow started DefaultDispatcher-worker-2 @coroutine#2
Collected 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
8、指定流所在协程
launchIn 用于指定协程作用域通知flow执行
使用 launchIn 替换 collect 在单独的协程中启动收集流
- 指定协程
//事件源
private fun events() = (1..3)
.asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.Default)
@Test
fun `test flow launch`() = runBlocking<Unit> {
val job = events()
.onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}
.launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow
// .launchIn(this)//这里使用当前上下文传入Flow
job.join()
}
打印:
Event: 1 DefaultDispatcher-worker-2 @coroutine#2
Event: 2 DefaultDispatcher-worker-1 @coroutine#2
Event: 3 DefaultDispatcher-worker-3 @coroutine#2
- 也可以指定当前协程中执行
@Test
fun `test flow launch`() = runBlocking<Unit> {
val job = events()
.onEach { event -> println("Event: $event ${Thread.currentThread().name}") }
// .collect {}
// .launchIn(CoroutineScope(Dispatchers.IO)) //这里使用另一个上下文传入Flow
.launchIn(this)//这里使用当前上下文传入Flow
// job.join()
}
Event: 1 Test worker @coroutine#2
Event: 2 Test worker @coroutine#2
Event: 3 Test worker @coroutine#2
9、流的取消
流采用和协程同样的协作取消。流可以在挂起函数的挂起的时候取消。
9.1、超时取消
withTimeoutOrNull 不能取消密集型任务
fun simpleFlow6() = flow<Int> {
for (i in 1..300) {
delay(1000)
emit(i)
println("Emitting $i")
}
}
@Test
fun `test cancel flow`() = runBlocking<Unit> {
withTimeoutOrNull(2500) {
simpleFlow6().collect { value -> println(value) }
}
println("Done")
}
9.2、主动取消
cancel
@Test
fun `test cancel flow `() = runBlocking<Unit> {
simpleFlow6()
.collect { value ->
if (value == 3) {
cancel()
}
println(value)
}
println("Done")
}
9.3、密集型任务的取消
密集型任务需要流的取消检测
cancel + cancellable
@Test
fun `test cancel flow check`() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
println(value)
if (value == 3) cancel()
println("cancel check ${coroutineContext[Job]?.isActive}")
}
}
10、背压和优化
- 什么是背压?
生产者生产的效率大于消费者消费的效率,元素积压
例,演示背压
fun simpleFlow8() = flow<Int> {
for (i in 1..10) {
// emit 上面这段代码在collect之前执行
delay(100)
emit(i) // 调用collect
// emit下面这段代码在 collect 之后执行
println("Emitting $i ${Thread.currentThread().name}")
}
}
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collect { value ->
delay(200) //处理这个元素消耗 200ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
Collected 1 Test worker @coroutine#1
Emitting 1 Test worker @coroutine#1
Collected 2 Test worker @coroutine#1
Emitting 2 Test worker @coroutine#1
Collected 3 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#1
Collected 4 Test worker @coroutine#1
Emitting 4 Test worker @coroutine#1
Collected 5 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Emitting 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Emitting 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Emitting 10 Test worker @coroutine#1
Collected in 3169 ms
- 如何解决背压?
通过缓存进行性能优化
10.1、buffer 操作符
并发运行流中发射元素的代码
注意:for (i in 1…10) 这里用的是 1到 10,原因是 for循环 有耗时问题,通过打印时间戳在 for (i in 1…x) 上下,发现 for (i in 1…x) 这行代码有时耗时超过200毫秒,目前不知是何问题,特此记录,为方便对比优化时长,使用1到10.
@Test
fun `test flow back pressure buffer`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.buffer(10) //缓存发射事件
.collect { value ->
delay(200) //处理这个元素消耗 200ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2398 ms
10.2、 flowOn
flowOn(),修改流上下文,达到异步处理的效果,从而优化背压
@Test
fun `test flow back pressure flowOn`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.flowOn(Dispatchers.IO)
.collect { value ->
delay(200) //处理这个元素消耗 200ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
Emitting 1 DefaultDispatcher-worker-1 @coroutine#2
Emitting 2 DefaultDispatcher-worker-1 @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 DefaultDispatcher-worker-1 @coroutine#2
Emitting 4 DefaultDispatcher-worker-1 @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 DefaultDispatcher-worker-1 @coroutine#2
Emitting 6 DefaultDispatcher-worker-1 @coroutine#2
Collected 3 Test worker @coroutine#1
Emitting 7 DefaultDispatcher-worker-1 @coroutine#2
Emitting 8 DefaultDispatcher-worker-1 @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 9 DefaultDispatcher-worker-1 @coroutine#2
Emitting 10 DefaultDispatcher-worker-1 @coroutine#2
Collected 5 Test worker @coroutine#1
Collected 6 Test worker @coroutine#1
Collected 7 Test worker @coroutine#1
Collected 8 Test worker @coroutine#1
Collected 9 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 2385 ms
10.3、conflate 操作符
conflate(),合并发射项,处理最新的值,不对每个值进行处理;
@Test
fun `test flow back pressure conflate`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.conflate()
.collect { value ->
delay(200) //处理这个元素消耗 200ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Collected 1 Test worker @coroutine#1
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Collected 2 Test worker @coroutine#1
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Collected 4 Test worker @coroutine#1
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Collected 6 Test worker @coroutine#1
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 8 Test worker @coroutine#1
Collected 10 Test worker @coroutine#1
Collected in 1554 ms
10.4、collectLatest 操作符
collectLatest(),取消并重新发射最后一个值
@Test
fun `test flow back pressure collectLatest`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collectLatest { value ->
delay(200) //处理这个元素消耗 200ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
Emitting 1 Test worker @coroutine#2
Emitting 2 Test worker @coroutine#2
Emitting 3 Test worker @coroutine#2
Emitting 4 Test worker @coroutine#2
Emitting 5 Test worker @coroutine#2
Emitting 6 Test worker @coroutine#2
Emitting 7 Test worker @coroutine#2
Emitting 8 Test worker @coroutine#2
Emitting 9 Test worker @coroutine#2
Emitting 10 Test worker @coroutine#2
Collected 10 Test worker @coroutine#12
Collected in 1648 ms
二、操作符
1、变换操作符
1.1、buffer (缓存)
上面背压有栗子
1.2、map (变换)
1.2.1、map
map 是变换元素
data class Student(var name: String, var age: Int)
private suspend fun performRequest(age: Int): Student {
delay(500)
return Student("这是name", age)
}
@Test
fun `test map flow operator`() = runBlocking<Unit> {
(1..3).asFlow()
.map { request -> performRequest(request) }
.collect { value -> println(value) }
}
Student(name=这是name, age=1)
Student(name=这是name, age=2)
Student(name=这是name, age=3)
1.2.2、mapNotNull (不空的下发)
@Test
fun `test mapNotNull flow operator`() = runBlocking<Unit> {
flow {
emit(1)
emit(3)
emit(2)
}
.mapNotNull { request ->
if (1 == request) {
null
} else {
Student("这是name", request)
}
}
.collect { value -> println(value) }
}
Student(name=这是name, age=3)
Student(name=这是name, age=2)
1.2.3、mapLatest
当有新值发送时,如果上个转换还没结束,会取消掉,用法同map
@Test
fun `test mapLatest flow operator`() = runBlocking<Unit> {
flow {
emit(1)
emit(2)
emit(3)
}.mapLatest {
if (2 == it) delay(100L)
"it is $it"
}.collect {
println(it)
}
}
1.3、transform (一转多)
@Test
fun `test transform flow operator`() = runBlocking<Unit> {
(1..3).asFlow()
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}.collect { value -> println(value) }
}
Making request 1
Student(name=这是name, age=1)
Making request 2
Student(name=这是name, age=2)
Making request 3
Student(name=这是name, age=3)
1.4、reduce (累*加减乘除)
@Test
fun `test reduce operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(1)
emit(1)
emit(2)
emit(3)
emit(3)
emit(4)
}.reduce { accumulator, value -> accumulator + value })
}
1.5、fold(累*加减乘除 and 拼接)
- 加
@Test
fun `test fold + operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(1)
emit(1)
emit(2)
emit(3)
emit(3)
emit(4)
}.fold(3) { accumulator, value -> accumulator + value })
}
17
- 减
@Test
fun `test fold - operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(2)
emit(3)
}.fold(18) { accumulator, value -> accumulator - value })
}
13
- 乘
@Test
fun `test fold multiply by operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(1)
emit(1)
emit(2)
emit(3)
}.fold(3) { accumulator, value -> accumulator * value })
}
18
- 除
@Test
fun `test fold devide operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(2)
emit(3)
}.fold(18) { accumulator, value -> accumulator / value })
}
3
- 拼接
@Test
fun `test fold joint operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(1)
emit(2)
emit(3)
}.fold("拼接") { accumulator, value -> return@fold "$accumulator =+= $value" })
}
拼接 =+= 1 =+= 2 =+= 3
1.6、flatMapConcat (有序变换)
元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素会等待。
@Test
fun `test flatMapConcat operator`() = runBlocking<Unit> {
(1..5).asFlow()
.onEach { delay(100) }
.flatMapConcat { num ->
flow {
if (3==num){
delay(200)
}
emit("num: $num")
}
}.collect {
println("value -> $it")
}
}
value -> num: 1
value -> num: 2
value -> num: 3
value -> num: 4
value -> num: 5
1.7、flatMapMerge (无序变换)
元素会变换完,以流的形式继续下发,并且某个元素需要耗时,它后面的元素不会等待。
@Test
fun `test flatMapMerge operator`() = runBlocking<Unit> {
(1..5).asFlow()
.onEach { delay(100) }
.flatMapMerge() { num ->
flow {
if (3==num){
delay(200)
}
emit("num: $num")
}
}.collect {
println("value -> $it")
}
}
value -> num: 1
value -> num: 2
value -> num: 4
value -> num: 3
value -> num: 5
1.8、flatMapLatest (截留)
快速执行的事件都正常下发,
当有新值发送时,如果上个转换还没结束,会上取消掉上一个,直接下发新值。
@Test
fun `test flatMapLatest operator`() = runBlocking<Unit> {
(1..5).asFlow()
.onEach { delay(100) }
.flatMapLatest() { num ->
flow {
if (3 == num) {
delay(200)
}
emit("num: $num")
emit("num2: $num")
}
}.collect {
println("value -> $it")
}
}
value -> num: 1
value -> num2: 1
value -> num: 2
value -> num2: 2
value -> num: 4
value -> num2: 4
value -> num: 5
value -> num2: 5
2、过滤型操作符
2.1、take (截留)
跟Rxjava一样
fun numbers() = flow<Int> {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
@Test
fun `test limit length operator`() = runBlocking<Unit> {
//take(2),表示 当计数元素被消耗时,原始流被取消
numbers().take(2).collect { value -> println(value) }
}
1
2
Finally in numbers
2.1.2、takeWhile
找到第一个不满足条件的值,发送它之前的值,和dropWhile相反
@Test
fun `test takeWhile operator`() = runBlocking<Unit> {
flow<Int> {
emit(2)
emit(1)
emit(3)
emit(4)
emit(1)
}.takeWhile { it < 2 }
.collect { value -> println(value) }
}
如上什么也不会输出;
@Test
fun `test takeWhile operator`() = runBlocking<Unit> {
flow<Int> {
emit(1)
emit(2)
emit(3)
emit(4)
emit(1)
}.takeWhile { it < 2 }
.collect { value -> println(value) }
}
会输出 1
2.2、filter(满足条件下发)
跟Rxjava一样
@Test
fun `test filter operator`() = runBlocking<Unit> {
numbers().filter {
it == 2
}.collect { value -> println(value) }
}
2.2.2、filterNotNull (不空的下发)
@Test
fun `test filterNotNull flow operator`() = runBlocking<Unit> {
flow {
emit(1)
emit(3)
emit(null)
emit(2)
}
.filterNotNull ()
.collect { value -> println(value) }
}
1
3
2
2.2.3、filterNot(符合条件的值将被丢弃)
筛选不符合条件的值,相当于filter取反
@Test
fun `test filterNot operator`() = runBlocking<Unit> {
flow<Int> {
emit(1)
emit(2)
emit(3)
}.filterNot {
it > 2
}.collect { value -> println(value) }
}
1
2
2.2.4、filterInstance (筛选符合类型的值)
对标rxjava中的ofType
筛选符合类型的值(不符合类型的值将被丢弃)
@Test
fun `test filterInstance operator`() = runBlocking<Unit> {
flow<Any> {
emit(1)
emit("2")
emit(3)
emit("str")
}.filterIsInstance<String>()
.collect { value -> println(value) }
}
2
str
2.3、skip 和 drop(跳过)
@Test
fun `test skip operator`() = runBlocking<Unit> {
numbers()
.drop(2)
.collect { value -> println(value) }
}
输出
3
2.3.2、dropWhile
找到第一个不满足条件的值,继续发送它和它之后的值
@Test
fun `test dropWhile operator`() = runBlocking<Unit> {
numbers()
.dropWhile { it <= 2 }
.collect { value -> println(value) }
}
This line will not execute
3
Finally in numbers
2.4、distinctUntilChanged (过滤重复)
@Test
fun `test distinctUntilChanged operator`() = runBlocking<Unit> {
flow<Int> {
emit(1)
emit(1)
emit(2)
emit(3)
emit(3)
emit(4)
}
.distinctUntilChanged()
.collect { value -> println(value) }
}
2.4.2、distinctUntilChangedBy
判断两个连续值是否重复,可以设置是否丢弃重复值。
去重规则有点复杂,没完全懂
@Test
fun `test distinctUntilChangedBy operator`() = runBlocking<Unit> {
flowOf(
Student(name = "Jack", age = 11),
Student(name = "Tom", age = 10),
Student(name = "Jack", age = 12),
Student(name = "Jack", age = 13),
Student(name = "Tom", age = 11)
)
.distinctUntilChangedBy { it.name == "Jack" }
.collect { //第三个Stu将被丢弃
println(it.toString())
}
}
Student(name=Jack, age=11)
Student(name=Tom, age=10)
Student(name=Jack, age=12)
Student(name=Tom, age=11)
2.5、single (判断是否一个事件)
用于确保 flow 输出值唯一。若只有一个值,则可以正常执行,若输出的值不止只有一个的时候,就会抛出异常:
@Test
fun `test single operator`() = runBlocking<Unit> {
try {
println(flow<Int> {
emit(1)
emit(1)
emit(2)
emit(3)
emit(3)
emit(4)
}.single())
} catch (e: Exception) {
println("e =$e")
}
}
如果一个事件,就正常执行;否则异常。
e =java.lang.IllegalArgumentException: Flow has more than one element
2.6、first (截留第一个事件)
@Test
fun `test first operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(1)
emit(1)
emit(2)
emit(3)
emit(3)
emit(4)
}.first())
}
1
2.7、debounce (防抖动)
@Test
fun `test debounce operator`() = runBlocking<Unit> {
flowOf(
Student(name = "Jack", age = 11),
Student(name = "Tom", age = 10),
Student(name = "Jack", age = 12),
Student(name = "Jack", age = 13),
Student(name = "Tom", age = 11)
)
.onEach {
if (it.name == "Jack" && it.age == 13)
delay(500)
}
.debounce(500)
.collect { //第三个Stu将被丢弃
println(it.toString())
}
}
Student(name=Jack, age=12)
Student(name=Tom, age=11)
2.8、conflate
见 10.3、conflate
仅保留最新值, 内部就是 buffer(CONFLATED``)
2.9、sample (周期采样)
固定周期采样 ,给定一个时间周期,保留周期内最后发出的值,其他的值将被丢弃
sample操作符与debounce操作符有点像,但是却限制了一个周期性时间,sample操作符获取的是一个周期内的最新的数据,可以理解为debounce操作符增加了周期的限制。
@Test
fun `test sample operator`() = runBlocking<Unit> {
flow {
repeat(10) {
delay(50)
emit(it)
}
}.sample(100).collect {
println(it)
}
}
0
2
4
6
8
3、组合型操作符
3.1、count (计数)
@Test
fun `test count operator`() = runBlocking<Unit> {
println(flow<Int> {
emit(1)
emit(1)
emit(2)
emit(3)
emit(3)
emit(4)
}
.count())
}
3.2、zip (合并元素)
跟Rxjava一样
@Test
fun `test zip operator`() = runBlocking<Unit> {
val nameFlow = mutableListOf("小红", "小黑").asFlow()
val numFlow = (1..3).asFlow()
nameFlow.zip(numFlow) { string, num ->
"$string:$num"
}.collect {
println("value -> $it")
}
}
3.3、combine(合并元素)
@Test
fun `test combine operator`() = runBlocking<Unit> {
val nameFlow = mutableListOf("小红", "小黑").asFlow()
val numFlow = (1..3).asFlow()
nameFlow.combine(numFlow) { string, num ->
"$string:$num"
}.collect {
println("value -> $it")
}
}
value -> 小红:1
value -> 小黑:2
value -> 小黑:3
3.4、merge (合并成流)
merge 是将两个flow合并起来,将每个值依次发出来
@Test
fun `test merge operator`() = runBlocking<Unit> {
val flow1 = listOf(1, 2)
.asFlow()
val flow2 = listOf("one", "two", "three")
.asFlow()
merge(flow1, flow2)
.collect { value -> println(value) }
}
1
2
one
two
three
3.5、flattenConcat (展平流)
展平操作符 flattenConcat 以顺序方式将给定的流展开为单个流,通俗点讲,减少层级 ,感觉和merge这么像呢,这个不太理解啥用
@Test
fun `test flattenConcat operator`() = runBlocking<Unit> {
val flow1 = listOf(1, 2)
.asFlow()
val flow2 = listOf("one", "two", "three")
.asFlow()
val flow3 = listOf("x", "xx", "xxx")
.asFlow()
flowOf(flow1, flow2, flow3)
.flattenConcat()
.collect { value -> println(value) }
}
1
2
one
two
three
x
xx
xxx
3.6、flattenMerge(展平流)
flattenMerge 作用和 flattenConcat 一样,但是可以设置并发收集流的数量
@Test
fun `test flattenMerge operator`() = runBlocking<Unit> {
val flow1 = listOf(1, 2)
.asFlow()
val flow2 = listOf("one", "two", "three")
.asFlow()
val flow3 = listOf("x", "xx", "xxx")
.asFlow()
flowOf(flow1, flow2, flow3)
.flattenMerge(2)
.collect { value -> println(value) }
}
1
2
one
two
three
x
xx
xxx
4、异常操作符
4.1、catch (拦截异常)
对标rxjava 中的 onErrorResumeNext
Exception、Throwable、Error 都会拦截
@Test
fun `test catch operator`() = runBlocking<Unit> {
(1..5).asFlow()
.onEach { delay(100) }
.onEach { if (2 == it) throw NullPointerException() }
.catch {
emit(110)
println("e == $it")
}
.collect {
println("value -> $it")
}
}
@Test
fun `test catch operator`() = runBlocking<Unit> {
(1..5).asFlow()
.onEach { delay(100) }
.onEach { if (2 == it)
// throw Exception("测试 异常")
// throw Throwable("测试 异常")
throw Error("测试 错误")
}
.catch {
emit(110)
println("e == $it")
}
.collect {
println("value -> $it")
}
}
value -> 1
value -> 110
e == java.lang.Error: 测试 错误
4.2、retry (重试)
所有异常错误都拦截
- 拦截次数
@Test
fun `test retry operator`() = runBlocking<Unit> {
flow<Any> {
emit(1)
emit(2)
throw Exception("异常")
emit(3)
}.retry(2)
.catch { emit(110) }
.collect { value -> println(value) }
}
- 拦截条件
@Test
fun `test retry 2 operator`() = runBlocking<Unit> {
flow<Any> {
emit(1)
emit(2)
throw Error("异常")
emit(3)
}.retry { it.message == "异常" }
.catch { emit(110) }
.collect { value -> println(value) }
}
如上,满足拦截条件,所以会一直打印日志
1
2
1
2
1
2
1
2
1
... 不杀死程序一直打印
4.2.2、retryWhen
4.3、withTimeout (超时)
@Test
fun `test retry 2 operator`() = runBlocking<Unit> {
withTimeout(2500) {
flow<Any> {
emit(1)
throw Error("异常")
}.retry { it.message == "异常" }
.catch { emit(110) }
.collect { value -> println(value) }
}
}
输出:
1
1
... 好多个
1
1
1
Timed out waiting for 2500 ms
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms
(Coroutine boundary)
at com.yoshin.kt.kotlindemo20220713.ExampleUnitTest$test retry 2 operator$1.invokeSuspend(ExampleUnitTest.kt:928)
Caused by: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 2500 ms
at app//kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:184)
at app//kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:154)
at app//kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:508)
at app//kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:284)
at app//kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:108)
at java.base@11.0.13/java.lang.Thread.run(Thread.java:834)
5、辅助操作符
5.1、onXXX
onXXX 的方法包含
onCompletion 流完成时调用
onStart 流开始时调用
onEach 元素下发时调用,每次下发都调用
对比rxjava 中:
onCompletion == doOnComplete
onStart == doOnSubscribe 或者 doOnLifecycle
onEach == doNext
@Test
fun `test do operator`() = runBlocking<Unit> {
(1..5).asFlow()
.onCompletion { println(" onCompletion == $it ") }
.onStart { println(" onStart ") }
.onEach { println(" onEach == $it ") }
.collect {
println("value -> $it")
}
}
5.2、delay (延时)
延时
private fun events() = (1..3)
.asFlow()
.onEach { delay(100) }
.flowOn(Dispatchers.Default)
5.3、measureTimeMillis (计时)
测量代码用时
@Test
fun `test flow back pressure`() = runBlocking<Unit> {
val time = measureTimeMillis {
simpleFlow8()
.collect { value ->
delay(200) //处理这个元素消耗 200ms
println("Collected $value ${Thread.currentThread().name}")
}
}
println("Collected in $time ms")
}
参考地址
笔记大部分内容来自动脑学院的文章和视频
动脑学院
:https://blog.csdn.net/qq_30382601/article/details/121825461
Kotlin 之 协程(三)Flow异步流
:https://blog.csdn.net/zx_android/article/details/122744370
Android Kotlin之Flow数据流:https://blog.csdn.net/u013700502/article/details/120526170