在 Quarkus 中的使用 Mutiny 进行响应式编程

news2024/11/15 11:15:14

在 Quarkus 中使用 Mutiny - 事件驱动的 Java 响应式编程库

本教程介绍在 Quarkus 中使用事件驱动的 Mutiny 响应式编程库 以应对异步系统开发中的挑战。

概述

Mutiny 是一个(Reactive Programming)响应式编程库, 事件是 Mutiny 的设计核心,可以观察事件,对事件作出反应,并创建优雅易读的处理管道。 Mutiny 提供了一个可导航的显式 API,引导一步步找到所需的操作符。 善于处理含非阻塞 I/O 应用的异步特性,以声明式的方式组合操作、转换数据、实施过程、从失败中恢复等等。

Mutiny 基于 Reactive Streams 标准 及 实现该标准的 java.util.concurrent.Flow,可以用于任何异步应用程序,比如事件驱动的微服务、基于消息的应用程序、网络实用程序、数据流处理及响应式应用程序!

RESTEasy Reactive 是一种新的 Jakarta REST (以前称为JAX-RS) 实现,基于 Vert.x 从头编写是完全响应式的,与 Quarkus 非常紧密地集成在一起,简化编程,将大量的工作转移到构建上。
支持在阻塞和非阻塞端点,并且具有出色的性能。可以使用 Mutiny 实现 Quarkus 的响应式 API 的业务逻辑。同时 Quarkus 也提供了大量的响应式 api 和特性。

    /* Mutiny 响应式编程 */
    Uni<String> request = makeSomeNetworkRequest(params);
    
    request.ifNoItem().after(ofMillis(100))
        .failWith(() -> new TooSlowException("💥"))
        .onFailure(IOException.class).recoverWithItem(fail -> "📦")
        .subscribe().with(
            item -> log("👍 " + item),
             err -> log(err.getMessage())
        );

为什么异步很重要 ?

我们生活在一个分布式的世界里。 大多数应用程序都是分布式系统。云、物联网、微服务、移动应用,甚至简单的 CRUD 应用都是分布式应用。 然而,开发分布式系统是困难的!

分布式系统中的通信本质上是异步的和不可靠的。任何事情都可能在任何时候出错,而且往往没有事先通知。正确地构建分布式应用程序是一个相当大的挑战。

通常,传统应用程序为每个请求分配一个线程,用多个线程处理多个并发请求。当请求处理需要通过网络进行交互时,它使用一个工作线程,该工作线程阻塞线程,直到接收到响应。
这种响应可能永远不会出现,因此您需要添加处理超时和其他弹性模式的监督程序。而且,为了同时处理更多请求,您需要创建更多线程。

线程是有代价的。每个线程都需要内存,线程越多,用于处理上下文切换的 CPU 周期就越多。幸运的是,还有另一种方法,使用非阻塞 I/O,这是一种处理 I/O 交互的有效方法,不需要额外的线程。
虽然使用非阻塞I/O的应用程序更高效,更适合云的分布式特性,但它们有一个相当大的限制: 必须永远不阻塞 I/O 线程。因此,您需要使用异步开发模型来实现业务逻辑。

I/O并不是异步在当今系统中必不可少的唯一原因。现实世界中的大多数交互都是异步的和事件驱动的。使用同步进程表示这些交互不仅是错误的, 它还会在应用程序中引入脆弱性。

什么是响应式编程 (Reactive Programming) ?

响应式编程结合了函数式编程、观察者模式和可迭代模式。Mutiny 给出更直接的定义:响应式编程是关于数据流的编程。
响应式编程是关于流的,尤其是观察流。它将这个想法推向了极限:在响应式编程中,一切都是数据流。使用响应式编程,您可以观察流,并在流中流动时实现副作用。
它本质上是异步的,因为您不知道何时会看到数据。然而,响应式编程远不止于此。它提供了一个工具箱来组合流和处理事件。在 Java 中,我们可以找到 Project Reactor 和 Rx Java.

响应式是构建响应式分布式系统和应用程序的一组原则和指导方针。Reactive Manifesto 将响应式系统描述为具有四个特征的分布式系统:

  • 快速响应,他们必须及时作出反应
  • 有弹性,它们可以调整自己适应波动的负荷
  • 容错,他们优雅地处理失败
  • 异步消息,响应系统的组件使用消息进行交互

Mutiny 的优势 在于其 API 设计

异步对于大多数开发人员来说很难掌握。因此,API 必须不需要高级知识或增加认知负担。它可以帮助你设计你的逻辑,并且在 6 个月后返回查看代码时仍然是可理解的。

Mutiny 的三大设计核心:

  • 事件驱动:你侦听事件并处理它们。
  • API可导航性:基于事件驱动的特性,API 是围绕事件类型构建的,并基于要处理的事件类型进行导航。
  • 简单性:只提供了 Multi 和 Uni 两种类型,可以处理任何类型的异步交互。

