Swift AsyncThrowingStream 和 AsyncStream Demo 演示

news2025/1/11 15:08:40

文章目录

    • 前言
    • 什么是 AsyncThrowingStream?
    • 调整现有代码以使用流
    • 什么是 AsyncStream?
    • AsyncThrowingStream
    • AsyncThrowingStream 迭代
    • 调试 AsyncStream
    • 取消一个 AsyncStream
    • 结论

前言

AsyncThrowingStreamAsyncStream 是 Swift 5.5 中由 SE-314 引入的并发框架的一部分。异步流允许你替换基于闭包或 Combine 发布器的现有代码。

在深入研究围绕抛出流的细节之前,如果你还没有阅读我的文章,我建议你先阅读我的文章,内容包括async-await。本文解释的大部分代码将使用那里解释的API。

什么是 AsyncThrowingStream?

AsyncThrowingStream 符合 AsyncSequence 协议,提供了一种不需要手动实现异步迭代器就能创建异步序列的便利方法。异步流适用于将基于回调或委托的API适配为可以与async-await一起使用的方式。

与 AsyncStream 相比,这种类型可以从 awaited next() 中抛出错误,从而用抛出的错误终止流。

您可以使用闭包初始化 AsyncThrowingStream,该闭包接收 AsyncThrowingStream.Continuation。在此闭包中生成元素,然后通过调用 continuation的yield(_:) 方法将它们提供给流。当没有更多元素可生成时,请调用 continuation的finish() 方法。这会导致序列迭代器生成nil,从而终止序列。如果发生错误,请调用 continuation的finish(throwing:) 方法,这会导致迭代器的 next() 方法将错误抛出到等待调用点。continuation 是 Sendable 的,这允许从 AsyncThrowingStream 的迭代之外的并发上下文中调用它。

任意的元素来源可能会比调用者迭代它们的速度更快地生成元素。因此,AsyncThrowingStream 定义了缓冲行为,允许流缓冲特定数量的最旧或最新元素。默认情况下,缓冲限制为 Int.max,这意味着它是无界的。

调整现有代码以使用流

将现有的回调代码适配为使用 async-await,请使用回调将值提供给流,方法是使用 continuation的yield(_:) 方法。

考虑一个假想的 QuakeMonitor 类型,每当它检测到地震时,它会向调用者提供 Quake 实例。为了接收回调,调用者将自定义闭包设置为监视器的 quakeHandler 属性的值,监视器会根据需要回调该闭包。调用者还可以设置 errorHandler 来接收异步错误通知,例如监视器服务突然不可用。

class QuakeMonitor {
    var quakeHandler: ((Quake) -> Void)?
    var errorHandler: ((Error) -> Void)?

    func startMonitoring() {}
    func stopMonitoring() {}
}

为了适应使用async-await,请扩展QuakeMonitor以添加一个类型为AsyncThrowingStream的quakes属性。在此属性的getter中,返回一个AsyncThrowingStream,其build闭包 - 在运行时调用以创建流 - 使用continuation执行以下步骤:

  1. 创建一个 QuakeMonitor 实例。
  2. 将监视器的 quakeHandler 属性设置为一个闭包,该闭包接收每个 Quake 实例,并通过调用 continuation的yield(_:) 方法将其转发到流中。
  3. 将监视器的 errorHandler 属性设置为一个闭包,该闭包接收来自监视器的任何错误,并通过调用 continuation 的 finish(throwing:) 方法将其转发到流中。这会导致流的迭代器抛出错误并终止流。
  4. 将 continuation 的 onTermination 属性设置为一个闭包,该闭包在监视器上调用 stopMonitoring()
  5. QuakeMonitor 上调用 startMonitoring()

请添加图片描述

extension QuakeMonitor {

    static var throwingQuakes: AsyncThrowingStream<Quake, Error> {
        AsyncThrowingStream { continuation in
            let monitor = QuakeMonitor()
            monitor.quakeHandler = { quake in
                 continuation.yield(quake)
            }
            monitor.errorHandler = { error in
                continuation.finish(throwing: error)
            }
            continuation.onTermination = { @Sendable _ in
                monitor.stopMonitoring()
            }
            monitor.startMonitoring()
        }
    }
}

因为流是AsyncSequence,调用点使用for-await-in语法来处理流产生的每个Quake实例:

do {
    for try await quake in quakeStream {
        print("Quake: \(quake.date)")
    }
    print("Stream done.")
} catch {
    print("Error: \(error)")
}

请添加图片描述

什么是 AsyncStream?

AsyncStream 类似于抛出的变体,但绝不会导致抛出错误。一个非抛出型的异步流会根据明确的完成调用或流的取消而完成。

