【Kotlin】Channel简介

news2025/1/11 12:54:54

1 前言

        Channel 是一个并发安全的阻塞队列,可以通过 send 函数往队列中塞入数据,通过 receive 函数从队列中取出数据。

        当队列被塞满时,send 函数将被挂起,直到队列有空闲缓存;当队列空闲时,receive 函数将被挂起,直到队列中有新数据存入。

        Channel 中队列缓存空间的大小需要在创建时指定,如果不指定,缓存空间默认是 0。

2 Channel 中 send 和 receive 案例

2.1 capacity 为 0

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
receive: 0
send: 1
receive: 1
send: 2
receive: 2

        说明:send 的 delay 时间比 receive 的 delay 时间短,但是并没有出现连续打印两个 send,而是打印一个 send,再打印一个 recieve,它们交替打印。因为 Channel 中队列的缓存空间默认为 0,在执行了 send 后,如果没有执行 recieve,send 将一直被挂起,直到执行了 receive 才恢复执行 send。

2.2 capacity 大于 0

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            delay(10)
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            delay(100)
            var element = channel.receive()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive: 0
receive: 1
receive: 2

        说明:Channel 中队列的缓存空间为 2,send 的 delay 时间比 receive 的 delay 时间短,因此会出现连续打印多个 send。

3 Channel 中迭代器