Mutiny 事件驱动流程

使用 Mutiny 时,你设计了一个事件流的管道,你的代码观察这些事件并作出反应。每个处理阶段都是附加到管路(pipeline)上的新管道(pipe)。该管道可以更改事件,创建、丢弃、缓存,及您需要的任何内容。

一般来说,事件从上游(upstream)流向下游(downstream),从源(source)流向尽头,有些事件可以从源头逆流而上。

从上游到下游的事件由发布者(Publishers)发布,并由下游订阅者(Subscribers)消费,订阅者也可能为自己的下游产生事件,如下图所示:

在这里插入图片描述
从上游流向下游的四种事件类型:

  • 订阅(Subscription):发布者发送的事件,在订阅事件之后发生,以表示其已确认下游的订阅。
  • 项目(Items):包含某些业务数据的事件。
  • 完成(Completion):指示源不再发出任何项目。仅 Multi 类型源能产生该事件。
  • 故障(Failure):表示上游发生错误,不继续发出项目。
  • 超载(Overflow):表示上游发布的数据量超过了下游的处理能力。

从下游流向上游的两种事件类型:

  • 订阅(subscribe):订阅者发送的事件,表示对下游订阅者对上游发布者的数据感兴趣。
  • 请求(Requests):订阅者发送的事件,表示它可以处理多少项事件,这与背压(back-pressure)有关。
  • 取消(Cancellation):订阅者发送的事件,以停止接收事件。

一个典型场景:

  1. 订阅者订阅上游,上游接收订阅请求,初始化后将 Subscription 事件发送给订阅者。
  2. 订阅者收到 subscription 事件,使用 subscription 发出请求(Requests)和取消(Cancellation)事件。
  3. 订阅者发送一个请求(Requests)事件,指示它此时可以处理多少项;它可以请求1、n或无限。
  4. 接收请求事件的发布者开始向订阅者发送最多n个项目(Items)事件。
  5. 订阅者可以随时决定请求更多事件或取消订阅。

在这里插入图片描述

请求事件(Requests)是背压协议(back-pressure)的基石。订阅者所请求的内容不应超过其可处理的内容,而发布者所发出的内容不应超过所接收的请求量。

不要忘记订阅!
如果没有订阅者订阅,则不会发出任何项。更重要的是,什么也不会发生。 如果您的程序什么都不做,请检查它是否订阅,这是一个非常常见的错误。

Uni and Multi

Mutiny 定义了两种响应式类型,它们随时接收和触发事件:

  • Multi:表示含 0 到多个项目的数据流(可能是无限多个),如用于从消息代理队列接收消息。
  • Uni:表示接收一个项目或失败的数据流,如用于将消息发送到消息代理队列。

区别:

  • Uni 可以处理具有空值的项,Multi 不可以是空值
  • Uni 无 Completion 事件
  • Uni 不能接收 Request 事件

Quarkus REST

RESTEasy Reactive 是为 Quarkus 架构量身定制的 REST 实现。它遵循响应优先,返回响应式类型进行异步处理,
但同时允许使用 @Blocking 注释编写命令式代码。Quarkus 内部实现了一个 proactor 模式,在需要时切换到工作线程。

传统应用程序使用阻塞 I/O 和命令式(顺序)执行模型。因此,在公开 HTTP 端点的应用程序中,每个 HTTP 请求都与一个工作线程相关联。
通常,该线程将处理整个请求,并且该线程在该请求期间仅为该请求提供服务。当处理需要与远程服务交互时,它使用阻塞 I/O。线程会被阻塞,
等待 I/O 的结果。虽然该模型很容易开发(因为一切都是连续的),但它有一些缺点。要处理并发请求,需要多个线程,因此需要引入工作线程池。
此池的大小限制了应用程序的并发性。此外,每个线程在内存和CPU方面都有成本。大型线程池导致贪心应用程序。

在这里插入图片描述

Quarkus 是由响应式引擎驱动的,在开发响应式应用程序时,您的代码将在为数不多的 I/O 线程之一上执行。
请记住,绝不能阻塞这些线程,否则模型会崩溃。因此,您不能使用阻塞I/O。相反,您需要调度I/O操作并传递延续。
Mutiny 事件驱动范式就是为此量身定制的。当 I/O 操作成功完成时,表示该操作的 Uni 发出一个项目事件。当它失败时,它会发出一个失败事件。
在这里插入图片描述
RESTEasy Reactive 通过两种类型的线程实现:

  1. 事件循环线程:它们负责从HTTP请求中读取字节并将字节写回HTTP响应。
  2. 工作线程:它们被池化,可以用来执行长时间运行的操作。

