Flux
Flux 是一个发出0-N个元素组成的异步序列的Publisher,可以被onComplete信号或者onError信号所终止。
Flux.just("Hello", "World").subscribe(System.out::println);
// fromArray(),fromIterable(),fromStream()
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
flux1.firstEmittingWith(flux2); // 谁先有数据就用谁,跟mono的or差不多
// 同步生成元素
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
// 可异步可同步
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
// sink的背压
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
Mono
Mono 是一个发出0-1个元素的Publisher,可以被onComplete信号或者onError信号所终止。
Mono.just(T) // 饥渴的。Mono. defer或fromSupplier 等是懒加载的
Mono.empty() / Mono.justOrEmpty(Optional<T>/T)
flux.then(); // 返回一个mono。是下一步的意思,但它只表示执行顺序的下一步,不依赖于上一步的结果。then() 方法的参数只是一个 Mono,无从接受上一步的执行结果。而 flatMap() 和 map() 的参数都是一个 Function,入参是上一步的执行结果。
Mono.never()
fromCallable(),fromCompletionStage(),fromFuture(),fromRunnable()和fromSupplier() 分别从Callable,CompletionStage,CompletableFuture,Runnable和Supplier中创建Mono
ignoreElements(Publisher source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
Mono.defer(()->{…});
mono1.or(mono2); // 谁先emit第一个可用的signal就用谁
// 传统的命令式编程
Object result1 = doStep1(params);
Object result2 = doStep2(result1);
Object result3 = doStep3(result2);
// 对应的反应式编程
Mono.just(params)
.flatMap(v -> doStep1(v))
.flatMap(v -> doStep2(v))
.flatMap(v -> doStep3(v));
逻辑判断
filter
对流中包含的元素进行过滤,只留下满足Predicate指定条件的元素
Flux.range(1, 10).filter(i -> i%2 == 0).subscribe(System.out::println);
any/all/ hasElements/ hasElement/ ofType
Flux.range(1, 6).any(f -> f < 0).subscribe(t -> System.out.println("any逻辑操作:"+t));
Flux.range(1, 6).all(f -> f > 0).subscribe(t -> System.out.println("all逻辑操作:"+t));
take/skip
take系列操作符用来从当前流中提取元素。Skip跳过元素
// 第一行语句输出的是数字 1 到 10;
// 第二行语句输出的是数字 991 到 1000;
// 第三行语句输出的是数字 1 到 9;
// 第四行语句输出的是数字 1 到 10,使得 Predicate 返回 true 的元素也是包含在内的
Flux.range(1, 1000).take(10).subscribe(System.out::println);
Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);
Flux.range(1, 1000).takeWhile(i -> i < 10).subscribe(System.out::println);
Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);
Flux.range(0, 10).skip(5).subscribe(f -> System.out.println("剩下的元素:"+f));
操作符
merge(eagerly,会提前准备好数据,更快,但更消耗内存)
merge和mergeSequential操作符用来把多个流合并成一个Flux序列. merge按照所有流中元素的实际产生序列来合并,而mergeSequential按照所有流被订阅的顺序。
//进行合并的流都是每隔 100 毫秒产生一个元素,不过第二个流中的每个元素的产生都比第一个流要延迟 50 毫秒。在使用 merge 的结果流中,来自两个流的元素是按照时间顺序交织在一起;而使用 mergeSequential 的结果流则是首先产生第一个流中的全部元素,再产生第二个流中的全部元素
Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5)).toStream().forEach(System.out::println);
concatMap (lazy;按顺序串行生成元素不能并行,内存消耗更小,默认Sequential排好序的)
flatMap(eagerly;会提前准备好数据,更快,但更消耗内存,flat能并行生成元素)
flatMap和flatMapSequential操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。Sequential结果会跟原始元素顺序一致
startWith/ concatWith
在开头添加:Flux#startWith(T...)
在最后添加:Flux#concatWith(T...)
handle
类似map 与 filter 的组合
Flux<String> alphabet = Flux.just(-1, 30, 13, 9, 20)
.handle((i, sink) -> {
String letter = alphabet(i);
if (letter != null)
sink.next(letter);
});
alphabet.subscribe(System.out::println);
zip(能配对多个)zip(a,b,c..)
zipWith(只配对两个)a.zipWith(b)
zipWith操作符把当前流中的元素与另一个流中的元素按照一对一的方式进行合并。在合并时可以不做任何处理,由此得到的是一个元素类型为Tuple2的流;也可以通过一个BiFunction函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值
Flux.just("a", "b").zipWith(Flux.just("c", "d")).subscribe(System.out::println);
Flux.just("a", "b").zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2)).subscribe(System.out::println);
combineLatest
把所有流中的最新产生的元素合并成一个新的元素,作为返回结果流中的元素
// 流中最新产生的元素会被收集到一个数组中,通过 Arrays.toString 方法来把数组转换成 String
Flux.combineLatest(
Arrays::toString,
Flux.intervalMillis(100).take(5),
Flux.intervalMillis(50, 100).take(5)
).toStream().forEach(System.out::println);
reduce
对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
Flux.range(1,100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1,100).reduceWith(()->100,(x,y)->x + y).subscribe(System.out::println);
distinct
Flux.just(1, 2, 2, 3, 3, 4, 5, 5).distinct().subscribe(f -> System.out.println("去重后的元素:"+f));
transform/ compose
可以将一段操作链封装为一个函数式function。相当于别名和宏.
Function<Flux<String>, Flux<String>> filterAndMap =
f -> f.filter(color -> !color.equals("orange"))
.map(String::toUpperCase);
Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
.doOnNext(System.out::println)
.transform(filterAndMap)
.subscribe(d -> System.out.println("Subscriber to Transformed MapAndFilter: "+d));
switchIfEmpty在源为空的情况下,替换成另一个流源
defaultIfEmpty当调用者执行完且没有任何数据返回时,返回默认值
then
…并且我希望用 Mono 来表示序列已经结束:then
…并且我想在序列结束后等待另一个任务完成:thenEmpty
…并且我想在序列结束之后返回一个 Mono:Mono#then(mono)
…并且我想在序列结束之后返回一个值:Mono#thenReturn(T)
…并且我想在序列结束之后返回一个 Flux:thenMany
and
前后两个流都等待结束,并返回一个空mono
when
当参数的流都完成时返回空mono
区间/集合
buffer/bufferTimeout
public final Flux<List<T>> buffer(int maxSize)
// 第一行语句输出的是 5 个包含 20 个元素的数组;
// 第二行语句输出的是 2 个包含了 10 个元素的数组;
// 第三行语句输出的是 5 个包含 2 个元素的数组。每当遇到一个偶数就会结束当前的收集;
// 第四行语句输出的是 5 个包含 1 个元素的数组,数组里面包含的只有偶数。
Flux.range(1, 100).buffer(20).subscribe(System.out::println);
Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);
Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);
Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);
window
返回值Flux<Flux>
// 两行语句的输出结果分别是 5 个和 2 个字符
Flux.range(1, 100).window(20).subscribe(System.out::println);
Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);
collectList/ collectMap
用于将含有多个元素的Flux转换为含有一个元素列表的Mono
Flux.range(1, 6).collectList().subscribe(f -> System.out.println("组成list后的元素:"+f));
Context
类似于 ThreadLocal, 一个 Context 是绑定到每一个链中的 Subscriber 上的. 它的内容只能被subscriberContext上游的操作符看到
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap( s -> Mono.subscriberContext().map( ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "World"));
StepVerifier.create(r).expectNext("Hello World").verifyComplete();
消息处理
subscribe订阅
当没有订阅时发布者什么也不做。
发出元素:doOnNext
序列完成:Flux#doOnComplete,Mono#doOnSuccess
因错误终止:doOnError
取消:doOnCancel
订阅时:doOnSubscribe
请求时:doOnRequest
完成或错误终止:doOnTerminate(Mono的方法可能包含有结果)
但是在终止信号向下游传递 之后 :doAfterTerminate
所有类型的信号(Signal):Flux#doOnEach
所有结束的情况(完成complete、错误error、取消cancel):doFinally
subscribe();
subscribe(Consumer<? super T> consumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer);
subscribe(Consumer<? super T> consumer,
Consumer<? super Throwable> errorConsumer,
Runnable completeConsumer,
Consumer<? super Subscription> subscriptionConsumer);
继承BaseSubscriber,重写hookOnSubscribe、hookOnNext、hookOnError、hookOnCancel、 hookOnComplete、hookFinally
error处理
响应式流中的任何错误都是一个终止事件。 即使用了错误处理操作符,也不会让源头流序列继续。而是将 onError 信号转化为一个 新的 序列 的开始。换句话说,它代替了被终结的 上游 流序列。
Flux/Mono.error(Throwable error) 创建一个只包含错误消息的序列
Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).subscribe(System.out::println, System.err::println);
Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).onErrorReturn(0).subscribe(System.out::println);
Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).switchOnError(Mono.just(0)).subscribe(System.out::println);
Flux.just(1,2).concatWith(Mono.error(new IllegalArgumentException())).onErrorResume(e -> {
if(e instanceof IllegalStateException)
return Mono.just(0);
else if(e instanceof IllegalArgumentException)
return Mono.just(-1);
return Mono.epmty();
}).subscribe(System,.out::println);
//不改变它的情况下做出响应(如记录日志),并让错误继续传递下去,
Flux.just("unknown").flatMap(k -> callExternalService(k))
.doOnError(e -> {
failureStat.increment();
log("uh oh, falling back, service failed for key " + k);
})
.onErrorResume(e -> getFromCache(k));
retry
Flux.just(1,2).concatWith(Mono.error(new IllegalStateException())).retry(1).subscrible(System.out::println);
阻塞和非阻塞相互转化
// 非阻塞->阻塞
mono.block();
flux.toIterable();
Flux.toStream
Flux.blockLast
// 阻塞->非阻塞
Flux<Person> asyncPersonLookup(PersonRepository repository){
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
}
Mono<Void> asyncSavePersons(Flux<Person>flux,PersonRepository repository){
return flux.publishOn(Schedulers.parallel())
.doOnNext(repository::save)
.then();
}
调度/线程
通过publishOn()和subscribeOn()方法可以切换执行操作调度器。publishOn()方法切换的是操作符的执行方式,而subscribeOn()方法切换的是产生流中元素时的执行方式。
publishOn会 改变后续的操作符的执行所在线程 (直到下一个 publishOn 出现在这个链上)
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
}).publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
Schedulers.newSingle()
Schedulers.newElastic(yourScheduleName)
测试
StepVerifier
StepVerifier的作用是可以对序列中包含的元素进行逐一验证。通过StepVerifier.create()方法对一个流进行包装之后再进行验证。expectNext()方法用来声明测试时所期待的流中的下一个元素的值,而verifyComplete()方法则验证流是否正常结束。verifyError()来验证流由于错误而终止。
StepVerifier.create(Flux.just(a,b))
.expectNext("a").expectNext("b").verifyComplete();
TestPublisher
这个类本质上是一个 Publisher<T>, 你可以通过可编程的方式来用它发出各种信号。
final TestPublisher<String> testPublisher = TestPublisher.creater();
testPublisher.next("a");
testPublisher.next("b");
testPublisher.complete();
StepVerifier.create(testPublisher)
.expectNext("a")
.expectNext("b")
.expectComplete();
日志
Flux.range(1, 2).log("Range").subscribe(System.out::println);
调试
在调试模式启用之后,所有的操作符在执行时都会保存额外的与执行链相关的信息。当出现错误时,这些信息会被作为异常堆栈信息的一部分输出
Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());
Flux.just(1, 0).map(x -> 1/x).checkpoint("test").subscribe(System.out::println);
引用
GitHub & BitBucket HTML Preview
Flux、Mono、Reactor 实战(史上最全)_普通网友的博客-CSDN博客_mono.create
聊聊Spring Reactor反应式编程 - 知乎
使用 Reactor 进行反应式编程_Jaemon的博客-CSDN博客_reactor3 反应式
reactor Mono.defer_罗小爬EX的博客-CSDN博客_reactor defer
https://www.baeldung.com/reactor-combine-streams
Reactor 3 (11): 数据扁平处理flatMap、concatMap_泛泛之素的博客-CSDN博客
reactor merge vs concat、flatMap vs concatMap_罗小爬EX的博客-CSDN博客