Kotlin协程-async分析

news2024/11/24 2:03:00

概述

本章讲解协程中asyncawait的原理。前提条件是知道父子协程是如何关联的,可以看这篇协程之间父子关系1-Job如何关联的了解。

这里简单讲一下原理:使用await方法,这是一个挂载方法,协程执行到这里就会挂载(就是退出invokeSuspend方法),什么时候才能继续执行呢?

使用await方法的协程会在这个协程的state链表中添加一个ResumeAwaitOnCompletion节点,当这个协程完成时,就会执行这个ResumeAwaitOnCompletion节点的invoke方法,就会唤醒之前挂载在这里的协程,顺便也把结果带回来了。

在之前先看如下例子:你猜最后parentJob的状态是什么样的?以及isActive是true还是false?如果知道这个就跳过这节。


private fun logX(any: Any?) {
    println("[Time:${LocalTime.now()} Thread:${Thread.currentThread().name}] $any ".trimIndent())
}

fun main() {
    // 指定到单线程调度器
    val dispatcher = Executors.newSingleThreadScheduledExecutor().asCoroutineDispatcher()
    // 顶层Job
    val parentJob = Job()
    // 使用CoroutineScope管理协程
    val scope = CoroutineScope(dispatcher + parentJob)
    
    val job = scope.launch {
        logX("launch start")
        logX("launch end")
    }

    while (parentJob.isActive)
        Thread.yield()

    logX("main end")
}

首先执行到launch后,parentJobstate的值会从EMPTY_ACTIVE变成ChildHandleNode。不熟悉的可以看协程之间父子关系1-Job如何关联的。

因为job代表的协程中没有挂载点,也没有子协程,就是一个很简单的协程,所以jobstate的值一直都是EMPTY_ACTIVE

现在协程执行完了,我们看下结束的逻辑,走resumeWith方法

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    // This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        // 
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                   ....
                } else {
                    // top-level completion reached -- invoke and return
                    // 没有返回值,outcome就是Unit
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

invokeSuspend就是在执行我们代码中协程代码块,这个方法结束要么是遇到挂载点,要么就是真的结束了。所以结束就会执行resumeWith方法

public final override fun resumeWith(result: Result<T>) {
    // 没有返回值,outcome就是Unit
    val state = makeCompletingOnce(result.toState())
    if (state === COMPLETING_WAITING_CHILDREN) return
    afterResume(state)
}
 internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
        loopOnState { state ->
            val finalState = tryMakeCompleting(state, proposedUpdate)
            when {
                finalState === COMPLETING_ALREADY ->
                    throw IllegalStateException(
                        "Job $this is already complete or completing, " +
                            "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
                    )
                finalState === COMPLETING_RETRY -> return@loopOnState
                else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
            }
        }
    }
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
    	// state的值是EMPTY_ACTIVE
        if (state !is Incomplete)
            return COMPLETING_ALREADY

        if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
            // 走这里
            if (tryFinalizeSimpleState(state, proposedUpdate)) {
                // Completed successfully on fast path -- return updated state
                return proposedUpdate
            }
            return COMPLETING_RETRY
        }
        // The separate slow-path function to simplify profiling
        return tryMakeCompletingSlowPath(state, proposedUpdate)
    }
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
  	....
    // 协程state的值从EMPTY_ACTIVE变成了Unit
    if (!_state.compareAndSet(state, update.boxIncomplete())) return false
    onCancelling(null) // simple state is not a failure
    onCompletionInternal(update)
    // 前面2个方法只是回调一下
    completeStateFinalization(state, update)
    return true
}
private fun completeStateFinalization(state: Incomplete, update: Any?) {
    	// parentHandle的值就是ChildHandleNode,同时父Job中有这个句柄
        parentHandle?.let {
            // parentJob就会删除ChildHandleNode,即删除了子节点
            it.dispose() // volatile read parentHandle _after_ state was updated
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
        val cause = (update as? CompletedExceptionally)?.cause
        /*
         * 2) Invoke completion handlers: .join(), callbacks etc.
         *    It's important to invoke them only AFTER exception handling and everything else, see #208
         */
        if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
            ....
        } else {
            // 如果还有监听,就通知,比如使用invokeOnCompletion方法注册
            state.list?.notifyCompletion(cause)
        }
    }