事件循环线程(也称为IO线程)负责以异步方式实际执行所有IO操作,并将结果通知给对这些IO操作感兴趣的侦听器。
默认情况下,RESTEasy Reactive 端点方法运行的线程依赖于方法的签名。如果一个方法返回异步类型,则认为它是非阻塞的,默认情况下将在IO线程上运行。但
如果您正在端点方法中编写阻塞代码如Thread.sleep(1000);,方法将在工作线程上运行。

初始化项目:

    mvn io.quarkus.platform:quarkus-maven-plugin:3.1.1.Final:create \
        -DprojectGroupId=org.acme \
        -DprojectArtifactId=rest-json-quickstart \
        -Dextensions='resteasy-reactive-jackson' \
        -DnoCode
    cd rest-json-quickstart

RESTEasy Reactive 与 Mutiny自然地集成在一起,当你只有一个结果时使用 Uni。当您有多个异步发出的项时使用 Multi:

@Path("/reactive") // @Path 定义了 URI 前缀
public class Endpoint {
    @POST
    @Path("{type}")
    public String allParams(@RestPath String type, // @RestPath, @... 获取不同类型的请求参数
                            @RestMatrix String variant,
                            @RestQuery String age,
                            @RestCookie String level,
                            @RestHeader("X-Cheese-Secret-Handshake") String secretHandshake,
                            @RestForm String smell) {
        return type + "/" + variant + "/" + age + "/" + level + "/"
                + secretHandshake + "/" + smell;
    }
    
    @GET
    @Path("/{name}")
    public Uni<String> hello(@RestPath String name) { // 术语 Endpoint: 用于服务REST调用的Java方法
        return Uni.createFrom().item(String.format("hello %s", name));
    }
    
    @GET
    @Path("/multi") // 可不指定
    @Produces(MediaType.APPLICATION_JSON) // 指定响应的 HTTP Content-Type 头
    public Multi<String> getAll() {
        return Multi.createFrom().items("a", "b", "c");;
    }
}

如需要在 HTTP 响应上设置更多的属性,可以从资源方法返回org.jboss.resteasy.reactive.RestResponse,或使用注解。如下:

@Path("")
public class Endpoint {
    @GET
    @ResponseStatus(201)
    @ResponseHeader(name = "X-Cheese", value = "Camembert")
    public RestResponse<String> hello() {
        // HTTP OK status with text/plain content type
        return ResponseBuilder.ok("Hello, World!", MediaType.TEXT_PLAIN_TYPE)
         // set a response header
         .header("X-Cheese", "Camembert")
         // set the Expires response header to two days from now
         .expires(Date.from(Instant.now().plus(Duration.ofDays(2))))
         // send a new cookie
         .cookie(new NewCookie("Flavour", "chocolate"))
         // end of builder API
         .build();
    }
}

以异步/响应的方式实现相同的阻塞操作,例如使用 Mutiny:

    @GET
    public Uni<String> blockingHello() throws InterruptedException {
        return Uni.createFrom().item("Yaaaawwwwnnnnnn…")
                // do a non-blocking sleep
                .onItem().delayIt().by(Duration.ofSeconds(2));
    }

如果用 @Transactional 注释了方法或类,那么它也将被视为阻塞方法。

请求或响应过滤器

您可以声明在以下请求处理阶段调用的函数:

  • 在端点方法被识别之前: 预路由请求过滤器
  • 在路由之后,但在端点方法被调用之前: 正常请求过滤器
  • 调用端点方法后: 响应过滤器

请求过滤器通常与处理请求的方法在的同一线程上执行。

HTTP 请求和响应可以通过分别提供 ContainerRequestFilter 或 ContainerResponseFilter 实现来拦截。或使用注解的方式拦截。

映射实体和 HTTP 主体

当你的端点方法返回一个对象时或返回带实体的 RestResponse、Response,将寻找一种将其映射到 HTTP 响应体的方法。类似地,
每当端点方法接受一个对象作为参数时,将寻找一种将 HTTP 请求体映射到对象的方法。

当安装了 JSON 扩展 quarkus-resteasy-reactive-jackson 时,将默认使用 application/JSON 作为大多数返回值的媒体类型,
除非媒体类型是通过 @Produces@consume 注释显式设置的 (一些已知类型的除外,如 String 和 File,默认分别为 text/plainapplication/octet-stream )。

Mutiny API

对于每种类型的事件,都有一个相关的方法来处理该特定的事件。例如:

