Kotlin 的Flow可以对数据流进行建模,类似LiveData、RxJava的数据流。
Flow也是用观察者模式实现的。
观察者模式包括了可观察对象(Observable,生产者、发射者、源这些称呼都是指可观察对象,可以被观察)、观察对象(Observers,订阅者、收集者、接收者这些称呼都是指观察对象,可以观察Observable)。当有什么状态(数据)变化时,Observable会自动通知Observers。
Observable(可观察者)可以是hot(热)或者是cold(冷)的。
- Hot Observables 可以在没有Observers(观察者)观察它们时,照常发送(发射)数据。
- Cold Observables只可以在有Observers(观察者)观察它们时,才发送(发射)数据。
而Flow是cold(冷)的,而它发送数据的时机是当有Terminal operator(末端操作符)应用到这个Flow时。
创建Flow
- flow操作符
fun fibonacci(): Flow<BigInteger> = flow {
var x = BigInteger.ZERO
var y = BigInteger.ONE
while (true) {
emit(x)
x = y.also {
y += x
}
}
}
上面的代码是不会使用flow发射数据的,因为flow是冷流,需要加上末端操作符才能使flow发射数据,如collect是最常见的末端操作符:
fibonacci().take(100).collect { println(it) }
flow操作符的源码是:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
由此可知flow操作符返回的是SafeFlow,意安全的流。因为flow支持coroutine,所以在flow操作符中不能使用其他coroutine context,在编译时,它会做这个检查。下面这个例子就是在flow内使用了不同的协程上下文,导致报错:
Caused by: java.lang.IllegalStateException: Flow invariant is violated:
flow {
emit(1) // Ok
withContext(IO) {
emit(2) // 报错
}
}.collect {
println(it)
}
如果想在改变coroutine context,可以使用flowOn操作符:
withContext(Dispatchers.Main) {
val singleValue = flow {
this.emit(1)
} // 如果之间没有指定它的上下文那么就用下面指定的IO上下文
.map { ... } // 在 IO中执行
.flowOn(Dispatchers.IO) // 它指定的上下文只会作用在前面没有指定上下文的操作符上,对后面的操作符没有影响
.filter { ... } // 在Default上下文中运行,因为下面的flowOn指定了
.flowOn(Dispatchers.Default)
.single() // 在 Main上下文中执行
}
这个coroutine context是flow执行的上下文。flowOn实际上是这样实现的,所以它只是把flow里lamda部分的代码放到一个新的context里去执行,方式如下:
- flowOf操作符
这个操作符其实只是在flow操作符基础上帮我们做了数据发射的工作:
但是跟flow操作符不同的是,flowOf操作符不会检查执行的上下文,flow则会检查。
例子:
flowOf(1, 2, 3).collect{
println(it)
}
- asFlow操作符,与flowOf是一样的,只是对列表操作方便一些而已:
listOf(1, 2, 3, 5).asFlow().collect{
println(it)
}
flow是冷的
所以我们要让它流动起来,就要使用一个末端操作符,最常见的就是collect,应用到这个flow上。 上面的例子我们已有展示。除了collect还有一些其他末端操作符。
toCollection()
这个操作符接受一个MutableCollection
实例,Kotlin官方提供了两个toList和toSet。所以参考它们的使用,不难得出toCollection()的使用,即flow发射出来的数据都收集起来,然后一起返回。
例子:
val myList = flow{
emit(1)
emit(2)
emit(5)
}.toList()
println(myList)
first()和last()
它们分别是取第一个和取后一个。例子:
val firstItem = flow{
emit(1)
emit(2)
emit(5)
}.first()
val lastItem = flow{
emit(1)
emit(2)
emit(5)
}.last()
single()
这个操作符,有点特别,它只希望flow只发射一个数据,发多一个就报错。
val singleValue = flow {
this.emit(1)
}
.map {
it + 2
}
.flowOn(IO)
.filter { it > 0 }
.flowOn(Default)
.single()
println(singleValue)
reduce() 和 fold()
这两个操作符都是将flow处理成一个值。
val singleValue = flow {
this.emit(1)
this.emit(2)
this.emit(3)
}
.reduce { accumulator, value ->
accumulator + value
}
println(singleValue) // 6
fold()有一个初始值,而reduce()则没有:
val singleValue = flow {
this.emit(1)
this.emit(2)
this.emit(3)
}
.fold(4) { acc: Int, value: Int ->
acc + value
}
println(singleValue) // 10
还记得我们说过,flow是在协程的基础上的技术,所以这些操作符都是是suspend函数,是可以被暂时挂起的,必须在协程中调用这些操作符。
launchIn()
这个末端操作符通常与onEach
, onCompletion
, catch
操作符一起使用。
使用这个操作符的效果与下面的代码是等价的:
scope.launch { flow.collect() }
这个操作符会返回一个Job实例,意味着我们可以使用这个job来取消当前flow的数据采集工作,避免它工作太长时间,把当前的coroutine scope挂起了。
例子:
val job = flow {
this.emit(1)
this.emit(2)
kotlinx.coroutines.delay(3000)
this.emit(3)
}
.onEach { value -> println("$value ***") }
.onCompletion { cause -> println(cause) }
.catch { cause -> println("Exception: $cause") }
.launchIn(this)
job.cancel()