dispose

ChildHandleNode的dispose方法谁就是 执行parentJob的removeNode方法,此时parentJob的state值还是ChildHandleNode

internal fun removeNode(node: JobNode) {
        // remove logic depends on the state of the job
        loopOnState { state ->
            when (state) {
                is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
                    if (state !== node) return // a different job node --> we were already removed
                    // try remove and revert back to empty state
                    if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
                }
                is Incomplete -> { // may have a list of completion handlers
                    // remove node from the list if there is a list
                    if (state.list != null) node.remove()
                    return
                }
                else -> return // it is complete and does not have any completion handlers
            }
        }
    }

最终parentJob的state值变成了EMPTY_ACTIVE。所以上面的代码中,isActive一直是true。除非手动调用 parentJob.complete()

示例


private fun logX(any: Any?) {
    println("[Time:${LocalTime.now()} Thread:${Thread.currentThread().name}] $any ".trimIndent())
}

fun main() {
    // 指定到单线程调度器
    val dispatcher = Executors.newSingleThreadScheduledExecutor().asCoroutineDispatcher()
    // 顶层Job
    val parentJob = Job()
    // 使用CoroutineScope管理协程
    val scope = CoroutineScope(dispatcher + parentJob)
    // 协程1
    val job = scope.launch {
        logX("launch start")
        // 协程2
        val deferred = async {
            logX("async start")
            delay(1000)
            logX("async end")
            "hello"
        }

        logX("before await")
        val result = deferred.await()
        logX(result)
        logX("launch end")
    }
	
    job.invokeOnCompletion {
        logX("Job Completion")
        // 手动使parentJob变成完成态
        parentJob.complete()
        // 关闭线程池
        dispatcher.close()
    }
    
    parentJob.invokeOnCompletion {
        // parentJob变成完成态时会触发这个监听
        logX("parentJob Completion")
    }

    while (parentJob.isActive)
        Thread.yield()

    logX("main end")
}

执行到while是parentJob的state值

在这里插入图片描述

async

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

使用async创建协程,返回值是DeferredCoroutinegetCompleted()方法就是不会挂起等待获取结果,就是直接拿到结果,如果协程不是完成状态,就会抛异常,完成状态直接获取结果。

await()是等待协程运行结束返回结果,我们这篇文章就是讲这个是怎么操作的。

DeferredCoroutine

private open class DeferredCoroutine<T>(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T> {
    override fun getCompleted(): T = getCompletedInternal() as T
    override suspend fun await(): T = awaitInternal() as T
    override val onAwait: SelectClause1<T> get() = onAwaitInternal as SelectClause1<T>
}

awaitInternal

rotected suspend fun awaitInternal(): Any? {
    // fast-path -- check state (avoid extra object creation)
    while (true) { // lock-free loop on state
        // 检查协程的状态
        val state = this.state
        if (state !is Incomplete) {
            // 协程已经是完成状态
            // already complete -- just return result
            if (state is CompletedExceptionally) { // Slow path to recover stacktrace
                // 是因为异常结束,就抛出异常
                recoverAndThrow(state.cause)
            }
            // 正常结束,返回结果
            return state.unboxState()

        }
        // 协程不是完成状态,确保协程已经被启动
        if (startInternal(state) >= 0) break // break unless needs to retry
    }
    return awaitSuspend() // slow-path
}

// returns: RETRY/FALSE/TRUE:
    //   FALSE when not new,
    //   TRUE  when started
    //   RETRY when need to retry
    private fun startInternal(state: Any?): Int {
        when (state) {
            is Empty -> { // EMPTY_X state -- no completion handlers
                if (state.isActive) return FALSE // already active
                if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
                onStart()
                return TRUE
            }
            is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
                if (!_state.compareAndSet(state, state.list)) return RETRY
                onStart()
                return TRUE
            }
            else -> return FALSE // not a new state
        }
    }