@GET
@Produces(MediaType.TEXT_PLAIN)
public Uni<String> reactive() {
    Multi<String> source = Multi.createFrom().items("a", "b", "c");
    source
        .onItem() // Called for every item
        .invoke(item -> LOG.info("3.Received item " + item))
        .onFailure() // Called on failure
        .invoke(failure -> LOG.info("Failed with " + failure))
        .onCompletion() // Called when the stream completes
        .invoke(() -> LOG.info("end.Completed"))
        .onSubscription() // Called when the upstream is ready
        .invoke(subscription -> LOG.info("1.We are subscribed!"))
        .onCancellation() // Called when the downstream cancels
        .invoke(() -> LOG.info("Cancelled :-("))
        .onRequest() // Called on downstream requests
        .invoke(n -> LOG.info("2.Downstream requested " + n + " items"))
        .subscribe() // 订阅, 无订阅不会执行任何操作
        .with(item -> LOG.info("4.Subscriber received " + item),
                failure-> LOG.info("Subscriber received " + failure.getMessage()));
        
        // Mutiny使用了一个构建器 API,每添加一个阶段(stage)返回一个新的 Uni 对象。
        return Uni.createFrom().item("hello") // 创建一个字符串作为项目 (Item)的数据源,
                .onItem().invoke(item -> LOG.info("Received item " + item)) // 收到项目事件,同步观察数据
                .onItem().transform(item -> item + " mutiny") // 收到项目事件,并进行处理
                .onItem().transform(String::toUpperCase); // 请求端点时订阅
}

在这里插入图片描述

处理过程

我们通过 Mutiny 描述了一个数据流处理管道 pipeline,它接收数据项目(item),处理它,最后消费它。
Mutiny 提供了许多操作符来创建、转换和编排 Uni 序列。提供的操作符可用于定义处理管道。事件、项目或失败在此管道中流动,
每个操作符都可以处理或转换事件 invoke() 只是可用的操作符/方法之一。每个组提供针对事件类型的方法/操作符。
例如,onFailure().recover, onCompletion().continueWith 等等。
当使用 Mutiny 时,必须编写 onItem() 可能会很麻烦。幸运的是,Mutiny 提供了一组快捷方式,使代码更简洁。

  • Consumer:表示接受单个输入参数且不返回结果的函数。
  • Supplier:表示不需要参数但返回结果的提供者函数。
  • Publisher:数据项和消息的生产者。每个订阅者(Subscriber )通过方法 onNext 以相同的顺序接收项目,除非丢失或遇到错误。

创建数据流管道

  • 可以从一个已知的值创建:
    Uni<Integer> uni = Uni.createFrom().item(1);
    // Multi
    Multi<Integer> multiFromItems = Multi.createFrom().items(1, 2, 3, 4);
    Multi<Integer> multiFromIterable = Multi.createFrom().iterable(Arrays.asList(1, 2, 3, 4, 5));
  • 从一个 Supplier 创建:
    AtomicInteger counter = new AtomicInteger();
    Uni<Integer> uni = Uni.createFrom().item(() -> counter.getAndIncrement());
    // Multi
    Multi<Integer> multi = Multi.createFrom().items(() ->
        IntStream.range(counter.getAndIncrement(), counter.get() * 2).boxed());
  • 还可以发出一个失败事件,表明操作失败:
    Uni<Integer> failed1 = Uni.createFrom().failure(new Exception("boom"));
    Multi<Integer> failed1 = Multi.createFrom().failure(new Exception("boom"));
  • 当操作不能产生结果时,您仍然需要一种方法来指示操作的完成。为此,Uni 可以发出一个空项, Multi 可以发出完成事件:
    Uni<Void> uni = Uni.createFrom().nullItem();
    Multi<String> multi = Multi.createFrom().empty();
  • 可以使用发射器创建一个 Uni。这种方法在集成基于回调的 api时很有用:
    Uni<String> uni = Uni.createFrom().emitter(em -> {
        // When the result is available, emit it
        em.complete(result);
    });
    
    Multi<Integer> multi = Multi.createFrom().emitter(em -> {
        em.emit(1);
        em.emit(2);
        em.emit(3);
        em.complete();
    });
  • 你也可以从 CompletionStage/CompletableFuture 中统一对象。这在与基于这些类型的 api 集成时非常有用
    Uni<String> uni = Uni.createFrom().completionStage(stage);
  • 创建一个周期性发出数据流的 Multi 管道:
    Multi<Long> ticks = Multi.createFrom().ticks().every(Duration.ofMillis(100));
  • 从生成器创建 Multi 管道:
    Multi<Object> sequence = Multi.createFrom().generator(() -> 1, (n, emitter) -> {
        int next = n + (n / 2) + 1;
        if (n < 50) {
            emitter.emit(next);
        } else {
            emitter.complete();
        }
        return next;
    });

观察事件

Uni 和 Multi 发出事件, 你通常需要观察并处理这些事件。大多数时候代码只对项目和失败事件感兴趣。但是还有其他类型的事件,如取消、请求、完成等,
例如,你可能需要在完成事件后关闭资源,或者记录有关失败或取消的消息。Mutiny 提供了两种方法(invoke + call),可以在不影响其分发的情况下查看或处理各种事件。

