Kotlin协程Flow浅析

news2025/2/27 3:06:11

Kotlin协程中的Flow主要用于处理复杂的异步数据,以一种”流“的方式,从上到下依次处理,和RxJava的处理方式类型,但是比后者更加强大。

Flow基本概念

Flow中基本上有三个概念,即 发送方,处理中间层,接收方,可以类比水利发电站中的上游,发电站,下游的概念, 数据从上游开始发送”流淌“至中间站被”处理“了一下,又流淌到了下游。

示例代码如下

flow {         // 发送方、上游
    emit(1)    // 挂起函数,发送数据
    emit(2)
    emit(3)
    emit(4)
    emit(5)
}
.filter { it > 2 }  // 中转站,处理数据
.map { it * 2 }
.take(2)
.collect{           // 接收方,下游
    println(it)
}
输出内容:
6
8

通过上面代码我们可以看到,基于一种链式调用api的方式,流式的进行处理数据还是很棒的,接下来具体看一下上面的组成:

  • flow{},是个高阶函数,主要用于创建一个新的Flow。在其Lambda函数内部使用了emit()挂起函数进行发送数据。
  • filter{}、map{}、take{},属于中间处理层,也是中间数据处理的操作符,Flow最大的优势,就是它的操作符跟集合操作符高度一致。只要会用List、Sequence,那么就可以快速上手 Flow 的操作符。
  • collect{},下游接收方,也成为终止操作符,它的作用其实只有一个:终止Flow数据流,并且接收这些数据。

其他创建Flow的方式还是flowOf()函数,示例代码如下

fun main() = runBlocking{aassssssssaaaaaaaas
    flowOf(1,2,3,4,5).filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println("flowof: $it")
    }
}

我们在看一下list集合的操作示例

listOf(1,2,3,4,5).filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .forEach{
            println("listof: $it")
        }

通过以上对比发现,两者的基本操作几乎一致,Kotlin也提供了两者相互转换的API,Flow.toList()、List.asFlow()这两个扩展函数,让数据在 List、Flow 之间来回转换,示例代码如下:

//flow 转list
    flowOf(1,2,3)
        .toList()
        .filter { it > 1 }
        .map { it * 2 }
        .take(2)
        .forEach{
            println(it)
        }
    // list 转 flow
    listOf(1,2,3).asFlow()
        .filter { it > 2 }
        .map { it * 2 }
        .take(2)
        .collect{
            println(it)
        }

Flow生命周期

虽然从上面操作看和集合类型,但是Flow还是有些特殊操作符的,毕竟它是协程的一部分,和Channel不同,Flow是有生命周期的,只是以操作符的形式回调而已,比如onStart、onCompletion这两个中间操作符。