awaitSuspend

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
        // uCont就是我们协程1对象
        // uCont.intercepted()返回的是DispatchedContinuation    
        // this就是协程2中的DeferredCoroutine对象                                                                        
        val cont = AwaitContinuation(uCont.intercepted(), this)
        // we are mimicking suspendCancellableCoroutine here and call initCancellability, too.
        cont.initCancellability()
        // 在AwaitContinuation对象中注册一个ResumeAwaitOnCompletion                                                                    
        cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeAwaitOnCompletion(cont)))
        cont.getResult()
    }

uCont就是我们协程1对象,uCont.intercepted()返回的是DispatchedContinuation,协程2就在这个对象中进行调度的。initCancellability方法会在当前协程的父协程上注册一个ChildContinuation监听,万一协程发生异常,这个是用来通知子协程

在这里插入图片描述

经过awaitSuspend方法后,协程1的state如下图

在这里插入图片描述

AwaitContinuation继承自CancellableContinuationImpl

private class AwaitContinuation<T>(
    delegate: Continuation<T>,
    private val job: JobSupport
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
    override fun getContinuationCancellationCause(parent: Job): Throwable {
        val state = job.state
        /*
             * When the job we are waiting for had already completely completed exceptionally or
             * is failing, we shall use its root/completion cause for await's result.
             */
        if (state is Finishing) state.rootCause?.let { return it }
        if (state is CompletedExceptionally) return state.cause
        return parent.getCancellationException()
    }

    ....
}
public override fun initCancellability() {
     
    	// 和协程1进行关联
        val handle = installParentHandle()
            ?: return // fast path -- don't do anything without parent
    
        if (isCompleted) {
            // Can be invoked concurrently in 'parentCancelled', no problems here
            handle.dispose()
            _parentHandle.value = NonDisposableHandle
        }
    }
private fun installParentHandle(): DisposableHandle? {
    // parent就是协程1的 StandaloneCoroutine
    val parent = context[Job] ?: return null // don't do anything without a parent
    // Install the handle
    // 在协程1中注册完成时回调监听
    val handle = parent.invokeOnCompletion(handler = ChildContinuation(this))
    _parentHandle.compareAndSet(null, handle)
    return handle
}

当协程1完成时,会执行注册过的监听。

internal fun Job.invokeOnCompletion(
    invokeImmediately: Boolean = true,
    handler: JobNode,
): DisposableHandle = when (this) {
    is JobSupport -> invokeOnCompletionInternal(invokeImmediately, handler)
    else -> invokeOnCompletion(handler.onCancelling, invokeImmediately, handler::invoke)
}
internal fun invokeOnCompletionInternal(
        invokeImmediately: Boolean,
        node: JobNode // ChildContinuation
    ): DisposableHandle {
    	// 
        node.job = this
        // Create node upfront -- for common cases it just initializes JobNode.job field,
        // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
    	//当前协程1的state的值是NodeList,再添加一个ChildContinuation节点
        val added = tryPutNodeIntoList(node) { state, list ->
            if (node.onCancelling) {
                // onCancelling表示能否响应取消事件,ChildContinuation就是为了响应取消,所以为true
                
                val rootCause = (state as? Finishing)?.rootCause
                if (rootCause == null) {
                    // 添加到链表中
                    list.addLast(node, LIST_CANCELLATION_PERMISSION or LIST_ON_COMPLETION_PERMISSION)
                } else {
                   .....
                }
            } else {
               ......
            }
        }
        when {
            added -> return node
            invokeImmediately -> node.invoke((state as? CompletedExceptionally)?.cause)
        }
        return NonDisposableHandle
    }

AwaitContinuation

cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeAwaitOnCompletion(cont)))

invokeOnCompletion方法前面已经讲过了,就是往当前协程注册一个完成回调,这样协程2的state的链表上再添加一个ResumeAwaitOnCompletion节点。

协程2中执行delay也会在协程2中添加ChildContinuation节点

在这里插入图片描述

