Kotlin协程笔记:CoroutineScope管理协程

news2025/1/21 0:53:52

         CoroutineScope 是实现协程结构化并发的关键。使用 CoroutineScope,可以批量管理同一个作用域下面所有的协程。

 

        CoroutineScope 与 结构化并发

        launch、async 被定义成了 CoroutineScope 扩展函数。在调用 launch 之前,必须先获取 CoroutineScope。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

        为何要设计成扩展方法?

        

fun main() {
    testCoroutinueScope()
}

private fun showScopeLog(any: Any?) {
    println("""$any Thread:${Thread.currentThread().name}""".trimIndent())
}

private fun testCoroutinueScope() {
    val scope = CoroutineScope(Job())
    scope.launch {
        launch {
            delay(1000000L)
            showScopeLog("Inner")
        }
        showScopeLog("Hello")
        delay(1000000L)
        showScopeLog("World") //不执行
    }

    scope.launch {
        launch {
            delay(1000000L)
            showScopeLog("Inner!!!")
        }
        showScopeLog("Hello!!!")
        delay(1000000L)
        showScopeLog("World!!!") //不执行
    }
    Thread.sleep(500L)
    scope.cancel()
}



Log:

Hello Thread:DefaultDispatcher-worker-1 @coroutine#1
Hello!!! Thread:DefaultDispatcher-worker-3 @coroutine#2

Process finished with exit code 0

 scope 创建了两个顶层的协程,接着,在协程的内部我们使用 launch 又创建了一个子协程。最后,在协程的外部等待了 500 毫秒,并且调用了 scope.cancel()。结果前面创建的 4 个协程就全部都取消了。

         父协程是属于 Scope 的,子协程是属于父协程的,只要调用了 scope.cancel(),这 4 个协程都会被取消。

        

        CoroutineScope 管理协程的能力,源自于 Job。

        父子协程关系如何建立?-CoroutineScope 如何通过 Job 来管理协程。

        CoroutineScope 是一个接口,为什么可以调用其构造函数,来创建 CoroutineScope 对象?不应该使用 object 关键字创建匿名内部类吗?

        调用 CoroutineScope() 并不是构造函数,而是一个顶层函数。

private fun testCoroutinueScope() {
    val scope = CoroutineScope(Job())
    scope.launch {
        launch {
            delay(1000000L)
            showScopeLog("Inner")
        }
        showScopeLog("Hello")
        delay(1000000L)
        showScopeLog("World") //不执行
    }

    scope.launch {
        launch {
            delay(1000000L)
            showScopeLog("Inner!!!")
        }
        showScopeLog("Hello!!!")
        delay(1000000L)
        showScopeLog("World!!!") //不执行
    }
    Thread.sleep(500L)
    scope.cancel()
}

public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
    ContextScope(if (context[Job] != null) context else context + Job())
internal class ContextScope(context: CoroutineContext) : CoroutineScope {
    override val coroutineContext: CoroutineContext = context
    // CoroutineScope is used intentionally for user-friendly representation
    override fun toString(): String = "CoroutineScope(coroutineContext=$coroutineContext)"
}
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

         Kotlin 当中的函数名称,在大部分情况下都是遵循“驼峰命名法”的,而在一些特殊情况下则不遵循这种命名法。上面的顶层函数 CoroutineScope(),其实就属于特殊的情况,因为它虽然是一个普通的顶层函数,但它发挥的作用却是“构造函数”。类似的用法,还有 Job() 这个顶层函数。

        在 Kotlin 当中,当顶层函数作为构造函数使用的时候,首字母是要大写的。

        创建 CoroutineScope 的时候,如果传入的 Context 是包含 Job 的,那就直接用;如果是不包含 Job 的,就会创建一个新的 Job。这就意味着,每一个 CoroutineScope 对象,它的 Context 当中必定存在一个 Job 对象。代码中的 CoroutineScope(Job()),改成 CoroutineScope() 也是完全没问题的。

        

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}



private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}

private class LazyStandaloneCoroutine(
    parentContext: CoroutineContext,
    block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
    private val continuation = block.createCoroutineUnintercepted(this, this)

    override fun onStart() {
        continuation.startCoroutineCancellable(this)
    }
}

StandaloneCoroutine 是 AbstractCoroutine 的子类,AbstractCoroutine 代表了协程的抽象类。另外这里有一个 initParentJob 参数,它是 true,代表了协程创建了以后,需要初始化协程的父子关系。而 LazyStandaloneCoroutine 则是 StandaloneCoroutine 的子类,它的 active 参数是 false,代表了以懒加载的方式创建协程。

public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    ... ...
 init {
        /*
         * Setup parent-child relationship between the parent in the context and the current coroutine.
         * It may cause this coroutine to become _cancelling_ if the parent is already cancelled.
         * It is dangerous to install parent-child relationship here if the coroutine class
         * operates its state from within onCancelled or onCancelling
         * (with exceptions for rx integrations that can't have any parent)
         */
        if (initParentJob) initParentJob(parentContext[Job])
    }
}