注意: 在这篇文章中,我们将解释如何使用AsyncThrowingStream。除了发生错误处理的部分,代码示例与AsyncStream类似。

AsyncThrowingStream

如何使用 AsyncThrowingStream

AsyncThrowingStream 可以很好地替代现有的基于闭包的代码,如进度和完成处理程序。为了更好地理解我的意思,我将向你介绍我们在 WeTransfer 应用程序中遇到的一个场景。

在我们的应用程序中,我们有一个基于闭包的现有类,叫做 FileDownloader

struct FileDownloader {
    enum Status {
        case downloading(Float)
        case finished(Data)
    }

    func download(_ url: URL, progressHandler: (Float) -> Void, completion: (Result<Data, Error>) -> Void) throws {
        // .. Download implementation
    }
}

文件下载器接受一个URL,报告进度情况,并完成一个包含下载数据的结果或在失败时显示一个错误。

文件下载器在文件下载过程中报告一个数值流。在这种情况下,它报告的是一个状态值流,以报告正在运行的下载的当前状态。FileDownloader 是一个完美的例子,你可以重写一段代码来使用 AsyncThrowingStream。然而,重写需要你在实现层面上也重写你的代码,所以让我们定义一个重载方法来代替:

extension FileDownloader {
    func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
        return AsyncThrowingStream { continuation in
            do {
                try self.download(url, progressHandler: { progress in
                    continuation.yield(.downloading(progress))
                }, completion: { result in
                    switch result {
                    case .success(let data):
                        continuation.yield(.finished(data))
                        continuation.finish()
                    case .failure(let error):
                        continuation.finish(throwing: error)
                    }
                })
            } catch {
                continuation.finish(throwing: error)
            }
        }
    }
}

正如你所看到的,我们把下载方法包裹在一个 AsyncThrowingStream 里面。我们将流的值 Status 的类型描述为一个通用的类型,允许我们用状态更新来延续流。

只要有错误发生,我们就会通过抛出一个错误来完成流。在完成处理程序的情况下,我们要么通过抛出一个错误来完成,要么用一个不抛出的完成回调来跟进数据的产生。

switch result {
case .success(let data):
    continuation.yield(.finished(data))
    continuation.finish()
case .failure(let error):
    continuation.finish(throwing: error)
}

在收到最后的状态更新后,不要忘记 finish() 回调,这一点至关重要。否则,我们将保持流的存活,而实现层面的代码将永远不会继续。

我们可以通过使用另一个 yield 方法来重写上述代码,接受一个 Result 枚举作为参数:

continuation.yield(with: result.map { .finished($0) })
continuation.finish()

重写后的代码简化了我们的代码,并去掉了 switch-case 代码。我们必须映射我们的 Reslut 枚举以匹配预期的 Status 值。如果我们产生一个失败的结果,我们的流将在抛出包含的错误后结束。

AsyncThrowingStream 迭代

一旦你配置好你的异步抛出流,你就可以开始在数值流上进行迭代。在我们的 FileDownloader 例子中,它将看起来如下所示:

do {
    for try await status in download(url) {
        switch status {
        case .downloading(let progress):
            print("Downloading progress: \(progress)")
        case .finished(let data):
            print("Downloading completed with data: \(data)")
        }
    }
    print("Download finished and stream closed")
} catch {
    print("Download failed with \(error)")
}

我们处理任何状态的更新,并且我们可以使用 catch 闭包来处理任何发生的错误。你可以使用基于 AsyncSequence 接口的 for ... in 循环进行迭代,这对 AsyncStream 来说是一样的。

如果你遇到了类似的编译错误:

‘async’ in a function that does not support concurrency

你可能想读一读我的文章,其中Swift 中的 async/await ——代码实例详解。

上述代码示例中的打印语句有助于你理解 AsyncThrowingStream 的生命周期。你可以替换打印语句来处理进度更新和处理数据,为你的用户实现可视化。

调试 AsyncStream

如果一个流不能报告数值,我们可以通过放置断点来调试流产生的回调。虽然也可能是上面的 “Download finished and stream closed” 的打印语句不会调用,这意味着你在实现层的代码永远不会继续。后者可能是一个未完成的流的结果。

为了验证,我们可以利用 onTermination 回调:

func download(_ url: URL) -> AsyncThrowingStream<Status, Error> {
    return AsyncThrowingStream { continuation in

        ///  配置一个终止回调,以了解你的流的生命周期。
        continuation.onTermination = { @Sendable status in
            print("Stream terminated with status \(status)")
        }

        // ..
    }
}

回调在流终止时被调用,它将告诉你你的流是否还活着。我推荐你阅读 Sendable 和 @Sendable 闭包代码实例详解 来理解 @Sendable 属性。