flowOf(1,2,3,4,5,6)
        .filter {
            println("filter: $it")
            it > 3
        }
        .map {
            println("map: $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .collect{
            println("collect: $it")
        }
输出内容:
onStart
filter: 1
filter: 2
filter: 3
filter: 4
map: 4
collect: 8
filter: 5
map: 5
collect: 10

我们可以看到onStart,它的作用是注册一个监听事件:当 flow 启动以后,它就会被回调。

和filter、map、take这些中间操作符不同,他们的顺序会影响数据的处理结果,这也很好理解;onStart和位置没有关系,它本质上是一个回调,不是一个数据处理的中间站。同样的还有数据处理完成的回调onCompletion。

flowOf(1,2,3,4,5,6)
        .filter {
            println("filter: $it")
            it > 3
        }
        .map {
            println("map: $it")
            it * 2
        }
        .take(2)
        .onStart { println("onStart") }
        .onCompletion { println("onCompletion") }
        .collect{
            println("collect: $it")
        }

Flow中onCompletion{} 在面对以下三种情况时都会进行回调:

  • 1,Flow 正常执行完毕
  • 2,Flow 当中出现异常
  • 3,Flow 被取消。

处理异常

在数据流的处理过程中,很难保证不出现问题,那么出现异常之后再该怎么处理呢?

  • 对于发生在上游、中间操作这两个阶段的异常,我们可以直接使用 catch 这个操作符来进行捕获和进一步处理。
  • 对于发生在下游,使用try-catch,把collect{}当中可能出现问题的代码包裹起来进行捕获处理。

上游或者中间异常使用catch

fun main() = runBlocking{
    val flow = flow {
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }

    flow.map { it * 2 }
        .catch { println("catch: $it") }
        .collect{
            println("collect: $it")
        }
}
输出:
collect: 2
collect: 4
catch: java.lang.IllegalStateException

catch 这个操作符的作用是和它的位置强相关的,catch 的作用域,仅限于catch的上游。换句话说,发生在 catch 上游的异常,才会被捕获,发生在 catch 下游的异常,则不会被捕获。

val flow = flow {
        emit(1)
        emit(2)
        throw IllegalStateException()
        emit(3)
    }

    flow.map { it * 2 }
        .catch { println("catch: $it") }
        .filter { it / 0 > 1 } // catch之后发生异常
        .collect{
            println("collect: $it")
    }
输出内容:
Exception in thread "main" java.lang.ArithmeticException: / by zero

下游使用try-catch

flowOf(1,2,3)
        .onCompletion { println("onCompletion $it") }
        .collect{
            try {
                println("collect: $it")
                throw IllegalStateException();
            }catch (e: Exception){
                println("catch $e")
            }
        }
输出:
collect: 1
catch java.lang.IllegalStateException
collect: 2
catch java.lang.IllegalStateException
collect: 3
catch java.lang.IllegalStateException
onCompletion null

切换执行线程

Flow适合处理复杂的异步任务,大多数情况下耗时任务放在子线程或线程池中处理,对于UI任务放在主线程中进行。

在Flow中可以使用flowOn操作符实现上述场景中的线程切换。

flowOf(1,2,3,4,5)
        .filter {
            logX("filter: $it")
            it > 2 }
        .flowOn(Dispatchers.IO) // 切换线程
        .collect{
            logX("collect: $it")
        }
输出内容:
================================
filter: 1
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 2
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 3
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 4
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 5
Thread:DefaultDispatcher-worker-1
================================
================================
collect: 3
Thread:main
================================
================================
collect: 4
Thread:main
================================
================================
collect: 5
Thread:main
================================

flowOn 操作符也是和它的位置强相关的。作用域限于它的上游。在上面的代码中,flowOn 的上游,就是 flowOf{}、filter{} 当中的代码,所以,它们的代码全都运行在 DefaultDispatcher 这个线程池当中。只有collect{}当中的代码是运行在 main 线程当中的。

终止操作符

Flow 里面,最常见的终止操作符就是collect。除此之外,还有一些从集合中借鉴过来的操作符,也是Flow的终止操作符。比如 first()、single()、fold{}、reduce{},本质上来说说当我们尝试将 Flow 转换成集合的时候,已经不属于Flow的API,也不属于协程的范畴了,它本身也就意味着 Flow 数据流的终止。

"冷的数据流"从何而来

在上面文章《Kotlin协程Channel浅析》中,我们认识到Channel是”热数据流“,随时准备好,随用随取,就像海底捞里的服务员。

现在我们看下Flow和Channel的区别

val flow = flow {
        (1..4).forEach{
            println("Flow发送前:$it")
            emit(it)
            println("Flow发送后: $it")
        }
    }

    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach{
            println("Channel发送前: $it")
            send(it)
            println("Channel发送后: $it")
        }
    }
    
输出内容:
Channel发送前: 1

Flow中的逻辑并未执行,因此我们可以这样类比,Channel之所以被认为是“热”的原因,是因为不管有没有接收方,发送方都会工作。那么对应的,Flow被认为是“冷”的原因,就是因为只有调用终止操作符之后,Flow才会开始工作。

除此之外,Flow一次处理一条数据,是个”懒家伙“。

    val flow = flow {
        (3..6).forEach {
            println("Flow发送前:$it")
            emit(it)
            println("Flow发送后: $it")
        }
    }.filter {
        println("filter: $it")
        it > 3
    }.map {
        println("map: $it")
        it * 2
    }.collect {
        println("结果collect: $it")
    }