你可以使用以下命令观察不同类型的事件:on{event}().invoke(ev -> System.out.println(ev));

  • invoke 方法是同步的无返回值,主要用于记录日志、处理同步副作用等, 如:onItem().invoke(item -> ...); or onFailure().invoke(failure -> ...); Mutiny调用回调函数,并在回调返回时继续向下游传播事件。
  • call 方法可用于执行异步操作,返回 Uni 对象,用于I/O操作,关闭资源,刷新数据等。如:onItem().call(item->someAsyncAction(item))。调用通常在需要实现异步副作用(如关闭资源)时使用。在回调返回的 Uni 发出一个项之前,Mutiny 不会将原始事件分派到下游

注意,这两个调用不会更改项目,它只是调用一个操作,当这个操作完成时,它将向下游发送原始项目。

当观察到失败事件时,如果回调抛出异常,Mutiny 将传播一个 CompositeException,聚合原始失败和回调失败。

转换项目

Uni 及 Multi 都会弹出数据项目,最常见操作之一是使用同步一对一函数转换这些项,新生成的项会被传递给下游。
主要通过 onItem().transform(item -> Function<T, U>) 对数据项目进行转换处理操作。它为每个项目调用传递的函数,并将结果作为生成的项目向下传播。
如果转换抛出异常,则捕获该异常并将其作为失败事件传递给下游订阅者。这也意味着在失败后订阅者将无法获得进一步的项目。

	Uni.createFrom().item("hello")
	    .onItem().transform(i -> i.toUpperCase()) // 大写转换
	    .onItem().transform(i -> i + "!"); // 可以链接多个转换

对接其他管道转换项目:

通过 onItem().transformToUni(Function<T, Uni<O>>)onItem().transformToMulti(Function<T, Multi<O>>)来实现传递数据流的,顺序组合允许链接依赖的异步操作。与 transform 不同,传递给 transformToUni 的函数返回一个Uni。例如,调用一个由 Uni 表示的远程服务的异步操作。返回的 Uni 会从远程服务发出结果,如果发生任何错误则会发出失败事件。

	Uni<String> invokeRemoteGreetingService(String name);
	Uni<String> result = uni
	    .onItem().transformToUni(name -> invokeRemoteGreetingService(name)); // 传递数据并订阅其他 Uni

将接收到的单个数据项目转换为 Multi 中的流式数据:

	Multi<String> result = uni
	    .onItem().transformToMulti(item -> Multi.createFrom().items(item, item)); // 传递数据并订阅,转为 Multi 类型

转换 Multi 数据流:合并 or 连接

将项目从 Multi 转换到后面管道时,需要决定下游订阅者按哪个顺序接收项目。Mutiny 提供了两种方式:

  • 合并:不保留顺序合并接收到的项目。
  • 连接:维护和连接产生项目的流。
  1. 将来自 Multi 中的项目转换为 Uni 中的单个项目:
	Multi<String> merged = multi
	    .onItem().transformToUniAndMerge(name -> invokeRemoteGreetingService(name));
	Multi<String> concat = multi
	    .onItem().transformToUniAndConcatenate(name -> invokeRemoteGreetingService(name));
  1. 将接收的 Multi 项目转换到另一个 Multi 管道数据流中:
	Multi<String> merged = multi
	    .onItem().transformToMultiAndMerge(item -> someMulti(item));
	Multi<String> concat = multi
	    .onItem().transformToMultiAndConcatenate(item -> someMulti(item));

失败处理

失败是可观测数据流的终结事件,表明发生了一些异常,失败后不再接收任何项目。

当接收到这样的事件时,您可以:

  • 传播,向下游传播故障(默认)。
  • 转换,将失败转化为另一个失败。
  • 恢复,切换到另一个流,发送回退项或完成事件。
  • 重试,发生异常进行再次尝试,如果尽管进行了多次尝试,但仍然失败,则该失败将向下游传播。

Mutiny 提供了多个操作符来处理失败事件:

  • 观察:使用 onFailure().invoke() 观测失败进行一些操作,如记录日志。
  • 转换:使用 onFailure().transform(failure -> new BizException(failure)),将失败转化为更有意义的异常类型。
  • 恢复:使用 onFailure().recoverWithItem(fallback) 使用回退项进行恢复。
  • 恢复:使用 onFailure().recoverWithUni(f -> getFallbackUni(f)) 切换到另一个异步管道数据流。
  • 完成:Multi 数据流使用 onFailure().recoverWithCompletion() 来发送完成事件替代异常事件。
  • 重试:使用 onFailure().retry().atMost(3) 进行多次重试。
  • Mutiny 提供了一种指数回退的方法: 每次重试增长间隔:
onFailure().retry()
        .withBackOff(Duration.ofMillis(100), Duration.ofSeconds(1)) // 配置初始延迟和最大延迟。还可以配置抖动来添加随机性。
        .atMost(3)
  • 条件重试:
    onFailure().retry().until(f -> shouldWeRetry(f));

转换阻塞I/O操作,在指定线程上执行操作

方法 runSubscriptionOn 请求上游在给定的执行线程上运行订阅,emitOn 用于指定向下游传播项目、失败和完成事件的执行线程,直到使用另一个emitOn:

    Uni.createFrom()
        .item(this::invokeRemoteServiceUsingBlockingIO) // 在指定的线程之上运行
        .runSubscriptionOn(Infrastructure.getDefaultWorkerPool())  // Mutiny 允许底层平台提供默认的工作线程池
        .subscribe().with(...) // 需要订阅

    Multi<String> multi = Multi.createFrom().items("john", "jack", "sue")
        .emitOn(Infrastructure.getDefaultWorkerPool()) // Mutiny 默认使用从上游发出事件的线程调用下一阶段,可以通过 emitOn 更换线程。
        .onItem().transform(this::invokeRemoteServiceUsingBlockingIO);

转换为阻塞的命令式执行

  • 使用 await().indefinitely() 方法来无限期阻塞和等待 Uni 项目数据。
  • 可以使用 await().atMost(Duration.ofSeconds(1)) 指定等待期限
  • 使用 asIterable() 迭代阻塞获取 Multi 数据,或使用asStream() 获取为 java.util.stream.Stream类型数据:
    Iterable<T> iterable = multi.subscribe().asIterable();
    for (T item : iterable) {
        doSomethingWithItem(item);
    }

使用 Unchecked.function 避免手动 try/catch 操作

当集成抛出检查异常(如IOException)的库时,添加 try/catch 块并将抛出的异常包装到运行时异常中并不是很方便。

    Uni<Integer> uni = item.onItem().transform(Unchecked.function(i -> {
        return methodThrowingIoException(i); // 可以抛出运行时异常
    }));

可以通过 import static io.smallrye.mutiny.unchecked.Unchecked.*; 方便使用各类 Unchecked.function 包装操作

使用 multi.select() 过滤 Multi 数据

    multi.select().where(i -> i > 6) // 条件为 true 是可以继续传播
    select().when(i -> Uni.createFrom().item(i > 6)) // when 异步版本,返回 Uni<Boolean>
    multi.select().distinct() // 过滤相同项,不能在大型或无限流上使用
    multi.select().repetitions() // 过滤连续的重复项,可以在大型或无限流上使用

将 Multi 数据聚合为 Uni

  1. 收集到一个列表:通过 collect().asList() 将项目存储在一个列表中 Uni<list<T>>。当 Multi 完成时,它会发出最终列表。
    Uni<List<String>> uni = multi.collect().asList();
  1. 收集到 Map 中:通过给 asMap 提供一个函数来计算每个项目的键来实现,key 相同的项会覆盖。 .asMultiMap 可以将相同 key 的项保存到一个列表
    Uni<Map<String, String>> uni = multi.collect().asMap(item -> getUniqueKey(item));
  1. 自定义聚合器收集:
    Uni<MyCollection> uni = multi.collect().in(MyCollection::new, (col, item) -> col.add(item)); // 提供容器和累加方法
    Uni<Long> count = multi.collect().with(Collectors.counting()); // 使用 Java Collector

通过 ifNull() 处理空项

    uni.onItem().ifNull().continueWith("hello");
    uni.onItem().ifNull().switchTo(() -> Uni.createFrom().item("hello"));
    uni.onItem().ifNull().failWith(() -> new Exception("Boom!"));
    uni.onItem().ifNotNull().transform(String::toUpperCase) // 非空项

超时处理

为 HTTP 调用等操作添加超时或截止时间,如果在截止时间之前没有得到响应,则认为操作失败。

    Uni<String> uniWithTimeout = uni
            .ifNoItem().after(Duration.ofMillis(100)) // 设置超时
            .recoverWithItem("some fallback item"); // 设置超时处理方式:恢复
            .fail().onFailure(TimeoutException.class).recoverWithItem("we got a timeout"); // 报错
            .failWith(() -> new ServiceUnavailableException()) // 自定义异常

事件延迟

  1. 通过 onItem().delayIt() 延迟 Uni 的项目事件
    Uni<String> delayed = Uni.createFrom().item("hello")
            .onItem().delayIt().by(Duration.ofMillis(10)); // 固定时长
           .onItem().delayIt().until(this::write); // 其他事件结束
  1. Multi 无 delayIt 方法,可以通过 call() 方法实现
    // 将所有项目延迟10毫秒
    Multi<Integer> delayed = multi 
            .onItem().call(i -> Uni.createFrom().nullItem().onItem().delayIt().by(Duration.ofMillis(10)));