同时AwaitContinuation对象的state的值夜变成了DisposeOnCancel

public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit =
    invokeOnCancellation(handler = DisposeOnCancel(handle))
internal fun <T> CancellableContinuation<T>.invokeOnCancellation(handler: CancelHandler) = when (this) {
    is CancellableContinuationImpl -> invokeOnCancellationInternal(handler)
    else -> throw UnsupportedOperationException("third-party implementation of CancellableContinuation is not supported")
}
internal fun invokeOnCancellationInternal(handler: CancelHandler) = invokeOnCancellationImpl(handler)

    private fun invokeOnCancellationImpl(handler: Any) {
        assert { handler is CancelHandler || handler is Segment<*> }
        _state.loop { state ->
            when (state) {
                is Active -> {
                    if (_state.compareAndSet(state, handler)) return // quit on cas success
                }
        
                ....
              
       
            }
        }
    }

协程2执行完成过程

协程1执行的到await方法后就已经挂起了,等待执行完成。

协程2的state的变化是EMPTY_ACTIVE – > ResumeAwaitOnCompletion -->NodeList(ResumeAwaitOnCompletion) -->NodeList(ResumeAwaitOnCompletion,ChildContinuation)

等协程2的delay执行1s后,协程2会被重新调度

internal class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcher(), Delay {
	.....
    
	override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val future = (executor as? ScheduledExecutorService)?.scheduleBlock(
            ResumeUndispatchedRunnable(this, continuation),
            continuation.context,
            timeMillis
        )
        // If everything went fine and the scheduling attempt was not rejected -- use it
        if (future != null) {
            // continuation就是 CancellableContinuationImpl
            // 往CancellableContinuationImpl对象的state中添加CancelFutureOnCancel节点
            continuation.invokeOnCancellation(CancelFutureOnCancel(future))
            return
        }
        // Otherwise fallback to default executor
        DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
    }
	
	.....
}

scheduleResumeAfterDelay方法会在CancellableContinuationImpl注册一个CancelFutureOnCancel,万一协程被取消了,这个主要是用来删除线程池中的任务的。

时间到了会执行ResumeUndispatchedRunnable

private class ResumeUndispatchedRunnable(
    private val dispatcher: CoroutineDispatcher,
    // CancellableContinuationImpl
    private val continuation: CancellableContinuation<Unit>
) : Runnable {
    override fun run() {
        with(continuation) { dispatcher.resumeUndispatched(Unit) }
    }
}

ResumeUndispatchedRunnable中的continuation就是CancellableContinuationImpl

