Kotlin Flow 背压和线程切换竟然如此相似

news2025/1/16 6:49:22

前言

上篇分析了Kotlin Flow原理,大部分操作符实现比较简单,相较而言背压和线程切换比较复杂,遗憾的是,纵观网上大部分文章,关于Flow背压和协程切换这块的原理说得比较少,语焉不详,鉴于此,本篇重点分析两者的原理及使用。
通过本篇文章,你将了解到:

  1. 什么是背压?
  2. 如何处理背压?
  3. Flow buffer的原理
  4. Flow 线程切换的使用
  5. Flow 线程切换的原理

1. 什么是背压?

先看自然界的水流:
image.png
为了充分利用水资源,人类建立了大坝,以大坝为分界点将水流分为上游和下游。

当上游的流速大于下游的流速,日积月累,最终导致大坝溢出,此种现象称为背压的出现

而对于Kotlin里的Flow,也有上游(生产者)、下游(消费者)的概念,如:

    suspend fun testBuffer1() {
        var flow = flow {
            //生产者
            (1..3).forEach {
                println("emit $it")
                emit(it)
            }
        }

        flow.collect {
            //消费者
            println("collect:$it")
        }
    }

通过collect操作符触发了流,从生产者生产数据(flow闭包),到消费者接收并处理数据(collect闭包),这就完成了流从上游到下游的一次流动过程。

2. 如何处理背压?

模拟一个生产者消费者速度不一致的场景:

    suspend fun testBuffer3() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it")
                emit(it)
            }
        }

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

计算流从生产到消费的整个时间:
image.png
生产者的速度比消费者的速度快,而它俩都是在同一个线程里顺序执行的,生产者必须等待消费者消费完毕后才会进行下一次生产。
因此,整个流的耗时=生产者耗时(3 * 1000ms)+消费者耗时(3 * 2000ms)=9s。

显而易见,消费者影响了生产者的速度,这种情况下该怎么优化呢?
最简单的解决方案:

生产者和消费者分别在不同的线程执行

如:

    suspend fun testBuffer4() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.flowOn(Dispatchers.IO)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

添加了flowOn()函数,它的存在使得它前面的代码在指定的线程里执行,如flow闭包了的代码都在IO线程执行,也就是生产者在IO线程执行。
而消费者在当前线程执行,因此两者无需相互等待,节省了总时间:
image.png

确实是减少了时间,提升了效率。但我们知道开启线程代价还是挺大的,既然都在协程里运行了,能否借助协程的特性:协程挂起不阻塞线程 来完成此事呢?
此时,Buffer出场了,先看看它是如何表演的:

    suspend fun testBuffer5() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.buffer(5)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

这次没有使用flowOn,取而代之的是buffer。
运行结果如下:
image.png
可以看出,生产者消费者都是在同一线程执行,但总耗时却和不在同一线程运行时相差无几。
那么它是如何做到的呢?这就得从buffer的源码说起。

3. Flow buffer的原理

无buffer

先看看没有buffer时的耗时:

    suspend fun testBuffer3() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it")
                emit(it)
            }
        }

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

image.png
从collect开始,依次执行flow闭包,通过emit调用到collect闭包,因为flow闭包里包含了几次emit,因此整个流程会有几次发射。
如上图,从步骤1到步骤8,因为是在同一个线程里,因此是串行执行的,整个流的耗时即为生产者到消费者(步骤1~步骤8)的耗时。

有buffer

在没看源码之前,我们先猜测一下它的流程:
image.png
每次emit都发送到buffer里,然后立刻回来继续发送,如此一来生产者没有被消费者的速度拖累。
而消费者会检测Buffer里是否有数据,有则取出来。

根据之前的经验我们知道:collect调用到emit最后到buffer是线性调用的,放入buffer后继续循环emit,那么问题来了:

是谁触发了collect闭包的调用呢?

接下来深入源码,探究答案。

buffer源码流程分析

创建Flow

public fun <T> Flow<T>.buffer(capacity: Int = Channel.BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
    var capacity = capacity//buffer容量
    var onBufferOverflow = onBufferOverflow//buffer满之后的处理策略
    if (capacity == Channel.CONFLATED) {
        capacity = 0
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    }
    // create a flow
    return when (this) {
        is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
        //走else 分支,构造ChannelFlowOperatorImpl
        else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
    }
}

buffer 返回Flow实例,其间涉及几个重要的类和函数:
image.png

调用collect
当调用Flow.collect时:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

构造了匿名内部类FlowCollector,并实现了emit方法,它的实现为collect的闭包。

调用ChannelFlowOperatorImpl.collect最终会调用ChannelFlow.collect:

    override suspend fun collect(collector: FlowCollector<T>): Unit =
        coroutineScope {
            collector.emitAll(produceImpl(this))
        }

    public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

