概览
自 从 Swift 5.5 推出新的 async/await 并发模型以来,异步队列(AsyncSequence)就成为其中不可或缺的重要一员。
不同于普通的序列,异步序列有着特殊的“惰性”和并发性,若序列中的元素还未准备好,系统在耐心等待的同时,还将宝贵的线程资源供其它任务去使用,极大的提高了系统整体性能。
在本篇博文中,您将学到以下知识
- 概览
- 什么是异步序列?
- 创建自定义异步序列
- 另一种创建异步序列的方式:AsyncStream
- 取消异步序列的处理
- 总结
什么是异步序列?
异步序列(AsyncSequence)严格的说是一个协议,它为遵守者提供异步的、序列的、迭代的序列元素访问。
表面看起来它是序列,实际上它内部元素是异步产生的,这意味着当子元素暂不可用时使用者将会等待直到它们可用为止:
诸多系统框架都对异步序列做了相应扩展,比如 Foundation 的 URL、 Combine 的发布器等等:
// Foundation
let url = URL(string: "https://kinds.blog.csdn.net/article/details/132787577")!
Task {
do {
for try await line in url.lines {
print(line)
}
}catch{
print("ERR: \(error.localizedDescription)")
}
}
// Combine
let p = PassthroughSubject<Int,Never>()
for await val in p.values {
print(val)
}
如上代码所示,URL#lines 和 Publisher#values 属性都是异步序列。
除了系统已为我们考虑的以外,我们自己同样可以非常方便的创建自定义异步序列。
创建自定义异步序列
一般来说,要创建自定义异步序列我们只需遵守 AsyncSequence 和 AsyncIteratorProtocol 协议即可:
下面我们就来创建一个“超级英雄们(Super Heros)”的异步序列吧:
struct SuperHeros: AsyncSequence, AsyncIteratorProtocol {
private let heros = ["超人", "钢铁侠", "孙悟空", "元始天尊", "菩提老祖"]
typealias Element = String
var index = 0
mutating func next() async throws -> Element? {
defer { index += 1}
try? await Task.sleep(for: .seconds(1.0))
if index >= heros.count {
return nil
}else{
return heros[index]
}
}
func makeAsyncIterator() -> SuperHeros {
self
}
}
Task {
let heros = SuperHeros()
for try await hero in heros {
print("出场英雄:\(hero)")
}
}
以上异步序列会每隔 1 秒“产出”一名超级英雄:
如上代码所示,如果下一个超级英雄还未就绪,系统会在等待同时去执行其它合适的任务,不会有任何资源上的浪费。
另一种创建异步序列的方式:AsyncStream
其实,除了直接遵守 AsyncSequence 协议以外,我们还有另外一种选择:AsyncStream!
不像 AsyncSequence 和 AsyncIteratorProtocol 协议 ,AsyncStream 是彻头彻尾的结构(实体):
它提供两种构造器,分别供正常和异步序列产出(Spawning)情境使用:
public init(_ elementType: Element.Type = Element.self, bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded, _ build: (AsyncStream<Element>.Continuation) -> Void)
public init(unfolding produce: @escaping () async -> Element?, onCancel: (@Sendable () -> Void)? = nil)
下面为此举两个 官方提供的代码示例:
let stream_0 = AsyncStream<Int>(Int.self,
bufferingPolicy: .bufferingNewest(5)) { continuation in
Task.detached {
for _ in 0..<100 {
await Task.sleep(1 * 1_000_000_000)
continuation.yield(Int.random(in: 1...10))
}
continuation.finish()
}
}
let stream_1 = AsyncStream<Int> {
await Task.sleep(1 * 1_000_000_000)
return Int.random(in: 1...10)
}
更多关于异步序列的知识,请小伙伴们移步如下链接观赏:
- Swift异步序列构造器AsyncStream内部定时器(Timer)无法被触发的解决
- Swift async/await 并发中如何将任务组(TaskGroup)转换为异步序列(AsyncSequence)
取消异步序列的处理
我们知道 新的 async/await 并发模型主打一个“结构化”,之所以称为“结构化”一个重要原因就是并发中所有任务都共同组成一个层级继承体系,当父任务出错或被取消时,所有子任务都会收到取消通知,异步序列同样也不例外。
就拿下面倒计时异步序列来说吧,它能感应父任务取消事件的原因是由于其中调用了 Task.sleep() 方法( sleep() 方法内部会对取消做出响应):
let countdown = AsyncStream<String> { continuation in
Task {
for i in (0...3).reversed() {
try await Task.sleep(until: .now + .seconds(1.0), clock: .suspending)
guard i > 0 else {
continuation.yield(with: .success("🎉 " + "see you!!!"))
return
}
continuation.yield("\(i) ...")
}
}
}
Task {
for await count in countdown {
print("current is \(count)")
}
}
正常情况下,我们应该在异步序列计算昂贵元素之前显式检查 Cancel 状态:
let stream_1 = AsyncStream<Int> {
// 假设 spawn() 是一个“昂贵”方法
func spawn() -> Int {
Int.random(in: 1...10)
}
// 或者使用 Task.checkCancellation() 处理异常
if Task.isCancelled {
return nil
}
return spawn()
}
在某些情况下,我们希望用自己的模型(Model)去关联退出状态,这时我们可以利用 withTaskCancellationHandler() 方法为异步序列保驾护航:
public func next() async -> Order? {
return await withTaskCancellationHandler {
let result = await kitchen.generateOrder()
// 使用自定义模型中的状态来判断是否取消
guard state.isRunning else {
return nil
}
return result
} onCancel: {
// 在父任务取消时设置取消状态!
state.cancel()
}
}
注意,当父任务被取消时上面 onCancel() 闭包中的代码会立即执行,很可能和 withTaskCancellationHandler() 方法主体代码同步进行。
现在,在一些 Task 内置取消状态不适合或不及时的场合下,我们可以在异步序列中使用 withTaskCancellationHandler() 的 onCancel() 子句来更有效率的完成退出操作,棒棒哒!💯。
总结
在本篇博文中,我们首先简单介绍了什么是异步序列,接着学习了几种创建自定义异步序列的方法,最后我们讨论了如何优雅的取消异步序列的迭代。
感谢观赏,再会!😎