在 Quarkus 中使用 Mutiny - 事件驱动的 Java 响应式编程库
本教程介绍在 Quarkus 中使用事件驱动的 Mutiny 响应式编程库 以应对异步系统开发中的挑战。
概述
Mutiny 是一个(Reactive Programming)响应式编程库, 事件是 Mutiny 的设计核心,可以观察事件,对事件作出反应,并创建优雅易读的处理管道。 Mutiny 提供了一个可导航的显式 API,引导一步步找到所需的操作符。 善于处理含非阻塞 I/O 应用的异步特性,以声明式的方式组合操作、转换数据、实施过程、从失败中恢复等等。
Mutiny 基于 Reactive Streams 标准 及 实现该标准的 java.util.concurrent.Flow
,可以用于任何异步应用程序,比如事件驱动的微服务、基于消息的应用程序、网络实用程序、数据流处理及响应式应用程序!
RESTEasy Reactive 是一种新的 Jakarta REST (以前称为JAX-RS) 实现,基于 Vert.x 从头编写是完全响应式的,与 Quarkus 非常紧密地集成在一起,简化编程,将大量的工作转移到构建上。
支持在阻塞和非阻塞端点,并且具有出色的性能。可以使用 Mutiny 实现 Quarkus 的响应式 API 的业务逻辑。同时 Quarkus 也提供了大量的响应式 api 和特性。
/* Mutiny 响应式编程 */
Uni<String> request = makeSomeNetworkRequest(params);
request.ifNoItem().after(ofMillis(100))
.failWith(() -> new TooSlowException("💥"))
.onFailure(IOException.class).recoverWithItem(fail -> "📦")
.subscribe().with(
item -> log("👍 " + item),
err -> log(err.getMessage())
);
为什么异步很重要 ?
我们生活在一个分布式的世界里。 大多数应用程序都是分布式系统。云、物联网、微服务、移动应用,甚至简单的 CRUD 应用都是分布式应用。 然而,开发分布式系统是困难的!
分布式系统中的通信本质上是异步的和不可靠的。任何事情都可能在任何时候出错,而且往往没有事先通知。正确地构建分布式应用程序是一个相当大的挑战。
通常,传统应用程序为每个请求分配一个线程,用多个线程处理多个并发请求。当请求处理需要通过网络进行交互时,它使用一个工作线程,该工作线程阻塞线程,直到接收到响应。
这种响应可能永远不会出现,因此您需要添加处理超时和其他弹性模式的监督程序。而且,为了同时处理更多请求,您需要创建更多线程。
线程是有代价的。每个线程都需要内存,线程越多,用于处理上下文切换的 CPU 周期就越多。幸运的是,还有另一种方法,使用非阻塞 I/O,这是一种处理 I/O 交互的有效方法,不需要额外的线程。
虽然使用非阻塞I/O的应用程序更高效,更适合云的分布式特性,但它们有一个相当大的限制: 必须永远不阻塞 I/O 线程。因此,您需要使用异步开发模型来实现业务逻辑。
I/O并不是异步在当今系统中必不可少的唯一原因。现实世界中的大多数交互都是异步的和事件驱动的。使用同步进程表示这些交互不仅是错误的, 它还会在应用程序中引入脆弱性。
什么是响应式编程 (Reactive Programming) ?
响应式编程结合了函数式编程、观察者模式和可迭代模式。Mutiny 给出更直接的定义:响应式编程是关于数据流的编程。
响应式编程是关于流的,尤其是观察流。它将这个想法推向了极限:在响应式编程中,一切都是数据流。使用响应式编程,您可以观察流,并在流中流动时实现副作用。
它本质上是异步的,因为您不知道何时会看到数据。然而,响应式编程远不止于此。它提供了一个工具箱来组合流和处理事件。在 Java 中,我们可以找到 Project Reactor 和 Rx Java.
响应式是构建响应式分布式系统和应用程序的一组原则和指导方针。Reactive Manifesto 将响应式系统描述为具有四个特征的分布式系统:
- 快速响应,他们必须及时作出反应
- 有弹性,它们可以调整自己适应波动的负荷
- 容错,他们优雅地处理失败
- 异步消息,响应系统的组件使用消息进行交互
Mutiny 的优势 在于其 API 设计
异步对于大多数开发人员来说很难掌握。因此,API 必须不需要高级知识或增加认知负担。它可以帮助你设计你的逻辑,并且在 6 个月后返回查看代码时仍然是可理解的。
Mutiny 的三大设计核心:
- 事件驱动:你侦听事件并处理它们。
- API可导航性:基于事件驱动的特性,API 是围绕事件类型构建的,并基于要处理的事件类型进行导航。
- 简单性:只提供了 Multi 和 Uni 两种类型,可以处理任何类型的异步交互。
Mutiny 事件驱动流程
使用 Mutiny 时,你设计了一个事件流的管道,你的代码观察这些事件并作出反应。每个处理阶段都是附加到管路(pipeline)上的新管道(pipe)。该管道可以更改事件,创建、丢弃、缓存,及您需要的任何内容。
一般来说,事件从上游(upstream)流向下游(downstream),从源(source)流向尽头,有些事件可以从源头逆流而上。
从上游到下游的事件由发布者(Publishers)发布,并由下游订阅者(Subscribers)消费,订阅者也可能为自己的下游产生事件,如下图所示:
从上游流向下游的四种事件类型:
- 订阅(Subscription):发布者发送的事件,在订阅事件之后发生,以表示其已确认下游的订阅。
- 项目(Items):包含某些业务数据的事件。
- 完成(Completion):指示源不再发出任何项目。仅 Multi 类型源能产生该事件。
- 故障(Failure):表示上游发生错误,不继续发出项目。
- 超载(Overflow):表示上游发布的数据量超过了下游的处理能力。
从下游流向上游的两种事件类型:
- 订阅(subscribe):订阅者发送的事件,表示对下游订阅者对上游发布者的数据感兴趣。
- 请求(Requests):订阅者发送的事件,表示它可以处理多少项事件,这与背压(back-pressure)有关。
- 取消(Cancellation):订阅者发送的事件,以停止接收事件。
一个典型场景:
- 订阅者订阅上游,上游接收订阅请求,初始化后将 Subscription 事件发送给订阅者。
- 订阅者收到
subscription
事件,使用 subscription 发出请求(Requests)和取消(Cancellation)事件。 - 订阅者发送一个请求(Requests)事件,指示它此时可以处理多少项;它可以请求1、n或无限。
- 接收请求事件的发布者开始向订阅者发送最多n个项目(Items)事件。
- 订阅者可以随时决定请求更多事件或取消订阅。
请求事件(Requests)是背压协议(back-pressure)的基石。订阅者所请求的内容不应超过其可处理的内容,而发布者所发出的内容不应超过所接收的请求量。
不要忘记订阅!
如果没有订阅者订阅,则不会发出任何项。更重要的是,什么也不会发生。 如果您的程序什么都不做,请检查它是否订阅,这是一个非常常见的错误。
Uni and Multi
Mutiny 定义了两种响应式类型,它们随时接收和触发事件:
- Multi:表示含 0 到多个项目的数据流(可能是无限多个),如用于从消息代理队列接收消息。
- Uni:表示接收一个项目或失败的数据流,如用于将消息发送到消息代理队列。
区别:
- Uni 可以处理具有空值的项,Multi 不可以是空值
- Uni 无 Completion 事件
- Uni 不能接收 Request 事件
Quarkus REST
RESTEasy Reactive 是为 Quarkus 架构量身定制的 REST 实现。它遵循响应优先,返回响应式类型进行异步处理,
但同时允许使用 @Blocking 注释编写命令式代码。Quarkus 内部实现了一个 proactor 模式,在需要时切换到工作线程。
传统应用程序使用阻塞 I/O 和命令式(顺序)执行模型。因此,在公开 HTTP 端点的应用程序中,每个 HTTP 请求都与一个工作线程相关联。
通常,该线程将处理整个请求,并且该线程在该请求期间仅为该请求提供服务。当处理需要与远程服务交互时,它使用阻塞 I/O。线程会被阻塞,
等待 I/O 的结果。虽然该模型很容易开发(因为一切都是连续的),但它有一些缺点。要处理并发请求,需要多个线程,因此需要引入工作线程池。
此池的大小限制了应用程序的并发性。此外,每个线程在内存和CPU方面都有成本。大型线程池导致贪心应用程序。
Quarkus 是由响应式引擎驱动的,在开发响应式应用程序时,您的代码将在为数不多的 I/O 线程之一上执行。
请记住,绝不能阻塞这些线程,否则模型会崩溃。因此,您不能使用阻塞I/O。相反,您需要调度I/O操作并传递延续。
Mutiny 事件驱动范式就是为此量身定制的。当 I/O 操作成功完成时,表示该操作的 Uni 发出一个项目事件。当它失败时,它会发出一个失败事件。
RESTEasy Reactive 通过两种类型的线程实现:
- 事件循环线程:它们负责从HTTP请求中读取字节并将字节写回HTTP响应。
- 工作线程:它们被池化,可以用来执行长时间运行的操作。
事件循环线程(也称为IO线程)负责以异步方式实际执行所有IO操作,并将结果通知给对这些IO操作感兴趣的侦听器。
默认情况下,RESTEasy Reactive 端点方法运行的线程依赖于方法的签名。如果一个方法返回异步类型,则认为它是非阻塞的,默认情况下将在IO线程上运行。但
如果您正在端点方法中编写阻塞代码如Thread.sleep(1000);
,方法将在工作线程上运行。
初始化项目:
mvn io.quarkus.platform:quarkus-maven-plugin:3.1.1.Final:create \
-DprojectGroupId=org.acme \
-DprojectArtifactId=rest-json-quickstart \
-Dextensions='resteasy-reactive-jackson' \
-DnoCode
cd rest-json-quickstart
RESTEasy Reactive 与 Mutiny自然地集成在一起,当你只有一个结果时使用 Uni。当您有多个异步发出的项时使用 Multi:
@Path("/reactive") // @Path 定义了 URI 前缀
public class Endpoint {
@POST
@Path("{type}")
public String allParams(@RestPath String type, // @RestPath, @... 获取不同类型的请求参数
@RestMatrix String variant,
@RestQuery String age,
@RestCookie String level,
@RestHeader("X-Cheese-Secret-Handshake") String secretHandshake,
@RestForm String smell) {
return type + "/" + variant + "/" + age + "/" + level + "/"
+ secretHandshake + "/" + smell;
}
@GET
@Path("/{name}")
public Uni<String> hello(@RestPath String name) { // 术语 Endpoint: 用于服务REST调用的Java方法
return Uni.createFrom().item(String.format("hello %s", name));
}
@GET
@Path("/multi") // 可不指定
@Produces(MediaType.APPLICATION_JSON) // 指定响应的 HTTP Content-Type 头
public Multi<String> getAll() {
return Multi.createFrom().items("a", "b", "c");;
}
}
如需要在 HTTP 响应上设置更多的属性,可以从资源方法返回org.jboss.resteasy.reactive.RestResponse
,或使用注解。如下:
@Path("")
public class Endpoint {
@GET
@ResponseStatus(201)
@ResponseHeader(name = "X-Cheese", value = "Camembert")
public RestResponse<String> hello() {
// HTTP OK status with text/plain content type
return ResponseBuilder.ok("Hello, World!", MediaType.TEXT_PLAIN_TYPE)
// set a response header
.header("X-Cheese", "Camembert")
// set the Expires response header to two days from now
.expires(Date.from(Instant.now().plus(Duration.ofDays(2))))
// send a new cookie
.cookie(new NewCookie("Flavour", "chocolate"))
// end of builder API
.build();
}
}
以异步/响应的方式实现相同的阻塞操作,例如使用 Mutiny:
@GET
public Uni<String> blockingHello() throws InterruptedException {
return Uni.createFrom().item("Yaaaawwwwnnnnnn…")
// do a non-blocking sleep
.onItem().delayIt().by(Duration.ofSeconds(2));
}
如果用 @Transactional
注释了方法或类,那么它也将被视为阻塞方法。
请求或响应过滤器
您可以声明在以下请求处理阶段调用的函数:
- 在端点方法被识别之前: 预路由请求过滤器
- 在路由之后,但在端点方法被调用之前: 正常请求过滤器
- 调用端点方法后: 响应过滤器
请求过滤器通常与处理请求的方法在的同一线程上执行。
HTTP 请求和响应可以通过分别提供 ContainerRequestFilter 或 ContainerResponseFilter 实现来拦截。或使用注解的方式拦截。
映射实体和 HTTP 主体
当你的端点方法返回一个对象时或返回带实体的 RestResponse、Response,将寻找一种将其映射到 HTTP 响应体的方法。类似地,
每当端点方法接受一个对象作为参数时,将寻找一种将 HTTP 请求体映射到对象的方法。
当安装了 JSON 扩展 quarkus-resteasy-reactive-jackson
时,将默认使用 application/JSON
作为大多数返回值的媒体类型,
除非媒体类型是通过 @Produces
或 @consume
注释显式设置的 (一些已知类型的除外,如 String 和 File,默认分别为 text/plain
和 application/octet-stream
)。
Mutiny API
对于每种类型的事件,都有一个相关的方法来处理该特定的事件。例如:
@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> reactive() {
Multi<String> source = Multi.createFrom().items("a", "b", "c");
source
.onItem() // Called for every item
.invoke(item -> LOG.info("3.Received item " + item))
.onFailure() // Called on failure
.invoke(failure -> LOG.info("Failed with " + failure))
.onCompletion() // Called when the stream completes
.invoke(() -> LOG.info("end.Completed"))
.onSubscription() // Called when the upstream is ready
.invoke(subscription -> LOG.info("1.We are subscribed!"))
.onCancellation() // Called when the downstream cancels
.invoke(() -> LOG.info("Cancelled :-("))
.onRequest() // Called on downstream requests
.invoke(n -> LOG.info("2.Downstream requested " + n + " items"))
.subscribe() // 订阅, 无订阅不会执行任何操作
.with(item -> LOG.info("4.Subscriber received " + item),
failure-> LOG.info("Subscriber received " + failure.getMessage()));
// Mutiny使用了一个构建器 API,每添加一个阶段(stage)返回一个新的 Uni 对象。
return Uni.createFrom().item("hello") // 创建一个字符串作为项目 (Item)的数据源,
.onItem().invoke(item -> LOG.info("Received item " + item)) // 收到项目事件,同步观察数据
.onItem().transform(item -> item + " mutiny") // 收到项目事件,并进行处理
.onItem().transform(String::toUpperCase); // 请求端点时订阅
}
处理过程
我们通过 Mutiny 描述了一个数据流处理管道 pipeline,它接收数据项目(item),处理它,最后消费它。
Mutiny 提供了许多操作符来创建、转换和编排 Uni 序列。提供的操作符可用于定义处理管道。事件、项目或失败在此管道中流动,
每个操作符都可以处理或转换事件 invoke() 只是可用的操作符/方法之一。每个组提供针对事件类型的方法/操作符。
例如,onFailure().recover, onCompletion().continueWith 等等。
当使用 Mutiny 时,必须编写 onItem() 可能会很麻烦。幸运的是,Mutiny 提供了一组快捷方式,使代码更简洁。
- Consumer:表示接受单个输入参数且不返回结果的函数。
- Supplier:表示不需要参数但返回结果的提供者函数。
- Publisher:数据项和消息的生产者。每个订阅者(Subscriber )通过方法 onNext 以相同的顺序接收项目,除非丢失或遇到错误。
创建数据流管道
- 可以从一个已知的值创建:
Uni<Integer> uni = Uni.createFrom().item(1);
// Multi
Multi<Integer> multiFromItems = Multi.createFrom().items(1, 2, 3, 4);
Multi<Integer> multiFromIterable = Multi.createFrom().iterable(Arrays.asList(1, 2, 3, 4, 5));
- 从一个 Supplier 创建:
AtomicInteger counter = new AtomicInteger();
Uni<Integer> uni = Uni.createFrom().item(() -> counter.getAndIncrement());
// Multi
Multi<Integer> multi = Multi.createFrom().items(() ->
IntStream.range(counter.getAndIncrement(), counter.get() * 2).boxed());
- 还可以发出一个失败事件,表明操作失败:
Uni<Integer> failed1 = Uni.createFrom().failure(new Exception("boom"));
Multi<Integer> failed1 = Multi.createFrom().failure(new Exception("boom"));
- 当操作不能产生结果时,您仍然需要一种方法来指示操作的完成。为此,Uni 可以发出一个空项, Multi 可以发出完成事件:
Uni<Void> uni = Uni.createFrom().nullItem();
Multi<String> multi = Multi.createFrom().empty();
- 可以使用发射器创建一个 Uni。这种方法在集成基于回调的 api时很有用:
Uni<String> uni = Uni.createFrom().emitter(em -> {
// When the result is available, emit it
em.complete(result);
});
Multi<Integer> multi = Multi.createFrom().emitter(em -> {
em.emit(1);
em.emit(2);
em.emit(3);
em.complete();
});
- 你也可以从 CompletionStage/CompletableFuture 中统一对象。这在与基于这些类型的 api 集成时非常有用
Uni<String> uni = Uni.createFrom().completionStage(stage);
- 创建一个周期性发出数据流的 Multi 管道:
Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofMillis(100));
- 从生成器创建 Multi 管道:
Multi<Object> sequence = Multi.createFrom().generator(() -> 1, (n, emitter) -> {
int next = n + (n / 2) + 1;
if (n < 50) {
emitter.emit(next);
} else {
emitter.complete();
}
return next;
});
观察事件
Uni 和 Multi 发出事件, 你通常需要观察并处理这些事件。大多数时候代码只对项目和失败事件感兴趣。但是还有其他类型的事件,如取消、请求、完成等,
例如,你可能需要在完成事件后关闭资源,或者记录有关失败或取消的消息。Mutiny 提供了两种方法(invoke + call),可以在不影响其分发的情况下查看或处理各种事件。
你可以使用以下命令观察不同类型的事件:on{event}().invoke(ev -> System.out.println(ev));
- invoke 方法是同步的无返回值,主要用于记录日志、处理同步副作用等, 如:
onItem().invoke(item -> ...);
oronFailure().invoke(failure -> ...);
Mutiny调用回调函数,并在回调返回时继续向下游传播事件。 - call 方法可用于执行异步操作,返回 Uni 对象,用于I/O操作,关闭资源,刷新数据等。如:
onItem().call(item->someAsyncAction(item))
。调用通常在需要实现异步副作用(如关闭资源)时使用。在回调返回的 Uni 发出一个项之前,Mutiny 不会将原始事件分派到下游
注意,这两个调用不会更改项目,它只是调用一个操作,当这个操作完成时,它将向下游发送原始项目。
当观察到失败事件时,如果回调抛出异常,Mutiny 将传播一个 CompositeException,聚合原始失败和回调失败。
转换项目
Uni 及 Multi 都会弹出数据项目,最常见操作之一是使用同步一对一函数转换这些项,新生成的项会被传递给下游。
主要通过 onItem().transform(item -> Function<T, U>)
对数据项目进行转换处理操作。它为每个项目调用传递的函数,并将结果作为生成的项目向下传播。
如果转换抛出异常,则捕获该异常并将其作为失败事件传递给下游订阅者。这也意味着在失败后订阅者将无法获得进一步的项目。
Uni.createFrom().item("hello")
.onItem().transform(i -> i.toUpperCase()) // 大写转换
.onItem().transform(i -> i + "!"); // 可以链接多个转换
对接其他管道转换项目:
通过 onItem().transformToUni(Function<T, Uni<O>>)
和 onItem().transformToMulti(Function<T, Multi<O>>)
来实现传递数据流的,顺序组合允许链接依赖的异步操作。与 transform 不同,传递给 transformToUni 的函数返回一个Uni。例如,调用一个由 Uni 表示的远程服务的异步操作。返回的 Uni 会从远程服务发出结果,如果发生任何错误则会发出失败事件。
Uni<String> invokeRemoteGreetingService(String name);
Uni<String> result = uni
.onItem().transformToUni(name -> invokeRemoteGreetingService(name)); // 传递数据并订阅其他 Uni
将接收到的单个数据项目转换为 Multi 中的流式数据:
Multi<String> result = uni
.onItem().transformToMulti(item -> Multi.createFrom().items(item, item)); // 传递数据并订阅,转为 Multi 类型
转换 Multi 数据流:合并 or 连接
将项目从 Multi 转换到后面管道时,需要决定下游订阅者按哪个顺序接收项目。Mutiny 提供了两种方式:
- 合并:不保留顺序合并接收到的项目。
- 连接:维护和连接产生项目的流。
- 将来自 Multi 中的项目转换为 Uni 中的单个项目:
Multi<String> merged = multi
.onItem().transformToUniAndMerge(name -> invokeRemoteGreetingService(name));
Multi<String> concat = multi
.onItem().transformToUniAndConcatenate(name -> invokeRemoteGreetingService(name));
- 将接收的 Multi 项目转换到另一个 Multi 管道数据流中:
Multi<String> merged = multi
.onItem().transformToMultiAndMerge(item -> someMulti(item));
Multi<String> concat = multi
.onItem().transformToMultiAndConcatenate(item -> someMulti(item));
失败处理
失败是可观测数据流的终结事件,表明发生了一些异常,失败后不再接收任何项目。
当接收到这样的事件时,您可以:
- 传播,向下游传播故障(默认)。
- 转换,将失败转化为另一个失败。
- 恢复,切换到另一个流,发送回退项或完成事件。
- 重试,发生异常进行再次尝试,如果尽管进行了多次尝试,但仍然失败,则该失败将向下游传播。
Mutiny 提供了多个操作符来处理失败事件:
- 观察:使用
onFailure().invoke()
观测失败进行一些操作,如记录日志。 - 转换:使用
onFailure().transform(failure -> new BizException(failure))
,将失败转化为更有意义的异常类型。 - 恢复:使用
onFailure().recoverWithItem(fallback)
使用回退项进行恢复。 - 恢复:使用
onFailure().recoverWithUni(f -> getFallbackUni(f))
切换到另一个异步管道数据流。 - 完成:Multi 数据流使用
onFailure().recoverWithCompletion()
来发送完成事件替代异常事件。 - 重试:使用
onFailure().retry().atMost(3)
进行多次重试。 - Mutiny 提供了一种指数回退的方法: 每次重试增长间隔:
onFailure().retry()
.withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)) // 配置初始延迟和最大延迟。还可以配置抖动来添加随机性。
.atMost(3)
- 条件重试:
onFailure().retry().until(f -> shouldWeRetry(f));
转换阻塞I/O操作,在指定线程上执行操作
方法 runSubscriptionOn 请求上游在给定的执行线程上运行订阅,emitOn 用于指定向下游传播项目、失败和完成事件的执行线程,直到使用另一个emitOn:
Uni.createFrom()
.item(this::invokeRemoteServiceUsingBlockingIO) // 在指定的线程之上运行
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) // Mutiny 允许底层平台提供默认的工作线程池
.subscribe().with(...) // 需要订阅
Multi<String> multi = Multi.createFrom().items("john", "jack", "sue")
.emitOn(Infrastructure.getDefaultWorkerPool()) // Mutiny 默认使用从上游发出事件的线程调用下一阶段,可以通过 emitOn 更换线程。
.onItem().transform(this::invokeRemoteServiceUsingBlockingIO);
转换为阻塞的命令式执行
- 使用
await().indefinitely()
方法来无限期阻塞和等待 Uni 项目数据。 - 可以使用
await().atMost(Duration.ofSeconds(1))
指定等待期限 - 使用
asIterable()
迭代阻塞获取 Multi 数据,或使用asStream()
获取为java.util.stream.Stream
类型数据:
Iterable<T> iterable = multi.subscribe().asIterable();
for (T item : iterable) {
doSomethingWithItem(item);
}
使用 Unchecked.function 避免手动 try/catch 操作
当集成抛出检查异常(如IOException)的库时,添加 try/catch 块并将抛出的异常包装到运行时异常中并不是很方便。
Uni<Integer> uni = item.onItem().transform(Unchecked.function(i -> {
return methodThrowingIoException(i); // 可以抛出运行时异常
}));
可以通过 import static io.smallrye.mutiny.unchecked.Unchecked.*;
方便使用各类 Unchecked.function
包装操作
使用 multi.select() 过滤 Multi 数据
multi.select().where(i -> i > 6) // 条件为 true 是可以继续传播
select().when(i -> Uni.createFrom().item(i > 6)) // when 异步版本,返回 Uni<Boolean>
multi.select().distinct() // 过滤相同项,不能在大型或无限流上使用
multi.select().repetitions() // 过滤连续的重复项,可以在大型或无限流上使用
将 Multi 数据聚合为 Uni
- 收集到一个列表:通过
collect().asList()
将项目存储在一个列表中Uni<list<T>>
。当 Multi 完成时,它会发出最终列表。
Uni<List<String>> uni = multi.collect().asList();
- 收集到 Map 中:通过给 asMap 提供一个函数来计算每个项目的键来实现,key 相同的项会覆盖。 .asMultiMap 可以将相同 key 的项保存到一个列表
Uni<Map<String, String>> uni = multi.collect().asMap(item -> getUniqueKey(item));
- 自定义聚合器收集:
Uni<MyCollection> uni = multi.collect().in(MyCollection::new, (col, item) -> col.add(item)); // 提供容器和累加方法
Uni<Long> count = multi.collect().with(Collectors.counting()); // 使用 Java Collector
通过 ifNull() 处理空项
uni.onItem().ifNull().continueWith("hello");
uni.onItem().ifNull().switchTo(() -> Uni.createFrom().item("hello"));
uni.onItem().ifNull().failWith(() -> new Exception("Boom!"));
uni.onItem().ifNotNull().transform(String::toUpperCase) // 非空项
超时处理
为 HTTP 调用等操作添加超时或截止时间,如果在截止时间之前没有得到响应,则认为操作失败。
Uni<String> uniWithTimeout = uni
.ifNoItem().after(Duration.ofMillis(100)) // 设置超时
.recoverWithItem("some fallback item"); // 设置超时处理方式:恢复
.fail().onFailure(TimeoutException.class).recoverWithItem("we got a timeout"); // 报错
.failWith(() -> new ServiceUnavailableException()) // 自定义异常
事件延迟
- 通过
onItem().delayIt()
延迟 Uni 的项目事件
Uni<String> delayed = Uni.createFrom().item("hello")
.onItem().delayIt().by(Duration.ofMillis(10)); // 固定时长
.onItem().delayIt().until(this::write); // 其他事件结束
- Multi 无 delayIt 方法,可以通过 call() 方法实现
// 将所有项目延迟10毫秒
Multi<Integer> delayed = multi
.onItem().call(i -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(10)));
使用 map flatMap concatMap 方法
对应关系如下:
- map ->
onItem().transform()
- flatMap ->
onItem().transformToUniAndMerge
- concatMap ->
onItem().transformToUniAndConcatenate
Mutiny API 是围绕组的思想进行分解的,每个组处理一个特定的事件,并提供操作符。然而,为了避免冗长,Mutiny还公开了常用方法的
快捷调用方法对应关系表。
控制需求
- 设置需求,pacedemand() 操作符可用于在特定时间点自动发布请求。
// 每100毫秒发出25项请求
FixedDemandPacer pacer = new FixedDemandPacer(25L, Duration.ofMillis(100L));
Multi<Integer> multi = Multi.createFrom().range(0, 100)
.paceDemand().on(Infrastructure.getDefaultWorkerPool()).using(pacer);
- 限制需求
capDemandsTo 和 capDemandUsing 操作符可用于限制下游用户的请求。 capDemandTo 操作符定义了可以流动的最大数据量。
capDemandUsing 可以通过函数根据自定义公式或先前的需求观察值提供上限值。
Multi.createFrom().range(0, 100)
.capDemandsTo(50L).subscribe()
总结
Mutiny 是一个基于 Reactive Streams 标准实现的异步编程库,旨在简化异步编程代码的编写和维护,提高程序的性能和可伸缩性。Mutiny 库根据不同的事件类型分组提供了丰富的操作符,支持开发者进行数据流的转换、过滤、聚合等操作,从而实现更加灵活和高效的异步编程。
Mutiny 可以理解为一个基于事件驱动的数据流处理管道,对数据流从上游到下游的管道进行编排,并传递数据,在发生各种事件时通过各类操作符进行处理。还需要在以后的实际使用中不断加深理解。
更多参考
- Mutiny 官网:https://smallrye.io/smallrye-mutiny
- Quarkus Resteasy Reactive: https://quarkus.io/guides/resteasy-reactive
- Reactive Streams: https://www.reactive-streams.org/