override fun CoroutineDispatcher.resumeUndispatched(value: T) {
    val dc = delegate as? DispatchedContinuation
    resumeImpl(value, if (dc?.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}
internal fun <R> resumeImpl(
        proposedUpdate: R,
        resumeMode: Int,
        onCancellation: ((cause: Throwable, value: R, context: CoroutineContext) -> Unit)? = null
    ) {
        _state.loop { state ->
            // state已经是CancelFutureOnCancel         
            when (state) {
               
                is NotCompleted -> {
                    val update = resumedState(state, proposedUpdate, resumeMode, onCancellation, idempotent = null)
                  
                    if (!_state.compareAndSet(state, update)) return@loop // retry on cas failure
                    detachChildIfNonResuable()
                    dispatchResume(resumeMode) // dispatch resume, but it might get cancelled in process
                    return // done
                }

                .....
            }
            alreadyResumedError(proposedUpdate) // otherwise, an error (second resume attempt)
        }
    }
// Unregister from parent job
private fun detachChildIfNonResuable() {
    // If instance is reusable, do not detach on every reuse, #releaseInterceptedContinuation will do it for us in the end
    if (!isReusable()) detachChild()
}

internal fun detachChild() {
    // parentHandle是ChildContinuation类型
    val handle = parentHandle ?: return
    // 调用ChildContinuation的dispose()方法,即把这个ChildContinuation从父job中删除
    // 即协程2会从链表中删除这个ChildContinuation节点
    handle.dispose()
    _parentHandle.value = NonDisposableHandle
}

现在协程2中还剩下一个ResumeAwaitOnCompletion节点。协程2从delay这个挂载点恢复执行了,当协程2执行完成时,最终会执行到completeStateFinalization方法。parentHandle就是ChildContinuation

private fun completeStateFinalization(state: Incomplete, update: Any?) {
        // 
        parentHandle?.let {
            // 从协程1中删除这个ChildContinuation,即删除协程2
            it.dispose() // volatile read parentHandle _after_ state was updated
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
        val cause = (update as? CompletedExceptionally)?.cause
        /*
         * 2) Invoke completion handlers: .join(), callbacks etc.
         *    It's important to invoke them only AFTER exception handling and everything else, see #208
         */
        if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
           .....
        } else {
            // list中还有一个ResumeAwaitOnCompletion
            state.list?.notifyCompletion(cause)
        }
    }

notifyCompletion方法会执行ResumeAwaitOnCompletioninvoke方法。

private class ResumeAwaitOnCompletion<T>(
    // continuation就是AwaitContinuation
    private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
    override val onCancelling get() = false
    override fun invoke(cause: Throwable?) {
        // job既是协程1
        val state = job.state
        assert { state !is Incomplete }
        if (state is CompletedExceptionally) {
            // Resume with with the corresponding exception to preserve it
            continuation.resumeWithException(state.cause)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state.unboxState() as T)
        }
    }
}

协程2完成后,会对监听器发出通知,执行它们的invkoke方法,AwaitContinuation含有调度器,会重新执行它的resume方法,将结果给协程1。

协程1执行完成过程

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
	
    ....
    
    public final override fun resumeWith(result: Result<T>) {
        val state = makeCompletingOnce(result.toState())
        if (state === COMPLETING_WAITING_CHILDREN) return
        afterResume(state)
    }
    
    ....
}

协程返回了一个字符串结果hello。所以result就是hello

internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
    	// stated的值是NodeList
        loopOnState { state ->
            val finalState = tryMakeCompleting(state, proposedUpdate)
            when {
                finalState === COMPLETING_ALREADY ->
                    throw IllegalStateException(
                        "Job $this is already complete or completing, " +
                            "but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
                    )
                finalState === COMPLETING_RETRY -> return@loopOnState
                else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
            }
        }
    }
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
    	// stated的值是NodeList
        // proposedUpdates是"hello"
        ....
        // The separate slow-path function to simplify profiling
        return tryMakeCompletingSlowPath(state, proposedUpdate)
    }
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
        // get state's list or else promote to list to correctly operate on child lists
    	// state已经是NodeList了,所以可以拿到不为null的list
        val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
        // promote to Finishing state if we are not in it yet
        // This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
        // atomically transition to finishing & completing state
        val finishing = state as? Finishing ?: Finishing(list, false, null)
        // must synchronize updates to finishing state
        val notifyRootCause: Throwable?
        synchronized(finishing) {
            // check if this state is already completing
            if (finishing.isCompleting) return COMPLETING_ALREADY
            // mark as completing
            // 标记正在完成
            finishing.isCompleting = true
            // if we need to promote to finishing, then atomically do it here.
            // We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
            // (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
            if (finishing !== state) {
                if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
            }
            // state从NodeList被修改成Finishing
            // ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
            assert { !finishing.isSealed } // cannot be sealed
            // add new proposed exception to the finishing state
            // 没有异常,所以isCancelling为false
            val wasCancelling = finishing.isCancelling
            (proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
            // If it just becomes cancelling --> must process cancelling notifications
            // notifyRootCause的值就是null
            notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
        }
        // process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!)
    	// 有异常才会通知它的子协程
        notifyRootCause?.let { notifyCancelling(list, it) }
        // now wait for children
        // we can't close the list yet: while there are active children, adding new ones is still allowed.
    
    	// job协程没有子协程,也就不需要等子协程完成
        val child = list.nextChild()
        if (child != null && tryWaitForChild(finishing, child, proposedUpdate))
            return COMPLETING_WAITING_CHILDREN
        // turns out, there are no children to await, so we close the list.
        list.close(LIST_CHILD_PERMISSION)
        // some children could have sneaked into the list, so we try waiting for them again.
        // it would be more correct to re-open the list (otherwise, we get non-linearizable behavior),
        // but it's too difficult with the current lock-free list implementation.
        val anotherChild = list.nextChild()
        if (anotherChild != null && tryWaitForChild(finishing, anotherChild, proposedUpdate))
            return COMPLETING_WAITING_CHILDREN
    
        // otherwise -- we have not children left (all were already cancelled?)
        return finalizeFinishingState(finishing, proposedUpdate)
    }
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
       // proposedUpdate是"hello"
        .....
        val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
        // Create the final exception and seal the state so that no more exceptions can be added
        val wasCancelling: Boolean
        val finalException = synchronized(state) {
            wasCancelling = state.isCancelling
            val exceptions = state.sealLocked(proposedException)
            val finalCause = getFinalRootCause(state, exceptions)
            if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
            finalCause
        }
        // Create the final state object
        val finalState = when {
            // was not cancelled (no exception) -> use proposed update value
            finalException == null -> proposedUpdate
            // small optimization when we can used proposeUpdate object as is on cancellation
            finalException === proposedException -> proposedUpdate
            // cancelled job final state
            else -> CompletedExceptionally(finalException)
        }
        // Now handle the final exception
        if (finalException != null) {
            // 如果子协程发生异常,就通知它的父协程
            val handled = cancelParent(finalException) || handleJobException(finalException)
            if (handled) (finalState as CompletedExceptionally).makeHandled()
        }
        // Process state updates for the final state before the state of the Job is actually set to the final state
        // to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
        if (!wasCancelling) onCancelling(finalException)
        onCompletionInternal(finalState)
        // Then CAS to completed state -> it must succeed
    	// 最后协程的state的值变成了"hello"
        val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
        assert { casSuccess }
        // And process all post-completion actions
        completeStateFinalization(state, finalState)
        return finalState
    }
