- Flow深入浅出系列之在ViewModels中使用Kotlin Flows
- Flow深入浅出系列之更聪明的分享 Kotlin Flows
- Flow深入浅出系列之使用Kotlin Flow自动刷新Android数据的策略
Flow深入浅出系列之更聪明的分享 Kotlin Flows
使生命周期对上游流有效,以跳过不必要的工作。这是一系列有关在Android中使用Kotlin Flow的文章的第二部分。
在第一部分中,我们描述了当在ViewModel类中使用SharedFlow或StateFlow时,Kotlin Flow的主要限制:
当用户导航回Activity或Fragment后重新收集使用SharingStarted.WhileSubscribed()
策略的SharedFlow或StateFlow时,其源上游流将始终从头重新启动,有时会导致执行不必要的工作,即先前缓存的数据仍然有效。
在上面的示例中,即使queryFlow的最新值在此期间未更改,仍将再次执行repository.search()
。这意味着可能会进行不必要的网络请求或数据库查询。
LiveData不会遇到这个问题,因为它的观察者不需要在变得不活动时取消订阅:LiveData是生命周期感知的,并将推迟新结果的传递,直到它变得活动为止,同时还确保相同的结果永远不会被传递给相同的观察者两次(即使它再次变为活动状态)。有关更多详细信息,请参阅完整文章。
在第一部分的结尾处,我们得出结论,在仅依赖于标准的shareIn()
或stateIn()
操作时,没有简单和正确的方法可避免执行此不必要的工作。
在第二部分中,我们将通过设计一个新的Flow运算符来解决这个效率问题,以使SharedFlows更好地与生命周期集成。
与Lifecycle同步
LiveData
LiveData比Flow更好地处理生命周期的核心原因是,生命周期状态会自动传播到所有LiveData实例的上游,从而它们可以在保留其状态的同时协作地暂停。确切地说,传播的信息是一个简化和聚合的生命周期状态,称为活动状态:
当LiveData实例的观察者中至少有一个进入STARTED生命周期状态时,LiveData实例变为活动状态。当订阅的观察者进入STARTED状态或新的已启动观察者订阅时,可能会发生这种情况;
当LiveData实例的没有观察者处于STARTED生命周期状态时,LiveData实例变为非活动状态。可能会发生这种情况,当已订阅的观察者从STARTED状态回退到CREATED状态或已启动的观察者取消订阅时。
当应用LiveData变换(如map()或switchMap())时,上游LiveData实例会自动继承下游LiveData实例的活动状态。
当LiveData实例处于非活动状态时,它们会自动推迟将新结果传递给下游观察者。此外,LiveData实现可能会通知以停止主动工作。例如,LiveData协程构建器将在其变为非活动状态时取消其协程块。
SharedFlow
SharedFlow(及其子类StateFlow)是一种特殊类型的Flow,它能够将单个源发出的值广播到多个观察者(称为收集器)并对新观察者进行最新值重播,就像LiveData一样。 SharedFlow实例既是值的收集器又是发射器,而上游值由SharedFlow独立于下游收集器收集。
Flow和SharedFlow不直接支持Android Lifecycle类,但在UI层必须使用AndroidX Lifecycle API(如Lifecycle.repeatOnLifecycle()
)来同步Flow收集与Android Lifecycle:Flow必须仅在UI处于STARTED生命周期状态或更高级别时收集,并且在UI不可见时应尽快取消收集协程。
像MutableSharedFlow这样的SharedFlow实现也能够公开当前已订阅收集器的数量,当与上述Lifecycle同步API结合使用时,可以用于推断与LiveData等效的活动状态:
- 当第一个收集器订阅时,SharedFlow实例变为活动状态;
- 当最后一个收集器取消订阅时,SharedFlow实例变为非活动状态。
shareIn()
和stateIn()
扩展功能是从源上游Flow创建SharedFlow实例的最简单方法。它们接受一个SharingStarted参数,该参数是响应“活动状态”变化时用于启动和停止收集上游Flow(从单独的协程)的策略。
总之,SharedFlow可以使生命周期感知,但是当使用这些有限API时,对于活动状态更改所能做的唯一事情是启动或取消收集整个上游Flow的协程。上游Flow不能暂停并在离开时恢复:取消协程意味着失去当前收集的整个内部状态,并且必须在下一次收集期间重新启动上游Flow。
这使得在上游Flow中使用像distinctUntilChanged()
这样的运算符变得不切实际,因为一旦SharedFlow策略取消收集Flow的协程,最后发出的值就会被遗忘。Flow操作符不适用于跨多个收集周期的工作。
另一种方法
我们需要找到一个权衡。我们应该始终保持收集整个上游Flow的协程处于活动状态,而不是取消它,但将上游Flow分成两部分:
- 上游流的上部分应与生命周期同步,并且在生命周期无效时应停止被收集;
- 上游流的下部分应始终被收集,但在生命周期无效时将暂时不接收来自上部分的新值。
这样,就可以在下部分中使用像distinctUntilChanged()
这样的运算符,在重新开始上部分时成功过滤掉重新发出的值,并避免在链下面进行不必要的工作。
val results: StateFlow<Result> =
someTriggerFlow() // 上部分
.someOperator(someFormOfLifecycle)
.distinctUntilChanged() // 下部分
.map { someExpensiveLoadingOperation(it) }
.stateIn(
scope = viewModelScope,
started = SharingStarted.Eagerly,
initialValue = Result.empty()
)
这种方法是由 @hicham.boushaba 在他的文章“使冷流具备生命周期感知能力”中提出的,该文章为本文提供了起点。
要使Flow的上部具备生命周期感知能力,我们需要使用一个操作符,以某种形式的生命周期作为输入,当生命周期处于活动状态时,启动一个子协程来收集它,并在非活动状态下取消协程,同时将值传递给下部。
在ViewModel中注入Android Lifecycle?
Boushaba先生建议使用一个名为whenAtLeast()
的新操作符,与自定义的Android Lifecycle-aware ViewModel结合使用。另一种选择是在标准ViewModel中使用由AndroidX Lifecycle库提供的flowWithLifecycle()操作符。这两个解决方案都要求在ViewModel类中保留对Android Lifecycle的引用。我认为出于以下不同的原因应该避免这样做:
- 它使ViewModel更难测试,因为它增加了对Android特定Lifecycle类的额外依赖;
- 它需要额外的复杂代码来在LifecycleOwner销毁后清除ViewModel中的Lifecycle引用,以防止内存泄漏;
- 它没有考虑到一个单独的ViewModel可能同时被多个具有不同Lifecycle的组件观察。例如,常常使用ViewModel来实现Fragment与Activity或另一个Fragment之间的通信。支持这些用例需要重新实现LiveData已经支持的很多功能;
有一个更简单的解决方案。
引入flowWhileShared()
操作符
事实证明,我们在ViewModel中根本不需要处理Android的Lifecycle,因为MutableSharedFlow已经提供了类似的功能。在前面的部分中,我们提到它通过subscriptionCount属性跟踪和公开当前订阅收集器的数量,并且这可以用来推断所需的活动状态。它在shareIn()和stateIn()内部使用,但也可以作为一个称为flowWhileShared()的自定义Flow操作符的输入值使用:
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.flowWhileShared(
subscriptionCount: StateFlow<Int>,
started: SharingStarted
): Flow<T> {
return started.command(subscriptionCount)
.distinctUntilChanged()
.flatMapLatest {
when (it) {
SharingCommand.START -> this
SharingCommand.STOP,
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> emptyFlow()
}
}
}
这个实现很简单,依赖于两个现有的Flow构建块:
使用SharingStarted来发射START和STOP命令的Flow,以响应subscriptionCount的变化。在99%的情况下,我们将使用的实现是SharingStarted.WhileSubscribed()
,这样当subscriptionCount > 0
时,会发射START命令,当subscriptionCount == 0
时,会发射STOP命令,以匹配LiveData的行为;
使用flatMapLatest()
操作符在上游Flow(this)
和空Flow之间切换。在内部,此操作符创建一个子协程来收集这些Flow的值,并在每个新的命令上取消它。
注意:为了简单起见,特殊的“停止并重置重放缓存”命令(作用于父SharedFlow的缓存)被忽略,并被视为常规的“停止”命令。
与SharedFlow或StateFlow的使用
不幸的是,这个新操作符不能与shareIn()
或stateIn()
结合使用,因为它们没有公开MutableSharedFlow的subscriptionCount
字段。相反,我们需要创建和管理自己的MutableSharedFlow或MutableStateFlow的实例。
为了减少样板代码并避免重复,我们可以创建一个自定义的StateFlow工厂函数:
fun <T> stateFlow(
scope: CoroutineScope,
initialValue: T,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): StateFlow<T> {
val state = MutableStateFlow(initialValue)
scope.launch {
producer(state.subscriptionCount).collect(state)
}
return state.asStateFlow()
}
这个函数立即从提供的作用域启动一个协程,并将subscriptionCount
作为参数传递给上游Flow的生产者lambda表达式。然后,MutableStateFlow
(实现了FlowCollector<T>
)收集生成的Flow,最后将MutableStateFlow
作为只读StateFlow返回。
在本文的末尾,您将找到一个优化版本的此函数的源代码(最小化编译器生成的类的数量)和SharedFlow的一个变体。
当然,只有在所有SharedFlow和StateFlow都是通过使用上面提到的AndroidX Lifecycle API从UI层收集时,整个系统才能按预期工作。例如:
viewLifecycleOwner.lifecycleScope.launch {
viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.results.collect { data ->
displayResult(data)
}
}
}
用例
让我们看一些如何结合使用这些新函数的实际示例。
响应位置更新加载数据
假设我们有一个类似于本文中描述的 locationFlow() 函数,允许以冷的 Flow 形式接收位置更新,同时主动使用设备的 GPS。我们想要从 API 加载当前位置周围的商店列表,并在设备每次移动至少 30 米时更新该列表,以避免不必要的 API 调用。同时,我们希望确保在屏幕不可见时释放 GPS 资源。
ViewModel 将包含与此类似的 StateFlow builder:
@OptIn(ExperimentalCoroutinesApi::class)
val location: StateFlow<List<Shop>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
locationClient.locationFlow(
interval = 20000L,
smallestDisplacement = 30f
)
.flowWhileShared(
subscriptionCount,
SharingStarted.WhileSubscribed(1000L)
)
.distinctUntilChanged { old, new ->
old.distanceTo(new) < 30f
}
.mapLatest { location ->
shopsRepository.getShopsAtLocation(location)
}
.catch { emit(emptyList()) }
}
(为了演示简化了错误处理)
stateFlow()
工厂函数以返回上游 Flow 的生产者 lambda 作为最后一个参数。该 lambda 允许在上游 Flow 中使用 subscriptionCount
。
上游 Flow 的主要部分将在 ViewModel 保持在内存中时无中断地被收集。但是,在 flowWhileShared()
之前的部分(在本例中为 locationFlow()
)仅在 StateFlow 处于活动状态(至少有一个订阅)时才会被收集,因此当它变为非活动时,GPS 将停止。由于 SharingStarted.WhileSubscribed(1000L)
策略,一旦 UI 隐藏且订阅数达到 0,集合就会在一秒钟后取消。这个额外的延迟可以避免在配置更改期间重新启动位置更新的成本(当新的 UI 实例几乎立即恢复 Flow 集合时)。
当用户离开屏幕并稍后回到屏幕时,locationFlow()
的收集将重新启动,并且仍将被无中断地收集的 distinctUntilChanged() 将处理过滤掉新的位置,这些位置与先前发布的位置太接近。这样,如果设备自上次 UI 可见以来移动的距离不足 30 米,我们就可以避免再次调用 API 进行不必要的工作。
最新结果仍将由 StateFlow
缓存,并在开始收集它时自动重放到当前 UI。
我们成功创建了最有效的实现,解决了该问题,而无需依赖 LiveData。
在无效时重新加载数据
尽可能应用程序应向用户展示最新版本的数据,而不需要手动刷新。为了实现这一点,存储库可以在每次完成写操作(创建/更新/删除)后向数据的观察者发送无效信号,最终触发刷新。
为了实现最大效率,只有在以下情况下才应从存储或网络重新加载数据:
- 屏幕当前可见(在后台没有早期刷新);
- 自上次加载以来数据确实发生了变化。
表示可以无效的数据的自然方法是从存储库返回结果的 Flow,而不是单个结果。
但是,由于我们希望在屏幕不可见时避免刷新数据,因此我们需要取消该 Flow 的收集。由于每个 Flow 集合都是自包含的,因此它始终会从相同屏幕的先前收集开始加载数据,而不管其是否已更改。
例如,Room Jetpack 库创建这种类型的 Flow,以提供对 DAO 中可观测数据集的支持。 Room 实现在 Flow 收集开始时执行数据库查询,然后每次更新相关表时执行。这意味着每次屏幕再次变为可见并且数据集在此期间没有更改时,都会执行不必要的数据库查询。
相比之下,Room 生成的 LiveData 实现更高效,因为只有在表自上次活动的 LiveData 以来发生更改时才会执行数据库查询。
为了使 Kotlin Flow 代码更高效,我们需要使用版本控制。
数据集的版本应该以 StateFlow<Int>
的形式在存储库中公开,同时还应提供获取当前数据快照的函数。例如:
interface CustomerRepository {
val customersVersion: StateFlow<Int>
suspend fun getAllCustomers(): List<Customer>
suspend fun getActiveCustomers(): List<Customer>
}
存储库可以由远程 API 或本地存储(如数据库)支持;这并不重要。
每当成功更新数据集时,版本号必须以原子方式递增:
private val _customersVersion = MutableStateFlow(0)
suspend fun updateCustomerName(customerId: Long, newName: String) {
// 在此处插入一些代码来更新数据
_customersVersion.update { it + 1 }
}
提示:如果存储库由 Room 数据库支持,您不需要手动跟踪更改,可以使用 Room 的
InvalidationTracker
监视一组表:
fun RoomDatabase.createVersionFlow(vararg tables: String): StateFlow<Int> {
val stateFlow = MutableStateFlow(0)
invalidationTracker.addObserver(object : InvalidationTracker.Observer(tables) {
override fun onInvalidated(tables: Set<String>) {
stateFlow.update { it + 1 }
}
})
return stateFlow.asStateFlow()
}
最后,在 ViewModel 中将所有拼图放在一起:
@OptIn(ExperimentalCoroutinesApi::class)
val activeCustomers: Flow<List<Customer>> =
stateFlow(viewModelScope, emptyList()) { subscriptionCount ->
customersRepository.customersVersion
.flowWhileShared(
subscriptionCount, SharingStarted.WhileSubscribed()
)
.distinctUntilChanged()
.mapLatest {
customersRepository.getActiveCustomers()
}
}
再次感谢 distinctUntilChanged()
operator,一旦 StateFlow 变为活动状态,当前版本号就会与先前版本号(如果有)进行比较,只有在版本号更改时才会加载数据。问题解决。
最后的建议
对于许多情况,标准的 shareIn()
和 stateIn()
operator 仍然应优先于flowWhileShared()
的复杂性。特别是:
- 用于一次性数据加载;
- 当上游 Flow 的源是屏幕上的 UI 元素(例如刷新按钮)时,因此在屏幕不可见时永远不会发出新值,并且无需取消上游 Flow 集合;
- 当从存储库重新加载相同数据的成本很低时,例如因为实现支持共享缓存。在屏幕可见时每次重新加载数据是可以接受的。
结论
使用新的 flowWhileShared()
operator,我们成功将上游 Flow 的一部分与下游 SharedFlow 或 StateFlow 的生命周期同步,借助 distinctUntilChanged()
等 operator 跳过不必要的工作。所有这些都不需要在 Android 应用程序的 ViewModel 中包含任何 LiveData 或 Android 特定的 Lifecycle 代码。
完整源代码
//FlowExt.kt
package be.digitalia.flow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingCommand
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.launch
inline fun <T> stateFlow(
scope: CoroutineScope,
initialValue: T,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): StateFlow<T> {
val state = MutableStateFlow(initialValue)
producer(state.subscriptionCount).launchIn(scope, state)
return state.asStateFlow()
}
inline fun <T> sharedFlow(
scope: CoroutineScope,
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
producer: (subscriptionCount: StateFlow<Int>) -> Flow<T>
): SharedFlow<T> {
val shared = MutableSharedFlow<T>(replay, extraBufferCapacity, onBufferOverflow)
producer(shared.subscriptionCount).launchIn(scope, shared)
return shared.asSharedFlow()
}
fun <T> Flow<T>.launchIn(scope: CoroutineScope, collector: FlowCollector<T>): Job = scope.launch {
collect(collector)
}
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> Flow<T>.flowWhileShared(
subscriptionCount: StateFlow<Int>,
started: SharingStarted
): Flow<T> {
return started.command(subscriptionCount)
.distinctUntilChanged()
.flatMapLatest {
when (it) {
SharingCommand.START -> this
SharingCommand.STOP,
SharingCommand.STOP_AND_RESET_REPLAY_CACHE -> emptyFlow()
}
}
}