输出内容:
Flow发送前:3
filter: 3
Flow发送后: 3
Flow发送前:4
filter: 4
map: 4
结果collect: 8
Flow发送后: 4
Flow发送前:5
filter: 5
map: 5
结果collect: 10
Flow发送后: 5
Flow发送前:6
filter: 6
map: 6
结果collect: 12
Flow发送后: 6

相比于满面春风,热情服务的Channel,Flow更像个冷漠的家伙,你不找他,他不搭理你。

  • Channel,响应速度快,但数据可能是旧的,占用资源
  • Flow,响应速度慢,但数据是最新的,节省资源

Flow也可以是”热“的,你知道吗?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/39334.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML+CSS大作业 (水果之家10个网页)

&#x1f380; 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

数字验证学习笔记——SystemVerilog芯片验证11 ——类的继承

一、类和继承 类的第二核心要素&#xff0c;即类的“继承”&#xff0c;继承也符合我们认识实际的观点&#xff0c;在自然界和科学界我们对世界的认识无外乎归纳法和演绎法。 归纳论证是一种由个别到一般的论证方法。它通过许多个别的事例或分论点&#xff0c;然后归纳出它们所…

17、Mybatis获取参数值的情况3(若mapper接口方法的参数为多个时,可以手动将这些参数放入map中存储)

Mybatis获取参数值的情况3&#xff08;若mapper接口方法的参数为多个时&#xff0c;可以手动将这些参数放入map中存储&#xff09; 第一步&#xff1a; 第二步&#xff1a; 第三步&#xff1a;

Linux系统tcp连接设置

目录net.ipv4.tcp_syn_retriesnet.ipv4.ip_local_port_rangenet.ipv4.tcp_net.core.somaxconnnet.ipv4.tcp_max_syn_backlognet.core.netdev_max_backlognet.ipv4.tcp_synack_retriesnet.ipv4.tcp_abort_on_overflownet.ipv4.tcp_syncookiesnet.ipv4.tcp_fastopennet.ipv4.tcp_…

兆易创新GD32 (四)FreeRTOS 移植 与 CMSIS OS2

可以完全参考STM32系列的方法 FreeRTOS 逛网下载 FreeRTOS源码 https://www.freertos.org/a00104.html GitHub地址 https://github.com/FreeRTOS/FreeRTOS-Kernel 下载后的FreeRTOS-Kernel复制到LIB文件夹下 在KEIL中添加文件&#xff0c;首先是公共部分 添加MDK移植文件CM4F…

含抽水蓄能电站系统的调峰经济调度研究matlab程序(粒子群算法)

含抽水蓄能电站系统的调峰经济调度研究matlab程序&#xff08;粒子群算法&#xff09; 参考文献&#xff1a;抽水蓄能电站的最佳调度方案研究 调峰电源的优化调度是促进电力系统安全稳定运行&#xff0c;实现可靠供电的要措施。因为目前我国的调峰电源严重不足&#xff0c;尤其…

【畅购商城】订单模块之收货人列表

目录 构建订单服务&#xff1a;8100 收货人列表 接口 后端实现&#xff1a;JavaBean 后端实现 前端实现 构建订单服务&#xff1a;8100 步骤一&#xff1a;构建项目&#xff0c;changgou4-service-orders 步骤二&#xff1a;修改pom.xml文件&#xff0c;添加依赖 <de…

Spring核心与设计思想

文章目录Spring 是什么&#xff1f;什么是容器&#xff1f;什么是IoC&#xff1f;传统程序开发控制反转式程序开发对比总结规律理解 Spring IoCDI 概念Spring 是什么&#xff1f; 我们通常所说的 Spring 指的是 Spring Framework&#xff08;Spring 框架&#xff09;&#xff…

ignite集群

ignite集群 基础知识 集群中的结点 Ignite 是一个分布式系统&#xff0c;集群是标配功能。 集群中的 节点&#xff0c;有两类&#xff1a; 服务器结点。具备完全功能的结点。 客户端结点&#xff1a;这个我没有完全理解&#xff0c;但从编程的角度&#xff0c;我初步认为这个…

Jenkins-CentOS安装jenkins