produceImpl 创建了Channel,内部开启了协程,返回ReceiveChannel。

再来看emitAll函数:

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
    ensureActive()
    var cause: Throwable? = null
    try {
        while (true) {
            //挂起等待Channel数据
            val result = run { channel.receiveCatching() }
            if (result.isClosed) {
                //Channel关闭后才会退出循环
                result.exceptionOrNull()?.let { throw it }
                break // returns normally when result.closeCause == null
            }
            //发送数据
            emit(result.getOrThrow())
        }
    } catch (e: Throwable) {
        cause = e
        throw e
    } finally {
        if (consume) channel.cancelConsumed(cause)
    }
}

Channel此时并没有数据,因此协程会挂起等待。

Channel发送
Channel什么时候有数据呢?当然是在调用了Channel.send()函数后。
前面提到过collect之后开启了协程:

  public open fun produceImpl(scope: CoroutineScope): ReceiveChannel<T> =
        scope.produce(context, produceCapacity, onBufferOverflow, start = CoroutineStart.ATOMIC, block = collectToFun)

  internal val collectToFun: suspend (ProducerScope<T>) -> Unit
        get() = { collectTo(it) }

  protected override suspend fun collectTo(scope: ProducerScope<T>) =
        flowCollect(SendingCollector(scope))

此时传入的参数为:collectToFun,最后构造了:

public class SendingCollector<T>(
    private val channel: SendChannel<T>
) : FlowCollector<T> {
    override suspend fun emit(value: T): Unit = channel.send(value)
}

当协程得到执行时,会调用collectToFun–>collectTo(it)–>flowCollect(SendingCollector(scope)),最终调用到:

#ChannelFlowOperatorImpl
    override suspend fun flowCollect(collector: FlowCollector<T>) =
        flow.collect(collector)

而该flow为最开始的flow,collector为SendingCollector。
flow.collect后会调用到flow的闭包,进而调用到emit函数:

    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        val currentContext = uCont.context
        currentContext.ensureActive()
        //...
        completion = uCont
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

emitFun本质上会调用collector里的emit函数,而此时的collector即为SendingCollector,最后调用channel.send(value)

如此一来,Channel就将数据发送出去了,此时channel.receiveCatching()被唤醒,接下来执行emit(result.getOrThrow()),这函数最后会流转到最初始的collect的闭包里。
上面的分析即为生产者到消费者的流转过程,单看源码可能比较乱,看图解惑:

image.png
红色部分和绿色部分分别为不同的协程,它俩的关联点即是蓝色部分。

Flow buffer的本质上是利用了Channel进行数据的发送和接收

buffer为啥能提升效率

前面分析过无buffer时生产者消费者的流程图,作为对比,我们也将加入buffer后生产者消费者的流程图。
image.png
还是以相同的demo,阐述其流程:

  1. 生产者挂起1s,当1s结束后调用emit发射数据,此时数据放入buffer里,生产者调用delay继续挂起
  2. 此时消费者被唤醒,然后挂起 2s等待
  3. 第2s到来之时,生产者调用emit发送数据到buffer里,继续挂起
  4. 第2s到来之时,消费者结束挂起,消费数据,然后继续挂起2s
  5. 第3s到来之时,生产者继续生产数据,而后生产者退出生产
  6. 第5s到来之时,消费者挂起结束,消费数据,然后继续挂起2s
  7. 第7s到来之时,消费者挂起结束,消费结束,此时因为channel里已经没有数据了,退出循环,最终消费者退出

由此可见,总共花费了7s。
image.png
ps:协程调度时机不同,打印顺序可能略有差异,但总体耗时不变。

至此,我们找到了buffer能够提高效率的原因:

生产者、消费者运行在不同的协程,挂起操作不阻塞对方

抛出一个比较有意思的问题:以下代码加buffer之后效率会有提升吗?

    suspend fun testBuffer6() {
        var flow = flow {
            (1..3).forEach {
                println("emit $it")
                emit(it)
            }
        }
        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it")
            }
        }
        println("use time:${time} ms")
    }

在未实验之前,如果你已经有答案,恭喜你已经弄懂了buffer的本质。

4. Flow 线程切换的使用

    suspend fun testBuffer4() {
        var flow = flow {
            (1..3).forEach {
                delay(1000)
                println("emit $it in thread:${Thread.currentThread()}")
                emit(it)
            }
        }.flowOn(Dispatchers.IO)

        var time = measureTimeMillis {
            flow.collect {
                delay(2000)
                println("collect:$it in thread:${Thread.currentThread()}")
            }
        }
        println("use time:${time} ms")
    }