private fun completeStateFinalization(state: Incomplete, update: Any?) {
        // 
        parentHandle?.let {
            it.dispose() // volatile read parentHandle _after_ state was updated
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
        val cause = (update as? CompletedExceptionally)?.cause
        /*
         * 2) Invoke completion handlers: .join(), callbacks etc.
         *    It's important to invoke them only AFTER exception handling and everything else, see #208
         */
        if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
           .....
        } else {
            state.list?.notifyCompletion(cause)
        }
    }

parentHandle意思就是父协程中的Handle,这里的值就是ChildHandleNode,执行它的dispose方法。就会将这个ChildHandleNode从协程1中删除。

state.list中有2个节点,一个是InvokeOnCompletion,另一个是ResumeAwaitOnCompletion

private fun NodeList.notifyCompletion(cause: Throwable?) {
    close(LIST_ON_COMPLETION_PERMISSION)
    notifyHandlers(this, cause) { true }
}
private class ResumeAwaitOnCompletion<T>(
    private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
    override val onCancelling get() = false
    override fun invoke(cause: Throwable?) {
        val state = job.state
        assert { state !is Incomplete }
        if (state is CompletedExceptionally) {
            // Resume with with the corresponding exception to preserve it
            continuation.resumeWithException(state.cause)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state.unboxState() as T)
        }
    }
}

tlin
private fun NodeList.notifyCompletion(cause: Throwable?) {
close(LIST_ON_COMPLETION_PERMISSION)
notifyHandlers(this, cause) { true }
}


