rxjs pipeable operators(下)
这一篇主要就是讲 flattening operators,像其他的 pipeable 一样, flattening operators 内部会 subscribe 每一个传进来的 Observable,并且将其返回一个新的 Observable。不过它可以将 higher order observable 扁平化,就像 lodash 的 flattening 一样(lodash 的 flattening 可以将嵌套的数组转化为扁平数组)。
flattening operators 包含 concatMap,switchMap,mergeMap 等,具体的区别后面再说。
concatMap 的基础使用:
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { concatMap, map } from 'rxjs/operators';
const endpointInput: HTMLInputElement =
document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click')
.pipe(
map((event) => endpointInput.value),
concatMap((value) =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`)
)
)
.subscribe((value) => console.log(value));
这样的处理就不需要一些回调函数在 concatMap 内部中处理结果,而是将多个/多重 Observable 传到下游,因此可以直接在最后的 subscribe 中进行处理。
也如同之前提到的一样,如果这时候,pipe 中的 Observable 报错,那么就会终止操作。
错误处理 1
这是第一个处理错误的方法,使用之前讲过的 catchError 进行处理。
import { fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, concatMap, map } from 'rxjs/operators';
const endpointInput: HTMLInputElement =
document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click')
.pipe(
map((event) => endpointInput.value),
concatMap((value) =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`)
),
catchError((err) => of(err))
)
.subscribe({
next: (value) => console.log(value),
complete: () => console.log('completed'),
});
使用这种方法操作 Observable,一旦遇到错误,那么该 stream 就会进入完成状态,从而终结 subscription。
错误处理 2
稍微调整一下 catchError 的过程就会让 concatMap 传出成功而非报错的信号,从而继续处理下一个流。
import { fromEvent, of } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { catchError, concatMap, map } from 'rxjs/operators';
const endpointInput: HTMLInputElement =
document.querySelector('input#endpoint');
const fetchButton = document.querySelector('button#fetch');
fromEvent(fetchButton, 'click')
.pipe(
map((event) => endpointInput.value),
concatMap((value) =>
ajax(`https://random-data-api.com/api/${value}/random_${value}`).pipe(
catchError((err) => of(err))
)
)
)
.subscribe((value) => console.log(value));
flattening operators 的差异
主要就是几个 operator 对于并发的处理:
-
concatmap 会将所有的 Observable 推到 queue 中,等到上游的 Observable 完成之后,再按顺序的继续执行下一个 Observable。
在不知道用什么的时候,可以优先选择 concatMap
-
switchMap 在遇到下游的 Observable 之后,会取消(unsubscribing)上一个 Observable。
如果是 HTTP 请求的话,就会将上一个 HTTP 请求取消。对于 GET 请求来说这不会有什么问题,不过如果是 POST 的话,有可能在取消该请求之前,该请求就已经到了后端,并且做了相应的变化。
-
mergemap 会在每次接受到一个 observable 的时候就发送一个请求
相较于其他两个 flattening operators,mergeMap 是最容易出现问题的一个,所以用的时候需要更加的谨慎