AbstractCoroutine 是 JobSupport 的子类,在 init{} 代码块当中,根据 initParentJob 参数,判断是否需要初始化协程的父子关系。initParentJob 是 true,所以这里的 initParentJob() 方法一定会执行,而它的参数 parentContext[Job]取出来的 Job,其实就是在 Scope 当中的 Job。

 initParentJob() 方法,是它的父类 JobSupport 当中的方法。




public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob, SelectClause0 {
    final override val key: CoroutineContext.Key<*> get() = Job

    protected fun initParentJob(parent: Job?) {
        assert { parentHandle == null }
        
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        
        parent.start()
        @Suppress("DEPRECATION")
        
        val handle = parent.attachChild(this)
        parentHandle = handle

        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle 
        }
    }
}


public interface Job : CoroutineContext.Element {
    public val children: Sequence<Job>   
    public fun attachChild(child: ChildJob): ChildHandle
}

上面的代码一共有三个地方需要注意,我们来分析一下:

首先判断传入的 parent 是否为空,如果 parent 为空,说明当前的协程不存在父 Job,就不需要创建协程父子关系。

然后确保 parent 对应的 Job 启动了。

parent.attachChild(this)它会将当前的 Job,添加为 parent 的子 Job。这里其实就是建立协程父子关系的关键代码。

 协程是如何“结构化取消”的?

 协程的结构化取消,本质上是事件的传递。

public fun CoroutineScope.cancel(cause: CancellationException? = null) {
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)
}

         CoroutineScope 的 cancel() 方法,本质上是调用了它当中的 Job.cancel()。而这个方法的具体实现在 JobSupport 当中




public override fun cancel(cause: CancellationException?) {
    cancelInternal(cause ?: defaultCancellationException())
}

public open fun cancelInternal(cause: Throwable) {
    cancelImpl(cause)
}

internal fun cancelImpl(cause: Any?): Boolean {
    var finalState: Any? = COMPLETING_ALREADY
    if (onCancelComplete) {
        finalState = cancelMakeCompleting(cause)
        if (finalState === COMPLETING_WAITING_CHILDREN) return true
    }
    if (finalState === COMPLETING_ALREADY) {
        finalState = makeCancelling(cause)
    }
    return when {
        finalState === COMPLETING_ALREADY -> true
        finalState === COMPLETING_WAITING_CHILDREN -> true
        finalState === TOO_LATE_TO_CANCEL -> false
        else -> {
            afterCompletion(finalState)
            true
        }
    }
}
if (onCancelComplete) {        
        finalState = cancelMakeCompleting(cause)        
        if (finalState === COMPLETING_WAITING_CHILDREN) 
        return true   
}

job.cancel() 最终会调用 JobSupport 的 cancelImpl() 方法。上面的代码中onCancelComplete 是 Boolean 类型的成员属性。代表了当前的 Job,是否有协程体需要执行。另外,由于 CoroutineScope 当中的 Job 是手动创建的,并不需要执行任何协程代码,所以,它会是 true。继续分析 cancelMakeCompleting() 方法:

private fun cancelMakeCompleting(cause: Any?): Any? {
    loopOnState { state ->
        
        val finalState = tryMakeCompleting(state, proposedUpdate)
        if (finalState !== COMPLETING_RETRY) return finalState
    }
}

private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
    if (state !is Incomplete)
        return COMPLETING_ALREADY

       
        return COMPLETING_RETRY
    }

    return tryMakeCompletingSlowPath(state, proposedUpdate)
}

private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
    
    notifyRootCause?.let { notifyCancelling(list, it) }

    return finalizeFinishingState(finishing, proposedUpdate)
}

 cancelMakeCompleting() 会调用 tryMakeCompleting() 方法,最终则会调用 tryMakeCompletingSlowPath() 当中的 notifyCancelling() 方法。所以,它才是最关键的代码。

private fun notifyCancelling(list: NodeList, cause: Throwable) {
    onCancelling(cause)
    notifyHandlers<JobCancellingNode>(list, cause)
    cancelParent(cause)
}
  •  通知子Job
notifyHandlers<JobCancellingNode>(list, cause)
  • 通知父Job
 cancelParent(cause)

 

通知子Job流程:

private inline fun <reified T: JobNode> notifyHandlers(list: NodeList, cause: Throwable?) {
    var exception: Throwable? = null
    list.forEach<T> { node ->
        try {
            node.invoke(cause)
        } catch (ex: Throwable) {
            exception?.apply { addSuppressedThrowable(ex) } ?: run {
                exception =  CompletionHandlerException("Exception in completion handler $node for $this", ex)
            }
        }
    }
    exception?.let { handleOnCompletionException(it) }
}

 遍历当前 Job 的子 Job,并将取消的 cause 传递过去,这里的 invoke() 最终会调用 ChildHandleNode 的 invoke() 方法:


internal class ChildHandleNode(
    @JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
    override val parent: Job get() = job
    override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
    override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}

public final override fun parentCancelled(parentJob: ParentJob) {
    cancelImpl(parentJob)
}

 ChildHandleNode 的 invoke() 方法会调用 parentCancelled() 方法,而它最终会调用 cancelImpl() 方法。 Job 取消的入口函数。这实际上就相当于在做递归调用。

通知父 Job 的流程:


private fun cancelParent(cause: Throwable): Boolean {
    if (isScopedCoroutine) return true

    val isCancellation = cause is CancellationException
    val parent = parentHandle

    if (parent === null || parent === NonDisposableHandle) {
        return isCancellation
    }
    
    return parent.childCancelled(cause) || isCancellation
}

 这个函数的返回值返回 true 代表父协程处理了异常,而返回 false,代表父协程没有处理异常。这种类似责任链的设计模式。


public open fun childCancelled(cause: Throwable): Boolean {
    if (cause is CancellationException) return true
    return cancelImpl(cause) && handlesException
}

 当异常是 CancellationException 的时候,协程是会进行特殊处理的。一般来说,父协程会忽略子协程的取消异常。而如果是其他的异常,那么父协程就会响应子协程的取消了。代码又会继续递归调用cancelImpl() 方法了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/105872.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

高级又高效的属性表编辑技术,你值得拥有!

当你要按关键词选择内容,无从下手? 当你要修改字段的部分内容,不知所措? 当你要提取出字段的某些内容,毫无头绪? 当你要按照位数来提取字符内容,力不从心? 当你要为字段补充一些新的内容,目瞪口呆? 当你要把一个字段分割成多个字段,抓耳挠腮? ...... 属性表的…

CUDA ~ WarpReduce

又是一篇关于cuda的 要好好学学哦, CUDA 编程进阶分享&#xff0c;一些 warp 的使用 如何实现一个高效的Softmax CUDA kernel&#xff1f;多少还是有些细节没有理解&#xff0c;恰好最近要做一个类似的 ReduceScale Kernel&#xff0c;原理机制还是比较相似的&#xff0c;所以…

CSS -- 10. 移动WEB开发之rem布局

文章目录移动WEB开发之rem布局1 rem基础2 媒体查询2.1 什么是媒体查询2.2 语法规范2.3 mediatype 查询类型2.4 关键字2.5 媒体特性2.6 案例&#xff1a;根据页面宽度改变背景颜色2.7 媒体查询rem实现元素动态大小变化2.8 针对不同的屏幕尺寸引入不同的样式文件3 Less基础3.1 维…

8000字详解Thread Pool Executor

摘要&#xff1a;Java是如何实现和管理线程池的?本文分享自华为云社区《JUC线程池: ThreadPoolExecutor详解》&#xff0c;作者&#xff1a;龙哥手记 。 带着大厂的面试问题去理解 提示 请带着这些问题继续后文&#xff0c;会很大程度上帮助你更好的理解相关知识点。pdai …

数据泄露成数据安全最大风险,企业如何预防呢?

据《中国政企机构数据安全风险分析报告》显示&#xff0c;2022年1月——2022年10月&#xff0c;安全内参共收录全球政企机构重大数据安全报道180起&#xff0c;其中数据泄露相关安全事件高达93起&#xff0c;占51.7%。与近三年平均每月公开报道频次相比&#xff0c;2022年相较前…

如何在3DMAX中不使用Maxscript或插件破碎物体对象?

在3DMAX中破碎物体我们通常会借助Maxscript或者插件&#xff0c;其实&#xff0c;不借助任何其他工具&#xff0c;3DMAX也可以实现对物体的破碎&#xff0c;下面就给大家介绍一种方法&#xff1a; 1.首先&#xff0c;创建一个破碎对象&#xff0c;比如一个石块&#xff08;或者…

AI趋势下,小布助手的进化论

“要构建人工智能等高精尖产业的新增长引擎”&#xff0c;随着人工智能在未来全球科技经济中的重要作用愈加凸显&#xff0c;当前产业已然获得了有史以来最强的政策建构力量。 随着政策的利好&#xff0c;中国人工智能进入一个前所未有的快速发展阶段。企查查数据显示&#xf…

疫情下的在线教学数据观

由于新型冠状病毒感染的肺炎疫情影响&#xff0c;剧烈增长的市场需求助推了在线教育的发展&#xff0c;同时也暴露了一些问题。 最近我们被客户要求撰写关于疫情的研究报告&#xff0c;包括一些图形和统计输出。 在本文中&#xff0c;我们结合了对100多个高中学生进行的在线教…

