Kotlin 协程基础十 —— 协作、互斥锁与共享变量

news2025/1/17 12:30:21

Kotlin 协程基础系列:

Kotlin 协程基础一 —— 总体知识概述

Kotlin 协程基础二 —— 结构化并发(一)

Kotlin 协程基础三 —— 结构化并发(二)

Kotlin 协程基础四 —— CoroutineScope 与 CoroutineContext

Kotlin 协程基础五 —— Channel

Kotlin 协程基础六 —— Flow

Kotlin 协程基础七 —— Flow 操作符(一)

Kotlin 协程基础八 —— Flow 操作符(二)

Kotlin 协程基础九 —— SharedFlow 与 StateFlow

Kotlin 协程基础十 —— 协作、互斥锁与共享变量

本节将介绍在协程间如果有先后执行、互相等待的需求时,应该怎样去处理这种等待和协作的工作。更会与 Java 的线程的协作工作对比,从而说明,在线程中通常不太简单的协作操作,在协程中很容易实现。

1、协程间的协作与等待

从运行角度来看,协程天生就是并行的,不论是对同等级的协程、父子协程还是毫无关系的协程。假如我们需要让协程互相等待,希望在协程的执行过程中可以停住,等待别的协程执行完毕,可以使用 Job 的 join() 或 Deferred 的 await()。

线程对于这种互相等待的需求可以通过 Thread 的 join(),还有 Future 和 CompletableFuture 以及 CountDownLatch。

CountDownLatch 适用于一个线程等待多个线程:

fun main() = runBlocking<Unit> {
    // countdown 译为倒计时,latch 是门闩、插销,组合起来就是用于倒计时的插销
    val countDownLatch = CountDownLatch(2)
    thread {
        // await() 会在 CountDownLatch 内的 count 减到 0 时结束等待
        countDownLatch.await()
        println("Count in CountDownLatch is 0 now,I'm free!")
    }

    thread {
        sleep(1000)
        // countDown() 会调用原子操作让 CountDownLatch 内的 count 减 1
        countDownLatch.countDown()
        println("Invoke countDown,count: ${countDownLatch.count}")
    }

    thread {
        sleep(2000)
        countDownLatch.countDown()
        println("Invoke countDown,count: ${countDownLatch.count}")
    }
}

运行结果:

Invoke countDown,count: 1
Count in CountDownLatch is 0 now,I'm free!
Invoke countDown,count: 0

修改 CountDownLatch 构造方法的 count 参数就可以修改要等待的线程数量,对于这种一个等待多个的业务需求,在协程中也可以用 join() 来做:

fun main() = runBlocking<Unit> {
    // 两个前置任务
    val preJob1 = launch {
        delay(1000)
    }

    val preJob2 = launch {
        delay(2000)
    }

    // 此协程需要等待两个协程执行之后再运行自己的内容
    launch {
        preJob1.join()
        preJob2.join()
        // 等待完前置任务,再做自己的事...
    }
}

实际上线程里也可以这么做,只不过因为线程本身的结构化管理比较麻烦,所以在正式的项目里很少真正的这么写。但因为协程可以结构化取消,因此它的 join() 比线程的 join() 更实用,在正式项目里的应用也较多。

其实,用 Channel 也能实现类似 CountDownLatch 那种,不指定具体等待哪些协程,只等待固定的次数的效果:

private fun channelSample() = runBlocking<Unit> {
    // 指定 Channel 的容量为 2
    val channel = Channel<Unit>(2)

    // 由于要等待两次发送数据才能继续执行后续代码,因此要 repeat(2) 接收
    launch {
        repeat(2) {
            channel.receive()
        }
    }
    
    launch {
        delay(1000)
        channel.send(Unit)
    }

    launch {
        delay(2000)
        channel.send(Unit)
    }
}

通过两个简单的例子可以发现,线程中有些复杂、比较底层、不太容易使用的协作和等待 API,在协程中的对应/等价 API 难度要大大降低。

2、select():先到先得

select() 会在内部开启多线竞赛,谁最快就用谁。

onJoin() 是仅限于在 select 代码块中才能调用的函数,它是一个监听注册,会监听 Job 的结束,不论 Job 是正常结束还是被取消,在其结束时都会回调执行 onJoin() 大括号的内容,并且大括号的返回值会作为 select() 的返回值:

fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)

    val job1 = scope.launch {
        delay(1000)
        println("job1 done")
    }

    val job2 = scope.launch {
        delay(2000)
        println("job2 done")
    }

    val job = scope.launch {
        val result = select {
            // select 只执行最先结束的 onJoin 回调
            job1.onJoin {
                1
            }

            job2.onJoin {
                2
            }
        }
        println("result: $result")
    }
    joinAll(job, job1, job2)
}