3.1 iterator

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        var iterator = channel.iterator()
        while (iterator.hasNext()) {
            var element = iterator.next()
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

3.2 for in

fun main() {
    var channel = Channel<Int>()
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in channel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
receive: 0
send: 1
send: 2
receive: 1
receive: 2

4 Channel 中 produce 和 actor

        produce 函数用于构造一个生产者协程,并返回一个 ReceiveChannel;actor 函数用于构造一个消费者协程,并返回一个 SendChannel。

4.1 produce

fun main() {
    var receiveChannel = CoroutineScope(Dispatchers.Default).produce<Int> { // 生产者
        repeat(3) {
            println("send: $it")
            send(it)
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        for (element in receiveChannel) {
            println("receive: $element")
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

4.2 actor

fun main() {
    var sendChannel = CoroutineScope(Dispatchers.Default).actor<Int> { // 生产者
        repeat(3) {
            var element = receive()
            println("receive: $element")
        }
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            println("send: $it")
            sendChannel.send(it)
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
receive: 0
receive: 1
send: 2
receive: 2

5 Channel 的关闭

        对于一个 Channel,如果我们调用了它的 close 函数,它会立即停止发送新元素,也就是说这时它的 isClosedForSend 会立即返回 true。而由于 Channel 缓冲区的存在,这时候可能还有一些元素没有被处理完,因此要等所有的元素都被读取之后 isClosedForReceive 才会返回 true。

fun main() {
    var channel = Channel<Int>(3)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
        channel.close()
        println("producter, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    CoroutineScope(Dispatchers.Default).launch { // 消费者
        repeat(3) {
            var element = channel.receive()
            println("receive: $element")
        }
        println("consumer, isClosedForSend=${channel.isClosedForSend}, isClosedForReceive=${channel.isClosedForReceive}")
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
producter, isClosedForSend=true, isClosedForReceive=false
receive: 0
receive: 1
receive: 2
consumer, isClosedForSend=true, isClosedForReceive=true

6 BroadcastChannel

        Channel 的生产者(producter)和消费者(consumer)都可以存在多个,但是同一个元素只会被一个消费者读到。BroadcastChannel 则不然,多个消费者不存在互斥行为。

6.1 Channel 中多个消费者

fun main() {
    var channel = Channel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            channel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            for (element in channel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 2
receive-1: 1

        说明:结果表明,Channel 中同一个元素只会被一个消费者读到。

6.2 BroadcastChannel 中多个消费者

6.2.1 BroadcastChannel

fun main() {
    var broadcastChannel = BroadcastChannel<Int>(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-0: 0
receive-0: 1
receive-0: 2
receive-1: 0
receive-1: 1
receive-1: 2

        说明:结果表明,BroadcastChannel 中同一个元素可以被所有消费者读到。

6.2.2 broadcast

fun main() {
    var channel = Channel<Int>()
    var broadcastChannel = channel.broadcast(2)
    CoroutineScope(Dispatchers.Default).launch { // 生产者
        delay(10)
        repeat(3) {
            println("send: $it")
            broadcastChannel.send(it)
        }
    }
    repeat(2) { index ->
        CoroutineScope(Dispatchers.Default).launch { // 消费者
            var receiveChannel = broadcastChannel.openSubscription()
            for (element in receiveChannel) {
                println("receive-$index: $element")
            }
        }
    }
    Thread.sleep(1000) // 阻塞当前线程, 避免程序过早结束, 协程提前取消
}

        打印如下。

send: 0
send: 1
send: 2
receive-1: 0
receive-1: 1
receive-1: 2
receive-0: 0
receive-0: 1
receive-0: 2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1629492.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue3 vite 路由去中心化(modules文件夹自动导入router)

通过路由去中心化可实现多人写作开发&#xff0c;不怕文件不停修改导致的冲突&#xff0c;modules中的文件可自动导入到index.js中 // 自动导入模块 const files import.meta.globEager(./modules/**.js); const modules {} for (const key in files) {modules[key.replace…

前端工程化Vue使用Node.js设置国内高速npm镜像源(踩坑记录版)

前端工程化Vue使用Node.js设置国内高速npm镜像源&#xff08;踩坑记录版&#xff09; 此篇仅为踩坑记录&#xff0c;并未成功更换高速镜像源&#xff0c;实际解决方法见文末跳转链接。 1.自身源镜像 自身镜像源创建Vue项目下载速度感人 2.更改镜像源 2.1 通过命令行配置 前提…

在Redux Toolkit中使用redux-persist进行状态持久化

在 Redux Toolkit 中使用 redux-persist 持久化插件的步骤如下: 安装依赖 npm install redux-persist配置 persistConfig 在 Redux store 配置文件中(例如 rootReducer.js)&#xff0c;导入必要的模块并配置持久化选项: import { combineReducers } from redux; import { p…

【MySQL关系型数据库】基本命令、配置、连接池

目录 MySQL数据库 第一章 1、什么是数据库 2、数据库分类 3、不同数据库的特点 4、MySQL常见命令&#xff1a; 5、MySQL基本语法 第二章 1、MySQL的常见数据类型 1、数值类型 2、字符类型 3、时间日期类型 2、SQL语句分类 1、DDL&#xff08;数据定义语言&#x…

mysql-sql-练习题-2-窗口函数

窗口函数 访问量max sum建表窗口函数连接 直播间人数 第1、3名建表排名sum 访问量max sum 每个用户截止到每月为止&#xff0c;最大单月访问次数&#xff0c;累计到该月的总访问次数 建表 create table visit(uid1 varchar(5) comment 用户id,month1 varchar(10) comment 月…

28.Gateway-网关过滤器

GatewayFilter是网关中提供的一种过滤器&#xff0c;可以多进入网关的请求和微服务返回的响应做处理。 GatewayFilter(当前路由过滤器&#xff0c;DefaultFilter) spring中提供了31种不同的路由过滤器工厂。 filters针对部分路由的过滤器。 default-filters针对所有路由的默认…

锂电池SOH预测 | 基于BP神经网络的锂电池SOH预测(附matlab完整源码)

锂电池SOH预测 锂电池SOH预测完整代码锂电池SOH预测 锂电池的SOH(状态健康度)预测是一项重要的任务,它可以帮助确定电池的健康状况和剩余寿命,从而优化电池的使用和维护策略。 SOH预测可以通过多种方法实现,其中一些常用的方法包括: 容量衰减法:通过监测电池的容量衰减…

.NET 检测地址/主机/域名是否正常

&#x1f331;PING 地址/主机名/域名 /// <summary>/// PING/// </summary>/// <param name"ip">ip</param>/// <returns></returns>public static bool PingIp(string ip){System.Net.NetworkInformation.Ping p new System.N…

Qt/C++ 波形绘制双缓冲下改善PaintEvent连续绘制卡顿问题(完整代码解析)

音频波形可视化&#xff1a;该控件用于将音频样本数据可视化为波形&#xff0c;常用于音频处理软件中以展示音频信号的时间域特性。 动态数据绘制&#xff1a;控件能够响应外部数据的变化并重新绘制波形&#xff0c;适用于实时或动态的音频数据流。 自定义绘制逻辑&#xff1…

git变更远端仓库名之后如何修改本地仓库配置的另一种方法?(删remote指针、添加、绑定master)

背景 如果某个远端的仓库地址变化后&#xff0c;本地仓库可以修改对应的remote。 之前谈过几种方法&#xff0c;比如重新设置一个新的remote的指针&#xff0c;绑定到新地址。然后删除origin&#xff0c;然后把新指针mv到origin。比如直接seturl修改&#xff08;git remote se…

Apache Seata如何解决TCC 模式的幂等、悬挂和空回滚问题

title: 阿里 Seata 新版本终于解决了 TCC 模式的幂等、悬挂和空回滚问题 author: 朱晋君 keywords: [Seata、TCC、幂等、悬挂、空回滚] description: Seata 在 1.5.1 版本解决了 TCC 模式的幂等、悬挂和空回滚问题&#xff0c;这篇文章主要讲解 Seata 是怎么解决的。 今天来聊一…

容器安全-镜像扫描

前言 容器镜像安全是云原生应用交付安全的重要一环&#xff0c;对上传的容器镜像进行及时安全扫描&#xff0c;并基于扫描结果选择阻断应用部署&#xff0c;可有效降低生产环境漏洞风险。容器安全面临的风险有&#xff1a;镜像风险、镜像仓库风险、编排工具风险&#xff0c;小…

STM32HAL库++ESP8266+cJSON连接阿里云物联网平台

实验使用资源&#xff1a;正点原子F1 USART1&#xff1a;PA9P、A10&#xff08;串口打印调试&#xff09; USART3&#xff1a;PB10、PB11&#xff08;WiFi模块&#xff09; DHT11&#xff1a;PG11&#xff08;采集数据、上报&#xff09; LED0、1&#xff1a;PB5、PE5&#xff…

神经网络的优化器

神经网络的优化器是用于训练神经网络的一类算法&#xff0c;它们的核心目的是通过改变神经网络的权值参数来最小化或最大化一个损失函数。优化器对损失函数的搜索过程对于神经网络性能至关重要。 作用&#xff1a; 参数更新&#xff1a;优化器通过计算损失函数相对于权重参数的…

c++理论篇(一) ——浅谈tcp缓存与tcp的分包与粘包

介绍 在网络通讯中,Linux系统为每一个socket创建了接收缓冲区与发送缓冲区,对于TCP协议来说,这两个缓冲区是必须的.应用程序在调用send/recv函数时,Linux内核会把数据从应用进程拷贝到socket的发送缓冲区中,应用程序在调用recv/read函数时,内核把接收缓冲区中的数据拷贝到应用…

Xcode 15构建问题

构建时出现的异常&#xff1a; 解决方式&#xff1a; 将ENABLE_USER_SCRIPT_SANDBOXING设为“no”即可&#xff01;

【Linux命令行艺术】1. 初见命令行

&#x1f4da;博客主页&#xff1a;爱敲代码的小杨. ✨专栏&#xff1a;《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 |《MySQL探索之旅》 |《Web世界探险家》 ❤️感谢大家点赞&#x1f44d;&#x1f3fb;收藏⭐评论✍&#x1f3fb;&#xff0c;您的三连就是我持续更…

【网络基础】深入理解UDP协议:从报文格式到应用本质

文章目录 前言Udp协议段格式1. 几乎所有协议首要解决的两个问题&#xff1a;a) 如何分离&#xff08;封装&#xff09;b) 如何进行向上交付 2. 理解报文本身3. 对Udp报文字段的解释4. Udp的特点如何理解 面向数据报&#xff1a; 5. IO类接口的本质&#xff1a;sento、recvfromU…

RS0102YH8功能和参数介绍及如何计算热耗散

RS0102YH8功能和参数介绍-公司新闻-配芯易-深圳市亚泰盈科电子有限公司 RS0102YH8 是一款电平转换芯片&#xff0c;由润石&#xff08;RUNIC&#xff09;公司生产。以下是关于RS0102YH8的一些功能和参数的介绍&#xff1a; 电平转换功能&#xff1a; RS0102YH8旨在提供电平转换…

k8s学习(三十七)centos下离线部署kubernetes1.30(高可用)

文章目录 准备工作1、升级操作系统内核1.1、查看操作系统和内核版本1.2、下载内核离线升级包1.3、升级内核1.4、确认内核版本 2、修改主机名/hosts文件2.1、修改主机名2.2、修改hosts文件 3、关闭防火墙4、关闭SELINUX配置5、时间同步5.1、下载NTP5.2、卸载5.3、安装5.4、配置5…