RxJS
13.1.1 什么是 RxJS ?
RxJS 是一个用于处理异步编程的 JavaScript 库,目标是使编写异步和基于回调的代码更容易。
13.1.2 为什么要学习 RxJS ?
就像 Angular 深度集成 TypeScript 一样,Angular 也深度集成了 RxJS。
服务、表单、事件、全局状态管理、异步请求 …
13.1.3 快速入门
-
可观察对象 ( Observable ) :类比 Promise 对象,内部可以用于执行异步代码,通过调用内部提供的方法将异步代码执行的结果传递到可观察对象外部。
-
观察者 ( Observer ):类比 then 方法中的回调函数,用于接收可观察对象中传递出来数据。
-
订阅 ( subscribe ):类比 then 方法,通过订阅将可观察对象和观察者连接起来,当可观察对象发出数据时,订阅者可以接收到数据。
import { Observable } from "rxjs"
const observable = new Observable(function (observer) {
setTimeout(function () {
observer.next({
name: "Tom"
})
}, 2000)
})
const observer = {
next: function (value) {
console.log(value)
}
}
observable.subscribe(observer)
13.2 可观察对象
13.2.1 Observable
-
在 Observable 对象内部可以多次调用 next 方法向外发送数据。
const observable = new Observable(function (observer) { let index = 0 setInterval(function () { observer.next(index++) }, 1000) }) const observer = { next: function (value) { console.log(value) } } observable.subscribe(observer)
-
当所有数据发送完成以后,可以调用 complete 方法终止数据发送。
const observable = new Observable(function (observer) { let index = 0 let timer = setInterval(function () { observer.next(index++) if (index === 3) { observer.complete() clearInterval(timer) } }, 1000) }) const observer = { next: function (value) { console.log(value) }, complete: function () { console.log("数据发送完成") } } observable.subscribe(observer)
-
当 Observable 内部逻辑发生错误时,可以调用 error 方法将失败信息发送给订阅者,Observable 终止。
import { Observable } from "rxjs" const observable = new Observable(function (observer) { let index = 0 let timer = setInterval(function () { observer.next(index++) if (index === 3) { observer.error("发生错误") clearInterval(timer) } }, 1000) }) const observer = { next: function (value) { console.log(value) }, error: function (error) { console.log(error) } } observable.subscribe(observer)
- 可观察对象是惰性的,只有被订阅后才会执行
const observable = new Observable(function () { console.log("Hello RxJS") }) // observable.subscribe()
-
可观察对象可以有 n 多订阅者,每次被订阅时都会得到执行
const observable = new Observable(function () { console.log("Hello RxJS") }) observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe() observable.subscribe()
-
取消订阅
import { interval } from "rxjs" const obs = interval(1000); const subscription = obs.subscribe(console.log); setTimeout(function () { subscription.unsubscribe() }, 2000)
13.2.2 Subject
-
用于创建空的可观察对象,在订阅后不会立即执行,next 方法可以在可观察对象外部调用
import { Subject } from "rxjs" const demoSubject = new Subject() demoSubject.subscribe({next: function (value) {console.log(value)}}) demoSubject.subscribe({next: function (value) {console.log(value)}}) setTimeout(function () { demoSubject.next("hahaha") }, 3000)
13.2.3 BehaviorSubject
拥有 Subject 全部功能,但是在创建 Obervable 对象时可以传入默认值,观察者订阅后可以直接拿到默认值。
import { BehaviorSubject } from "rxjs"
const demoBehavior = new BehaviorSubject("默认值")
demoBehavior.subscribe({next: function (value) {console.log(value)}})
demoBehavior.next("Hello")
13.2.3 ReplaySubject
功能类似 Subject,但有新订阅者时两者处理方式不同,Subject 不会广播历史结果,而 ReplaySubject 会广播所有历史结果。
import { ReplaySubject } from "rxjs"
const rSubject = new ReplaySubject()
rSubject.subscribe(value => {
console.log(value)
})
rSubject.next("Hello 1")
rSubject.next("Hello 2")
setTimeout(function () {
rSubject.subscribe({next: function (value) {console.log(value)}})
}, 3000)
13.3 辅助方法
13.3.1 range
range(start, length),调用方法后返回 observable 对象,被订阅后会发出指定范围的数值。
import { range } from "rxjs"
range(0, 5).subscribe(n => console.log(n))
// 0
// 1
// 2
// 3
// 4
方法内部并不是一次发出 length 个数值,而是发送了 length 次,每次发送一个数值,就是说内部调用了 length 次 next 方法。
13.3.2 of
将参数列表作为数据流返回。
of("a", "b", [], {}, true, 20).subscribe(v => console.log(v))
13.3.3 from
将 Array,Promise, Iterator 转换为 observable 对象。
from(["a", "b", "c"]).subscribe(v => console.log(v))
// a
// b
// c
import { from } from "rxjs"
function p() {
return new Promise(function (resolve) {
resolve([100, 200])
})
}
from(p()).subscribe(v => console.log(v))
// [100, 200]
13.3.4 interval、timer
Interval:每隔一段时间发出一个数值,数值递增
import { interval } from "rxjs"
interval(1000).subscribe(n => console.log(n))
**timer **:间隔时间过去以后发出数值,行为终止,或间隔时间发出数值后,继续按第二个参数的时间间隔继续发出值
import { timer } from "rxjs"
timer(2000).subscribe(n => console.log(n))
timer(0, 1000).subscribe(n => console.log(n))
13.3.5 Concat
合并数据流,先让第一个数据流发出值,结束后再让第二个数据流发出值,进行整体合并。
import { concat, range } from "rxjs"
concat(range(1, 5), range(6, 5)).subscribe(console.log)