运行结果:

job1 done
result: 1
job2 done

结果能看出,select() 只执行了最先结束的 job1 的 onJoin,没有执行 job2 的。

与 Job 的 onJoin() 功能类似的还有:Deferred 的 onAwait()、Channel 的 onSend()、onReceive() 以及 onReceiveCatching()。此外还有一个特殊的函数 onTimeout(),如果 select() 内所有的监听回调都没有在 onTimeout() 设置的超时时间内完成,那么就由 onTimeout() 作为 select() 的返回值:

@OptIn(ExperimentalCoroutinesApi::class)
fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)

    val job1 = scope.launch {
        delay(1000)
        println("job1 done")
    }

    val deferred = scope.async {
        delay(2000)
        println("deferred done")
    }

    val channel = Channel<String>()

    val job = scope.launch {
        val result = select {
            // select 只执行最先结束的 onJoin 回调
            job1.onJoin {
                1
            }

            deferred.onAwait {
                2
            }

            channel.onSend("haha") {}
            /*channel.onReceive {}
            channel.onReceiveCatching {}*/

            onTimeout(500) {
                "Timeout!"
            }
        }
        println("result: $result")
    }
    joinAll(job, job1)
}

运行结果:

result: Timeout!
job1 done

3、互斥锁和共享变量

在遇到一个不太好理解的知识点时,我们还是先说线程,再引入到协程中。

线程中有一个术语叫竞态条件,或者说竞争条件,英文是 race condition。这个词的含义比较广,在 Java 和 Kotlin 这种高级编程语言中,它指的是多个线程访问共享资源时,由于缺乏并发控制,导致资源的访问顺序不受控,进而导致出现错误的结果的条件。

在 Kotlin 中,仍然可以使用我们在 Java 中熟知的 synchronized 和 Lock 这两种锁机制来保证共享资源的线程安全,也提供了新的选项,下面我们来说一说。

3.1 @Synchronized

Kotlin 中没有 synchronized 关键字,代替它的是 @Synchronized 注解。对于方法而言,使用 @Synchronized 注解的作用与 Java 中使用 synchronized 关键字修饰方法的作用是一样的。被 @Synchronized 注解标记的方法不能同时被多个线程(注意,不是协程)执行。

而 Java 中 synchronized 代码块在 Kotlin 中被 synchronized 函数代替了:

fun main() = runBlocking<Unit> {
    var number = 0
    val lock = Any()

    val thread1 = thread {
        repeat(100_000) {
            synchronized(lock) {
                number++
            }
        }
    }

    val thread2 = thread {
        repeat(100_000) {
            synchronized(lock) {
                number--
            }
        }
    }

    thread1.join()
    thread2.join()
    println("result: $number") // 输出 0
}

同样的代码结构也可以用在协程中:

fun main() = runBlocking<Unit> {
    var number = 0
    val lock = Any()
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job1 = scope.launch {
        repeat(100_000) {
            synchronized(lock) {
                number++
            }
        }
    }

    val job2 = scope.launch {
        repeat(100_000) {
            synchronized(lock) {
                number--
            }
        }
    }

    job1.join()
    job2.join()
    println("result: $number")
}

synchronized() 仍然掐住的是线程,确切的说是掐住了执行 synchronized() 所在的协程的线程。虽然这样做有点浪费,因为不止掐住了协程,连运行该协程代码的线程都被掐住了,但确实实现了共享资源的线程安全,而且 synchronized() 本来也是针对线程的,只不过从协程的角度看,如果可以只掐住协程,不影响运行该协程代码的线程就更好了。

这个区别就好像 delay() 与 sleep() 一样。协程的 delay() 只会挂起当前的协程,但是不会影响其所在的线程;而 sleep() 是让整个线程休眠。因此在协程中,为了不影响整个线程,我们通常都是使用 delay() 仅作用于当前协程,而不会使用 sleep() 为了让协程挂起而影响到整个线程的运行。下一节要讲的 Mutex 就可以解决这个问题。

Lock 的用法也大致相同,这里不多赘述。

3.2 Mutex

Mutex 是计算机领域的专属词汇,全称是 mutual exclusion,即互斥。Kotlin 提供的 Mutex 是基于协程的、挂起式的,不同于前面两个是基于线程的、阻塞式的。Mutex 是协程自己的实现,它不卡线程,性能更好,使用也很方便:

fun main() = runBlocking<Unit> {
    var number = 0
    val mutex = Mutex()
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job1 = scope.launch {
        repeat(100_000) {
            try {
                mutex.lock()
                number++
            } finally {
                mutex.unlock()
            }
        }
    }

    val job2 = scope.launch {
        repeat(100_000) {
            mutex.withLock {
                number--
            }
        }
    }

    job1.join()
    job2.join()
    println("result: $number")
}

job1 内使用的是常规用法,在操作共享变量前用 lock() 加锁,在 finally 代码块中解锁。job2 内使用的是简便写法,withLock() 将代码块内的代码放入 try 中执行,在 finally 中用 unlock() 解锁:

@OptIn(ExperimentalContracts::class)
public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
    contract {
        callsInPlace(action, InvocationKind.EXACTLY_ONCE)
    }
    lock(owner)
    return try {
        action()
    } finally {
        unlock(owner)
    }
}

Mutex 的优势是性能,但由于它是基于协程的,因此只能在协程中使用。所以,如果只在协程中使用共享资源,那么就用 Mutex,如果需要在线程中使用,就要用上一节说的 synchronized 与 Lock。

3.3 Semaphore

Java 还有一个 Semaphore,信号量,一个可以被多个线程持有的锁。你可以在它的构造方法中指定它最多可以被几个线程持有,如果有多余指定数量的线程去获取 Semaphore 就会陷入等待。获取锁用 acquire(),释放锁用 release()。

由于共享变量是只要有两个线程同时访问就会导致出错了,因此允许多个线程持有的 Semaphore 并不能用于解决竞态条件的问题,它是用来做性能控制的。你可以用它来实现类似线程池的功能,只不过你实现出来的是自己定制的对象池:同一时间最多只有多少个对象同时在做事,满了之后如果再来新对象就得等着,直到有新的坑让出来,这些新对象才能开始做事。

Kotlin 提供了一个 Semaphore 的协程版本,就叫 Semaphore,定位与 Java 的 Semaphore 相同,只不过是协程版本。

3.4 其他 API

在传统的线程系统里,还有一组典型的 API:wait()、notify()、notifyAll()。它们三个属于更底层的 API,在线程系统里,它既能实现互斥锁,也能实现线程之间相互等待的功能。但事实上,这些年已经基本没人再用这组函数了。因为 synchronized 关键字与 Lock 的推出,已经基本上完全替代了它们,而且它们用起来也很麻烦,所以现在没人用。正因如此,协程没有推出与它们类似的 API。

AtomicInteger 与 CopyOnWriteArrayList 等等也可以在协程中使用。虽然它们是针对线程的,但是卡住线程的同时一定把协程也卡住了。所以在协程里也可以无风险地使用。

此外,volatile 与 transient 也可以在协程中使用,只不过不再是关键字,而是注解。

4、ThreadLocal

ThreadLocal 是线程的局部变量,即该变量在每个线程都是独立的,从不同的线程中访问该变量,这些线程对变量的值的读写都是相互独立的,对每个线程都有独立的副本。

ThreadLocal 是用来干嘛的?它的定位就像它的名字一样,就是针对线程的局部变量。Java 变量按照作用域由小到大可以划分为局部变量(方法内)、成员变量(类内)、静态变量(全局),ThreadLocal 是一种介于局部变量和静态变量之间的一种变量,范围比方法大,比静态全局小,只在当前线程范围内有效。

ThreadLocal 是对 Java 线程一个很关键的能力补充。前面提过,协程相对线程的一大优势就是线程不具备结构化管理的能力,而协程结构化管理的能力相当强大。线程不具备结构化管理的能力,但我们开发时是有结构化管理的需求的,这时就要用 ThreadLocal。有了 ThreadLocal 之后,在同一个线程里执行的多个方法之间就可以共享变量了,且该共享变量只针对当前线程有效,跨线程时还是独立的。因此 ThreadLocal 通常会作为静态变量存在。

ThreadLocal 在协程中的等价物是什么?有什么东西是跨方法的、针对协程的局部变量吗?CoroutineContext 就是协程里的 ThreadLocal。

本来,由于协程是具备结构化管理能力的,你完全不需要在协程内使用 ThreadLocal。但是开发过程中,免不了与 Java 代码进行协作,如果想在协程代码里访问老代码里的 ThreadLocal 对象,是不能像如下这样直接使用的:

val kotlinLocalString = ThreadLocal<String>()
fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job = scope.launch {
        kotlinLocalString.set("Test")
        delay(1000)
        println(kotlinLocalString.get())
    }
    job.join()
}