如果出现了错误,输出结果可能如下:

Stream terminated with status finished(Optional(FileDownloader.FileDownloadingError.example))

上述输出只有在使用 AsyncThrowingStream 时才能实现。如果是一个普通的 AsyncStream,完成的输出看起来如下:

Stream terminated with status finished

而取消的结果对这两种类型的流来说都是这样的:

Stream terminated with status cancelled

你也可以在流结束后使用这个终止回调进行任何清理。例如,删除任何观察者或在文件下载后清理磁盘空间。

取消一个 AsyncStream

一个 AsyncStreamAsyncThrowingStream 可以由于一个封闭的任务被取消而取消。一个例子可以如下:

let task = Task.detached {
    do {
        for try await status in download(url) {
            switch status {
            case .downloading(let progress):
                print("Downloading progress: \(progress)")
            case .finished(let data):
                print("Downloading completed with data: \(data)")
            }
        }
    } catch {
        print("Download failed with \(error)")
    }
}
task.cancel()

一个流在超出范围或包围的任务取消时就会取消。如前所述,取消将相应地触发 onTermination 回调。

结论

AsyncThrowingStreamAsyncStream 是重写基于闭包的现有代码到支持 async-awai t的替代品的好方法。你可以提供一个连续的值流,并在成功或失败时完成一个流。你可以使用基于 AsyncSequence APIs 的 for 循环在实现层面上迭代值。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/517392.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

JUC并发编程15 | Java内存模型JMM与volatile

尚硅谷&#xff08;56-70&#xff09; JMM 引入一些大厂的面试题 Java内存模型JMM是什么JMM与volatile之间的关系是什么JMM有哪些特性or它的三大特性是什么为什么要有JMM&#xff0c;它为什么出现&#xff1f;功能和作用是什么&#xff1f;happens-before 先行发生原则是什么…

LaTeX极简入门

​LaTeX是什么&#xff1f; LaTeX是一种基于ΤΕΧ的排版系统&#xff0c;由美国计算机学家莱斯利兰伯特&#xff08;Leslie Lamport&#xff09;在20世纪80年代初期开发。 LaTeX是一款开源免费&#xff0c;并且应用相当广泛的排版工具。不但能够对文字、公式、图片进行精确而复…

电容笔和触控笔有什么区别?电容笔牌子排行榜

而现在&#xff0c;在无纸化教育的大热之下&#xff0c;电容笔这个配件&#xff0c;也被很多人所关注。许多人对电容笔与触控笔的不同之处感到困惑&#xff0c;事实上&#xff0c;这二者是非常容易分辨的&#xff0c;电容笔是适用在我们最常见的电容屏上才能进行操作&#xff0…

算法工程师面试题

1.关于边缘提取的算法有那些&#xff1f;各有什么优缺点&#xff1f; Canny算法&#xff1a;Canny算法是一种经典的边缘检测算法&#xff0c;具有较高的准确性和良好的鲁棒性。该算法利用高斯滤波器对图像进行平滑处理&#xff0c;然后计算图像中每个像素的梯度和方向&#xff…

TinyHttpd 运行过程出现的问题

最近拉了个 TinyHttpd 的工程下来&#xff0c;不过好像各个都有些改动&#xff0c;最后挑了篇阅读量最多的。工程也是从这里面给的链接下载的。 参考自&#xff1a;https://blog.csdn.net/jcjc918/article/details/42129311 拿下来在编译运行前&#xff0c;按这里说的&#x…

词云图制作(R)

词云图制作 文章目录 词云图制作[toc]1 工作准备2 实际操作 1 工作准备 材料准备&#xff1a; 文本数据txt文件&#xff0c;或者其他文本文件。R语言软件 2 实际操作 第一步&#xff1a;从网上相关新闻网站复制粘贴或下载相关文本数据&#xff0c;转化为txt格式文件或其他&…

【设计模式】桥接模式

【设计模式】桥接模式 参考资料&#xff1a; Java 设计模式&#xff1a;实战桥接模式 一起来学设计模式之桥接模式 《设计模式之美》设计模式与范式&#xff08;结构型-桥接模式&#xff09; 桥接模式在项目中的应用 文章目录 【设计模式】桥接模式一、桥接模式概述二、案例场…

GPT-4 开始内测32k输入长度的版本了!你收到邀请了吗?

要说现在 GPT-4 最大的问题是什么&#xff1f;可能除了一时拿他没有办法的机器幻觉&#xff0c;就是卡死的输入长度了吧。尽管在一般的对话、搜索的场景里目前普通版本 GPT-4 的 8000 左右的上下文长度或许绰绰有余&#xff0c;但是在诸如内容生成、智能阅读等方面当下基础版的…