flowOn(Dispatchers.IO)表示其之前的操作符(函数)都在IO线程执行,如这里的意思是flow闭包里的代码在IO线程执行。
而其之后的操作符(函数)在当前的线程执行。
通常用在子线程里获取网络数据(flow闭包),然后再collect闭包里(主线程)更新UI。

5. Flow 线程切换的原理

public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> {
    checkFlowContext(context)
    return when {
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    }
}

看到这你可能已经有答案了:这不就和buffer一样的方式吗?
但仔细看,此处多了个上下文:CoroutineContext。
CoroutineContext的作用就是用来决定协程运行在哪个线程。

前面分析的buffer时,我们的协程的作用域是runBlocking,即使生产者、消费者在不同的协程,但是它们始终在同一个线程里执行。
而使用了flowOn指定线程,此时生产者、消费者在不同的线程运行协程。
因此,只要弄懂了buffer原理,flowOn原理自然而然就懂了。
image.png

以上为Flow背压和线程切换的全部内容,下篇将分析Flow的热流。
本文基于Kotlin 1.5.3,文中完整Demo请点击

您若喜欢,请点赞、关注、收藏,您的鼓励是我前进的动力

持续更新中,和我一起步步为营系统、深入学习Android/Kotlin

1、Android各种Context的前世今生
2、Android DecorView 必知必会
3、Window/WindowManager 不可不知之事
4、View Measure/Layout/Draw 真明白了
5、Android事件分发全套服务
6、Android invalidate/postInvalidate/requestLayout 彻底厘清
7、Android Window 如何确定大小/onMeasure()多次执行原因
8、Android事件驱动Handler-Message-Looper解析
9、Android 键盘一招搞定
10、Android 各种坐标彻底明了
11、Android Activity/Window/View 的background
12、Android Activity创建到View的显示过
13、Android IPC 系列
14、Android 存储系列
15、Java 并发系列不再疑惑
16、Java 线程池系列
17、Android Jetpack 前置基础系列
18、Android Jetpack 易学易懂系列
19、Kotlin 轻松入门系列
20、Kotlin 协程系列全面解读

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/59633.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5G无线技术基础自学系列 | 5G上行功率控制

素材来源&#xff1a;《5G无线网络规划与优化》 一边学习一边整理内容&#xff0c;并与大家分享&#xff0c;侵权即删&#xff0c;谢谢支持&#xff01; 附上汇总贴&#xff1a;5G无线技术基础自学系列 | 汇总_COCOgsta的博客-CSDN博客 5G上行功率控制是针对每个UE的不同信道…

CS224W 8 GNN Augmentation andTraining

目录 Graph Augmentation for GNNs 引入 Why Graph Augmentation Graph Augmentation Approaches Feature Augmentation on Graphs Input graph没有node features GNN很难学习的一些特定结构 Graph Structure augmentation Augment sparse graphs——添加虚拟节点或边…

不同平台下运行历程代码

不同平台下运行历程代码 所谓的大端模式,是指数据的低位(就是权值较小的后面那几位)保存在内存的高地址中,而数据的高位,保存在内存的低地址中,这样的存储模式有点儿类似于把数据当作字符串顺序处理:地址由小向大增加,而数据从高位往低位放; 所谓的小端模式,是指数据…

【Mysql】索引

文章目录一.索引的价值1.1. mysql与磁盘交互的基本单位建立共识1.2. 为什么IO交互的基本单位为Page理解单个Page理解多个Page提高在单个Page中的查找效率针对多页情况的页目录为什么选择B树,而不是其他数据结构&#xff1f;没有主键会怎么创建索引为什么推荐使用自增ID作为主键…

Spring(Bean 作用域和生命周期)

目录 1. 案例1: Bean作用域的问题 2. 作用域 3. 设置 Bean 的作用域 4. Spring 的执行流程 5. Bean 的生命周期 1. 案例1: Bean作用域的问题 现在有一个公共的 Bean,通过给 A 用户 和 B 用户使用, 然后在使用的过程中 A 偷偷的修改了公共 Bean 的数据, 导致 B 在使用时发…

html静态网站基于动漫网站网页设计与实现共计4个页面

HTML实例网页代码, 本实例适合于初学HTML的同学。该实例里面有设置了css的样式设置&#xff0c;有div的样式格局&#xff0c;这个实例比较全面&#xff0c;有助于同学的学习,本文将介绍如何通过从头开始设计个人网站并将其转换为代码的过程来实践设计。 ⚽精彩专栏推荐&#x1…