使用 map flatMap concatMap 方法

对应关系如下:

  • map -> onItem().transform()
  • flatMap -> onItem().transformToUniAndMerge
  • concatMap -> onItem().transformToUniAndConcatenate

Mutiny API 是围绕组的思想进行分解的,每个组处理一个特定的事件,并提供操作符。然而,为了避免冗长,Mutiny还公开了常用方法的
快捷调用方法对应关系表。

控制需求

  • 设置需求,pacedemand() 操作符可用于在特定时间点自动发布请求。
    // 每100毫秒发出25项请求
    FixedDemandPacer pacer = new FixedDemandPacer(25L, Duration.ofMillis(100L));
    Multi<Integer> multi = Multi.createFrom().range(0, 100)
            .paceDemand().on(Infrastructure.getDefaultWorkerPool()).using(pacer);
  • 限制需求

capDemandsTo 和 capDemandUsing 操作符可用于限制下游用户的请求。 capDemandTo 操作符定义了可以流动的最大数据量。
capDemandUsing 可以通过函数根据自定义公式或先前的需求观察值提供上限值。

    Multi.createFrom().range(0, 100)
            .capDemandsTo(50L).subscribe()

总结

Mutiny 是一个基于 Reactive Streams 标准实现的异步编程库,旨在简化异步编程代码的编写和维护,提高程序的性能和可伸缩性。Mutiny 库根据不同的事件类型分组提供了丰富的操作符,支持开发者进行数据流的转换、过滤、聚合等操作,从而实现更加灵活和高效的异步编程。

Mutiny 可以理解为一个基于事件驱动的数据流处理管道,对数据流从上游到下游的管道进行编排,并传递数据,在发生各种事件时通过各类操作符进行处理。还需要在以后的实际使用中不断加深理解。

更多参考

  • Mutiny 官网:https://smallrye.io/smallrye-mutiny
  • Quarkus Resteasy Reactive: https://quarkus.io/guides/resteasy-reactive
  • Reactive Streams: https://www.reactive-streams.org/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/656650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

1.3 springboot项目中,数据层HikariCP与MyBatis整合

步骤1&#xff1a;在顶级父工程中pom引入mysql和mybatis依赖 <!-- mysql驱动 --> <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.41</version> </dependency> <…

微信公众号每天定时发送消息给女朋友

前言 这是我在这个网站整理的笔记&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;RodmaChen 每天定时发送消息给女朋友 一. 环境准备二. 代码拉取和配置三. 项目部署3.1 直接运行3.2 后台运行 四. 效果图 参考地址&#xff1a;https://github.com/limoes…

云服务器部署极简版openGauss,本地Data Studio远程连接

openGauss是一款开源关系型数据库管理系统&#xff0c;华为研发&#xff0c;2020年7月1日开源。Data Studio提供了一个图形化界面来管理openGauss数据库。 1.下载安装包 在华为云上租一台服务器&#xff0c;操作系统选&#xff1a;openEuler 20.03 64bit (64-bit) 获取openGa…

C程序设计(第五版)

文章目录 前言第3章 顺序程序设计第4章 选择结构程序设计c第5章 循环程序设计第6章 利用数组处理批量数据第7章 用函数实现模块化程序设计第8章 善于利用指针第9章 用户建立数据类型结构体字节对齐 第10章 对文件的输入输出 前言 鉴于写CSDN博客一篇一篇查找比较麻烦&#xff0…

day55_springmvc

今日内容 零、 复习昨日 零、 复习昨日 1 maven项目编译后代码在target 2 发现代码都没有错,该写的都有,但是已启动服务器404,查看target,如果编译会后资源不全面,那就删除重新编译 3 重新看一下,如何使用mavne创建javaweb项目 一、参数绑定 【重点】 所谓参数绑定,就是前端发请…

【GPT LLM】跟着论文学习gpt

GPT1开山之作&#xff1a;Improving language understanding by generative pre-training 本文提出了gpt1&#xff0c;即使用无标签的数据对模型先进行训练&#xff0c;让模型学习能够适应各个任务的通用表示&#xff1b;后使用小部分 task-aware的数据对模型进行微调&#xff…

csdn编辑

这里写自定义目录标题 欢迎使用Markdown编辑器新的改变功能快捷键合理的创建标题&#xff0c;有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants 创建一个自定义列表如何创建一个…

【tensorflow】连续输入+离散输入的神经网络模型训练代码

【tensorflow】连续输入离散输入的神经网络模型训练代码 离散输入的转化问题 构造词典 创建离散数据、转化字典索引、创建连续数据 创建离散输入连续输入模型 训练输出 全部代码 - 复制即用 查看本系列三种模型写法&#xff1a;   【tensorflow】连续输入的线性回归模型训…

