背景
最近项目业务,所有模块已经支持Swift混编开发,正在逐步使用Swift 方式进行开发新业务,以及逐步替换老业务方式进行发展,所以使用一些较为成熟的Swift 的三方库,成为必要性,经过调研发现RxSwift 在使用的情况上,较为成熟,且方便实用;以下介绍一些RxSwift 使用的一些方式和大家一起学习讨论
Swift为值类型,在传值与方法回调上有影响,RxSwift一定程度上弥补Swift的灵活性
- RxSwift使得代码复用性较强,减少代码量
- RxSwift因为声明都是不可变更,增加代码可读性
- RxSwift使得更易于理解业务代码,抽象异步编程,统一代码风格
- RxSwift使得代码更易于编写集成单元测试,增加代码稳定性
一:简单的使用方式
1.1 信号创建
1.1.1创建一个普通信号
let disposeB = DisposeBag()
//通过指定的方法实现来自定义一个被观察的序列。
//订阅创建
let myOb = Observable<Any>.create { (observ) -> Disposable in
observ.onNext("alan")
observ.onCompleted()
return Disposables.create()
}
//订阅事件
myOb.subscribe { (even) in
print("subscribe" + "\(even)")
}.disposed(by: disposeB)//销毁
1.1.2 创建信号的其他方式
class ViewController: UIViewController {
let disposeB = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
// Do any additional setup after loading the view, typically from a nib.
//通过指定的方法实现来自定义一个被观察的序列。
//订阅创建
let myOb = Observable<Any>.create { (observ) -> Disposable in
observ.onNext("alan")
zipSigal observ.onCompleted()
return Disposables.create()
}
//订阅事件
myOb.subscribe { (even) in
print("subscribe" + "\(even)")
}.disposed(by: disposeB)//销毁
//各种观察者序列产生方式
//该方法通过传入一个默认值来初始化。
Observable.just("just")
.subscribe { (event) in
print(event)
}
.disposed(by: disposeB)
//该方法可以接受可变数量的参数(必需要是同类型的)
Observable.of("o","f","of").subscribe { (event) in
print(event)
}.disposed(by: disposeB)
//该方法需要一个数组参数。
Observable.from(["f","r","o","m"]).subscribe { (event) in
print("from" + "\(event)")
}.disposed(by: disposeB)
//该方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列。
Observable<Int>.never().subscribe { (event) in
print(event)
}.disposed(by: disposeB)
// // 该方法创建一个空内容的 Observable 序列。 //会打印complete
Observable<Int>.empty().subscribe { (event) in
print("empty" ,event)
}.disposed(by: disposeB)
//该方法创建一个不做任何操作,而是直接发送一个错误的 Observable 序列。
let myError = MyError.A
print(myError.errorType)
Observable<Int>.error(myError).subscribe { (event) in
print(event.error)
}.disposed(by: disposeB)
//该方法通过指定起始和结束数值,创建一个以这个范围内所有值作为初始值的Observable序列。
Observable.range(start: 1, count: 6).subscribe { (event) in
print(event)
}.disposed(by: disposeB)
//该方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)。慎重使用
// Observable.repeatElement("SPAlan").subscribe { (event) in
// print(event)
// }.disposed(by: disposeB)
//该方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
//第一个参数:初始化的数值 第二个 条件 第三也是一个条件 0 + 2 <= 10 依次循环下去,iterate:重复执行 ,执行结果为 0,2,4,6,8,10
Observable.generate(initialState: 0, condition: {$0<=10}, iterate: {$0+2}).subscribe { (event) in
print(event)
}.disposed(by: disposeB)
//上面和下面的效果一样
Observable.of(0,2,4,6,8,10).subscribe { (event) in
print(event)
}.disposed(by: disposeB)
//该个方法相当于是创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable序列创建的行为,而这个 block 里就是真正的实例化序列对象的地方。
var isOdd = true
let factory: Observable<Int> = Observable.deferred { () -> Observable<Int> in
isOdd = !isOdd
if isOdd{
return Observable.of(0,2,4,6,8)
}else{
return Observable.of(1,3,5,7,9)
}
}
//这里会调用上面的工厂
factory.subscribe { (event) in
print("\(isOdd)",event)
}.disposed(by: disposeB)
//这里会再次调用工厂
factory.subscribe { (event) in
print("\(isOdd)",event)
}.disposed(by: disposeB)
//这个方法创建的 Observable 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。
// Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { (event) in
// print(event)
// }.disposed(by: disposeB)
//这个方法有两种用法,一种是创建的 Observable序列在经过设定的一段时间后,产生唯一的一个元素。,这个只调用一次
Observable<Int>.timer(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe{(event) in
print("123",event)
}.disposed(by: disposeB)
//另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素。:经过5秒后每个1秒创建一个元素
// Observable<Int>.timer(DispatchTimeInterval.seconds(5), period: DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance).subscribe { (event) in
// print(event)
// }.disposed(by: disposeB)
}
enum MyError:Error {
case A
case B
var errorType:String {
switch self {
case .A:
return "i am error A"
case .B:
return "BBBB"
}
}
}
override func didReceiveMemoryWarning() {
super.didReceiveMemoryWarning()
// Dispose of any resources that can be recreated.
}
}
1.2 iOS中的一些用法
1.2.1:button点击事件
//MARK: - RxSwift应用-button响应
func setupButton() {
// 业务逻辑 和 功能逻辑
// 设计
self.button.rx.tap
.subscribe(onNext: { () in
print("按钮点击了")
})
.disposed(by: disposeBag)
}
1.2.2:textfiled文本响应
//MARK: - RxSwift应用-textfiled
func setupTextFiled() {
self.textFiled.rx.text.orEmpty
.subscribe(onNext: { (text) in
print(text)
})
.disposed(by: disposeBag)
// 简单简单 更简单-RxSwift 面向开发者
self.textFiled.rx.text
.bind(to: self.button.rx.title())
.disposed(by: disposeBag)
}
1.2.3:scrollView效果
//MARK: - RxSwift应用-scrollView
func setupScrollerView() {
scrollView.contentSize = CGSize(width: 300, height: 1000)
scrollView.rx.contentOffset
.subscribe(onNext: { [weak self](content) in
print(content);
print(self?.scrollView)
})
.disposed(by: disposeBag)
}
1.2.4:KVO
//MARK: - RxSwift应用-KVO
func setupKVO() {
// 系统KVO 还是比较麻烦的
// person.addObserver(self, forKeyPath: "name", options: .new, context: nil)
person.rx.observeWeakly(String.self, "name").subscribe(onNext: { (change) in
print(change ?? "helloword")
}).disposed(by: disposeBag)
}
1.2.5:通知
//MARK: - 通知
func setupNotification(){
NotificationCenter.default.rx
.notification(UIResponder.keyboardWillShowNotification)
.subscribe { (event) in
print(event)
}.disposed(by: disposeBag)
}
1.2.6:手势
//MARK: - 手势
func setupGestureRecognizer(){
let tap = UITapGestureRecognizer()
self.label.addGestureRecognizer(tap)
self.label.isUserInteractionEnabled = true
tap.rx.event.subscribe(onNext: { (tap) in
print(tap.view)
})
.disposed(by: disposeBag)
}
1.2.7:网络请求
//MARK: - RxSwift应用-网络请求
func setupNextwork() {
let url = URL(string: "https://www.baidu.com")
// 传统方式
URLSession.shared.dataTask(with: url!) { (data, response, error) in
print("dataTask.response->" + "\(response)")
print("dataTask.response->" + "\(data)")
}.resume()
//RxSwift 方式
URLSession.shared.rx.response(request: URLRequest(url: url!))
.subscribe(onNext: { (response, data) in
print("rx.response.response ==== \(response)")
print("rx.response.data ===== \(data)")
}, onError: { (error) in
print("error ===== \(error)")
}).disposed(by: disposeBag)
}
1.2.8:timer定时器
func setupTimer() {
timer = Observable<Int>.interval(DispatchTimeInterval.seconds(1), scheduler: MainScheduler.instance)
timer.subscribe(onNext: { (num) in
print("hello word \(num)")
}).disposed(by: disposeBag)
}
二:高阶用法
2.1:组合操作符
2.1.1:startWith
print("*****startWith*****")
Observable.of("1", "2", "3", "4")
.startWith("A")
.startWith("B")
.startWith("C", "a", "b")
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//效果: CabBA1234
2.1.2:merge
print("*****merge*****")
let subject1 = PublishSubject<String>()
let subject2 = PublishSubject<String>()
// merge subject1和subject2
Observable.of(subject1, subject2)
.merge()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
subject1.onNext("C")
subject1.onNext("o")
subject2.onNext("o")
subject2.onNext("o")
subject1.onNext("c")
subject2.onNext("i")
2.1.3:zip
- 将多达8个原可观测序列组合成一个新的可观测序列,并将从组合的可观测序列中发射出对应索引处每个原可观测序列的元素
print("*****zip*****")
let stringSubject = PublishSubject<String>()
let intSubject = PublishSubject<Int>()
Observable.zip(stringSubject, intSubject) { stringElement, intElement in
"\(stringElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSubject.onNext("C")
stringSubject.onNext("o") // 到这里存储了 C o 但是不会响应除非;另一个响应
intSubject.onNext(1) // 勾出一个
intSubject.onNext(2) // 勾出另一个
stringSubject.onNext("i") // 存一个
intSubject.onNext(3) // 勾出一个
// 说白了: 只有两个序列同时有值的时候才会响应,否则存值
2.1.4:combineLatest
- 将原可观测序列组合成一个新的观测序列,并将开始发出联合观测序列的每个元素最新元素可观测序列一旦所有排放原序列至少有一个元素,并且当原可观测序列发出的任何一个新元素
print("*****combineLatest*****")
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSub.onNext("S") // 存一个 S
stringSub.onNext("P") // 存了一个覆盖 - 和zip不一样
intSub.onNext(1) // 发现strOB也有G 响应 G 1
intSub.onNext(2) // 覆盖1 -> 2 发现strOB 有值G 响应 G 2
stringSub.onNext("alan") // 覆盖G -> alan 发现intOB 有值2 响应 alan 2
// combineLatest 比较zip 会覆盖
// 应用非常频繁: 比如账户和密码同时满足->才能登陆. 不关系账户密码怎么变化的只要查看最后有值就可以 loginEnable
输出结果:
*****combineLatest*****
P 1
P 2
alan 2
2.1.5:switchLatest
- 将可观察序列发出的元素转换为可观察序列,并从最近的内部可观察序列发出元素
print("*****switchLatest*****")
let switchLatestSub1 = BehaviorSubject(value: "L")
let switchLatestSub2 = BehaviorSubject(value: "1")
let switchLatestSub = BehaviorSubject(value: switchLatestSub1)// 选择了 switchLatestSub1 就不会监听 switchLatestSub2
switchLatestSub.asObservable()
.switchLatest()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
switchLatestSub1.onNext("G")
switchLatestSub1.onNext("_")
switchLatestSub2.onNext("2")
switchLatestSub2.onNext("3") // 2-3都会不会监听,但是默认保存由 2覆盖1 3覆盖2
switchLatestSub.onNext(switchLatestSub2) // 切换到 switchLatestSub2
switchLatestSub1.onNext("*")
switchLatestSub1.onNext("alan") // 原理同上面 下面如果再次切换到 switchLatestSub1会打印出 alan
switchLatestSub2.onNext("4")
输出结果为:
*****switchLatest*****
L
G
_
3
4
2.2:映射操作符
2.2.1:map
- 转换闭包应用于可观察序列发出的元素,并返回转换后的元素的新可观察序列。
print("*****map*****")
let ob = Observable.of(1,2,3,4)
ob.map { (number) -> Int in
return number+2
}
.subscribe{
print("\($0)")
}
.disposed(by: disposeBag)
2.2.2:flatMap and flatMapLatest
- 将可观测序列发射的元素转换为可观测序列,并将两个可观测序列的发射合并为一个可观测序列。
- 这也很有用,例如,当你有一个可观察的序列,它本身发出可观察的序列,你想能够对任何一个可观察序列的新发射做出反应(序列中序列:比如网络序列中还有模型序列)
- flatMap和flatMapLatest的区别是,flatMapLatest只会从最近的内部可观测序列发射元素
struct SPPlayer {
var score: BehaviorRelay<Int>
}
func flatMapLatest(){
print("*****flatMapLatest*****")
var boy = SPPlayer(score: BehaviorRelay(value: 100))
var girl = SPPlayer(score: BehaviorRelay(value: 90))
var player = BehaviorSubject(value: boy)
player.asObservable()
.flatMapLatest{$0.score.asObservable() } // 本身score就是序列 模型就是序列中的序列
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
boy.score.accept(75)
player.onNext(girl)
boy.score.accept(50)
boy.score.accept(40)// 如果切换到 flatMapLatest 就不会打印
girl.score.accept(10)
girl.score.accept(0)
// 输出结果为:100,75,90,10,0
}
func flatMap(){
print("*****flatMap*****")
var boy = SPPlayer(score: BehaviorRelay(value: 100))
var girl = SPPlayer(score: BehaviorRelay(value: 90))
var player = BehaviorSubject(value: boy)
player.asObservable()
.flatMap {$0.score.asObservable() } // 本身score就是序列 模型就是序列中的序列
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
boy.score.accept(75)
player.onNext(girl)
boy.score.accept(50)
boy.score.accept(40)// 如果切换到 flatMapLatest 就不会打印
girl.score.accept(10)
girl.score.accept(0)
// 输出结果为:100,75,90,50,40,10,0
}
2.2.3:scan
- 从初始就带有一个默认值开始,然后对可观察序列发出的每个元素应用累加器闭包,并以单个元素可观察序列的形式返回每个中间结果
print("*****scan*****")
Observable.of(10, 100, 1000)
.scan(2) { aggregateValue, newValue in
aggregateValue + newValue // 10 + 2 , 100 + 10 + 2 , 1000 + 100 + 2
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 这里主要强调序列值之间的关系
2.3:过滤条件操作符
2.3.1:filter
- 仅从满足指定条件的可观察序列中发出那些元素
print("*****filter*****")
Observable.of(1,2,3,4,5,6,7,8,9,0)
.filter { $0 % 2 == 0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.3.2:distinctUntilChanged
- 抑制可观察序列发出的顺序重复元素
print("*****distinctUntilChanged*****")
Observable.of("1", "2", "2", "2", "3", "3", "4")
.distinctUntilChanged()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.3.3:elementAt
- 仅在可观察序列发出的所有元素的指定索引处发出元素
print("*****elementAt*****")
Observable.of("A", "l", "a", "n", "z")
.element(at: 3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.3.4:single
- 只发出可观察序列发出的第一个元素(或满足条件的第一个元素)。如果可观察序列发出多个元素,将抛出一个错误。
print("*****single*****")
Observable.of("alan", "zhi")
.single()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
Observable.of("zhai", "zhuang")
.single { $0 == "zhuang" }
.subscribe (onNext:{ print($0,"---single---") })
.disposed(by: disposeBag)
/**
输出结果:
alan
Unhandled error happened: Sequence contains more than one element.
zhuang ---single---
**/
2.3.5:take
- 只从一个可观察序列的开始发出指定数量的元素。 上面signal只有一个序列 在实际开发会受到局限 这里引出 take 想几个就几个
print("*****take*****")
Observable.of("alan", "zhuang","xing", "zhai")
.take(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.3.6:takeLast
- 仅从可观察序列的末尾发出指定数量的元素
print("*****takeLast*****")
Observable.of("alan", "zhuang","xing", "zhai")
.takeLast(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//输出结果:zhuang xing zhai
2.3.7:takeWhile
只要指定条件的值为true,就从可观察序列的开始发出元素
print("*****takeWhile*****")
Observable.of(1, 2, 3, 4, 5, 6)
.take(while: { int in
int < 3
})
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出结果 1,2
2.3.8:takeUntil
- 从原序列可观察序列发出元素,直到参考可观察序列发出元素
- 这个要重点,应用非常频繁 比如当前页面销毁了,就不能获取值了(cell重用运用)
print("*****takeUntil*****")
let sourceSequence = PublishSubject<String>()
let referenceSequence = PublishSubject<String>()
sourceSequence
.take(until: referenceSequence)
.subscribe (onNext:{ print($0) })
.disposed(by: disposeBag)
sourceSequence.onNext("alan")
sourceSequence.onNext("zhai")
sourceSequence.onNext("xing")
referenceSequence.onNext("zhuang") // 条件一出来,下面就走不了
sourceSequence.onNext("qiang")
sourceSequence.onNext("yanzi")
sourceSequence.onNext("ting")
输出结果为:
*****takeUntil*****
alan
zhai
xing
2.3.9:skip
- 从原序列可观察序列发出元素,直到参考可观察序列发出元素
- 这个要重点,应用非常频繁 不用解释 textfiled 都会有默认序列产生
print("*****skip*****")
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
print("*****skipWhile*****")
Observable.of(1, 2, 3, 4, 5, 6)
.skip(while :{int in
int < 3
})
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.3.10:skipUntil
- 抑制从原序列可观察序列发出元素,直到参考可观察序列发出元素
print("*****skipUntil*****")
let sourceSeq = PublishSubject<String>()
let referenceSeq = PublishSubject<String>()
sourceSeq
.skip(until: referenceSeq)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 没有条件命令 下面走不了
sourceSeq.onNext("alan")
sourceSeq.onNext("zhai")
sourceSeq.onNext("qiang")
referenceSeq.onNext("zhuang") // 条件一出来,下面就可以走了
sourceSeq.onNext("lilei")
sourceSeq.onNext("yanzi")
sourceSeq.onNext("meimei")
输出结果为:
*****skipUntil*****
lilei
yanzi
meimei
2.4:集合控制操作符
2.4.1:toArray
- 将一个可观察序列转换为一个数组,将该数组作为一个新的单元素可观察序列发出,然后终止
print("*****toArray*****")
Observable.range(start: 1, count: 10)
.toArray()
.subscribe(onSuccess: { print($0) })
.disposed(by: disposeBag)
// 输出结果:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
2.4.2:reduce
- 从一个设置的初始化值开始,然后对一个可观察序列发出的所有元素应用累加器闭包,并以单个元素可观察序列的形式返回聚合结果 - 类似scan
print("*****reduce*****")
Observable.of(10, 100, 1000)
.reduce(1, accumulator: +) // 1 + 10 + 100 + 1000 = 1111
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
// 输出结果:1111
2.4.3:concat
- 以顺序方式连接来自一个可观察序列的内部可观察序列的元素,在从下一个序列发出元素之前,等待每个序列成功终止
print("*****concat*****")
let subject1 = BehaviorSubject(value: "zhai")
let subject2 = BehaviorSubject(value: "1")
let subjectsSubject = BehaviorSubject(value: subject1)
subjectsSubject.asObservable()
.concat()
.subscribe (onNext:{ print($0) })
.disposed(by: disposeBag)
subject1.onNext("alan")
subject1.onNext("xing")
subjectsSubject.onNext(subject2)
subject2.onNext("打印不出来")
subject2.onNext("2")
subject1.onCompleted() // 必须要等subject1 完成了才能订阅到! 用来控制顺序 网络数据的异步
subject2.onNext("3")
2.5:从可观察对象的错误通知中恢复的操作符
2.5.1:catchAndReturn
- 从错误事件中恢复,方法是返回一个可观察到的序列,该序列发出单个元素,然后终止
enum SPError:Error {
case A
case B
var errorType:String {
switch self {
case .A:
return "i am error A"
case .B:
return "BBBB"
}
}
}
let spError = SPError.A
print("*****catchAndReturn*****")
let sequenceThatFails = PublishSubject<String>()
sequenceThatFails
.catchAndReturn("alan")
.subscribe (onNext: { print($0) })
.disposed(by: disposeBag)
sequenceThatFails.onNext("qiang")
sequenceThatFails.onNext("lilei") // 正常序列发送成功的
//发送失败的序列,一旦订阅到位 返回我们之前设定的错误的预案
sequenceThatFails.onError(self.spError)
5.2:catch
- 通过切换到提供的恢复可观察序列,从错误事件中恢复
print("*****catch*****")
let sequenceThatFails = PublishSubject<String>()
let recoverySequence = PublishSubject<String>()
sequenceThatFails
.catch {
print("Error:", $0)
return recoverySequence
}
.subscribe { print($0) }
.disposed(by: disposeBag)
sequenceThatFails.onNext("😬")
sequenceThatFails.onNext("😨")
sequenceThatFails.onNext("😡")
sequenceThatFails.onNext("🔴")
sequenceThatFails.onError(self.spError)
recoverySequence.onNext("😊")
5.3:retry
通过无限地重新订阅可观察序列来恢复重复的错误事件
print("*****retry*****")
var count = 1 // 外界变量控制流程
let sequenceRetryErrors = Observable<String>.create { observer in
observer.onNext("alan")
observer.onNext("xing")
observer.onNext("zhai")
if count == 1 {
// 流程进来之后就会过度-这里的条件可以作为出口,失败的次数
observer.onError(self.spError) // 接收到了错误序列,重试序列发生
print("错误序列来了")
count += 1
}
observer.onNext("liLie")
observer.onNext("yanzi")
observer.onNext("ting")
observer.onCompleted()
return Disposables.create()
}
sequenceRetryErrors
.retry()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
/***
输出结果:
*****retry*****
alan
xing
zhai
错误序列来了
alan
xing
zhai
liLie
yanzi
ting
*/
5.4:retry(_:):
- 通过重新订阅可观察到的序列,重复地从错误事件中恢复,直到重试次数达到max未遂计数
print("*****retry(_:)*****")
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("alan")
observer.onNext("xing")
observer.onNext("qiangqiang")
if count < 5 { // 这里设置的错误出口是没有太多意义的额,因为我们设置重试次数
observer.onError(self.spError)
print("错误序列来了")
count += 1
}
observer.onNext("liLei")
observer.onNext("yanzi")
observer.onNext("ting")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.6:Rx流程操作符。
2.6.1:debug
- 打印所有订阅、事件和处理。
print("*****debug*****")
var count = 1
let sequenceThatErrors = Observable<String>.create { observer in
observer.onNext("alan")
observer.onNext("zhai")
observer.onNext("qiang")
if count < 5 {
observer.onError(self.spError)
print("错误序列来了")
count += 1
}
observer.onNext("liLei")
observer.onNext("yanzi")
observer.onNext("zhuang")
observer.onCompleted()
return Disposables.create()
}
sequenceThatErrors
.retry(3)
.debug()
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
2.7:链接操作符
2.7.1:multicast
- 将原可观察序列转换为可连接序列,并通过指定的主题广播发射。
func testMulticastConnectOperators(){
print("*****multicast*****")
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的地方
// 所以在订阅一次 会出现什么问题?
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
/**
输出结果为:
*****multicast*****
我开始请求网络了
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
订阅1: 请求到的本地
订阅2: 请求到的本地
销毁回调了
**/
_ = netOB.connect()
}
2.7.2:replay
- 将原可观察序列转换为可连接的序列,并将向每个新订阅服务器重放以前排放的缓冲大小
- 首先拥有和publish一样的能力,共享 Observable sequence, 其次使用replay还需要我们传入一个参数(buffer size)来缓存已发送的事件,当有新的订阅者订阅了,会把缓存的事件发送给新的订阅者
print("*****replay*****")
let intSequence = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.replay(5)
_ = intSequence
.subscribe(onNext: { print(Date(),"订阅 1:, Event: \($0)") })
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
_ = intSequence.connect()
}
DispatchQueue.main.asyncAfter(deadline: .now() + 4) {
_ = intSequence
.subscribe(onNext: { print(Date(),"订阅 2:, Event: \($0)") })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 8) {
_ = intSequence
.subscribe(onNext: { print(Date(),"订阅 3:, Event: \($0)") })
}
DispatchQueue.main.asyncAfter(deadline: .now() + 10) {
self.disposeBag = DisposeBag()
}
三:示例
3.1:简单实用RxSwift 做一个计时器功能,控件绑定数据基础用法
计时器源码
文章Demo源码