Kotlin 协程官方文档知识汇总(一)

news2025/3/30 7:09:18

1、协程基础

Kotlin 是一门仅在标准库中提供最基本底层 API 以便其他库能够利用协程的语言。与许多其他具有类似功能的语言不同,asyncawait 在 Kotlin 中并不是关键字,甚至都不是标准库的一部分。此外,Kotlin 的挂起函数概念为异步操作提供了比 future 与 promise 更安全、更不易出错的抽象。

kotlinx.coroutines 是由 JetBrains 开发的功能丰富的协程库。使用协程需要添加对 kotlinx-coroutines-core 模块的依赖,如项目的 readme 文件所述。

1.1 第一个协程

协程是一个可以挂起(suspend)的计算实例。在概念上它类似于线程,因为它需要运行一段与其他代码并发的代码块。但是,协程不绑定到任何特定的线程。它可以在一个线程中暂停执行,然后在另一个线程中恢复执行。

协程可以被视为轻量级线程,但有很多重要的差异使得它们在实际使用中与线程很不同。

运行以下代码获取你的第一个工作协程:

fun main() = runBlocking { // this: CoroutineScope
    launch { // launch a new coroutine and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello") // main coroutine continues while a previous one is delayed
}

// 输出结果:
Hello
World!

launch 是一个协程构建器。它启动一个与其他代码并发的新协程,该协程继续独立工作。

delay 是一个特殊的挂起函数,它会协程挂起一定的时间。挂起协程不会阻塞底层线程,而是允许其他协程运行并使用底层线程执行它们的代码。

runBlocking 也是一个协程构建器,它将一个常规的 main() 的非协程世界与 runBlocking {…} 大括号内的协程代码连接起来。在 IDE 中,在 runBlocking {…} 的起始括号后面会有 this:CoroutineScope 的提示,如果你删除或者忘记在上述代码中添加 runBlocking {…},那么 launch 的调用会收到错误,因为它只能在 CoroutineScope 内使用。

runBlocking 这个名字表示运行它的线程(在这个例子中是主线程)在被调用期间会被阻塞,直到 runBlocking 内的所有协程执行完毕。你经常会在应用程序的最顶层看到这样使用 runBlocking,而在真正的代码之内则很少如此,因为线程是代价高昂的资源,阻塞线程是比较低效的,我们通常并不希望阻塞线程。

结构化并发

协程遵循结构化并发,意思是说新的协程只能在一个指定的 CoroutineScope 内启动,CoroutineScope 界定了协程的生命周期。上面的例子展示了 runBlocking 建立了相应的作用域(Scope)。

在真实的应用程序中,你会启动很多协程。结构化的并发保证协程不会丢失或泄露。直到所有子协程结束之前,外层的作用范围不会结束。结构化的并发还保证代码中的任何错误都会正确的向外报告,不会丢失。

1.2 提取函数重构

假如你想要将 launch 代码块内的代码抽取到一个单独的函数中,当你在 IDE 中点击 “Extract function” 选项时,你会得到一个 suspend 修饰的函数,即挂起函数。挂起函数在协程内部可以被当作普通函数使用,它额外的功能是,调用其他挂起函数(就像上例中的 delay 那样)以挂起协程的执行。

1.3 作用域构建器

除了由不同的构建器提供协程作用域之外,还可以使用 coroutineScope 构建器声明自己的作用域。它创建一个协程作用域,并且不会在所有已启动的子协程执行完毕之前结束。

runBlocking 和 coroutineScope 看起来很相似,都会等待其协程体以及所有子协程结束。主要区别在于,runBlocking 会阻塞当前线程来等待,而 coroutineScope 只是挂起,会释放底层线程用于其他用途。由于存在这点差异,runBlocking 是常规函数,而 coroutineScope 是挂起函数。

你可以在挂起函数中使用 coroutineScope。例如,将并发打印 Hello 和 World 的操作移入 suspend fun doWorld() 函数中:

fun main() = runBlocking {
    doWorld()
}

suspend fun doWorld() = coroutineScope {  // this: CoroutineScope
    launch {
        delay(1000L)
        println("World!")
    }
    println("Hello")
}

// 输出结果:
Hello
World!

1.4 作用域构建器与并发

一个 coroutineScope 构建器可以在任意挂起函数内使用以进行多个并发操作。在挂起函数 doWorld 内启动两个并发的协程:

// Sequentially executes doWorld followed by "Done"
fun main() = runBlocking {
    doWorld()
    println("Done")
}

// Concurrently executes both sections
suspend fun doWorld() = coroutineScope { // this: CoroutineScope
    launch {
        delay(2000L)
        println("World 2")
    }
    launch {
        delay(1000L)
        println("World 1")
    }
    println("Hello")
}

// 输出结果:
Hello
World 1
World 2
Done

1.5 显式任务

launch 这个协程构建器会返回一个 Job 对象,它是被启动的协程的处理器,可以用来显式地等待协程结束。比如,你可以等待子协程结束后再输出 Done:

val job = launch { // launch a new coroutine and keep a reference to its Job
    delay(1000L)
    println("World!")
}
println("Hello")
job.join() // wait until child coroutine completes
println("Done") 

// 输出结果:
Hello
World!
Done

1.6 协程是轻量的

与 JVM 线程相比,协程消耗更少的资源。有些代码使用线程时会耗尽 JVM 的可用内存,如果用协程来表达,则不会达到资源上限。比如,以下代码启动 50000 个不同的协程,每个协程等待 5 秒,然后打印一个点号(‘.’),但只消耗非常少的内存:

fun main() = runBlocking {
    repeat(50_000) { // launch a lot of coroutines
        launch {
            delay(5000L)
            print(".")
        }
    }
}

如果使用线程来实现相同功能(删除 runBlocking,将 launch 替换为 thread,将 delay 替换为 Thread.sleep),程序可能会消耗过多内存,抛出内存不足(out-of-memory)的错误或者启动线程缓慢。

2、取消与超时

2.1 取消协程执行

在一个长时间运行的应用程序中,你也许需要对你的后台协程进行细粒度的控制。 比如说,用户关闭了一个启动了协程的界面,那么现在不再需要协程的执行结果,这时,它应该是可以被取消的。 launch 会返回一个 Job,可以用来取消运行中的协程:

val job = launch {
    repeat(1000) { i ->
        println("job: I'm sleeping $i ...")
        delay(500L)
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion 
println("main: Now I can quit.")

输出结果如下:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

调用了 job.cancel() 后,该协程会被取消。这里也可以使用 Job 的挂起函数 cancelAndJoin,它合并了对 cancel() 和 join() 的调用。

2.2 取消是协作式的

协程取消是协作式的。协程代码必须进行协作才能被取消。在 kotlinx.coroutines 中的所有挂起函数都是可取消的。它们会检查协程是否已取消,并在取消时抛出 CancellationException 异常。然而,如果协程正在进行计算并且没有检查取消,则它无法被取消,就像以下示例所示:

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    // 循环中并没有检查协程是否取消的代码,因此它无法在进入循环后被取消
    while (i < 5) { // computation loop, just wastes CPU
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

输出结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.

查看结果发现在取消协程之后仍在继续打印,直到迭代完成。

同样的问题可以通过捕获 CancellationException 并不重新抛出它来观察到:

val job = launch(Dispatchers.Default) {
    repeat(5) { i ->
        try {
            // print a message twice a second
            println("job: I'm sleeping $i ...")
            delay(500)
        } catch (e: Exception) {
            // log the exception
            println(e)
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

// 输出结果:
job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@3151f9b5
job: I'm sleeping 3 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@3151f9b5
job: I'm sleeping 4 ...
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutine{Cancelling}@3151f9b5
main: Now I can quit.

虽然捕获 Exception 是一种反模式,但这个问题可能以更微妙的方式浮现,例如在使用 runCatching 函数时,它不会重新抛出 CancellationException。

2.3 使计算代码可取消

确保计算代码可取消的方法有两种:一是定期调用一个挂起函数来检查取消状态,yield 函数是一个很好的选择;二是显式地检查取消状态。我们来试试后者:

val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
    var nextPrintTime = startTime
    var i = 0
    while (isActive) { // cancellable computation loop
        // print a message twice a second
        if (System.currentTimeMillis() >= nextPrintTime) {
            println("job: I'm sleeping ${i++} ...")
            nextPrintTime += 500L
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

输出结果:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
main: Now I can quit.

如你所见,现在这个循环被取消了。isActive 是通过 CoroutineScope 对象在协程内部可用的扩展属性。

2.4 在 finally 中关闭资源

可取消的挂起函数在取消时会抛出 CancellationException 异常,我们可以像处理其他异常一样来处理它。例如,可以使用 try {...} finally {...} 表达式或 Kotlin 的 use 函数来执行终止操作:

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        println("job: I'm running finally")
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

joincancelAndJoin 函数都会等待所有终止操作完成,因此上面的示例将产生以下输出:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
main: Now I can quit.

2.5 运行不能取消的代码块

在前面的示例中,在 finally 块中使用挂起函数会导致 CancellationException 异常,因为协程已被取消。通常情况下,这不是问题,因为所有良好行为的关闭操作(如关闭文件、取消作业或关闭任何类型的通信通道)通常都是非阻塞的,不涉及任何挂起函数。然而,在极少数情况下,如果你需要在已取消的协程中挂起,则可以使用 withContext(NonCancellable) 来包装相应的代码。下面的示例展示了如何使用 withContext 函数和 NonCancellable 上下文:

val job = launch {
    try {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    } finally {
        withContext(NonCancellable) {
            println("job: I'm running finally")
            delay(1000L)
            println("job: And I've just delayed for 1 sec because I'm non-cancellable")
        }
    }
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")

输出如下:

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm running finally
job: And I've just delayed for 1 sec because I'm non-cancellable
main: Now I can quit.

2.6 超时

取消协程执行的最明显的实际原因是因为其执行时间已经超过了某个超时时间。虽然你可以手动跟踪对应的 Job 引用,并启动一个单独的协程来延迟取消跟踪的协程,但是有一个名为 withTimeout 的函数可以直接使用。以下是一个示例:

withTimeout(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
}

输出如下:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

TimeoutCancellationExceptionCancellationException 的子类。我们之前没有在控制台上看到它的堆栈跟踪,是因为在已取消的协程中,CancellationException 被认为是协程完成的正常原因。然而,在这个示例中,我们在 main 函数中直接使用了 withTimeout 函数,因此在主函数中抛出的 TimeoutCancellationException 将被打印到控制台上。

由于取消只是一个异常,所有的资源都可以按照通常的方式关闭。如果你需要在任何类型的超时上执行一些额外的操作,可以将包含超时代码的 try {...} catch (e: TimeoutCancellationException) {...} 块包装起来。另外,如果你希望在超时时返回 null 而不是抛出异常,可以使用 withTimeoutOrNull 函数,它与 withTimeout 函数类似,但在超时时返回 null

val result = withTimeoutOrNull(1300L) {
    repeat(1000) { i ->
        println("I'm sleeping $i ...")
        delay(500L)
    }
    "Done" // will get cancelled before it produces this result
}
println("Result is $result")

输出结果:

I'm sleeping 0 ...
I'm sleeping 1 ...
I'm sleeping 2 ...
Result is null

2.7 异步超时与资源

withTimeout 函数中的超时事件是相对于其块中正在运行的代码异步发生的,可能会在超时块内部的返回之前发生。如果在块内部打开或获取某些需要在块外部关闭或释放的资源,请记住这一点。

例如,下面的示例中,我们使用 Resource 类模拟一个可关闭的资源,该类通过增加 acquired 计数器来跟踪资源被创建的次数,并在其 close 函数中减少计数器。现在,让我们创建许多协程,每个协程都在 withTimeout 块的末尾创建一个 Resource,并在块外部释放该资源。我们添加一个小延迟,以便超时事件更有可能在 withTimeout 块已经完成时发生,这将导致资源泄漏。

var acquired = 0

class Resource {
    init { acquired++ } // Acquire the resource
    fun close() { acquired-- } // Release the resource
}

fun main() {
    runBlocking {
        repeat(10_000) { // Launch 10K coroutines
            launch { 
                val resource = withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    Resource() // Acquire a resource and return it from withTimeout block     
                }
                resource.close() // Release the resource
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired
}

如果你运行上面的代码,会发现它并不总是打印零,虽然这可能取决于你的机器的时间安排。你可能需要调整这个示例中的超时时间,以实际看到非零值。

请注意,在这里从 10000 个协程中对 acquired 计数器进行递增和递减是完全线程安全的,因为它总是发生在同一个线程上,即 runBlocking 使用的线程上。更多关于此的解释将在协程上下文的章节中进行说明。

为了解决这个问题,你可以将资源的引用存储在变量中,而不是从 withTimeout 块中返回它。这样,即使超时事件发生,资源也不会在超时块之外被释放,因为它们的引用仍然存在于变量中。

runBlocking {
    repeat(10_000) { // Launch 10K coroutines
        launch { 
            var resource: Resource? = null // Not acquired yet
            try {
                withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    resource = Resource() // Store a resource to the variable if acquired      
                }
                // We can do something else with the resource here
            } finally {  
                resource?.close() // Release the resource if it was acquired
            }
        }
    }
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired

这个示例总是打印零。资源不会泄漏。

3、组合挂起函数

3.1 默认顺序调用

假设我们在其他地方定义了两个挂起函数,它们可以做一些有用的事情,比如远程服务调用或计算。我们只是假装它们有用,但实际上每个函数只是为了这个例子而延迟一秒钟:

suspend fun doSomethingUsefulOne(): Int {
    delay(1000L) // pretend we are doing something useful here
    return 13
}

suspend fun doSomethingUsefulTwo(): Int {
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

如果我们需要按顺序调用它们 —— 首先执行 doSomethingUsefulOne,然后执行 doSomethingUsefulTwo,并计算它们结果的总和,该怎么办?在实践中,如果我们使用第一个函数的结果来决定是否需要调用第二个函数或决定如何调用第二个函数,我们会使用普通的顺序调用,因为协程中的代码与常规代码一样,默认情况下是顺序的。以下示例通过测量两个挂起函数执行所需的总时间来演示它:

val time = measureTimeMillis {
    val one = doSomethingUsefulOne()
    val two = doSomethingUsefulTwo()
    println("The answer is ${one + two}")
}
println("Completed in $time ms")

输出结果:

The answer is 42
Completed in 2017 ms

3.2 async 并发

如果在调用 doSomethingUsefulOne 和 doSomethingUsefulTwo 之间没有依赖关系,并且我们想通过同时执行两者来更快地获取答案,那么可以使用 async 函数来实现。

概念上,asynclaunch 非常相似。它们都会启动一个单独的协程,该协程是一个轻量级的线程,可以与其他协程并发工作。它们的区别在于,launch 函数返回一个 Job 对象,并不返回任何结果值,而 async 函数返回一个 Deferred 对象,它是一个轻量级的非阻塞 future,表示承诺稍后提供结果。你可以在 Deferred 值上使用 .await() 函数来获取其最终结果,但是 Deferred 对象也是一个 Job 对象,因此如果需要,可以取消它。

val time = measureTimeMillis {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

输出如下:

The answer is 42
Completed in 1017 ms

这样做的速度是原来的两倍,因为这两个协程是并发执行的。请注意,协程的并发始终是显式的。

3.3 延迟启动 async

可选地,可以通过将 async 的 start 参数设置为 CoroutineStart.LAZY,使 async 变为延迟启动。在这种模式下,它仅在 await 需要其结果时或者在调用其 Job 的 start 函数时启动协程。运行以下示例:

val time = measureTimeMillis {
    val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
    val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
    // some computation
    one.start() // start the first one
    two.start() // start the second one
    println("The answer is ${one.await() + two.await()}")
}
println("Completed in $time ms")

输出如下:

The answer is 42
Completed in 1017 ms

因此,在这里定义了两个协程,但不像前面的示例那样执行它们,而是将控制权交给程序员,由程序员决定何时调用 start() 开始执行。我们首先启动一个协程,然后启动另一个协程,最后等待各个协程完成。

需要注意的是,如果我们在 println 中只调用 await 而不先在各个协程上调用 start,那么这将导致顺序行为,因为 await 会启动协程执行并等待其完成,这不是延迟启动的预期用例。async(start = CoroutineStart.LAZY) 的用例是在计算值涉及挂起函数的情况下,替换标准的 lazy 函数。

3.4 Async 风格的函数(Async-style functions)

我们可以使用 async 协程构建器定义异步调用 doSomethingUsefulOne 和 doSomethingUsefulTwo 的 Async 风格函数,使用 GlobalScope 引用来退出结构化并发。我们使用“…Async”后缀命名这样的函数,以突出它们仅启动异步计算,并且需要使用生成的延迟值来获取结果的事实。

GlobalScope 是一个敏感的 API,可能会以非平凡的方式产生反作用,其中一种将在下面解释,因此你必须显式使用 @OptIn(DelicateCoroutinesApi::class) 来选择使用 GlobalScope。

// The result type of somethingUsefulOneAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulOneAsync() = GlobalScope.async {
    doSomethingUsefulOne()
}

// The result type of somethingUsefulTwoAsync is Deferred<Int>
@OptIn(DelicateCoroutinesApi::class)
fun somethingUsefulTwoAsync() = GlobalScope.async {
    doSomethingUsefulTwo()
}

需要注意的是,这些 xxxAsync 函数不是挂起函数。它们可以从任何地方使用。然而,它们的使用始终意味着将它们的操作与调用代码异步(这里是指并发)执行。

以下示例展示了它们在协程外部的使用:

// note that we don't have `runBlocking` to the right of `main` in this example
fun main() {
    val time = measureTimeMillis {
        // we can initiate async actions outside of a coroutine
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync()
        // but waiting for a result must involve either suspending or blocking.
        // here we use `runBlocking { ... }` to block the main thread while waiting for the result
        runBlocking {
            println("The answer is ${one.await() + two.await()}")
        }
    }
    println("Completed in $time ms")
}

这种使用异步函数的编程风格在此仅作为说明提供,因为它是其他编程语言中的一种流行风格。使用这种风格与 Kotlin 协程强烈不建议,原因如下所述。

考虑以下情况:在 val one = somethingUsefulOneAsync() 行和 one.await() 表达式之间的代码中存在某些逻辑错误,程序抛出异常,并且程序正在执行的操作中止。通常,全局错误处理程序可以捕获此异常,记录和报告开发人员的错误,但程序仍然可以继续执行其他操作。但是,在这里,somethingUsefulOneAsync 仍然在后台运行,即使启动它的操作已中止。结构化并发不会出现这个问题,如下面的部分所示。

3.5 使用 async 的结构化并发

让我们拿并发使用 async 的例子,并提取一个函数,该函数同时执行 doSomethingUsefulOne 和 doSomethingUsefulTwo,并返回它们结果的总和。由于 async 协程构建器是在 CoroutineScope 上定义的扩展,因此我们需要在作用域中拥有它,这就是 coroutineScope 函数提供的:

suspend fun concurrentSum(): Int = coroutineScope {
    val one = async { doSomethingUsefulOne() }
    val two = async { doSomethingUsefulTwo() }
    one.await() + two.await()
}

这样,如果 concurrentSum 函数内部发生错误,并抛出异常,那么在其作用域内启动的所有协程都将被取消。

我们仍然可以看到两个操作的并发执行,正如上面的 main 函数输出所示:

The answer is 42
Completed in 1017 ms

取消请求始终会通过协程层次结构进行传播:

fun main() = runBlocking<Unit> {
    try {
        failedConcurrentSum()
    } catch(e: ArithmeticException) {
        println("Computation failed with ArithmeticException")
    }
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> { 
        try {
            delay(Long.MAX_VALUE) // Emulates very long computation
            42
        } finally {
            println("First child was cancelled")
        }
    }
    val two = async<Int> { 
        println("Second child throws an exception")
        throw ArithmeticException()
    }
    one.await() + two.await()
}

请注意,当子协程(即 two)失败时,第一个 async 和等待其完成的父协程都将被取消:

Second child throws an exception
First child was cancelled
Computation failed with ArithmeticException

4、协程上下文与调度器

协程始终在某些上下文中执行,该上下文由 Kotlin 标准库中 CoroutineContext 类型的值表示。协程上下文是各种元素的集合,主要元素是协程的 Job,及其调度器。

4.1 调度器与线程

协程上下文包括一个协程调度器(参见 CoroutineDispatcher),该调度器确定相应协程用于执行的线程或线程集。协程调度器可以将协程执行限制在特定线程上,将其调度到线程池中,或允许其无限制地运行。

所有协程构建器(如 launch 和 async)都接受一个可选的 CoroutineContext 参数,该参数可用于显式指定新协程的调度器和其他上下文元素。

尝试以下示例:

launch { // context of the parent, main runBlocking coroutine
    println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
    println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) { // will get dispatched to DefaultDispatcher 
    println("Default               : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
    println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}

输出如下(顺序可能不同):

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

当不带参数使用 launch 时,它会从启动它的 CoroutineScope 继承上下文(因此也继承调度器)。在这种情况下,它继承了运行在主线程中的 main runBlocking 协程的上下文。

Dispatchers.Unconfined 是一种特殊的调度器,看起来也在主线程中运行,但实际上是一种不同的机制,稍后会进行解释。

当作用域中没有显式指定其他调度器时,默认调度器将被使用。它由 Dispatchers.Default 表示,并使用共享的后台线程池。

newSingleThreadContext 为协程创建一个线程来运行。专用线程是非常昂贵的资源。在实际应用中,当不再需要时,它必须使用 close 函数释放,或者存储在顶级变量中并在整个应用程序中重复使用。

4.2 自由调度器与受限调度器(Unconfined vs confined dispatcher)

Dispatchers.Unconfined 协程调度器在调用者线程中启动一个协程,但仅在第一个挂起点之前。在挂起之后,它会在完全由调用的挂起函数决定的线程中恢复协程。Unconfined 调度器适用于既不消耗 CPU 时间,也不更新特定线程中的任何共享数据(如 UI)的协程。

另一方面,默认情况下从外部 CoroutineScope 继承的调度器,特别是对于 runBlocking 协程的默认调度器,它被限制在调用者线程中,因此继承它的效果是将执行限制在这个线程中,具有可预测的 FIFO 调度:

launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
    println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
    delay(500)
    println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
}
launch { // context of the parent, main runBlocking coroutine
    println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
    delay(1000)
    println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}

输出:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

因此,使用从 runBlocking{...} 继承的上下文启动的协程将继续在主线程中执行,而使用 Dispatchers.Unconfined 启动的协程将最初在主线程中启动,但在遇到挂起点(例如 delay 函数)时将在默认执行器线程中恢复执行。它恢复执行的线程将由下一个挂起函数关联的调度器决定,例如在本例中与默认执行器线程相关联的 delay 函数。

Unconfined 调度器是一种高级机制,可以在某些特殊情况下提供帮助,比如不需要将协程分派到稍后执行,或者分派会产生不良副作用,因为某个协程中的某个操作必须立即执行。不过,一般情况下不应该在代码中使用 Unconfined 调度器。

4.3 调试协程与线程

协程可以在一个线程上挂起,在另一个线程上恢复执行。即使在单线程调度器中,如果没有特殊的工具来帮助,也可能很难确定协程正在做什么、在哪里以及何时执行。

使用 IDEA 调试

Kotlin 插件的 Coroutine Debugger 简化了在 IntelliJ IDEA 中调试协程的过程。

调试功能适用于 kotlinx-coroutines-core 版本 1.3.8 或更高版本。

调试工具窗口包含“协程”选项卡。在此选项卡中,你可以找到有关当前正在运行和挂起的协程的信息。协程按它们正在运行的调度器进行分组。

使用协程调试器,你可以:

  • 检查每个协程的状态。
  • 查看运行中和挂起的协程的本地变量和捕获的变量的值。
  • 查看完整的协程创建堆栈,以及协程内部的调用堆栈。堆栈包括所有带有变量值的帧,即使在标准调试期间也会丢失这些帧。
  • 获取包含每个协程及其堆栈状态的完整报告。要获取报告,请在“协程”选项卡中右键单击,然后单击“获取协程转储”。

要开始进行协程调试,只需要设置断点并以调试模式运行应用程序。

可以在教程中了解更多关于协程调试的内容。

使用日志调试

在没有 Coroutine Debugger 的情况下,使用线程打印日志文件中的线程名称是调试带有线程的应用程序的另一种方法。此功能由日志记录框架广泛支持。使用协程时,仅使用线程名称并不能提供太多上下文,因此 kotlinx.coroutines 包括调试工具以使其更容易。

使用 JVM 选项 -Dkotlinx.coroutines.debug 运行下列代码:

val a = async {
    log("I'm computing a piece of the answer")
    6
}
val b = async {
    log("I'm computing another piece of the answer")
    7
}
log("The answer is ${a.await() * b.await()}")

这段代码中有三个协程。主协程 (#1) 在 runBlocking 中,另外两个协程计算延迟值 a (#2) 和 b (#3)。它们都在 runBlocking 的上下文中执行,并且限制在主线程中。该代码的输出为:

[main @coroutine#2] I'm computing a piece of the answer
[main @coroutine#3] I'm computing another piece of the answer
[main @coroutine#1] The answer is 42

log 函数在方括号中打印线程名称,你可以看到它是主线程,后面附加了当前执行协程的标识符。当调试模式启用时,此标识符会按顺序分配给所有创建的协程。

4.4 在线程之间跳转

使用 JVM 选项 -Dkotlinx.coroutines.debug 运行下列代码:

newSingleThreadContext("Ctx1").use { ctx1 ->
    newSingleThreadContext("Ctx2").use { ctx2 ->
        runBlocking(ctx1) {
            log("Started in ctx1")
            withContext(ctx2) {
                log("Working in ctx2")
            }
            log("Back to ctx1")
        }
    }
}

它展示了几种新技术。其中一种是使用带有显式指定上下文的 runBlocking 函数,另一种是使用 withContext 函数来改变协程的上下文,同时仍然保持在同一个协程中,正如你可以在下面的输出中看到的那样:

[Ctx1 @coroutine#1] Started in ctx1
[Ctx2 @coroutine#1] Working in ctx2
[Ctx1 @coroutine#1] Back to ctx1

注意,此示例还使用 Kotlin 标准库中的 use 函数,该函数会在不再需要 newSingleThreadContext 创建的线程时释放它们。

4.5 上下文中的 Job

协程的 Job 是其上下文的一部分,可以使用 coroutineContext[Job] 表达式从上下文中检索它:

println("My job is ${coroutineContext[Job]}")

在调试模式下会输出如下内容:

My job is "coroutine#1":BlockingCoroutine{Active}@6d311334

注意,CoroutineScope 中的 isActive 只是一个方便的快捷方式,用于检查 coroutineContext[Job]?.isActive == true

4.6 子协程

当在另一个协程的 CoroutineScope 中启动协程时,它通过 CoroutineScope.coroutineContext 继承其上下文,并且新协程的 Job 成为父协程 Job 的子 Job。当取消父协程时,所有子协程也会被递归取消。

然而,父子关系可以通过以下两种方式之一显式地覆盖:

  1. 当在启动协程时显式指定不同的范围(例如 GlobalScope.launch)时,它不会从父范围继承 Job。
  2. 当将不同的 Job 对象作为新协程的上下文传递(如下面的示例所示)时,它会覆盖父范围的 Job。

在这两种情况下,启动的协程不会绑定到它所启动的范围上,并且独立运行。

// launch a coroutine to process some kind of incoming request
val request = launch {
    // it spawns two other jobs
    launch(Job()) { 
        println("job1: I run in my own Job and execute independently!")
        delay(1000)
        println("job1: I am not affected by cancellation of the request")
    }
    // and the other inherits the parent context
    launch {
        delay(100)
        println("job2: I am a child of the request coroutine")
        delay(1000)
        println("job2: I will not execute this line if my parent request is cancelled")
    }
}
delay(500)
request.cancel() // cancel processing of the request
println("main: Who has survived request cancellation?")
delay(1000) // delay the main thread for a second to see what happens

输出如下:

job1: I run in my own Job and execute independently!
job2: I am a child of the request coroutine
main: Who has survived request cancellation?
job1: I am not affected by cancellation of the request

4.7 父协程的职责

父协程总是会等待所有子协程完成。父协程无须显式追踪所有子协程的启动,并且无须在最后使用 Job.join() 等待子协程:

// launch a coroutine to process some kind of incoming request
val request = launch {
    repeat(3) { i -> // launch a few children jobs
        launch  {
            delay((i + 1) * 200L) // variable delay 200ms, 400ms, 600ms
            println("Coroutine $i is done")
        }
    }
    println("request: I'm done and I don't explicitly join my children that are still active")
}
request.join() // wait for completion of the request, including all its children
println("Now processing of the request is complete")

输出结果:

request: I'm done and I don't explicitly join my children that are still active
Coroutine 0 is done
Coroutine 1 is done
Coroutine 2 is done
Now processing of the request is complete

4.8 命名协程以调试

当协程经常打印日志并且你只需要关联来自同一个协程的日志记录时, 则自动分配的 id 是好的。然而,当一个协程与特定请求的处理相关联或做一些特定的后台任务时,最好将其明确命名以用于调试。 CoroutineName 上下文元素与线程名具有相同的目的,当调试模式开启时,它被包含在正在执行此协程的线程名中。

下面的例子演示了这一概念:

log("Started main coroutine")
// run two background value computations
val v1 = async(CoroutineName("v1coroutine")) {
    delay(500)
    log("Computing v1")
    252
}
val v2 = async(CoroutineName("v2coroutine")) {
    delay(1000)
    log("Computing v2")
    6
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}")

使用 JVM 选项 -Dkotlinx.coroutines.debug 运行代码,输出类似于:

[main @main#1] Started main coroutine
[main @v1coroutine#2] Computing v1
[main @v2coroutine#3] Computing v2
[main @main#1] The answer for v1 / v2 = 42

4.9 组合上下文元素

有时我们需要在协程上下文中定义多个元素。我们可以使用 + 操作符来实现。 比如说,我们可以显式指定一个调度器来启动协程并且同时显式指定一个命名:

launch(Dispatchers.Default + CoroutineName("test")) {
    println("I'm working in thread ${Thread.currentThread().name}")
}

使用 -Dkotlinx.coroutines.debug JVM 参数,输出如下所示:

I'm working in thread DefaultDispatcher-worker-1 @test#2

4.10 协程作用域

让我们将关于上下文、子协程和 Job 的知识结合起来。假设我们的应用程序有一个具有生命周期的对象,但该对象不是协程。例如,我们正在编写一个 Android 应用程序,并在 Android Activity 的上下文中启动各种协程以执行异步操作来获取和更新数据、进行动画等。所有这些协程在活动被销毁时必须被取消,以避免内存泄漏。当然,我们可以手动操作上下文和 Job,以绑定 Activity 及其协程的生命周期,但 kotlinx.coroutines 提供了一种封装的抽象:CoroutineScope。你应该已经熟悉了 CoroutineScope,因为所有协程构建器都声明为其扩展。

我们通过创建与 Activity 生命周期相关联的 CoroutineScope 实例来管理协程的生命周期。CoroutineScope 实例可以通过 CoroutineScope() 或 MainScope() 工厂函数创建。前者创建一个通用作用域,而后者创建一个用于 UI 应用程序的作用域,并将 Dispatchers.Main 作为默认调度器:

class Activity {
    private val mainScope = MainScope()

    fun destroy() {
        mainScope.cancel()
    }
    // to be continued ...

现在,我们可以使用定义的作用域在此 Activity 的作用域内启动协程:

// class Activity continues
    fun doSomething() {
        // launch ten coroutines for a demo, each working for a different time
        repeat(10) { i ->
            mainScope.launch {
                delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
                println("Coroutine $i is done")
            }
        }
    }
} // class Activity ends

在我们的主函数中,我们创建该 Activity,调用我们的测试函数 doSomething,并在 500ms 后销毁该 Activity。这将取消从 doSomething 启动的所有协程。我们可以看到,在 Activity 销毁后,即使我们等待更长时间,也不会再打印出任何消息了。

val activity = Activity()
activity.doSomething() // run test function
println("Launched coroutines")
delay(500L) // delay for half a second
println("Destroying activity!")
activity.destroy() // cancels all coroutines
delay(1000) // visually confirm that they don't work

输出如下:

Launched coroutines
Coroutine 0 is done
Coroutine 1 is done
Destroying activity!

只有前两个协程打印了消息,而其他协程则被 Activity.destroy() 中的 job.cancel() 取消了。

注意,Android 在所有具有生命周期的实体中都提供了协程作用域的一级支持。请参阅相应的文档。

线程本地数据

有时候,将一些线程本地数据传递给协程,或在协程之间传递这些数据非常方便。但是,由于协程不绑定到任何特定的线程,如果手动完成这个过程,可能会导致样板代码的出现。

对于 ThreadLocal,扩展函数 asContextElement 可以解决这个问题。它创建了一个附加的上下文元素,保留了给定 ThreadLocal 的值,并在每次协程切换其上下文时恢复它的值。

很容易通过演示来展示它的工作原理:

threadLocal.set("main")
println("Pre-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
val job = launch(Dispatchers.Default + threadLocal.asContextElement(value = "launch")) {
    println("Launch start, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
    yield()
    println("After yield, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")
}
job.join()
println("Post-main, current thread: ${Thread.currentThread()}, thread local value: '${threadLocal.get()}'")

使用 Dispatchers.Default 在后台线程池中启动了一个新的协程,因此它在与线程池不同的线程上工作,但它仍然具有我们使用threadLocal.asContextElement(value = “launch”) 指定的线程本地变量的值,无论协程在哪个线程上执行。因此,输出(带有调试信息)如下所示:

Pre-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'
Launch start, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
After yield, current thread: Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main], thread local value: 'launch'
Post-main, current thread: Thread[main @coroutine#1,5,main], thread local value: 'main'

很容易忘记设置相应的上下文元素。如果运行协程的线程不同,从协程访问的线程本地变量可能具有意外的值。为避免这种情况,建议使用 ensurePresent 方法,在使用不正确时立即抛出异常。

ThreadLocal 具有一流的支持,并且可以与 kotlinx.coroutines 提供的任何原语一起使用。但它有一个关键限制:当 ThreadLocal 被修改时,新值不会传播到协程调用方(因为上下文元素无法跟踪所有 ThreadLocal 对象访问),并且在下一次挂起时更新的值会丢失。使用 withContext 在协程中更新线程本地变量的值,有关更多详细信息,请参见 asContextElement。

或者,可以将值存储在类似于 Counter(var i: Int) 的可变框内,这个框又存储在线程本地变量中。但是,在这种情况下,您完全负责同步对该可变框中变量的潜在并发修改。

对于高级用途,例如与日志 MDC、事务上下文或任何其他在内部使用线程本地变量传递数据的库集成,请参阅应该实现的 ThreadContextElement 接口的文档。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2322469.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于 mxgraph 实现流程图

mxgraph 可以实现复杂的流程图绘制。mxGraph里的Graph指的是图论(Graph Theory)里的图而不是柱状图、饼图和甘特图等图(chart)&#xff0c;因此想找这些图的读者可以结束阅读了。 作为图论的图&#xff0c;它包含点和边&#xff0c;如下图所示。 交通图 横道图 架构图 mxGrap…

动态路由机制MoE专家库架构在多医疗AI专家协同会诊中的应用探析

随着医疗人工智能技术的飞速进步,AI在医学领域的应用日益增多,尤其是在复杂疾病的诊断和治疗中,AI技术的应用带来了巨大的潜力。特别是动态路由机制混合专家(Mixture of Experts,MoE)架构,因其灵活、高效的特点,正逐渐成为实现多AI专家协同会诊的关键技术。通过将多个不…

双工通信:WebSocket服务

&#xff08;一&#xff09;WebSocket概述 WebSocket 是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工通信——浏览器和服务器只需要完成一次握手&#xff0c;两者之间就可以创建持久性的连接&#xff0c; 并进行双向数据传输 注意;Websocket也只能由客户端先握…

洪水灌溉算法 + 总结

文章目录 floodfill算法图像渲染题解代码 岛屿数量题解代码 岛屿的最大面积题解代码 被围绕的区域题解代码 太平洋大西洋水流问题题解代码 扫雷游戏题解代码 衣橱整理题解代码 总结 floodfill算法 1. 寻找相同性质的联通块&#xff0c;可以使用dfs或者bfs解决&#xff0c;比如…

LangChain4j(1):初识LangChain4j

1 什么是LangChain和LangChain4j LangChain是一个大模型的开发框架&#xff0c;使用LangChain框架&#xff0c;程序员可以更好的利用大模型的能力&#xff0c;大大提高编程效率。如果你是一个lava程序员&#xff0c;那么对LangChain最简单直观的理解就是&#xff0c;LangChain…

Photoshop 2025安装包下载及Photoshop 2025详细图文安装教程

文章目录 前言一、Photoshop 2025安装包下载二、Photoshop 2025安装教程1.解压安装包2.运行程序3.修改安装路径4.设安装目录5.开始安装6.等安装完成7.关闭安装向导8.启动软件9.安装完成 前言 无论你是专业设计师&#xff0c;还是初涉图像处理的小白&#xff0c;Photoshop 2025…

SQL Server安装程序无法启动:系统兼容性检查失败

问题现象&#xff1a; 运行 SQL Server 2022 安装程序时&#xff0c;提示 “硬件或软件不满足最低要求”&#xff0c;安装向导直接退出或无法继续。 快速诊断 操作系统版本检查&#xff1a; # 查看 Windows 版本&#xff08;需 20H2 或更高&#xff09; winver 支持的系统&…

期权合约作废的话,权利金和保证金会退还么?

在期权交易中&#xff0c;权利金是否可以退回&#xff0c;主要取决于期权的交易情况和合约条款。 期权作废的三种情形 一般来说期权作废一共有三种情况&#xff0c;分别是到期没有行权、主动放弃或者是标的退市了。 第一种是到期未行权&#xff0c;一般来说值得都是虚值期权&…

MIPI计算ECC和CRC工具介绍

一、MIPI简介 MIPI联盟&#xff0c;即移动产业处理器接口&#xff08;Mobile Industry Processor Interface 简称MIPI&#xff09;联盟。MIPI&#xff08;移动产业处理器接口&#xff09;是MIPI联盟发起的为移动应用处理器制定的开放标准和一个规范。MIPI官网https://mipi.org/…

医院管理系统(源码)分享

「医院管理系统&#xff08;源码&#xff09; 源码&#xff1a; https://pan.quark.cn/s/b6e21488fce3 第1章 绪论 1.1 项目背景 随着计算机科学的迅猛发展和互联网技术的不断推进&#xff0c;人们的生活方式发生了巨大的变化&#xff0c;同时也推动了整个软件产业的发展。把…

使用Geotools从DEM数据中读取指定位置的高程实战

目录 前言 一、GridCoverage2D对象介绍 1、GridCoverage2D的属性 2、GridCoverage2D核心方法 3、GridCoverage2D中的高级操作 二、指定位置的高程获取 1、存储原理 2、相关属性的获取 3、获取高程的方法 三、总结 前言 在地理信息科学领域&#xff0c;高程数据是至关重…

STM32F103_LL库+寄存器学习笔记05 - GPIO输入模式,捕获上升沿进入中断回调

导言 GPIO设置输入模式后&#xff0c;一般会用轮询的方式去查看GPIO的电平状态。比如&#xff0c;最常用的案例是用于检测按钮的当前状态&#xff08;是按下还是没按下&#xff09;。中断的使用一般用于计算脉冲的频率与计算脉冲的数量。 项目地址&#xff1a;https://github.…

直播预告 | TDgpt 智能体发布 时序数据库 TDengine 3.3.6 发布会即将开启

从海量监控数据&#xff0c;到工业、能源、交通等场景中实时更新的各类传感器数据&#xff0c;时序数据正在以指数级速度增长。而面对如此庞杂的数据&#xff0c;如何快速分析、自动发现问题、精准预测未来&#xff0c;成为企业数字化转型过程中的关键挑战。 TDengine 的答案是…

vscode 通过Remote-ssh远程连接服务器报错 could not establish connection to ubuntu

vscode 通过Remote-ssh插件远程连接服务器报错 could not establish connection to ubuntu&#xff0c;并且出现下面的错误打印&#xff1a; [21:00:57.307] Log Level: 2 [21:00:57.350] SSH Resolver called for "ssh-remoteubuntu", attempt 1 [21:00:57.359] r…

【JavaScript 简明入门教程】为了Screeps服务的纯JS入门教程

0 前言 0-1 Screeps: World 众所不周知&#xff0c;​Screeps: World是一款面向编程爱好者的开源大型多人在线即时战略&#xff08;MMORTS&#xff09;沙盒游戏&#xff0c;其核心机制是通过编写JavaScript代码来控制游戏中的单位&#xff08;称为“Creep”&#xff09;&#…

Prometheus stack命令行接入springboot服务metrics

使用Prometheus Stack监控SpringBoot应用 本文将详细介绍如何使用Prometheus Stack监控SpringBoot应用的metrics。假设你已经安装了Kubernetes集群&#xff0c;并使用Helm安装了Prometheus Stack全家桶。SpringBoot应用已经配置好&#xff0c;暴露了相应的metrics端点。 Sprin…

Git Bash 设置Notepad++作为默认编辑器

网上搜的时候发现别人搞得有点复杂 &#xff08;绝对正确的方法&#xff09;Git Bash 设置Notepad作为默认编辑器_git 通过notpad 编辑器-CSDN博客 最简单的方式就是重新安装git&#xff0c;然后在选择编辑器的时候&#xff0c;勾选notepad即可

Qt 制作验证码

Qt 制作验证码 #include <QRandomGenerator> #include <QPainterPath> #include <QPainter>// 生成随机数 int r(int a,int b0){return b ? QRandomGenerator::global()->bounded(a, b): QRandomGenerator::global()->bounded(a); }// 生成随机多边形…

【数据结构】二叉树 — 经典OJ面试题剖析!!!

目录 二叉树相关oj题 1. 检查两颗树是否相同 2. 另一棵树的子树 3. 翻转二叉树 4. 判断一颗二叉树是否是平衡二叉树 5. 对称二叉树 6. 二叉树的构建及遍历 7. 二叉树的层序遍历 8. 判断一棵树是不是完全二叉树 9. 二叉树的最近公共祖先 10. 根据前序与中序遍历序列构…

【MySQL】用户账户、角色、口令、PAM

目录 查看用户账户设置 连接 1.本地连接 2.远程连接 账户 角色 操作用户账户和角色 配置口令和账户有效期限 手工使口令过期 配置口令有效期限 PAM身份验证插件 客户端连接&#xff1a;使用 PAM 账户登录 在连接到MySQL服务器并执行查询时&#xff0c;会验证你的身…