快讯 | 嘉为蓝鲸受邀出席汽车新智造数字行业峰会,助力构建数字时代竞争力!

12月9日&#xff0c;第五届GADI汽车新智造数字创新行业峰会暨年度评选盛典于上海圆满落幕&#xff0c;嘉为蓝鲸受邀出席。本届大会以“数智创新 赋能破局”为主题&#xff0c;多方面切入解读新能源汽车的数字化发展趋势&#xff0c;助力车企构建数字时代竞争力。 01 研运一体&a…

数据通信基础 - 信道特性(奈奎斯特定理、香农定理 )

文章目录1 概述1.1 通信系统模型图2 信道特性2.1 信道带宽 W2.2 奈奎斯特定理 - 无噪音2.3 香农定理 - 有噪音2.4 带宽、码元速率、数据速率 关系梳理3 网工软考真题1 概述 1.1 通信系统模型图 通信的目的&#xff1a;传递信息 2 信道特性 2.1 信道带宽 W 模拟信道&#…

数据中台选型必读(六):说说数据服务的七大核心功能

在前面的文章中&#xff0c;我们介绍了数据中台的元数据中心、指标字典与指标体系、数据模型设计、数据质量评估等内容&#xff0c;这些都是One Data理念下数据中台架构的重要部分。 我们今天要讲的One Service——统一数据服务&#xff0c;指的是由数据中台提供统一的数据接入…

搭建自动发卡网站搭建教程(独角数卡)保姆级教程,支付 + 图文

自动发卡网站 程序是开源的独角数卡 我搭建了一个这样的 wooknow自动销售发卡http://ok.54ndd.com/ 一个在线销售虚拟产品的平台。你应该见过这样的发卡平台。一些虚拟产品&#xff0c;如软件、激活码和会员可以放在上面出售。我在这里使用的发卡项目是一个开源的单字符数字…

Matplotlib怎么创建 axes 对象?

在 matplotlib 中&#xff0c;有几种常见的方法来创建 axes 对象&#xff1a; 1.使用 subplots 函数&#xff1a; import matplotlib.pyplot as pltfig, ax plt.subplots()subplots 函数会创建一个新的图形&#xff08;figure&#xff09;并返回一个包含单个子区域&#xff…

二肽Ala-Pro,13485-59-1

Substrate for skin fibroblast prolidase.皮肤成纤维细胞prolida酶的底物。 编号: 199181中文名称: 二肽Ala-Pro英文名: Ala-ProCAS号: 13485-59-1单字母: H2N-AP-OH三字母: H2N-Ala-Pro-COOH氨基酸个数: 2分子式: C8H14N2O3平均分子量: 186.21精确分子量: 186.1等电点(PI): 6…

【git 提交、撤销、回退代码】

git 提交、撤销、回退代码git push后 发现提交分支错误 --> 回退代码git 未push、取消commit(保留代码&#xff09;git 未push、取消commit(不保留代码&#xff09;git push后 发现提交分支错误 --> 回退代码 首先 git log 查看提交记录&#xff0c; 找到需要回退到哪次…

CSRF实战案例—绕过referer值验证

在一个添加管理员的界面引起了我的注意 尝试添加一个管理员,如下添加成功,我们可以观察其请求包中并未存在token字段,可能存在csrf漏洞。但是存在“Referer”和“Origin”字段 我们把referer字段删了只剩origin,查看是否可以请求成功,发现可以请求成功 两个值都删了,请求…

PGL 系列(四)词向量 CBOW

环境 python 3.6.8paddlepaddle-gpu 2.3.0numpy 1.19.5一、CBOW 概念 CBOW:通过上下文的词向量推理中心词 在CBOW中,先在句子中选定一个中心词,并把其它词作为这个中心词的上下文。如 上图 CBOW所示,把“spiked”作为中心词,把“Pineapples、are、and、yellow”作为中心词…

【车载开发系列】UDS诊断---控制DTC设置($0x85)

【车载开发系列】UDS诊断—控制DTC设置&#xff08;$0x85&#xff09; UDS诊断---控制DTC设置&#xff08;$0x85&#xff09; 【车载开发系列】UDS诊断---控制DTC设置&#xff08;$0x85&#xff09;一.概念定义常见汽车故障二.子功能三.报文格式1&#xff09;报文请求2&#xf…

索引的底层实现原理是什么?

索引存储在内存中&#xff0c;为服务器存储引擎为了快速找到记录的一种数据结构。索引的主要作用是加快数据查找速度&#xff0c;提高数据库的性能。 索引的分类 (1) 普通索引&#xff1a;最基本的索引&#xff0c;它没有任何限制。 (2) 唯一索引&#xff1a;与普通索引类似…

计算机毕设Python+Vue研究生培养过程管理系统(程序+LW+部署)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…