kotlinLocalString 的 get() 拿到的值一定是 set() 设置的值吗?不一定!因为虽然协程没变,但是执行协程代码的线程有可能改变了,delay() 的时候线程被让出,可能会去执行其他协程的代码。等 delay() 结束继续执行下面代码的时候,有可能就不是在刚才的线程中执行了。因为协程只能保证在执行挂起函数之后依然运行在刚才的 ContinuationInterceptor 所管理的某一个线程池上,不能保证同一个线程。

因此 ThreadLocal 不能在协程中直接使用,因为它的效果在协程中变得不可靠了。怎么办?用 asContextElement() 把 ThreadLocal 转换成 CoroutineContext:

val kotlinLocalString = ThreadLocal<String>()
fun main() = runBlocking<Unit> {
    val scope = CoroutineScope(EmptyCoroutineContext)
    val job = scope.launch {
        val stringContext = kotlinLocalString.asContextElement("Test")
        withContext(stringContext) {
            delay(1000)
            println(kotlinLocalString.get())
        }
    }
    job.join()
}

asContextElement() 是 ThreadLocal 的扩展函数,它会把参数里的值封装到返回值的 ThreadLocalElement 中。再将结果填到 withContext() 的参数中,包住获取 ThreadLocal 值的代码,这时候里面的 ThreadLocal 就是对协程兼容的了。不管里面怎么切协程,只要没出协程,它的值都会被保持住。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278000.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

论文笔记-arXiv2025-A survey about Cold Start Recommendation

论文笔记-arXiv2025-Cold-Start Recommendation towards the Era of Large Language Models: A Comprehensive Survey and Roadmap 面向大语言模型&#xff08;LLMs&#xff09;时代的冷启动推荐&#xff1a;全面调研与路线图1.引言2.前言3.内容特征3.1数据不完整学习3.1.1鲁棒…

设计模式03:行为型设计模式之策略模式的使用情景及其基础Demo

1.策略模式 好处&#xff1a;动态切换算法或行为场景&#xff1a;实现同一功能用到不同的算法时和简单工厂对比&#xff1a;简单工厂是通过参数创建对象&#xff0c;调用同一个方法&#xff08;实现细节不同&#xff09;&#xff1b;策略模式是上下文切换对象&#xff0c;调用…

飞机电气系统技术分析:数字样机技术引领创新

现代飞机正向着更安全、环保和经济的方向发展&#xff0c;飞机系统的设计日益复杂&#xff0c;对各子系统的性能和可靠性也提出了更高要求。作为飞机的重要组成部分&#xff0c;电气系统&#xff08;Electrical System&#xff0c;ES&#xff09;不仅负责为各类机载设备提供稳定…

(01)FreeRTOS移植到STM32

一、以STM32的裸机工程模板 任意模板即可 二、去官网上下载FreeRTOS V9.0.0 源码 在移植之前&#xff0c;我们首先要获取到 FreeRTOS 的官方的源码包。这里我们提供两个下载 链 接 &#xff0c; 一 个 是 官 网 &#xff1a; http://www.freertos.org/ &#xff0c; 另…

【Unity-Game4Automation PRO 插件】

Game4Automation PRO 插件 是一个用于 Unity 引擎 的工业自动化仿真工具&#xff0c;它提供了对工业自动化领域的仿真和虚拟调试支持&#xff0c;特别是在与工业机器人、生产线、PLC 系统的集成方面。该插件旨在将工业自动化的实时仿真与游戏开发的高质量 3D 可视化能力结合起来…

element select 绑定一个对象{}

背景&#xff1a; select组件的使用&#xff0c;适用广泛的基础单选 v-model 的值为当前被选中的 el-option 的 value 属性值。但是我们这里想绑定一个对象&#xff0c;一个el-option对应的对象。 <el-select v-model"state.form.modelA" …

mybatis延迟加载、缓存

目录 一、所需表 二、延迟加载 1.延迟加载概念 2.立即加载和延迟加载的应用场景 3.多对一延迟加载查询演示 (1)实体类 User Account (2)AccountMapper接口 (3)AccountMapper.xml (4)UserMapper接口 (5)UserMapper.xml (6)在总配置文件(mybatis-config.xml)中开启延…

VIVADO FIFO (同步和异步) IP 核详细使用配置步骤

VIVADO FIFO (同步和异步) IP 核详细使用配置步骤 目录 前言 一、同步FIFO的使用 1、配置 2、仿真 二、异步FIFO的使用 1、配置 2、仿真 前言 在系统设计中&#xff0c;利用FIFO&#xff08;first in first out&#xff09;进行数据处理是再普遍不过的应用了&#xff0c…

一、1-2 5G-A通感融合基站产品及开通

