一、概念
对于每一个由协程构建器开启的协程,都会返回一个 Job 实例用来管理协程的生命周期。launch()直接返回 Job实现,async() 返回的 Deferred 实现了 Job接口。
Job | public fun start(): Boolean |
public fun cancel(cause: CancellationException? = null) 取消 Job 会抛异常,默认可空,也可以自定义,job.cancel(CancellationException("取消"))。它不会立马就被取消,先进入 cancelling。协程作用域和协程上下文的扩展函数cancel()底层都是调用的它。 | |
public suspend fun join() 挂起当前协程,直到 Job 完成。 | |
public suspend fun Job.cancelAndJoin() 挂起当前协程,直到 Job 取消完成。 | |
public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle Job结束后调用该回调,不管是cancelled还是competed都会回调。 | |
Deferred | public suspend fun await(): T 挂起当前协程,直到 Deferred 完成。 |
Await.kt
joinAll( ) | public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() } 挂起当前协程,直到传入的 Job 都执行完。 |
public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() } 挂起当前协程,直到集合中的 Job 都执行完。 | |
awaitAll( ) | public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> 挂起当前协程,直到传入的 Deferred 都执行完。 |
public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> 挂起当前协程,直到集合中的 Deferred 都执行完。 |
二、生命周期
如果Job是通过协程构建器创建的,Active就是协程主体运行时的状态,在这个状态下我们可以启动子协程。一般协程都是在Active状态下启动,只有那些延迟启动的才会以New状态启动。
当Job完成时,会进入Completing状态等待所有子协程完成,然后进入Compelted状态。
如果Job在Active或Completing状态下取消或者异常,会进入到Cancelling状态供我们做一些资源释放等工作,然后进入到Cancelled状态。
没有直接的生命周期函数可供调用,而是使用以下三个属性去做判断:
Job的状态/函数判断 | isActive | isCompleted | isCancelled |
New 新创建(optional initial state) | false | false | false |
Active 活跃(default initial state) | true | false | false |
Completing 完成中(transient state) | true | false | false |
Cancelling 取消中(transient state) | false | false | true |
Cancelled 已取消(final state) | false | true | true |
Compeleted 已完成(final state) | false | true | false |
三、协程的取消
Java 线程其实没有提供任何机制来安全地终止线程,Thread 类提供了一个 interrupt() 方法用于中断线程的执行,并不意味着立即停止目标线程正在进行的工作,而只是传递了请求中断的消息,然后由线程在下一个合适的时机中断自己。
仅仅终止线程是一个糟糕的方案,协程提供了一个 cancel() 函数来取消Job,但并不是一定能取消。协程的取消是协作的,一段协程代码必须协作才能被取消。所有 kotlinx.coroutines 中的挂起函数都是可被取消的,它们检查协程的取消,并在取消时抛出 CancellationException。如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的。
3.1 取消的原理
public interface Job : CoroutineContext.Element {
//通过序列保存了所有子Job的引用,所以具有父子层级结构
public val children: Sequence<Job>
}
//子Job接口
public interface ChildJob : Job {
//提供了父Job取消自己的函数
public fun parentCancelled(parentJob: ParentJob)
}
//父Job接口
public interface ParentJob : Job {
//提供了获取子Job被取消原因的函数
public fun getChildJobCancellationCause(): CancellationException
}
//Job的实现类,同时实现了ChildJob和ParentJob,说明一个Job对象既可以是父Job也可以是子Job
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
internal var parentHandle: ChildHandle? //当前协程的父协程帮助类,当前协程作为子协程
}
3.2 取消的状态
挂起函数(挂起点)会在执行的时候检查协程的状态,其它情况需要手动调用 job.isAlive 或 job.ensureActive() 来检查。
- 运行出错或者调用cancel()后该Job会在遇到第一个挂起点开始取消并抛出CancellationException异常:
- 先处于Cancelling状态,没有挂起点或检查措施便不会响应取消操作直至代码块执行完毕。
才能继续执行其它,否则会存在其它协程并发执行。- 手动调用join()或遇到代码中的第一个挂起点,协程才会真正被取消,再处于Cancelled状态。推荐使用cancelAndJoin()简化调用。
- 一旦该Job被取消,该Job下的子job也会一并取消,但父Job和兄弟Job不受影响,该Job不能再用作任何新Job的父Job(不能开启新协程)。
3.3 取消的异常处理
协程通过抛出一个 CancellationException异常 来取消 Job。cancel() 可以传参使用不同的异常来指定原因,需要是 CancellationException 的子类才能取消协程。该异常不会导致父协程或兄弟协程的取消,可以使用 try-catch-finally 去捕获处理释放资源,推荐使用标准函数 use() 会自动关闭资源。
suspend fun main() = runBlocking {
//没有继承父协程的上下文,有自己的作用域,因此 runBlocking 不会等待 GlobalScope 执行完再结束。
val job = GlobalScope.launch {
try {
//耗时操作
}catch (e:Exception){
//处理异常
}finally{
//释放资源
}
}
delay(1000) //让job运行一下再取消
// job.cancel() //抛异常 JobCancellationException
// job.join() //挂起函数,这样就会等 GlobalScope 取消完再继续执行
job.cancelAndJoin() //简写
}
3.4 无法直接取消的情况(CPU密集型、没有挂起点)
由于调用cancel()操作后Job会处于Cancelling状态,此时只需判断Job是否处于活跃状态于便可以响应cancel()操作。
- CPU密集型任务无法直接被cancel()取消,因为直接取消会丢失临时计算数据。可以通过对Job状态的判断来响应cancel()操作。
- Job的取消发生在挂起点上,没有挂起点便不会响应cancel()操作,当我们使用协程却没有调用任何挂起函数的时候(做阻塞操作、神经网络学习)便会发生这种情况。
isActive 加在判断里 | public val CoroutineScope.isActive: Boolean 判断Job是否处于活跃状态(尚未完成且尚未取消)。 |
ensureActive() 写在函数里 | public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive() 返回coroutineContext扩展函数,调用Job的函数,最终调用的是 !isActive,Job处于非活跃状态就报错CancelllationException。 |
yield() 不至于抢占太多线程让其它协程拿不到执行权 | public suspend fun yield(): Unit 会检查所在协程的状态,如果已经取消则报错 CancellationException,此外会尝试让出线程执行权。 |
suspend fun main() = runBlocking {
val job = launch(Dispatchers.Default) { //该协程中无挂起点
while (isActive) { //判断出false便会取消
ensureActive() //检测出false便会取消
yield() //不至于因为任务太抢占资源导致其它协程拿不到线程执行权
println("CPU密集任务")
}
}
delay(1000) //让job运行一会儿后再取消
println("等完")
job.cancelAndJoin() //cancel()操作会将 isActive = false
println("结束")
}
3.5 一定无法取消的情况
由于我们可以捕获CancellationException异常,在 Job 真正结束前可以做一些事情,由于 Job 响应 cancel() 后已经处于 Cancelling状态,此时启动一个新协程(会被忽略)或者调用挂起函数(会抛异常CancellationException)是无法被执行的。
- 方式①:指定协程上下文为NonCancellable来得到一个常驻Job不响应 cancel()操作。
- 方式②:使用invokeOnCompletion()函数,当 Job 处于Cancelled状态或Compeleted状态时会执行回调。形参it是一个异常,没有异常值为null,协程被取消值为 CancellationException。
withContext(NonCancellable){
//不会响应取消
}
job.invodeOnCompletion{
//回调代码
}
3.6 自定义挂起函数定义取消的回调
详见回调函数改挂起函数
//定义
suspend fun getResource():StudentBean = suspendCancellableCoroutine{ continuation ->
request(object : ICallBack{
override fun onSuccess(data:String){
continuation.resume(data)
}
override fun onFailure(exception:Throwable){
continuation.resumeWithException(exception)
}
})
//定义协程取消时应该做的操作
continuation.invokeOnCancellation{ //TODO... }
}
//使用
suspend main() = runBlocking{
try{
viewModelScope.launch{
val bean = getResource()
}
}catch(e : Exception){
e.printStackTrace()
}
}
四、自定义Job
协程构建器基于其父Job构建自己的Job,如果自定义了Job便不再适用父子关系,失去了结构化并发(父协程不会等待子协程完成)。
fun main(): Unit = runBlocking {
val scope = CoroutineScope(Job())
// test1(scope) //打印:测试1---子协程1
test2(scope) //打印:测试2---子协程1 、测试2---子协程2
delay(1000)
}
fun test1(scope: CoroutineScope) {
scope.launch {
launch {
println("测试1---子协程1")
scope.cancel()
}
launch {
println("测试1---子协程2")
}
}
}
fun test2(scope: CoroutineScope) {
scope.launch {
launch(Job()) { //此处添加了一个job参数
println("测试2---子协程1")
scope.cancel()
}
launch {
println("测试2---子协程2")
}
}
}