数据流以协程为基础构建,与仅返回单个值的挂起函数相反,数据流可按顺序发出多个值。从概念上来讲,数据流是可通过异步方式进行计算处理的一组数据序列。所发出值的类型必须相同。
数据流包含三个实体:
- 提供方会生成添加到数据流中的数据。得益于协程,数据流还可以异步生成数据。
- (可选)中介可以修改发送到数据流的值,或修正数据流本身。
- 使用方则使用数据流中的值。
官方文档:戳一下
中文文档:戳一下
下面介绍流的使用说明:
- 一、异步流
- 二、流的构建器
- 三、冷流
- 四、流的连续性
- 五、流的上下文
- 六、在指定协程中收集流(launchIn)
- 七、启动流
- 八、流的取消
- 九、流的取消检测
- 十、背压处理
- 十一、流的操作符
- 十二、流的异常处理
- 十三、流的完成
- 十四、StateFlow跟SharedFlow使用
一、异步流
如果,我们知道,挂起函数可以异步的返回单个值,想要异步返回多个值,我们应该怎么做呢?
我们先使用List看下,
- List
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
打印结果
1
2
3
List#forEach可以返回多个值,但是不是异步的
- 序列
我们延迟100毫秒,用于表示计算消耗时间,
fun simple(): Sequence<Int> = sequence { // 序列构建器
for (i in 1..3) {
Thread.sleep(100) // 假装我们正在计算
yield(i) // 产生下一个值
}
}
fun main() {
simple().forEach { value -> println(value) }
}
上面代码,在每次打印之前,都会延迟100毫秒。但是,Thread.sleep也是阻塞的,不是异步的。
- 挂起函数
上面这些,计算过程会阻塞运行该代码的主线程。 当这些值由异步代码计算时,我们可以使用 suspend 修饰符标记函数 simple, 这样它就可以在不阻塞的情况下执行其工作并将结果作为列表返回:
suspend fun simple(): List<Int> {
delay(1000) // 假装我们在这里做了一些异步的事情
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
上述代码,会在挂起后,异步返回多个值。但是,是直接返回所有的数字,并不是每延迟100毫秒,返回一个,跟我们的预期不同。
- 流
使用上述代码,我们只能做到一次返回所有的值。为了表示异步计算的值流(stream),我们可以通过Flow类型来实现
fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(100) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}
fun main() = runBlocking<Unit> {
// 启动并发的协程以验证主线程并未阻塞
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// 收集这个流
simple().collect { value -> println(value) }
}
上述代码,可以在不阻塞主线程的情况爱。每等待 100 毫秒打印一个数字,以达到预期
打印结果
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
通过流可以异步返回多个值,我们能立刻想到一个经典的使用场景:文件下载,获取下载进度。
使用Flow跟之前其他方式区别
- 名为flow的Flow类型构建器函数
- flow{}构建块中的代码可以挂起
- 函数simple不再标有suspend修饰符
- 流使用emit()函数发射值
- 流使用collect()函数收集值
二、流的构建器
流通过flowof或asFlow扩展函数来构建;通过collect函数来收集。
流的构建方式:
- flowOf构建器定义了一个发射固定值集的流
- 使用.asFlow()扩展函数,可以将各种集合与序列转换成流
示例代码
fun flowFive() {
launch {
flowOf("flowOne", "flowTwo", "flowThree")
.onEach { delay(1000) }
.collect { printLog("$it") }
(1..5).asFlow().collect {
printLog("$it")
}
}
}
三、冷流
Flow是一种类似于序列的冷流,flow构建器中的代码直到流被收集(collect)的时候才运行
看下面代码
suspend fun simpleFlow() = flow<Int> {
printLog("flow 开始创建")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun flowThree() {
launch {
val flow = simpleFlow()
printLog("开始逻辑")
flow.collect {
printLog("$it")
}
}
}
打印结果:
开始逻辑
flow 开始创建
1
2
3
四、流的连续性
流的连续性:
- 流的每次单独收集都是按顺序执行的,除非使用特殊操作符
- 从上游到下游每个过渡操作符都会处理每个发射出的值然后再交给末端操作符。
示例代码
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
执行结果
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
五、流的上下文
- 流的收集总是在调用协程的上下文中发生,流的该属性称为上下文保存
- flow{}构建器中的代码必须遵循上下文保存属性,并且不允许从其他上下文中发射(emit)
- flowOn操作符,该函数用于更改流发生的上下文
通过flowOn改变线程
suspend fun simpleFlow2() = flow<Int> {
printLog("flow 线程:${Thread.currentThread().name}")
for (i in 1..3) {
delay(1000)
emit(i)
}
}.flowOn(Dispatchers.Default)
fun flowSix() {
launch {
simpleFlow2().collect {
printLog("$it,线程:${Thread.currentThread().name}")
}
}
}
六、在指定协程中收集流(launchIn)
使用launchIn替换collect,我们可以在单独的协程中启动流的收集。指定作用域
fun simpleFlow3() = (1..3).asFlow().onEach { delay(1000) }
.flowOn(Dispatchers.Default)
fun flowSeven() {
launch {
val job = simpleFlow3().onEach { printLog("value:$it,Thread:${Thread.currentThread().name}") }
.launchIn(CoroutineScope(SupervisorJob() + Dispatchers.IO))
//这里可以取消
job.cancel()
}
}
七、启动流
使用流表示来自一些源的异步事件是很简单的。
在这个案例中,我们需要一个类似 监听(addEventListener) 的函数,该函数注册一段响应的代码,来处理即将到来的事件,并继续进行进一步的处理。onEach 操作符可以担任该角色。 然而,onEach 是一个过渡操作符。我们也需要一个末端操作符来收集流。 否则仅调用 onEach 是无效的。
如果我们在 onEach 之后使用 collect 末端操作符,那么后面的代码会一直等待直至流被收集:
// 模仿事件流
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- 等待流收集
println("Done")
}
打印结果
Event: 1
Event: 2
Event: 3
Done
八、流的取消
流采用与协程同样的协作取消。像往常一样,流的收集可以是当流在一个可取消的挂起函数(如,delay)中挂起的时候取消
在协程中启动流,协程取消了,流也就取消了
suspend fun simpleFlow4() = flow<Int> {
printLog("flow 开始创建")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun flowEight() {
launch {
//在超时操作时,取消
withTimeoutOrNull(2100){
simpleFlow4().collect{
printLog("value: $it")
}
}
printLog("执行完成")
}
}
九、流的取消检测
- 为方便起见,流构建器对每个发射值执行附加的ensureActive检测以进行取消,这意味着从flow{}发出的频繁循环是可以取消的。
- 处于性能原因,大多数其他流操作不会自行执行其他取消检测,在协程处于繁忙循环的情况下,必须明确检测是否取消
- 通过cancellable操作符来执行此操作
suspend fun simpleFlow5() = flow<Int> {
printLog("flow 开始创建")
for (i in 1..3) {
delay(1000)
emit(i)
}
}
fun flowNine() {
launch {
simpleFlow5().collect {
printLog("value: $it")
if (it == 2) {
cancel()
}
}
}
}
繁忙的时候,取消不成功
fun flow10() {
launch {
(1..9).asFlow().collect {
printLog("value: $it")
if (it == 5) {
cancel()
}
}
}
}
如果,我们在这种情况也需要检测是否取消的话。需要使用cancellable
fun flow10() {
launch {
(1..9).asFlow().cancellable().collect {
printLog("value: $it")
if (it == 5) {
cancel()
}
}
}
}
十、处理背压
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
- buffer(),并发运行流中的发射元素
- conflate(),合并发射项,不对每个值进行处理(优化消费者)
- collectLatest(),取消并重新发射最后一个值
- 当必须更改CoroutineDispatcher时,flowOn操作符使用了相同的缓冲机制,但是buffer()函数显式地请求缓冲而不改变执行的上下文(线程)
生产者速度 > 消费者时,产生背压,看代码
suspend fun simpleFlow6() = flow<Int> {
printLog("flow 开始创建,Thread:${Thread.currentThread().name}")
for (i in 1..3) {
//生产出来,需要100
delay(100)
emit(i)
}
}
fun flow11() {
launch {
val time = measureTimeMillis {
simpleFlow6().collect {
//消耗需要200。这样就产生了背压
delay(200)
printLog("value: $it,Thread:${Thread.currentThread().name}")
}
}
printLog("总耗时:$time")
}
}
上面这种,生产者,每过100毫秒就会发射一次数据。消费者(collect),需要200毫秒,才能消费一次数据。这样就差生了背压。
- 使用缓冲,优化背压
fun flow11() {
launch {
val time = measureTimeMillis {
simpleFlow6().buffer(10).collect {
//消耗需要200毫秒。这样就产生了背压
delay(200)
printLog("value: $it,Thread:${Thread.currentThread().name}")
}
}
printLog("总耗时:$time")
}
}
- 使用flowOn切换线程,处理
fun flow11() {
launch {
val time = measureTimeMillis {
simpleFlow6().flowOn(Dispatchers.Default).collect {
//消耗需要200。这样就产生了背压
delay(200)
printLog("value: $it,Thread:${Thread.currentThread().name}")
}
}
printLog("总耗时:$time")
}
}
- 使用conflate(),合并发射项,不对每项值进行处理
fun flow11() {
launch {
val time = measureTimeMillis {
simpleFlow6()
// .buffer(20)
// .flowOn(Dispatchers.Default)
.conflate()
.collect {
//消耗需要200。这样就产生了背压
delay(200)
printLog("value: $it,Thread:${Thread.currentThread().name}")
}
}
printLog("总耗时:$time")
}
}
这种方式,可能会丢失中间的值,直接使用最新的值。
- 使用collectLatest处理
fun flow11() {
launch {
val time = measureTimeMillis {
simpleFlow6()
// .buffer(20)
// .flowOn(Dispatchers.Default)
.conflate()
.collectLatest {
//消耗需要200。这样就产生了背压
delay(200)
printLog("value: $it,Thread:${Thread.currentThread().name}")
}
}
printLog("总耗时:$time")
}
}
只使用最新的值
十一、流操作符
11.1 转换操作符
- map操作符
https://kotlinlang.org/docs/flow.html#intermediate-flow-operators
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
打印结果
response 1
response 2
response 3
- Transform 操作符
在流转换转换符中,最通用的就是Transform。它可以用于模拟简单的变换,如映射和过滤器,以及实现更复杂的变换。使用Transform操作符,我们可以发出任意次数的任意值。
代码示例
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
打印结果
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
11.2 限制大小操作符
当达到相应的限制时,大小限制中间运算符(如take)将取消流的执行。协程中的取消总是通过抛出异常来执行,以便在取消的情况下,所有资源管理函数(如try{…}finally{…}块)都正常运行:
- take操作符
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
//打印结果
1
2
Finally in numbers
11.3 末端操作符
末端操作符是在流上用于启动流收集的挂起函数。 collect 是最基础的末端操作符,但是还有另外一些更方便使用的末端操作符:
- 转化为各种集合,例如 toList 与 toSet。
- 获取第一个(first)值与确保流发射单个(single)值的操作符。
- 使用 reduce 与 fold 将流规约到单个值。
代码示例
val sum = (1..5).asFlow()
.map { it * it } // 数字 1 至 5 的平方
.reduce { a, b -> a + b } // 求和(末端操作符)
println(sum)
打印结果
55
11.4 组合操作符
- Zip
就像 Kotlin 标准库中的 Sequence.zip 扩展函数一样, 流拥有一个 zip 操作符用于组合两个流中的相关值:
val nums = (1..3).asFlow() // 数字 1..3
val strs = flowOf("one", "two", "three") // 字符串
nums.zip(strs) { a, b -> "$a -> $b" } // 组合单个字符串
.collect { println(it) } // 收集并打印
打印结果
1 -> one
2 -> two
3 -> three
- Combine
当流表示一个变量或操作的最新值时,可能需要执行计算,这依赖于相应流的最新值,并且每当上游流产生值的时候都需要重新计算。这种相应的操作符就是combine
例如,先前示例中的数字如果每 300 毫秒更新一次,但字符串每 400 毫秒更新一次, 然后使用 zip 操作符合并它们,但仍会产生相同的结果, 尽管每 400 毫秒打印一次结果:
我们在本示例中使用 onEach 过渡操作符来延时每次元素发射并使该流更具说明性以及更简洁。
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
下面,我们用combine代替zip
val nums = (1..3).asFlow().onEach { delay(300) } // 发射数字 1..3,间隔 300 毫秒
val strs = flowOf("one", "two", "three").onEach { delay(400) } // 每 400 毫秒发射一次字符串
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" } // 使用“combine”组合单个字符串
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
我们得到了完全不同的输出,其中,nums 或 strs 流中的每次发射都会打印一行:
1 -> one at 452 ms from start
2 -> one at 651 ms from start
2 -> two at 854 ms from start
3 -> two at 952 ms from start
3 -> three at 1256 ms from start
11.5 展平操作符
主要有:
- flatMapConcat
- flatMapMerge
- flatMapLatest
流表示异步接收的值序列,所以很容易遇到这样的情况: 每个值都会触发对另一个值序列的请求。比如说,我们可以拥有下面这样一个返回间隔 500 毫秒的两个字符串流的函数:
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
现在,如果我们有一个包含三个整数的流,并为每个整数调用 requestFlow,如下所示
(1..3).asFlow().map { requestFlow(it) }
然后我们得到了一个包含流的流(Flow<Flow>),需要将其进行展平为单个流以进行下一步处理。集合与序列都拥有 flatten 与 flatMap 操作符来做这件事。然而,由于流具有异步的性质,因此需要不同的展平模式, 为此,存在一系列的流展平操作符。
flatMapConcat
连接模式由 flatMapConcat 与 flattenConcat 操作符实现。它们是相应序列操作符最相近的类似物。它们在等待内部流完成之前开始收集下一个值
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
打印结果
1: First at 121 ms from start
1: Second at 622 ms from start
2: First at 727 ms from start
2: Second at 1227 ms from start
3: First at 1328 ms from start
3: Second at 1829 ms from start
flatMapMerge
另一种展平模式是并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发射值。 它由 flatMapMerge 与 flattenMerge 操作符实现。他们都接收可选的用于限制并发收集的流的个数的 concurrency 参数(默认情况下,它等于 DEFAULT_CONCURRENCY)。
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
打印结果
1: First at 136 ms from start
2: First at 231 ms from start
3: First at 333 ms from start
1: Second at 639 ms from start
2: Second at 732 ms from start
3: Second at 833 ms from start
flatMapLatest
与 collectLatest 操作符类似,也有相对应的“最新”展平模式,在发出新流后立即取消先前流的收集。 这由 flatMapLatest 操作符来实现。
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
打印结果
1: First at 142 ms from start
2: First at 322 ms from start
3: First at 425 ms from start
3: Second at 931 ms from start
注意,flatMapLatest 在一个新值到来时取消了块中的所有代码 (本示例中的 { requestFlow(it) })。 这在该特定示例中不会有什么区别,由于调用 requestFlow 自身的速度是很快的,不会发生挂起, 所以不会被取消。然而,如果我们要在块中调用诸如 delay 之类的挂起函数,这将会被表现出来。
十二、流的异常处理
当运算符中的发射器或代码抛出异常时的处理方式:
- try/catch处理
- catch函数处理
try/catch块是我们Kotlin中常用的异常捕获功能。在流的处理中我们也可以通过专门的catch()函数来处理流在发射到收集整个过程中出现的异常。
try/catch处理
在收集器的时候出现异常的处理。首先,我们可以使用Kotlin中的try/catch来处理
suspend fun simpleFlow7() = flow<Int> {
for (i in 1..3) {
//生产出来,需要100
delay(100)
emit(i)
}
}
fun flow12() {
launch {
try {
simpleFlow6()
.collect {
//消耗需要200。这样就产生了背压
printLog("value: $it")
//如果值小于1,抛出异常
check(it <= 1) { "result: $it" }
}
} catch (e: Throwable) {
printLog("Exception:$e")
}
}
}
这里可以看到,异常被捕获了
但是,流必须对异常透明,即在 flow { … } 构建器内部的 try/catch 块中发射值是违反异常透明性的。
catch函数处理
发射器可以使用 catch 操作符来保留此异常的透明性并允许封装它的异常处理。catch 操作符的代码块可以分析异常并根据捕获到的异常以不同的方式对其做出反应:
- 可以使用 throw 重新抛出异常。
- 可以使用 catch 代码块中的 emit 将异常转换为值发射出去。
- 可以将异常忽略,或用日志打印,或使用一些其他代码处理它。
代码示例
fun flow13() {
launch {
flow<Int> {
emit(2)
//主动抛出异常
throw NullPointerException("数据异常")
}.catch {_ -> emit(-1) } //出现异常后,重新发送一个数据过去
.flowOn(Dispatchers.IO)
.collect { printLog("$it") }
}
}
这里,主动抛出异常,通过catch函数捕获后,重新发送一个数据到末端。
透明捕获
catch 过渡操作符遵循异常透明性,仅捕获上游异常(catch 操作符上游的异常,但是它下面的不是)。 如果 collect { … } 块(位于 catch 之下)抛出一个异常,那么异常会逃逸
代码示例
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // 不会捕获下游异常
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
上面的代码尽管有catch函数。但是,因为异常是在collect末端出现的(catch函数之后出现的),所以,这个异常也是捕获不了的。
解决方案: 声明式捕获
我们可以将 catch 操作符的声明性与处理所有异常的期望相结合,将 末端(collect) 操作符的代码块移动到 onEach 中,并将其放到 catch 操作符之前。收集该流必须由调用无参的 collect() 来触发
简单说就是如果,我们想让catch函数处理所有的异常。我们需要:
- 我们就需要把collect()函数中的代码,放到onEach中(catch函数之前)执行
- 调用无参的collect()函数来收集该流
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
打印结果
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
这样,就可以在没有显示使用try/catch块的情况下捕获所有的异常了。
十三、流的完成
当流收集完成时(普通情况或异常情况),它可能需要执行一个动作。
- 命令式finally块,处理
- 声明式处理onCompletion函数
命令式finally块
除了 try/catch 之外,收集器还能使用 finally 块在 collect 完成时执行一个动作。
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
这段代码,在打印simple流后,会打印finally里面的done
1
2
3
Done
声明式处理onCompletion函数
流拥有 onCompletion 操作符,它会在流完全收集时调用
示例代码
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
这个打印结果跟上面是一样的
onCompletion 的主要优点是其 lambda 表达式的可空参数 Throwable 可以用于确定流收集是正常完成还是有异常发生。
下面,我们使用异常完成来演示
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
打印结果
1
Flow completed exceptionally
Caught exception
这里,可以看到 onCompletion函数的代码执行了。
onCompletion 操作符与 catch 不同,它不处理异常。我们可以看到前面的示例代码,异常仍然流向下游。它将被提供给后面的 onCompletion 操作符,并可以由 catch 操作符处理。
与 catch 操作符的另一个不同点是 onCompletion 能观察到所有异常并且仅在上游流成功完成(没有取消或失败)的情况下接收一个 null 异常。
示例代码
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
打印结果
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
我们可以看到完成时 cause 不为空,因为流由于下游异常而中止
十四、StateFlow跟SharedFlow
StateFlow 和 SharedFlow 是 Flow API,允许数据流以最优方式发出状态更新并向多个使用方发出值。
官方地址:戳一下
StateFlow
StateFlow是一个状态容器式的可观察数据流,可以向收集器发送当前的状态更新和新状态更新。还可以通过其value属性读取当前状态值。
它只能有一个观察者可以获取到数据。
与使用 flow 构建器构建的冷数据流不同,StateFlow 是热数据流:从数据流收集数据不会触发任何提供方代码。StateFlow 始终处于活跃状态并存于内存中,而且只有在垃圾回收根中未涉及对它的其他引用时,它才符合垃圾回收条件。
我们创建一个页面,里面2个按钮,通过"+“,”-"来修改textView的值
示例代码:
Activity
class FlowTestActivity : AppCompatActivity() {
private val textView by lazy {
findViewById<TextView>(R.id.tv_content)
}
private val viewModel by viewModels<FlowTestViewModel>()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_flow_test)
//启动协程
lifecycleScope.launchWhenCreated {
//通过flow来收集数据
viewModel.flowNumber.collect {
textView.text = "$it"
}
}
}
//点击按钮+
fun onNumberPlus(_: View) {
viewModel.numPlus()
}
fun onNumberMinus(_: View) {
viewModel.numMinus()
}
}
ViewModel
class FlowTestViewModel : ViewModel() {
val flowNumber = MutableStateFlow(0)
fun numPlus() {
flowNumber.value++
}
fun numMinus() {
flowNumber.value--
}
}
点击按钮,我们发现,实现了功能。
如果使用过LiveData的话。到这里,我们发现StateFlow跟LiveData的使用非常的相似。那么它们有什么区别呢。
StateFlow 和 LiveData
StateFlow 和 LiveData 具有相似之处。两者都是可观察的数据容器类。
他们的不同之处:
- StateFlow 需要将初始状态传递给构造函数,而 LiveData 不需要。
- 当 View 进入 STOPPED 状态时,LiveData.observe() 会自动取消注册使用方,而从 StateFlow 或任何其他数据流收集数据的操作并不会自动停止。如需实现相同的行为,您需要从 Lifecycle.repeatOnLifecycle 块收集数据流。
repeatOnLifecycle API 仅在 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0 库及更高版本中提供。
冷流转热流(ShareIn)
StateFlow 是热数据流,只要该数据流被收集,或对它的任何其他引用在垃圾回收根中存在,该数据流就会一直存于内存中。您可以使用 shareIn 运算符将冷数据流变为热数据流。
冷流转热流需要满足一下条件:
- 用于共享数据流的 CoroutineScope。此作用域函数的生命周期应长于任何使用方,使共享数据流在足够长的时间内保持活跃状态。
- 要重放 (replay) 至每个新收集器的数据项数量。
- “启动”行为政策。
代码示例
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed()
)
}
在此示例中,latestNews 数据流将上次发出的数据项重放至新收集器,只要 externalScope 处于活跃状态并且存在活跃收集器,它就会一直处于活跃状态。当存在活跃订阅者时,SharingStarted.WhileSubscribed()“启动”政策将使上游提供方保持活跃状态。可使用其他启动政策,例如使用 SharingStarted.Eagerly 可立即启动提供方,使用 SharingStarted.Lazily 可在第一个订阅者出现后开始共享数据,并使数据流永远保持活跃状态。
SharedFlow
SharedFlow会向从其中收集值的所有使用方发出数据。跟BroadcastChannel(广播通道)特别相似,属于1对多关系。
看到这个,我们首先想到的场景:是不是非常适合ViewPager+Fragment共享数据
为了减少代码量,我们就在一个页面中,通过3个TextView来实现该功能。
Activity
class FlowTestActivity : AppCompatActivity() {
private val textView by lazy {
findViewById<TextView>(R.id.tv_content)
}
private val textView2 by lazy {
findViewById<TextView>(R.id.tv_content_2)
}
private val textView3 by lazy {
findViewById<TextView>(R.id.tv_content_3)
}
private val viewModel by viewModels<FlowTestViewModel>()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_flow_test)
lifecycleScope.launchWhenCreated {
ShareFlowNumEvent.shareEvent.collect {
textView.text = "${it.num}"
}
}
lifecycleScope.launchWhenCreated {
ShareFlowNumEvent.shareEvent.collect {
textView2.text = "${it.num}"
}
}
lifecycleScope.launchWhenCreated {
ShareFlowNumEvent.shareEvent.collect {
textView3.text = "${it.num}"
}
}
}
fun onNumberPlus(view: View) {
viewModel.sharePlus()
}
fun onNumberMinus(view: View) {
viewModel.shareMinus()
}
}
这里,还使用上面例子的2个按钮,使用3个textView。
下面创建sharedFlow,我们使用单例,毕竟是共享的嘛
object ShareFlowNumEvent {
val shareEvent = MutableSharedFlow<NumData>()
suspend fun shareData(data: NumData) {
shareEvent.emit(data)
}
}
data class NumData(val num: Int = 0)
创建一个单例及一个数据的封装类
ViewModel
class FlowTestViewModel : ViewModel() {
private var job: Job? = null
fun sharePlus() {
job = viewModelScope.launch(Dispatchers.IO) {
ShareFlowNumEvent.shareData(NumData(Random.nextInt(10)))
}
}
fun shareMinus() {
job?.cancel()
}
}
通过点击页面按钮,调用sharePlus ,通过ShareFlowNumEvent.shareData来发送(emit)数据流。
UI页面,通过collect来收集流。
通过下图,我们可以知道,3个TextView都收到了数据
参考资料:
https://developer.android.google.cn/kotlin/flow?hl=zh-cn
https://kotlinlang.org/docs/flow.html#terminal-flow-operators
https://www.kotlincn.net/docs/reference/coroutines/flow.html
https://juejin.cn/post/7007602776502960165#comment