1、通感融合定义和场景&#xff08;阅读&#xff09; 1.1通感融合定义 1.2通感融合应用场景 2、通感融合架构和原理&#xff08;较难&#xff0c;理解即可&#xff09; 2.1 感知方式 2.2 通感融合架构 SF&#xff08;Sensing Function&#xff09;&#xff1a;核心网感知控制…

某政务行业基于 SeaTunnel 探索数据集成平台的架构实践

分享嘉宾&#xff1a;某政务公司大数据技术经理 孟小鹏 编辑整理&#xff1a;白鲸开源 曾辉 导读&#xff1a;本篇文章将从数据集成的基础概念入手&#xff0c;解析数据割裂给企业带来的挑战&#xff0c;阐述数据集成的重要性&#xff0c;并对常见的集成场景与工具进行阐述&…

【MySQL】使用C语言链接

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;MySQL 目录 一&#xff1a;&#x1f525; MySQL connect &#x1f98b; Connector / C 使用&#x1f98b; mysql 接口介绍&#x1f98b; 完整代码样例 二&#xff1a;&#x1f525; 共勉 一&#…

《Java核心技术II》并行流

并行流 从集合中获取并行流&#xff1a;Stream paralleWords words.parallelStream(); parallel方法将任意顺序流转换为并行流&#xff1a;Stream paralleWords Stream.of(wordArray).parallel(); 以下是不好的示范&#xff0c;假设对字符串的所有短单词计数&#xff1a; …

【Rust自学】13.2. 闭包 Pt.2:闭包的类型推断和标注

13.2.0. 写在正文之前 Rust语言在设计过程中收到了很多语言的启发&#xff0c;而函数式编程对Rust产生了非常显著的影响。函数式编程通常包括通过将函数作为值传递给参数、从其他函数返回它们、将它们分配给变量以供以后执行等等。 在本章中&#xff0c;我们会讨论 Rust 的一…

ETW HOOK[InfinityHook]技术解析

文章目录 概述分析过程参考资料 概述 ETW是操作系统为了对系统调用、异常等信息做了一个日志操作&#xff0c;本质就是在进行调用这些中断、异常、系统调用时会走向这个代码函数区域日志保存的功能。而ETW HOOK就是在驱动层微软的PatchGuard并未对其做到很好的检测&#xff0c…

码编译安装httpd 2.4,测试

下载链接&#xff1a;https://dlcdn.apache.org/httpd/httpd-2.4.62.tar.gz [rootopenEuler-1 ~]# yum install gcc gcc-c make -y [rootopenEuler-1 ~]# ll /root total 9648 -rw-------. 1 root root 920 Jan 10 17:15 anaconda-ks.cfg -rw-r--r-- 1 root root 9872432…

步入响应式编程篇(一)

响应式编程 为什么要有响应式编程&#xff1f;响应式编程的用法Flow api的用法处理器 为什么要有响应式编程&#xff1f; 传统编码&#xff0c;操作流程常见的是命令式编程范式&#xff0c;如对于一个请求或操作来说&#xff0c;都是串行执行&#xff0c;直到异常或执行结束&a…

C++—18、C++ 中如何写类

一、类的功能阐述 今天我们将用目前学到的类的基础知识从头开始编写一个类。只编写一个基本的Log类&#xff0c;来演示到目前为止我们学过的一些基本特性。随着接下来的学习你会看到从一个类的基本版本到一个更高级版本的过程和区别。高级版本可以做同样的事情&#xff0c;但可…

SW - 查看装配图中的零件的全路径名称

文章目录 SW - 查看装配图中的零件的全路径名称概述笔记END SW - 查看装配图中的零件的全路径名称 概述 装配图中&#xff0c;如果本机有多个不同版本的同名零件(e.g. v1/p1零件, v2/p1零件)&#xff0c;在装配图中想确认是哪个版本的零件。 如果编辑错了文件&#xff0c;或者…

【开源分享】nlohmann C++ JSON解析库

文章目录 1. Nlohmann JSON 库介绍2. 编译和使用2.1 获取库2.2 包含头文件2.3 使用示例2.4 编译 3. 优势4. 缺点5. 总结参考 1. Nlohmann JSON 库介绍 Nlohmann JSON 是一个用于 C 的现代 JSON 库&#xff0c;由 Niels Lohmann 开发。它以易用性和高性能著称&#xff0c;支持 …

神经网络基础-正则化方法

文章目录 1. 什么是正则化2. 正则化方法2.1 Dropout正则化2.2 批量归一化(BN层) 学习目标&#xff1a; 知道正则化的作用掌握随机失活 DropOut 策略知道 BN 层的作用 1. 什么是正则化 在设计机器学习算法时希望在新样本上的泛化能力强。许多机器学习算法都采用相关的策略来减小…