RxJava一直是我长久以来的救星。它提供了丰富的功能,让我在Android编程中更加注重响应式思维。我的代码中到处都是Single
、Subject
和Completable
。
而现在,协程成为了备受赞誉和推崇的选择,许多演讲和会议都推荐使用。于是我开始学习它。
为了展示我目前的学习成果,我将尝试比较RxJava
和协程在解决一些常见问题时的差异。我使用的库和设计模式有:
- MVVM与Android Architecture Component
- Repository模式
- Retrofit
加载并显示
打开菜单片段并显示从服务器获取的咖啡列表
RxJava
让我们从ViewModel
的角度开始。在MenuViewModel
中,我调用menuRepository.getMenu()
以获取Single
的实例。
class MenuViewModel @Inject constructor(
private val menuRepository: MenuRepository)
: ViewModel() {
val coffeeList: LiveData<List<Coffee>>
get() = _coffeeList
private val _coffeeList = MutableLiveData<List<Coffee>>()
private val disposeBag = CompositeDisposable()
override fun onCleared() {
super.onCleared()
disposeBag.dispose()
}
fun loadMenu() {
val disposable = menuRepository.getMenu()
.subscribe { list: List<Coffee> ->
_coffeeList.value = list
}
disposeBag.add(disposable)
}
}
请注意,忽略了错误处理。
为了避免内存泄漏,在onCleared()
期间我们必须记得处理可释放的资源。
repository代码如下:
class MenuRepository @Inject constructor(
private val menuApi: MenuApi
) {
fun getMenu(): Single<List<Coffee>> {
return menuApi.getMenu()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
}
}
interface MenuApi {
@GET("menu")
fun getMenu(): Single<List<Coffee>>
}
我们正在调用subscribeOn
和observeOn
来确保网络调用不在主线程上执行。
Coroutines
class MenuViewModel @Inject constructor(
private val menuRepository: MenuRepository)
: ViewModel() {
val coffeeList: LiveData<List<Coffee>> = liveData(context = viewModelScope.coroutineContext + Dispatchers.IO) {
emit(menuRepository.getMenu())
}
}
class MenuRepository @Inject constructor(
private val menuApi: MenuApi
) {
suspend fun getMenu(): List<Coffee> {
return menuApi.getMenu()
}
}
interface MenuApi {
@GET("menu")
suspend fun getMenu(): List<Coffee>
}
正如您在这里所看到的,借助协程和KTX扩展,代码变得简单得多。生命周期和线程管理只需向liveData块传递一个参数即可:
context = viewModelScope.coroutineContext + Dispatchers.IO
将视图A从视图B更新
当用户在菜单类型Fragment中选择菜单类型时,更新咖啡菜单。
更具体地说,菜单类型Fragment放置在菜单Fragment上方。当用户在此处更改菜单类型时,我们通过所选的菜单类型更新下方的菜单。
为了分离关注点并解耦组件,这两个片段将不直接相互通信。我们将利用MenuRepository
作为中间人,并让两个片段共享相同的实例。这可以通过依赖注入框架如Dagger来实现。这里将忽略其实际实现。
RxJava
class MenuViewModel {
fun loadMenu() {
menuRepository.menu
.subscribe { list: List<Coffee> ->
_coffeeList.value = list
}
menuRepository.refreshMenu(MenuType.DEFAULT).subscribe()
}
}
class MenuRepository {
private val menuSubject = BehaviorSubject.create<List<Coffee>>()
val menu: Observable<List<Observable>>
get() = menuSubject
fun refreshMenu(menuType: MenuType): Completable {
return menuApi.getMenu(menuType)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.doOnSuccess { menuSubject.onNext(it) }
.ignoreElement()
}
}
在MenuRepository
中,getMenu()
被替换为一个菜单属性和一个refreshMenu()
方法。我使用副作用运算符doOnSuccess()
将接收到的数据传递给Subject
,它是一个BehaviorSubject
,用于保存最新的菜单数据。在MenuFragment
中,getMethod().subscribe {}
被拆分为两个调用,menu.subscribe {}
和refreshMenu().subscribe()
。
通过这个变化,我们可以在MenuTypeFragment
中简单地这样做。
class MenuTypeViewModel @Inject constructor(
private val menuRepo: MenuRepository
): ViewModel() {
fun onMenuTypeChanged() {
menuRepo.refreshMenu(MenuType.TODAY_SPECIAL).subscribe()
}
}
请注意,我忽略了一次性处理。为了避免任何内存泄漏,您仍需执行前一个用例中展示的相同处理。此外,如果API调用失败,每次调用menuRepo.refreshMenu()
都必须进行错误处理。
Coroutines
在这种情况下,协程并没有像示例3中那样简化。接下来您将看到原因所在。
class MenuViewModel {
val coffeeList: LiveData<List<Coffee>> = liveData(context = viewModelScope.coroutineContext + Dispatchers.IO) {
// if the data needs to be processed before displaying,
// this is where I usually do it
menuRepo.menu.collect(::emit)
}
}
class MenuRepository {
private val menuChannel = BroadcastChannel<List<Coffee>>(CONFLATED)
val menu: Flow<List<Coffee>>
get() = channel.asFlow()
suspend fun refreshMenu(menuType: MenuType) {
menuChannel.send(menuApi.getMenu(menuType))
}
}
interface MenuApi {
@GET("menu")
suspend fun getMenu(@Query("type") menuType: MenuType): List<Coffee>
}
在MenuRepository
中,我们仍然需要创建与BehaviorSubject
类似的东西。在协程世界中,它被称为BroadcastChannel(CONFLATED)
。这是一个工厂方法,它创建了ConflatedBroadcastChannel
的实例。就像在RxJava中有其他类型的Subject
一样,这里也有其他类型的BroadcastChannel
。
为了能够观察到热数据流,我们使用asFlow
将BroadcastChannel
转换为Flow
。然后我们可以在ViewModel
中调用collect()
来持续接收新数据。
这个概念更接近于RxJava世界而不是协程世界。collect()
类似于subscribe()
,Flow
类似于Observable
。如果你看一下Flow
,会发现有很多内置的操作符,比如retryWhen
或debounce
,这些在RxJava中已经存在并且我们都喜欢。
然后在MenuTypeFragment
中代码如下:
class MenuTypeViewModel @Inject constructor(
private val menuRepo: MenuRepository
): ViewModel() {
fun onMenuTypeChanged() {
viewModelScope.launch {
// try-catch-finally is how we usually handle
// error in Coroutines world
try {
menuRepo.refreshMenu(MenuType.TODAY_SPECIAL)
} catch (e: Exception) {
// e.g.
// notify user
// record exception to your exception monitoring service
} finally {
// e.g.
// dismiss loading indicator
// enable views
}
}
}
}
由于我们使用了ConflatedBroadcastChannel
,因此refreshMenu()
内部的send()
将永远不会挂起。这意味着如果有很多地方使用refreshMenu()
,将不会有很多挂起的协程持有您的资源。
随着Kotlin协程1.4.0的发布,ConflatedBroadcastChannel
已被弃用。StateFlow
和ShareFlow
被引入。您可以在官方公告中了解更多信息。
使用哪个调度器?
在代码片段6中,我们直接在collect中调用emit
。
menuRepo.menu.collect(::emit)
但是要设置值给LiveData使用了哪个调度器?为了找到答案,我打印了一些日志。
menuRepo.menu.collect {
Log.d("MenuViewModel", Thread.currentThread().name)
emit(it)
}
//打印结果: D/MenuViewModel: DefaultDispatcher-worker-1
这看起来不对劲,在我没有在主线程中设置LiveData值的情况下,为什么这不会导致应用崩溃?
原来,KTX库已经为我们解决了这个问题。在liveData { }的emit()内部,它执行了以下操作:
override suspend fun emit(value: T) = withContext(coroutineContext) {
// target: LiveData
target.clearSource()
target.value = value
}
而你可能猜到了,这就是coroutineContext:
private val coroutineContext = context + Dispatchers.Main.immediate
这里的Dispatchers.Main
表示我们Android的主线程。
订阅的取消处理?
那么menuRepo.menu.collect()
的取消处理呢?当LiveData
变为非活跃状态时,订阅会自动取消吗?还是我们仍然需要在RxJava中执行所有的步骤?
答案是是的,它会自动取消。在KTX库内部,liveData { }
生成的LiveData
在LiveData
变为非活跃状态时会取消正在运行的协程。
结论
当您有一个需要等待其结果的阻塞耗时任务时,协程本身可能是一个很不错的选择。如果您使用Android KTX库,您将发现自己编写了最高效的代码。
然而,当您想使用协程来实现观察者设计模式时,与RxJava相比,它并没有提升多少生产力。最终,协程本身并不是专门为此而构建的。
RxJava仍然是我工具箱中最强大的工具。我不建议在所有情况下都迁移到协程。协程中仍然存在很多实验性API,包括我示例中的BroadcastChannel
。协程真正发挥作用还需要时间。