概述
本章讲解协程中async
,await
的原理。前提条件是知道父子协程是如何关联的,可以看这篇协程之间父子关系1-Job如何关联的了解。
这里简单讲一下原理:使用await
方法,这是一个挂载方法,协程执行到这里就会挂载(就是退出invokeSuspend方法),什么时候才能继续执行呢?
使用await
方法的协程会在这个协程的state链表中添加一个ResumeAwaitOnCompletion
节点,当这个协程完成时,就会执行这个ResumeAwaitOnCompletion
节点的invoke
方法,就会唤醒之前挂载在这里的协程,顺便也把结果带回来了。
在之前先看如下例子:你猜最后parentJob
的状态是什么样的?以及isActive
是true还是false?如果知道这个就跳过这节。
private fun logX(any: Any?) {
println("[Time:${LocalTime.now()} Thread:${Thread.currentThread().name}] $any ".trimIndent())
}
fun main() {
// 指定到单线程调度器
val dispatcher = Executors.newSingleThreadScheduledExecutor().asCoroutineDispatcher()
// 顶层Job
val parentJob = Job()
// 使用CoroutineScope管理协程
val scope = CoroutineScope(dispatcher + parentJob)
val job = scope.launch {
logX("launch start")
logX("launch end")
}
while (parentJob.isActive)
Thread.yield()
logX("main end")
}
首先执行到launch
后,parentJob
的state
的值会从EMPTY_ACTIVE
变成ChildHandleNode
。不熟悉的可以看协程之间父子关系1-Job如何关联的。
因为job
代表的协程中没有挂载点,也没有子协程,就是一个很简单的协程,所以job
的state
的值一直都是EMPTY_ACTIVE
。
现在协程执行完了,我们看下结束的逻辑,走resumeWith
方法
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
//
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
....
} else {
// top-level completion reached -- invoke and return
// 没有返回值,outcome就是Unit
completion.resumeWith(outcome)
return
}
}
}
}
invokeSuspend
就是在执行我们代码中协程代码块,这个方法结束要么是遇到挂载点,要么就是真的结束了。所以结束就会执行resumeWith
方法
public final override fun resumeWith(result: Result<T>) {
// 没有返回值,outcome就是Unit
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
loopOnState { state ->
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
// state的值是EMPTY_ACTIVE
if (state !is Incomplete)
return COMPLETING_ALREADY
if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
// 走这里
if (tryFinalizeSimpleState(state, proposedUpdate)) {
// Completed successfully on fast path -- return updated state
return proposedUpdate
}
return COMPLETING_RETRY
}
// The separate slow-path function to simplify profiling
return tryMakeCompletingSlowPath(state, proposedUpdate)
}
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
....
// 协程state的值从EMPTY_ACTIVE变成了Unit
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
onCancelling(null) // simple state is not a failure
onCompletionInternal(update)
// 前面2个方法只是回调一下
completeStateFinalization(state, update)
return true
}
private fun completeStateFinalization(state: Incomplete, update: Any?) {
// parentHandle的值就是ChildHandleNode,同时父Job中有这个句柄
parentHandle?.let {
// parentJob就会删除ChildHandleNode,即删除了子节点
it.dispose() // volatile read parentHandle _after_ state was updated
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
val cause = (update as? CompletedExceptionally)?.cause
/*
* 2) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling and everything else, see #208
*/
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
....
} else {
// 如果还有监听,就通知,比如使用invokeOnCompletion方法注册
state.list?.notifyCompletion(cause)
}
}
dispose
ChildHandleNode
的dispose方法谁就是 执行parentJob的removeNode
方法,此时parentJob的state值还是ChildHandleNode
。
internal fun removeNode(node: JobNode) {
// remove logic depends on the state of the job
loopOnState { state ->
when (state) {
is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
if (state !== node) return // a different job node --> we were already removed
// try remove and revert back to empty state
if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
}
is Incomplete -> { // may have a list of completion handlers
// remove node from the list if there is a list
if (state.list != null) node.remove()
return
}
else -> return // it is complete and does not have any completion handlers
}
}
}
最终parentJob
的state值变成了EMPTY_ACTIVE
。所以上面的代码中,isActive
一直是true。除非手动调用 parentJob.complete()
。
示例
private fun logX(any: Any?) {
println("[Time:${LocalTime.now()} Thread:${Thread.currentThread().name}] $any ".trimIndent())
}
fun main() {
// 指定到单线程调度器
val dispatcher = Executors.newSingleThreadScheduledExecutor().asCoroutineDispatcher()
// 顶层Job
val parentJob = Job()
// 使用CoroutineScope管理协程
val scope = CoroutineScope(dispatcher + parentJob)
// 协程1
val job = scope.launch {
logX("launch start")
// 协程2
val deferred = async {
logX("async start")
delay(1000)
logX("async end")
"hello"
}
logX("before await")
val result = deferred.await()
logX(result)
logX("launch end")
}
job.invokeOnCompletion {
logX("Job Completion")
// 手动使parentJob变成完成态
parentJob.complete()
// 关闭线程池
dispatcher.close()
}
parentJob.invokeOnCompletion {
// parentJob变成完成态时会触发这个监听
logX("parentJob Completion")
}
while (parentJob.isActive)
Thread.yield()
logX("main end")
}
执行到while
是parentJob的state值
async
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
使用async
创建协程,返回值是DeferredCoroutine
。getCompleted()
方法就是不会挂起等待获取结果,就是直接拿到结果,如果协程不是完成状态,就会抛异常,完成状态直接获取结果。
await()
是等待协程运行结束返回结果,我们这篇文章就是讲这个是怎么操作的。
DeferredCoroutine
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T> {
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = onAwaitInternal as SelectClause1<T>
}
awaitInternal
rotected suspend fun awaitInternal(): Any? {
// fast-path -- check state (avoid extra object creation)
while (true) { // lock-free loop on state
// 检查协程的状态
val state = this.state
if (state !is Incomplete) {
// 协程已经是完成状态
// already complete -- just return result
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
// 是因为异常结束,就抛出异常
recoverAndThrow(state.cause)
}
// 正常结束,返回结果
return state.unboxState()
}
// 协程不是完成状态,确保协程已经被启动
if (startInternal(state) >= 0) break // break unless needs to retry
}
return awaitSuspend() // slow-path
}
// returns: RETRY/FALSE/TRUE:
// FALSE when not new,
// TRUE when started
// RETRY when need to retry
private fun startInternal(state: Any?): Int {
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return FALSE // already active
if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
onStart()
return TRUE
}
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
if (!_state.compareAndSet(state, state.list)) return RETRY
onStart()
return TRUE
}
else -> return FALSE // not a new state
}
}
awaitSuspend
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
// uCont就是我们协程1对象
// uCont.intercepted()返回的是DispatchedContinuation
// this就是协程2中的DeferredCoroutine对象
val cont = AwaitContinuation(uCont.intercepted(), this)
// we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
cont.initCancellability()
// 在AwaitContinuation对象中注册一个ResumeAwaitOnCompletion
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeAwaitOnCompletion(cont)))
cont.getResult()
}
uCont
就是我们协程1对象,uCont.intercepted()
返回的是DispatchedContinuation
,协程2就在这个对象中进行调度的。initCancellability
方法会在当前协程的父协程上注册一个ChildContinuation
监听,万一协程发生异常,这个是用来通知子协程
经过awaitSuspend
方法后,协程1的state如下图
AwaitContinuation
继承自CancellableContinuationImpl
,
private class AwaitContinuation<T>(
delegate: Continuation<T>,
private val job: JobSupport
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
override fun getContinuationCancellationCause(parent: Job): Throwable {
val state = job.state
/*
* When the job we are waiting for had already completely completed exceptionally or
* is failing, we shall use its root/completion cause for await's result.
*/
if (state is Finishing) state.rootCause?.let { return it }
if (state is CompletedExceptionally) return state.cause
return parent.getCancellationException()
}
....
}
public override fun initCancellability() {
// 和协程1进行关联
val handle = installParentHandle()
?: return // fast path -- don't do anything without parent
if (isCompleted) {
// Can be invoked concurrently in 'parentCancelled', no problems here
handle.dispose()
_parentHandle.value = NonDisposableHandle
}
}
private fun installParentHandle(): DisposableHandle? {
// parent就是协程1的 StandaloneCoroutine
val parent = context[Job] ?: return null // don't do anything without a parent
// Install the handle
// 在协程1中注册完成时回调监听
val handle = parent.invokeOnCompletion(handler = ChildContinuation(this))
_parentHandle.compareAndSet(null, handle)
return handle
}
当协程1完成时,会执行注册过的监听。
internal fun Job.invokeOnCompletion(
invokeImmediately: Boolean = true,
handler: JobNode,
): DisposableHandle = when (this) {
is JobSupport -> invokeOnCompletionInternal(invokeImmediately, handler)
else -> invokeOnCompletion(handler.onCancelling, invokeImmediately, handler::invoke)
}
internal fun invokeOnCompletionInternal(
invokeImmediately: Boolean,
node: JobNode // ChildContinuation
): DisposableHandle {
//
node.job = this
// Create node upfront -- for common cases it just initializes JobNode.job field,
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
//当前协程1的state的值是NodeList,再添加一个ChildContinuation节点
val added = tryPutNodeIntoList(node) { state, list ->
if (node.onCancelling) {
// onCancelling表示能否响应取消事件,ChildContinuation就是为了响应取消,所以为true
val rootCause = (state as? Finishing)?.rootCause
if (rootCause == null) {
// 添加到链表中
list.addLast(node, LIST_CANCELLATION_PERMISSION or LIST_ON_COMPLETION_PERMISSION)
} else {
.....
}
} else {
......
}
}
when {
added -> return node
invokeImmediately -> node.invoke((state as? CompletedExceptionally)?.cause)
}
return NonDisposableHandle
}
AwaitContinuation
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeAwaitOnCompletion(cont)))
invokeOnCompletion
方法前面已经讲过了,就是往当前协程注册一个完成回调,这样协程2的state
的链表上再添加一个ResumeAwaitOnCompletion
节点。
协程2中执行delay也会在协程2中添加ChildContinuation节点
同时AwaitContinuation
对象的state的值夜变成了DisposeOnCancel
public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
invokeOnCancellation(handler = DisposeOnCancel(handle))
internal fun <T> CancellableContinuation<T>.invokeOnCancellation(handler: CancelHandler) = when (this) {
is CancellableContinuationImpl -> invokeOnCancellationInternal(handler)
else -> throw UnsupportedOperationException("third-party implementation of CancellableContinuation is not supported")
}
internal fun invokeOnCancellationInternal(handler: CancelHandler) = invokeOnCancellationImpl(handler)
private fun invokeOnCancellationImpl(handler: Any) {
assert { handler is CancelHandler || handler is Segment<*> }
_state.loop { state ->
when (state) {
is Active -> {
if (_state.compareAndSet(state, handler)) return // quit on cas success
}
....
}
}
}
协程2执行完成过程
协程1执行的到await
方法后就已经挂起了,等待执行完成。
协程2的state的变化是EMPTY_ACTIVE
– > ResumeAwaitOnCompletion
-->NodeList(ResumeAwaitOnCompletion)
-->NodeList(ResumeAwaitOnCompletion,ChildContinuation)
。
等协程2的delay执行1s后,协程2会被重新调度
internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
.....
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
ResumeUndispatchedRunnable(this, continuation),
continuation.context,
timeMillis
)
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
// continuation就是 CancellableContinuationImpl
// 往CancellableContinuationImpl对象的state中添加CancelFutureOnCancel节点
continuation.invokeOnCancellation(CancelFutureOnCancel(future))
return
}
// Otherwise fallback to default executor
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}
.....
}
scheduleResumeAfterDelay
方法会在CancellableContinuationImpl
注册一个CancelFutureOnCancel
,万一协程被取消了,这个主要是用来删除线程池中的任务的。
时间到了会执行ResumeUndispatchedRunnable
。
private class ResumeUndispatchedRunnable(
private val dispatcher: CoroutineDispatcher,
// CancellableContinuationImpl
private val continuation: CancellableContinuation<Unit>
) : Runnable {
override fun run() {
with(continuation) { dispatcher.resumeUndispatched(Unit) }
}
}
ResumeUndispatchedRunnable
中的continuation
就是CancellableContinuationImpl
override fun CoroutineDispatcher.resumeUndispatched(value: T) {
val dc = delegate as? DispatchedContinuation
resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
internal fun <R> resumeImpl(
proposedUpdate: R,
resumeMode: Int,
onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)? = null
) {
_state.loop { state ->
// state已经是CancelFutureOnCancel
when (state) {
is NotCompleted -> {
val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
detachChildIfNonResuable()
dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
return // done
}
.....
}
alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
}
}
// Unregister from parent job
private fun detachChildIfNonResuable() {
// If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end
if (!isReusable()) detachChild()
}
internal fun detachChild() {
// parentHandle是ChildContinuation类型
val handle = parentHandle ?: return
// 调用ChildContinuation的dispose()方法,即把这个ChildContinuation从父job中删除
// 即协程2会从链表中删除这个ChildContinuation节点
handle.dispose()
_parentHandle.value = NonDisposableHandle
}
现在协程2中还剩下一个ResumeAwaitOnCompletion
节点。协程2从delay这个挂载点恢复执行了,当协程2执行完成时,最终会执行到completeStateFinalization
方法。parentHandle
就是ChildContinuation
。
private fun completeStateFinalization(state: Incomplete, update: Any?) {
//
parentHandle?.let {
// 从协程1中删除这个ChildContinuation,即删除协程2
it.dispose() // volatile read parentHandle _after_ state was updated
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
val cause = (update as? CompletedExceptionally)?.cause
/*
* 2) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling and everything else, see #208
*/
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
.....
} else {
// list中还有一个ResumeAwaitOnCompletion
state.list?.notifyCompletion(cause)
}
}
notifyCompletion
方法会执行ResumeAwaitOnCompletion
的invoke
方法。
private class ResumeAwaitOnCompletion<T>(
// continuation就是AwaitContinuation
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override val onCancelling get() = false
override fun invoke(cause: Throwable?) {
// job既是协程1
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
// Resume with with the corresponding exception to preserve it
continuation.resumeWithException(state.cause)
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state.unboxState() as T)
}
}
}
协程2完成后,会对监听器发出通知,执行它们的invkoke
方法,AwaitContinuation
含有调度器,会重新执行它的resume
方法,将结果给协程1。
协程1执行完成过程
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
....
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
}
....
}
协程返回了一个字符串结果hello
。所以result
就是hello
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
// stated的值是NodeList
loopOnState { state ->
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
// stated的值是NodeList
// proposedUpdates是"hello"
....
// The separate slow-path function to simplify profiling
return tryMakeCompletingSlowPath(state, proposedUpdate)
}
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
// get state's list or else promote to list to correctly operate on child lists
// state已经是NodeList了,所以可以拿到不为null的list
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
// promote to Finishing state if we are not in it yet
// This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
// atomically transition to finishing & completing state
val finishing = state as? Finishing ?: Finishing(list, false, null)
// must synchronize updates to finishing state
val notifyRootCause: Throwable?
synchronized(finishing) {
// check if this state is already completing
if (finishing.isCompleting) return COMPLETING_ALREADY
// mark as completing
// 标记正在完成
finishing.isCompleting = true
// if we need to promote to finishing, then atomically do it here.
// We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
// (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
if (finishing !== state) {
if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
}
// state从NodeList被修改成Finishing
// ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
assert { !finishing.isSealed } // cannot be sealed
// add new proposed exception to the finishing state
// 没有异常,所以isCancelling为false
val wasCancelling = finishing.isCancelling
(proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
// If it just becomes cancelling --> must process cancelling notifications
// notifyRootCause的值就是null
notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
}
// process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!)
// 有异常才会通知它的子协程
notifyRootCause?.let { notifyCancelling(list, it) }
// now wait for children
// we can't close the list yet: while there are active children, adding new ones is still allowed.
// job协程没有子协程,也就不需要等子协程完成
val child = list.nextChild()
if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
return COMPLETING_WAITING_CHILDREN
// turns out, there are no children to await, so we close the list.
list.close(LIST_CHILD_PERMISSION)
// some children could have sneaked into the list, so we try waiting for them again.
// it would be more correct to re-open the list (otherwise, we get non-linearizable behavior),
// but it's too difficult with the current lock-free list implementation.
val anotherChild = list.nextChild()
if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate))
return COMPLETING_WAITING_CHILDREN
// otherwise -- we have not children left (all were already cancelled?)
return finalizeFinishingState(finishing, proposedUpdate)
}
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
// proposedUpdate是"hello"
.....
val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
// Create the final exception and seal the state so that no more exceptions can be added
val wasCancelling: Boolean
val finalException = synchronized(state) {
wasCancelling = state.isCancelling
val exceptions = state.sealLocked(proposedException)
val finalCause = getFinalRootCause(state, exceptions)
if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
finalCause
}
// Create the final state object
val finalState = when {
// was not cancelled (no exception) -> use proposed update value
finalException == null -> proposedUpdate
// small optimization when we can used proposeUpdate object as is on cancellation
finalException === proposedException -> proposedUpdate
// cancelled job final state
else -> CompletedExceptionally(finalException)
}
// Now handle the final exception
if (finalException != null) {
// 如果子协程发生异常,就通知它的父协程
val handled = cancelParent(finalException) || handleJobException(finalException)
if (handled) (finalState as CompletedExceptionally).makeHandled()
}
// Process state updates for the final state before the state of the Job is actually set to the final state
// to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
if (!wasCancelling) onCancelling(finalException)
onCompletionInternal(finalState)
// Then CAS to completed state -> it must succeed
// 最后协程的state的值变成了"hello"
val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
assert { casSuccess }
// And process all post-completion actions
completeStateFinalization(state, finalState)
return finalState
}
private fun completeStateFinalization(state: Incomplete, update: Any?) {
//
parentHandle?.let {
it.dispose() // volatile read parentHandle _after_ state was updated
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
val cause = (update as? CompletedExceptionally)?.cause
/*
* 2) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling and everything else, see #208
*/
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
.....
} else {
state.list?.notifyCompletion(cause)
}
}
parentHandle
意思就是父协程中的Handle,这里的值就是ChildHandleNode
,执行它的dispose
方法。就会将这个ChildHandleNode
从协程1中删除。
state.list
中有2个节点,一个是InvokeOnCompletion
,另一个是ResumeAwaitOnCompletion
。
private fun NodeList.notifyCompletion(cause: Throwable?) {
close(LIST_ON_COMPLETION_PERMISSION)
notifyHandlers(this, cause) { true }
}
private class ResumeAwaitOnCompletion<T>(
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override val onCancelling get() = false
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
// Resume with with the corresponding exception to preserve it
continuation.resumeWithException(state.cause)
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state.unboxState() as T)
}
}
}
tlin
private fun NodeList.notifyCompletion(cause: Throwable?) {
close(LIST_ON_COMPLETION_PERMISSION)
notifyHandlers(this, cause) { true }
}
```kotlin
private class ResumeAwaitOnCompletion<T>(
private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
override val onCancelling get() = false
override fun invoke(cause: Throwable?) {
val state = job.state
assert { state !is Incomplete }
if (state is CompletedExceptionally) {
// Resume with with the corresponding exception to preserve it
continuation.resumeWithException(state.cause)
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state.unboxState() as T)
}
}
}