rxjs pipeable operators(上)
A Pipeable Operator is a function that takes an Observable as its input and returns another Observable. It is a pure operation: the previous Observable stays unmodified.
一个 Pipeable Operator 是一个接受一个 Observable 作为输入,并且返回另一个 Observable。这是一个纯函数,作为 输入 的 Observable 不会有任何的变化。
下部分会主要讲 flattening operators。
filter
filter 也和 JS 中的 filter 用法很相似,同样是接受一组 Observables,然后将不符合的数据过滤掉,将剩余的 observables pipe 到下游的 Observables 中去。
⚠️:返回的不一定是一个 Observable,如下面的案例,实际上会返回 2 个 Observables,filter 或是任何的 pipeable 会返回的是所有满足 predicates(断言)的 observables。
import { Observable } from 'rxjs';
import { filter } from 'rxjs/operators';
const product1 = {
id: 1,
title: 'iPhone 9',
description: 'An apple mobile which is nothing like apple',
price: 549,
discountPercentage: 12.96,
rating: 4.69,
stock: 94,
brand: 'Apple',
category: 'smartphones',
thumbnail: 'https://i.dummyjson.com/data/products/1/thumbnail.jpg',
images: [
'https://i.dummyjson.com/data/products/1/1.jpg',
'https://i.dummyjson.com/data/products/1/2.jpg',
'https://i.dummyjson.com/data/products/1/3.jpg',
'https://i.dummyjson.com/data/products/1/4.jpg',
'https://i.dummyjson.com/data/products/1/thumbnail.jpg',
],
};
const product2 = {
id: 3,
title: 'Samsung Universe 9',
description: "Samsung's new variant which goes beyond Galaxy to the Universe",
price: 1249,
discountPercentage: 15.46,
rating: 4.09,
stock: 36,
brand: 'Samsung',
category: 'smartphones',
thumbnail: 'https://i.dummyjson.com/data/products/3/thumbnail.jpg',
images: ['https://i.dummyjson.com/data/products/3/1.jpg'],
};
const product3 = {
id: 9,
title: 'Infinix INBOOK',
description:
'Infinix Inbook X1 Ci3 10th 8GB 256GB 14 Win10 Grey – 1 Year Warranty',
price: 1099,
discountPercentage: 11.83,
rating: 4.54,
stock: 96,
brand: 'Infinix',
category: 'laptops',
thumbnail: 'https://i.dummyjson.com/data/products/9/thumbnail.jpg',
images: [
'https://i.dummyjson.com/data/products/9/1.jpg',
'https://i.dummyjson.com/data/products/9/2.png',
'https://i.dummyjson.com/data/products/9/3.png',
'https://i.dummyjson.com/data/products/9/4.jpg',
'https://i.dummyjson.com/data/products/9/thumbnail.jpg',
],
};
const products$ = new Observable((subscriber) => {
setTimeout(() => subscriber.next(product1), 1000);
setTimeout(() => subscriber.next(product2), 2000);
setTimeout(() => subscriber.next(product3), 3000);
});
const smartphone$ = products$.pipe(
filter((item: any) => item.category === 'smartphones')
);
smartphone$.subscribe((item) => console.log(item));
// also works the same way
// products$
// .pipe(filter((item: any) => item.category === 'smartphones'))
// .subscribe((item) => console.log(item));
从这个案例已经可以看到使用 rxjs 能够减少 callback 的使用,每一个 observable 都可以单独拉出来做一个变量,但是这个变量和 async/await 不太一样的一点就在于,使用这个 Observable 的时候不需要等 Observable 实现,也就是说,可以省时。
map
这个也很像 array map,下面是用对之前 forkjoin 的改写,可以看到使用 map 能够更好的分离逻辑:
import { forkJoin, interval } from 'rxjs';
// Mike is from New Delhi and likes to eat pasta.
import { ajax } from 'rxjs/ajax';
import { map } from 'rxjs/operators';
// 这里就可以单独的处理逻辑
const randomName$ = ajax<any>(
'https://random-data-api.com/api/name/random_name'
).pipe(map((ajaxRes: any) => ajaxRes.response.first_name));
const randomNation$ = ajax<any>(
'https://random-data-api.com/api/nation/random_nation'
).pipe(map((ajaxRes: any) => ajaxRes.response.capital));
const randomFood$ = ajax<any>(
'https://random-data-api.com/api/food/random_food'
).pipe(map((ajaxRes: any) => ajaxRes.response.dish));
// forkJoin([randomName$, randomNation$, randomFood$]).subscribe(
// ([nameAjax, nationAjax, foodAjax]) => console.log(`${nameAjax.response.first_name} is from ${nationAjax.response.capital} and likes to eat ${foodAjax.response.dish}.`)
// );
forkJoin([randomName$, randomNation$, randomFood$]).subscribe(
([firstname, capital, dish]) => {
console.log(`${firstname} is from ${capital} and likes to eat ${dish}`);
}
);
tap
tap 不会修改 Observable 的结果,也不会修改任何的 Notification(Notification 指的就是 next、error 或是 complete),但是能够获取 Notification 的值,因此非常适合用来 debug。
debounceTime
就是防抖的实现,JS 原生防抖部分可以参考: 防抖和节流的实现。
import { fromEvent } from 'rxjs';
import { map, debounceTime } from 'rxjs/operators';
const sliderInput = document.querySelector('input#slider');
fromEvent(sliderInput, 'input')
.pipe(
debounceTime(2000),
map((event: any) => event.target.value)
)
.subscribe((val) => console.log(val));
这样在 debounce 的期间就不会触发任何一个结果(这里是 slider,没有使用 debounce 之前每滑动 slider 都会输出一个结果),只有出了 debounce 的时间才会输出结果。
catchError
catchError 是对异常提供 fallback 操作,这样可以让整个 Observable 继续执行下去,从而完成 complete。使用 catchError 后,pipe 或是 subscribe 这个 Observable 的下游不会进入 error 阶段。
EMPTY 是 rxjs 内置的一个函数,使用了 rxjs 之后就不会进行任何的操作。