Job 是协程上下文CoroutineContext的实现之一,通过它我们可以对协程的生命周期进行一些控制操作。
Job 是协程的
句柄。使用
launch 或
async 创建的每个协程都会返回一个
Job 实例对象,该实例是相应协程的唯一标识并管理其生命周期。还可以将
Job 传递给
CoroutineScope 以进一步管理其生命周期,如以下示例所示:
class ExampleClass {
...
fun exampleMethod() {
// Handle to the coroutine, you can control its lifecycle
val job = scope.launch {
// New coroutine
}
if (...) {
// Cancel the coroutine started above, this doesn't affect the scope
// this coroutine was launched in
job.cancel()
}
}
}
我们通过Job可以获取当前协程的运行状态,还可以随时取消协程。
协程的状态查询:
-
isActive 活跃
-
isCompleted 已完成
-
isCancelled 已取消
如果协程处于活跃状态,协程运行出错或者调用
job.cancel() 都会将当前任务置为取消状态 (
isActive = false, isCancelled = true)。当所有的子协程都完成后,协程会进入已完成状态,此时
isCompleted = true。
常用的协程操作:
-
cancel 用于Job的取消,取消协程
-
start 用于启动一个协程,让其到达 Active状态
-
invokeOnCompletion 添加一个监听,当工作完成或者异常时会调用
-
join 阻塞并等候当前协程完成
start
协程不是默认创建就启动了吗? 怎么还有一个
start 方法 。
-
这个方法主要是针对通过懒加载 Lazy模式创建的协程,需要进行手动 start才能启动协程。
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
println("执行在协程中...")
delay(1000L)
println("执行完毕...")
}
job.start()
cancel
协程的取消,我们之前也讲到过,一般我们可以手动调用
cancel 或者在
onDestory的时候调用
cancel:
var job = GlobalScope.launch {
println("执行在协程中...")
delay(1000L)
println("执行完毕...")
}
...
override fun onDestroy() {
job.cancel()
super.onDestroy()
}
invokeOnCompletion
协程执行完的回调
invokeOnCompletion 也是我们常用的监听,在正常执行完毕,或者异常执行完毕都会回调这个方法。
val exceptionHandler = CoroutineExceptionHandler { coroutineContext, throwable ->
YYLogUtils.e(throwable.message ?: "Unkown Error")
}
val job = GlobalScope.launch(Dispatchers.Main + exceptionHandler) {
println("执行在另一个协程中...")
delay(1000L)
val num = 9/0
println("另一个协程执行完毕...")
}
job.invokeOnCompletion {
println("完成或异常的回调")
}
没有异常的回调:
加入9/0的异常代码之后的回调:
join
join()方法会暂停所处的协程,直到Job中的代码执行完毕再继续,一般用来等待某个协程执行完毕,它是一个挂起函数。(跟
Deferred.await()方法的调用有异曲同工之处,
await() 会等待
async{ } 中的代码返回结果后再继续。)
fun main() = runBlocking {
val job = launch {
delay(100)
println("hello")
delay(300)
println("world")
}
println("test1")
job.join()
println("test2")
}
输出:
cancelAndJoin
suspend fun main() {
runBlocking {
val job = launch {
repeat(1000) { i ->
println("job: test $i ...")
delay(500L)
}
}
delay(1300L) // 延迟一段时间
println("main: job.cancel() called!")
// job.cancel() // 取消该作业
// job.join() // 等待作业执行结束
job.cancelAndJoin() // 等价以上两行,取消一个作业并等待它结束
println("main: job is canceled.")
}
}
输出:
问题: 如果先调用 job.join() 后调用 job.cancel() 会是什么情况?
-
协程中的代码不会被取消,会全部执行完。
协程的取消是协作的
协程并不是一定能被取消的,协程的取消是协作的。一段协程代码必须协作才能被取消。所有
kotlinx.coroutines 中的挂起函数都是可被取消的 。它们检查协程的取消, 并在取消时抛出
CancellationException。
如果协程正在执行不可中断的计算任务,并且没有检查取消的话,那么它是不能被取消的。这一点上跟Java的线程取消有点类似。
suspend fun main() {
runBlocking {
val job = launch(Dispatchers.Default) {
var nextPrintTime = System.currentTimeMillis()
var i = 0
while (i < 100) { // 一个执行计算的循环,只是为了占用 CPU
// 每秒打印消息两次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: hello ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段时间
println("main: job.cancel() called!")
job.cancelAndJoin() // 取消一个作业并且等待它结束
println("main: Now job is canceled.")
}
}
输出:
可以看到上面代码中当调用 job.cancelAndJoin() 之后,协程并没有被立即取消掉,而是继续执行完while循环所有的计算工作后才自动停止。也就是说上面代码跟先调用 job.join() 后调用 job.cancel() 的效果是一样的。
检查 Job 状态
解决上面代码问题的第一种办法是修改
while循环的判断条件,添加对
Job状态的检查,例如
isActive或isCancelled:
while (i < 100 && isActive)
while (i < 100 && coroutineContext[Job]?.isActive == true)
while (i < 100 && coroutineContext[Job]?.isCancelled == false)
以上三种写法都可以。
另一种方法使用协程标准库中的函数
ensureActive() :
while (i < 100) {
ensureActive()
...
}
ensureActive() 的实现是这样的:
public fun Job.ensureActive(): Unit {
if (!isActive) throw getCancellationException()
}
其实跟第一种原理是一样的,也是调用了isActive来判断的。
第三种办法是
使用
yield() 支持可响应取消 :
while (i < 100) {
yield()
...
}
yield()方法是一个可取消的挂起函数,它的作用是:如果当前协程的Job在调用此挂起函数时被取消或完成,或者在此函数等待分派时,它将使用CancellationException作为结果来恢复协程。因此它可保证立即取消协程。 yield() 应该在定时检查中最先被调用。
检查协程是否为已取消状态,在业务开发中是非常有必要的,例如,如果您要从磁盘读取多个文件,请先检查协程是否已取消,然后再开始读取每个文件。
someScope.launch {
for(file in files) {
ensureActive() // Check for cancellation
readFile(file)
}
}
Deferred(带结果的Job)
Deferred 是
Job 的子类,它是一种可以返回结果的
Job,可以类比 Java 中的
Future 对象,在 Java 中
Future.get() 可以等待异步线程任务的执行结果,而
Deferred.await() 主要用来等待
async{ } 启动的协程代码块的返回结果:
fun main() = runBlocking {
val deferred = async {
...
delay(5000)
10
}
val result = deferred.await()
}
deferred 也是可以取消的,对于已经取消的
deferred 调用
await() 方法,会抛出
JobCancellationException 异常。
同理,在
deferred.await() 之后调用
deferred.cancel(), 那么什么都不会发生,因为任务已经结束了。
async 可以通过将
start 参数设置为
CoroutineStart.LAZY 变成惰性的。在这个模式下,调用
await 获取结果时,最好调用
Job 的
start 方法启动协程。
fun main() {
runBlocking {
val time = measureTimeMillis {
val deferred1 = async(Dispatchers.IO, CoroutineStart.LAZY) {
printInfo()
delay(1000) // 模拟耗时操作
1
}
val deferred2 = async(Dispatchers.IO, CoroutineStart.LAZY) {
printInfo()
delay(3000) // 模拟耗时操作
2
}
deferred1.start()
deferred2.start()
printInfo("${deferred1.await() + deferred2.await()}")
printInfo("end")
}
printInfo("time: $time")
}
}
fun printInfo(msg : String? = null) {
println("${Thread.currentThread().name}: ------> ${msg ?: ""}")
}
输出:
问题:如果不调用 start() 情况会怎样?
上面代码如果不调用
deferred1.start() 和
deferred2.start() 方法,输出如下:
虽然也正确输出结果了,但是会发现
总耗时是两个async块中耗时的累加。
也就是说调用
start之后两个
async块中的代码是
并发执行的,所以最终耗时是二者中耗时最长的那个任务的时间。如果不调用
start,则由于
LAZY启动模式的特点,只有在被需要的时候才会去启动这个协程,也就是说当执行到
deferred1.await() 时,才会去执行第一个
async{} 的代码,等这个执行完毕后,再继续执行
deferred2.await() 时,才会去执行第二个
async{} 的代码,所以最终耗时是二者的累加。