```kotlin
private class ResumeAwaitOnCompletion<T>(
    private val continuation: CancellableContinuationImpl<T>
) : JobNode() {
    override val onCancelling get() = false
    override fun invoke(cause: Throwable?) {
        val state = job.state
        assert { state !is Incomplete }
        if (state is CompletedExceptionally) {
            // Resume with with the corresponding exception to preserve it
            continuation.resumeWithException(state.cause)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state.unboxState() as T)
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2229932.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

推荐一款功能强大的AI实时变声器:FliFlik Voice Changer

FliFlik VoiCE Changer是一款专注于声音变换与音频处理的创新软件&#xff0c;旨在满足从日常娱乐、游戏直播到播客制作、专业音频编辑的多种应用场景需求。无论是想在游戏中变换声音逗乐队友&#xff0c;还是在播客中塑造个性化的音效&#xff0c;这款软件都能提供灵活而强大的…

LeetCode总结-链表

一、遍历链表 1290.二进制链表转整数 2058.找出临界点之间的最小和最大距离 2181.合并零之间的节点 二、删除节点 问&#xff1a;为什么没有修改 dummy&#xff0c;但 dummy.next 却是新链表的头节点&#xff1f;如果删除了 head&#xff0c;那么最后返回的是不是原链表的头…

Apache Dubbo (RPC框架)

本文参考官方文档&#xff1a;Apache Dubbo 1. Dubbo 简介与核心功能 Apache Dubbo 是一个高性能、轻量级的开源Java RPC框架&#xff0c;用于快速开发高性能的服务。它提供了服务的注册、发现、调用、监控等核心功能&#xff0c;以及负载均衡、流量控制、服务降级等高级功能。…

【Flask】二、Flask 路由机制

目录 什么是路由&#xff1f; Flask中的路由 基本路由 动态路由 路由中的HTTP方法 路由函数返回 在Web开发中&#xff0c;路由是将URL映射到相应的处理函数的过程。Flask是一个轻量级的Web应用框架&#xff0c;提供了简单而强大的路由机制&#xff0c;使得开发者能够轻松…

强势改进!TCN-Transformer时间序列预测

强势改进&#xff01;TCN-Transformer时间序列预测 目录 强势改进&#xff01;TCN-Transformer时间序列预测预测效果基本介绍程序设计参考资料 预测效果 基本介绍 1.Matlab实现TCN-Transformer时间序列预测&#xff1b; 2.运行环境为Matlab2023b&#xff1b; 3.单个变量时间序…

六西格玛项目助力,手术机器人零部件国产化稳中求胜——张驰咨询

项目背景 XR-1000型腔镜手术机器人是某头部手术机器人企业推出的高端手术设备&#xff0c;专注于微创手术领域&#xff0c;具有高度的精确性和稳定性。而XR-1000型机器人使用的部分核心零部件长期依赖进口&#xff0c;特别是高精度电机、关节执行机构和视觉系统等&#xff0c;…

C++ 优先算法——复写零(双指针)

目录 题目&#xff1a;复写零 1. 题目解析 2. 算法原理 一. 先找到最后一个“复写”数 处理边界情况 二. 复写操作 3. 代码实现 题目&#xff1a;复写零 1. 题目解析 题目截图&#xff1a; 该题目要求的与移动零相似&#xff0c;都要在一个数组上进行操作&#xff0c;…

掌握DFMEA,让潜在设计缺陷无处遁形!

一个微小的设计缺陷&#xff0c;就可能让一款产品从市场宠儿变成过客。那么&#xff0c;如何在设计初期就精准识别并扼杀这些潜在威胁呢&#xff1f;答案就是——巧妙运用DFMEA&#xff08;设计失效模式与效应分析&#xff09;。本文&#xff0c;天行健企业管理咨询公司将详细阐…

时间序列预测(十)——长短期记忆网络(LSTM)

目录 一、LSTM结构 二、LSTM 核心思想 三、LSTM分步演练 &#xff08;一&#xff09;初始化 1、权重和偏置初始化 2、初始细胞状态和隐藏状态初始化 &#xff08;二&#xff09;前向传播 1、遗忘门计算&#xff08;决定从上一时刻隐状态中丢弃多少信息&#xff09; 2、…

FlaskFastAPIgunicornunicorn并发调用

Flask VS. FastAPI Flask和FastAPI是Python中两种流行的Web框架&#xff0c;它们各自具有不同的特点和适用场景。以下是它们之间的一些主要区别&#xff1a; 1. 框架类型 Flask&#xff1a;Flask是一个轻量级的微框架&#xff0c;适合构建小型到中型的Web应用。它灵活且易于扩展…

安装scrcpy-client模块av模块异常,环境问题解决方案

背景 使用 pip install scrcpy-client命令出现以下报错 performance hint: av\logging.pyx:232:5: Exception check on log_callback will always require the GIL to be acquired. Possible solutions: 1. Declare log_callback as noexcept if you control the definition …

Linux——常见指令及其权限理解(正在更新中)

1.指令 1.1 快速了解指令 pwd 首次登录&#xff0c;默认所处的路径 whoami 当前所用的用户的名称 ls 显示当前路径下&#xff0c;文件名称 mkdir 在当前目录下&#xff0c;创建一个文件夹/目录 cd 进入一个目录 touch 新建一个文…

胡壮麟《语言学教程》第五版PDF英文版+中文版翻译

胡壮麟《语言学教程》中文版&#xff1a;https://pan.quark.cn/s/9491130ec572 《语言学教程》&#xff08;英文版&#xff09;是一部经典的语言学教材&#xff0c;自 1988 年面世以来&#xff0c;被众多高校广泛采用&#xff0c;长销不衰。该教材自出版以来不断修订&#xff…

项目模块十二:TcpServer模块

一、模块设计思路 1、目的 对所有模块整合&#xff0c;实现一个服务器模块供外部快速搭建服务器。 2、管理 监听套接字 主 Reactor&#xff0c;创建 EventLoop _baseloop 对象&#xff0c;进行对监听套接字的管理 哈希表管理所有新连接的 Channel 创建线程池进行连接的事…

【Spring源码核心篇-01】精通Spring的bean的生命周期

Spring源码核心篇整体栏目 内容链接地址【一】Spring的bean的生命周期https://zhenghuisheng.blog.csdn.net/article/details/143441012 spring的bean的生命周期 一&#xff0c;spring中bean的生命周期1&#xff0c;生成BeanDefinition1.1&#xff0c;初始化context和BeanFacto…

IAR出现,Error [e12].Unable to open file "xxxx:_app.xcl"怎么办?

编译时出现&#xff0c;一般是拷贝过来出现这个问题。解决方法: 1&#xff0c;点击到最左边“code -Debug”上 2&#xff0c;点Project>>>options for node &#xff02;code&#xff02; 3&#xff0c;选项卡“linker”>>&#xff02;linker configuration fil…

C#与C++交互开发系列(十一):委托和函数指针传递

前言 在C#与C的互操作中&#xff0c;委托&#xff08;delegate&#xff09;和函数指针的传递是一个复杂但非常强大的功能。这可以实现从C回调C#方法&#xff0c;或者在C#中调用C函数指针的能力。无论是跨语言调用回调函数&#xff0c;还是在多线程、异步任务中使用委托&#x…

SpringBoot国际化:创建多语言支持的Web应用

SpringBoot国际化&#xff1a;创建多语言支持的Web应用 介绍 SpringBoot作为一个强大的框架&#xff0c;提供了便捷的国际化支持&#xff0c;使开发者能够轻松创建多语言支持的Web应用。通过使用SpringBoot的MessageSource&#xff0c;开发者可以在应用中实现动态的语言切换。…

如何快速搭建一个3D虚拟展厅?

随着元宇宙概念的兴起&#xff0c;一个全新的虚拟、立体数字空间正逐步成为我们生活的一部分。在这个空间里&#xff0c;用户可以沉浸其中&#xff0c;进行丰富的交互操作&#xff0c;体验前所未有的无限可能。而如何快速搭建一个属于自己的元宇宙3D虚拟展厅&#xff0c;正成为…

blender 小车建模 建模 学习笔记

一、学习blender视频教程链接 案例4&#xff1a;狂奔的小车_建模_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1Bt4y1E7qn?p14&spm_id_from333.788.videopod.episodes&vd_sourced0ea58f1127eed138a4ba5421c577eb1 二、开始建模 &#xff08;1&#xff09;创…