京东短网址高可用提升最佳实践 | 京东云技术团队

作者&#xff1a;京东零售 郝彦军 什么是短网址&#xff1f; 短网址&#xff0c;是在长度上比较短的网址。简单来说就是帮您把冗长的URL地址缩短成8个字符以内的短网址。 当我们在腾讯、新浪发微博时&#xff0c;有时发很长的网址连接&#xff0c;但由于微博只限制140个字&a…

Android Studio中android: baselineAligned属性认识及用途

文章目录 使用Button控件来演示使用TextView控件来演示 android:baselineAligned 设置子元素都按照基线对齐&#xff0c;默认是true 使用Button控件来演示 在项目中经常使用layout_weight属性利用比重来设置控件的大小&#xff0c;代码如下&#xff1a; <?xml version&qu…

Baumer工业相机堡盟工业相机如何使用BGAPI SDK解决两个万兆网相机的同步采集不同步的问题

Baumer工业相机堡盟工业相机如何使用BGAPI SDK解决两个万兆网相机的同步采集不同步的问题 Baumer工业相机Baumer工业相机图像数据转为Bitmap的技术背景Baumer同步异常 &#xff1a;客户使用两个Baumer万兆网相机进行同步采集发现FrameID相同&#xff0c;但是图像不同步细节原因…

2023 年第八届数维杯数学建模挑战赛 A题详细思路

下面给大家带来每个问题简要的分析&#xff0c;以方便大家提前选好题目。 A 题 河流-地下水系统水体污染研究 该问题&#xff0c;初步来看属于物理方程类题目&#xff0c;难度较大。需要我们通过查阅相关文献和资料&#xff0c;分析并建立河流-地下水系统中有机污染物的对流、…

机器学习之聚类算法一

文章目录 一、简述1. 有监督和无监督的区别&#xff0c;以及应用实例2. 为什么是聚类3. 聚类都有哪些 二、k-means1.k-means&#xff0c;核心思想是什么1. 同一个簇内的样本点相似度较高&#xff0c;这里的相似度高&#xff0c;具体指什么2.问题来了&#xff1a;同一簇之间相似…

IP-Guard能否限制PC端微信登录?

能否限制PC端微信登录&#xff1f; 不能限制微信登录&#xff0c;但可以通过应用程序控制策略&#xff0c;禁止微信程序启动。 在控制台-【策略】-【应用程序】&#xff0c;添加以下策略&#xff1a; 动作&#xff1a;禁止 应用程序&#xff1a;wechat.exe 可以实现禁止微信启…

【python 多进程】零基础也能轻松掌握的学习路线与参考资料

学习python多进程可以帮助程序员充分利用CPU的性能&#xff0c;同时提高程序的并发性和响应能力。在学习python多进程前&#xff0c;需要具备一定的Python编程基础和对操作系统进程的基本了解。 一、Python多进程学习路线 基本概念 在学习python多进程之前&#xff0c;首先需…

C++基础之默认成员函数(构造函数,析构函数)

目录 空类中都有什么 默认成员函数 构造函数 简介 特性 注意 总结 析构函数 简介 特性 注意 总结 空类中都有什么 先看下面一段代码&#xff1a; class Date {};int main() {Date d1;std::cout << sizeof(Date) << std::endl;std::cout << sizeof(d1) <…

Linux之系统基本设置(四)

1、Linux 系统基本设置 1、系统时间管理 查看系统当前时间和时区 [root192 ~]# date 2023年 05月 04日 星期四 22:43:16 EDT [root192 ~]# date -R Thu, 04 May 2023 22:43:24 -0400 [root192 ~]# date %Y %m %d %H:%M:%S 2023 05 04 22:43:38设置完整时间 [root192 ~]# da…

基于html+css的图展示67

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

Shell脚本文本三剑客之sed编辑器(拥明月入怀,揽星河入梦)

文章目录 一、sed编辑器简介二、sed工作流程三、sed命令四、sed命令的使用1.sed打印文件内容&#xff08;p&#xff09;&#xff08;1&#xff09;打印文件所有行&#xff08;2&#xff09;打印文件指定行 2.sed增加、插入、替换行&#xff08;a、i、c&#xff09;(1&#xff0…

【C++】类和对象()

&#x1f601;作者&#xff1a;日出等日落 &#x1f514;专栏&#xff1a;C 当你的希望一个个落空&#xff0c;你也要坚定&#xff0c;要沉着! —— 朗费罗 前言 面向过程和面向对象初步认识 C语言是面向过程的&#xff0c;关注…