基于遗传算法的微电网调度(风、光、蓄电池、微型燃气轮机)(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️❤️&#x1f4a5;&#x1f4a5;&#x1f4a5;&#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清…

Docker介绍

目录 docker定义 docker解决了什么问题 docker技术边界 docker给我们带来了哪些改变 docker和虚拟机的区别 docker基本架构 基本架构图 RootFs Linux Namespace 进程命名空间 查看元祖进程命名空间 查看当前用户进程命名空间 容器进程命名空间 容器进程命名空间的…

[激光原理与应用-33]:典型激光器 -5- 不同激光器的全面、综合比较

目录 第1章 五类激光器的性能及应用对比 第2章 各类激光器的区别特点及应用三张表看懂-超米激光 2.1 固体激光器 2.2 气体激光器 2.3 化学激光器 2.4 染料激光器 2.5 半导体激光器 2.6 光纤激光器 2.7 自由电子激光器 第3章 10多种激光器的全面梳理 3.1 激光器的分类…

二叉树-31-37对称二叉树

31. 对称的二叉树 递归&#xff1a; 把原问题化成更小规模的问题&#xff0c;并且具有相同的问题性质&#xff0c;重复调用本身函数 二叉树的递归&#xff0c;是将某个节点的左子树、右子树看成一颗完整的树&#xff0c;那么对于子树的访问或者操作就是对于原树的访问或者操作…

HTML如何制作音乐网站(如何搭建个人音乐网页)

&#x1f389;精彩专栏推荐 &#x1f4ad;文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业&#xff1a; 【&#x1f4da;毕设项目精品实战案例 (10…

Java毕业设计 JSP+MySQL幼儿园信息管理系统

幼儿园网站就是幼儿园的“商标”,在网站上可以看出一所幼儿园的特色和个性,在这个信息时代,建立属于自己的幼儿园网站是最直接的宣传手段。而就目前的幼儿园信息管理系统来说,有很多功能都是华而不实的,不能很好的与幼儿园日常工作相结合,容易导致日常工作出现异常。本系统的题…

【HDU No. 4417】 超级马里奥 Super Mario

【HDU No. 4417】 超级马里奥 Super Mario 杭电OJ 题目地址 【题意】 可怜的公主陷入困境&#xff0c;马里奥需要拯救他的情人。把通往城堡的道路视为一条线&#xff08;长度为n &#xff09;&#xff0c;在每个整数点i 上都有一块高度为hi 的砖&#xff0c;马里奥可以跳的最…

【博客552】git auto-merge原理以及auto-merge的不同模式

git auto-merge原理 1、merge 常见误区 1、git merge 是用时间先后决定merge结果的&#xff0c;后面会覆盖前面的? 答 &#xff1a;git 是分布式的文件版本控制系统&#xff0c;在分布式环境中时间是不可靠的&#xff0c;git是靠三路合并算法进行合并的。 2、git merge 只…

电脑装了w10没有w7流畅怎么办?

如果我们对自己的电脑进行了系统的重装&#xff0c;在电脑装了win10系统之后发现没有win7流畅的话&#xff0c;很多小伙伴不知道是什么情况应该怎么解决。 那么据微点阅读小编所知可能是我们电脑硬件设施的不兼容所导致的。我们可以在官网上查看win10系统的配置要求是否符合自…

一文带你深入理解【Java基础】· 泛型

写在前面 Hello大家好&#xff0c; 我是【麟-小白】&#xff0c;一位软件工程专业的学生&#xff0c;喜好计算机知识。希望大家能够一起学习进步呀&#xff01;本人是一名在读大学生&#xff0c;专业水平有限&#xff0c;如发现错误或不足之处&#xff0c;请多多指正&#xff0…

Arduino开发实例-DIY风速测量及显示

DIY风速测量及显示 1、应用介绍 本次实例将使用一款具有 NPN 脉冲输出的数字风速计传感器。 NPN脉冲输出风速计效果好,性价比高。另外它仅在 5V 电源下工作。 在本次实例中,将此风速计传感器与 Arduino 板和 0.96 英寸 OLED 显示屏连接。 OLED显示屏将以米/秒为单位显示风速…

[附源码]计算机毕业设计基于SpringBoot的酒店预订系统设计与实现

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

​AAAI 2023 | 基于历史对比学习的时序知识图谱推理

©PaperWeekly 原创 作者 | 徐奕单位 | 上海交通大学Acemap研究方向 | 数据挖掘论文标题&#xff1a;Temporal Knowledge Graph Reasoning with Historical Contrastive Learning论文链接&#xff1a;https://arxiv.org/abs/2211.10904代码链接&#xff1a;https://github…

Elasticsearch好用查询插件分享

以前我常用的ES查询工具是Head&#xff0c;作为插件形式在浏览器中运行&#xff0c;挺方便的&#xff0c;后来发现head不太好用&#xff0c;比如在数据浏览的时候&#xff0c;不小心就点击了两个索引&#xff0c;背景色设置的还不够明显&#xff0c;比较容易看错数据的。于是想…