CentOS安装jenkins jenkins适配的jdk版本是17和11 第一步&#xff1a;安装jdk11 &#xff08;1&#xff09;在oracle下载jdk11 jdk-11.0.16.1_linux-x64_bin.tar,放到非root用户的家目录下 &#xff08;2&#xff09;解压&#xff0c;tar -zxvf jdk-11.0.16.1_linux-x64_bin…

校园论坛设计(Java)——介绍篇

校园论坛设计&#xff08;Java&#xff09; 文章目录校园论坛设计&#xff08;Java&#xff09;0、写在前面1、项目介绍2、项目背景3、项目功能介绍3.1 总体设计图3.2 帖子模块3.3 学习模块3.4 个人信息模块3.5 数据报表模块3.6 校园周边模块3.7 用户管理模块3.8 登录注册模块4…

基于桶的排序之基数排序以及排序方法总结

基于桶的排序之基数排序以及排序方法总结 作者&#xff1a;Grey 原文地址&#xff1a; 博客园&#xff1a;基于桶的排序之基数排序以及排序方法总结 CSDN&#xff1a;基于桶的排序之基数排序以及排序方法总结 说明 基于桶的排序有两种&#xff0c;分别是计数排序和基数排…

智源社区AI周刊No.107:英伟达推出Magic3D;Stable Diffusion2.0发布

汇聚每周AI热点&#xff0c;不错过重要资讯&#xff01;欢迎扫码&#xff0c;关注并订阅智源社区AI周刊。英伟达推出Magic3D&#xff0c;性能超过谷歌DreamFusion近一段时间&#xff0c;让AI生成3D点云成为业界研究的重点。谷歌曾在9月提出DreamFusion&#xff0c;引起广泛关注…

【LEACH协议】最佳簇半径的无线传感器网络分簇路由算法【含Matlab源码 2087期】

⛄一、 数据融合的LEACH协议简介 1 基于自适应数据融合的LEACH协议 1.1 基本定义和概念 无线传感器网络中的一个簇可以用一个无向加权全连通图G(V,E)来表示,V是簇中所有传感器节点的集合,E使簇中两个节点之间可以直接通信。假设顶点v∈V代表簇中的一个传感器节点,边euv(u,v)∈…

合作共赢,就在2022亚马逊云科技re:Invent全球大会

一年一度的科技狂欢盛会&#xff0c;2022年亚马逊云科技re:Invent全球大会即将于北京时间2022年11月29日盛大开启&#xff01;届时全球各领域关注“云计算圈”发展的人都将齐聚一堂&#xff0c;共同聆听5大重磅嘉宾的主题演讲&#xff0c;700前沿技术话题…… 携全球合作伙伴…

APP逆向案例之(三)sign 参数破解

说明&#xff1a;某新闻APP sign 参数 抓包发现包含内容&#xff1a; url: https://124.*.*.*/api/categorynews/lists 参数&#xff1a; 其中 sign 参数是需要变化的否则访问失败&#xff0c;其余都是固定的 page: 3, size: 10, category: -2, from: -1, lng: 116.363…

快收藏!最全GO语言实现设计模式【下】

点个关注&#x1f446;跟腾讯工程师学技术导语| 继上篇【快收藏&#xff01;最全GO语言实现设计模式】&#xff0c;本文继续列出GO语言实现的经典设计模式示例&#xff0c;每个示例都精心设计&#xff0c;力求符合模式结构&#xff0c;可作为日常编码参考&#xff0c;同时一些常…

基于遗传算法的自主式水下潜器路径规划问题附Matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

Pycharm配置python3环境

一、打开pycharm 二、选择preferences 三、选择项目所属Python解释器&#xff0c;点击设置icon&#xff0c;选择“全部显示” 四、选择“添加”&#xff0c;选择最新的python路径进行添加 五、验证是否配置成功 打印输出内容&#xff0c;则配置成功

JMETER也会遇到加密难题,中文乱码也能一并处理

文章目录加密的接口测试导出jar包&#xff0c;放入jmeter的lib/ext扩展目录JMeter输出中文乱码总结加密的接口测试 不管是接口测试还是性能测试&#xff0c;在遇到接口有加密入参的时候&#xff0c;该怎么办&#xff1f; 毫无疑问&#xff0c;放弃自己实现的想法&#xff0c;除…