基于知识图谱的电影推荐系统——Neo4jPython

文章目录 1. 数据解下载与配置2. 将处理好的数据导入数据库中3. 执行项目 1. 数据解下载与配置 选择TMDB电影数据集&#xff0c;Netflix Prize 数据集下载。 也可直接从这里下载&#xff1a;链接: https://pan.baidu.com/s/1l6wjwcUzy5G_dIlVDbCkpw 提取码: pkq6 。 执行prep…

【AI】金融FinGPT模型

金融FinGPT模型开源&#xff0c;对标BloombergGPT&#xff0c;训练参数可从61.7亿减少为367万&#xff0c;可预测股价 继Bloomberg提出了500亿参数的BloombergGPT&#xff0c;GPT在金融领域的应用受到了广泛关注&#xff0c;但BloombergGPT是一个非开源的模型&#xff0c;而且…

【ESP32之旅】U8g2 在线仿真和UI调试

前言 几乎每个玩屏幕的电子DIYer都知道万能的屏幕驱动中间件u8g2库&#xff0c;这个库提供了强大的驱动适配和ui设计能力。但是官方没有一个好用的ui设计和仿真软件&#xff0c;在设计UI布局的时候对单片机频繁的烧录调试浪费了大量的时间。最近在论坛看到有一个第三方维护的在…

nginx映射后,公网通过域名无法访问到静态资源

今天发生一件奇怪的事情&#xff0c;首先是阿里云的数字DV证书中pgj.bw580.com和acc.bw580.com无缘无故的消失了&#xff0c; 接着查看https://pgj.bw580.com/css/chunk-ceb11154.aefc15d8.css&#xff0c;在跳板机中可以访问到该资源&#xff0c;但是通过外网能够访问。 通过防…

MySQL 中各种锁的详细介绍

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

P109认识和改造世界

认识世界的根本目的在于改造世界 认识和改造世界之间的辩证关系 感觉只喜欢考 必然和自由的辩证关系 人类创造历史的两个基本活动 &#xff1a; 认识和改造世界所以认识和改造世界的基础是实践 认识改造和三大界之间的联系 改造客观世界和改造主观世界之间的关系 认识世界…

台电x80HD 安装linux系统,可调电压电源供电,外网访问、3D打印klipper固件

一、系统安装 参照https://blog.csdn.net/gangtieren/article/details/102975027安装 安装过程遇到的问题&#xff1a; 1、试了 linux mint 21 、ubuntu20.04 、ubuntu22.04 都没有直接安装成功&#xff0c;u盘选择安装进入系统后一直黑屏&#xff0c;只有ubuntu18.04 选择后稍…

基于Eclipse+Java+Swing+Mysql实现学生成绩管理系统

基于EclipseJavaSwingMysql实现学生成绩管理系统 一、系统介绍二、功能展示1.登陆2.成绩浏览3.班级添加4.班级维护5.学生添加6、学生维护 三、数据库四、其它1.其他系统实现五.获取源码 一、系统介绍 学生&#xff1a;登陆、成绩浏览 管理员&#xff1a;登陆、班级添加、班级维…

多分支merge忽略文件合并

该文章已同步收录到我的博客网站&#xff0c;欢迎浏览我的博客网站&#xff0c;xhang’s blog 1. .gitattributes 文件的作用 .gitattributes 文件是 Git 版本控制系统中的一个配置文件&#xff0c;它用于指定 Git 如何处理文件的二进制数据&#xff0c;以及如何标识文件的类…

字节月薪23k软件测试工程师:必备的6大技能(建议收藏)

软件测试 随着软件开发行业的日益发展&#xff0c;岗位需求量和行业薪资都不断增长&#xff0c;想要入行的人也是越来越多&#xff0c;但不知道从哪里下手&#xff0c;今天&#xff0c;就给大家分享一下&#xff0c;软件测试行业都有哪些必会的方法和技术知识点&#xff0c;作…

夏天到了,给数据中心泼点“冷水”

气温上升&#xff0c;还有什么能比“工作没了”&#xff0c;更能让人一瞬间心里拔凉拔凉的呢&#xff1f; 这个“薪尽自然凉”的故事&#xff0c;就发生在数据中心。 前不久&#xff0c;某电商平台正在购物高峰期&#xff0c;结果IDC冷冻系统故障&#xff0c;机房设备温度快速升…

智能电动汽车充电桩系统及硬件电路研究 安科瑞 许敏

摘要&#xff1a;随着充电桩技术的发展&#xff0c;以及人们对电动汽车快速充电的需求&#xff0c;很多厂商开始对智能充电桩进行研究。以电动 汽车智能充电桩的发展现状为背景&#xff0c;进行了智能电动汽车充电桩系统硬件电路的研究。 关键词&#xff1a;充电桩&#xff1b…