【微服务】spring webflux响应式编程使用详解

news2024/12/24 20:39:04

目录

一、webflux介绍

1.1 什么是webflux

1.2 什么是响应式编程

1.3 webflux特点

二、Java9中响应式编程

2.1 定义事件流源

2.2 实现订阅者

三、Spring Webflux介绍

四、Reactor 介绍

五、Reactor 常用API操作

5.1 Flux 创建流操作API

5.2 Flux响应流的订阅

5.3 Flux处理实时流

六、Spring Webflux 使用

6.1 Spring Webflux简介

6.1 Spring Webflux中的核心组件

6.2 Spring Webflux基于注解的实现

6.2.1 引入核心依赖

6.2.2 核心业务类

6.2.3 核心接口类

6.3 Spring Webflux 函数式编程实现

6.3.1 自定义handler

6.3.2 自定义server服务器

6.3.3 访问效果测试

6.3.4 使用webclient调用

6.4 Spring Boot RouterFunction 整合方式一

6.5 Spring Boot RouterFunction 整合方式二

6.5.1 静态化改造

七、webflux的使用场景

八、写在文末


一、webflux介绍

1.1 什么是webflux

webflux,即响应式编程。在JDK9中开始引入了响应式编程模型,而spring5.0版本之后正式引入对webflux的支持,即spring webflux,spring webflux是spring在5.0版本后提供的一套响应式编程风格的web开发框架。

1.2 什么是响应式编程

响应式编程是一种用于处理异步数据流和事件的编程范式。它的核心思想是将数据流看作是一系列事件的序列,通过对事件流的处理来实现计算。它强调基于事件的异步处理和函数式编程的思想,可以帮助开发人员更好地处理复杂的应用程序逻辑。

而响应式编程,其实就是为这种异步非阻塞的流式编程制定的一套标准。流式编程已不陌生了,Java8提供的stream api就是这种风格。这套标准包括对运行环境(JVM、JavaScript)以及网络协议相关的规范。

1.3 webflux特点

非阻塞式

在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程

函数式编程

Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求

二、Java9中响应式编程

Java 9引入了Flow API作为响应式编程的标准实现,具体来说:

  • Flow API提供了一组接口和类,用于定义和处理数据流;

  • 它基于Publisher-Subscriber模式,其中Publisher生成事件流并发布给Subscriber进行处理。

如果使用Java9中的响应式编程进行实现,核心需要两步:

  • 定义事件流源;
  • 实现订阅者;

下面来看一段具体的实现代码。

2.1 定义事件流源

在Flow API中,事件流源被定义为Publisher的实现类,具体来说,首先需要创建一个类实现Publisher接口,并重写其subscribe()方法。在subscribe()方法中,可以通过调用Subscriber的onSubscribe()方法来将事件流订阅给Subscriber。

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class EventPublisher implements Flow.Publisher<String> {

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        subscriber.onSubscribe(new SimpleSubscription(subscriber));
    }
}

2.2 实现订阅者

订阅者是实现Subscriber接口的类。在Flow API中,只需要实现Subscriber接口的onNext()、onError()和onComplete()方法;

  • 当事件流发出下一个元素时,onNext()方法将被调用;

  • 当发生错误时,onError()方法将被调用;

  • 当事件流结束时,onComplete()方法将被调用;

在这些方法中,我们可以根据业务需要添加处理事件流的数据相关逻辑。

import java.util.concurrent.Flow;

public class EventSubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received item: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Event stream completed.");
    }
}

测试代码

import java.util.concurrent.Flow;

public class Main {
    public static void main(String[] args) {
        EventPublisher publisher = new EventPublisher();
        EventSubscriber subscriber = new EventSubscriber();

        publisher.subscribe(subscriber);

        publisher.submit("Event 1");
        publisher.submit("Event 2");
        publisher.submit("Event 3");

        publisher.close();
    }
}

三、Spring Webflux介绍

是Spring5添加新的模块,用于web开发的,功能和SpringMVC类似的,Webflux使用当前一种比较流程响应式编程出现的框架。spring官方文档地址:Web on Reactive Stack :: Spring Framework

spring-webflux是spring web框架体系中的一个组成模块,说起这个WebFlux,不难会拿出来与Spring Web与WebMvc进行比较,因为在目前很多项目开发中,仍然会使用WebMVC进行开发,尽管springboot成为基础的开发框架,但是接口开发中核心组件还是WebMVC的进一步封装。

四、Reactor 介绍

可以这么理解,响应式编程中的核心实现在于Reactor 的实现和应用,具体来说,Reactor是满足Reactive规范框架。具体来说:

  • 对响应式流规范的一种实现;

  • Spring Webflux默认的响应式框架;

  • 完全异步非阻塞,对背压的支持;

  • 提供两个异步序列API,Flux[N]和Mono[0|1];

  • 提供对响应式流的操作;

在Reactor中,有两个核心类,Flux和Mono ,这两个类实现接口 Publisher,提供丰富操作符。

  • Flux 对象实现发布者,返回 N 个元素,即产生0到N个元素的异步序列;

  • Mono 实现发布者,返回 0 或者 1 个元素,即产生至多一个元素的异步序列。

Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种数据信号特点:

  • 错误信号和完成信号都是终止信号,不能共存的;
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流;
  • 如果没有错误信号,没有完成信号,表示是无限数据流;

五、Reactor 常用API操作

接下来通过实际操作来演示下基于Reactor 常用的API的使用。引入如下依赖包。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.5</version>
</dependency>

5.1 Flux 创建流操作API

在上面提到,如果你需要创建多于一个元素的异步序列,可以考虑使用Flux 相关API,下面是使用Flux 的创建多种形式流的操作

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

public class ReactorApi {

    @Test
    public void fluxJust() {
        Flux<String> phones = Flux.just("小米", "三星", "华为");
    }

    @Test
    public void fluxFromIterable() {
        Flux<String> phones = Flux.fromIterable(Arrays.asList("小米", "三星", "华为"));
    }

    @Test
    public void fluxFromArray() {
        Flux<String> phones = Flux.fromArray(new String[]{"小米", "三星", "华为"});
    }

    @Test
    public void fluxFromStream() {
        Flux<String> phones = Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"}));
        phones.subscribe();
        phones.subscribe(); //只能被订阅一次
    }

    @Test
    public void fluxEmpty() {
        Flux<String> phones = Flux.empty(); //generic type still honored
    }

    @Test
    public void fluxRange() {
        Flux<Integer> phones = Flux.range(5, 3);
    }

    @Test
    public void fluxGenerate() {
        Flux<Long> flux = Flux.generate(
                AtomicLong::new,
                (state, sink) -> {
                    long i = state.getAndIncrement();
                    sink.next(i);
                    if (i == 10) sink.complete();
                    return state;
                },
                (state) -> System.out.println("done")
        );
        flux.subscribe(System.out::println);
    }


    @Test
    public void fluxCreate() {
        Flux<String> phones = Flux.create((t) -> {
            t.next("小米");
            t.next("三星");
            t.next("华为");
            t.complete();
        });

        phones.subscribe(System.out::println);

        System.out.println("------------");

        Flux<String> ownFluxListener = Flux.create(sink -> {
            //传入自定义的方法
            new MyDataListener(){
                public void onReceiveData(String str){
                    sink.next(str);
                }
                public void onComplete(){
                    sink.complete();
                }
            };
        }, FluxSink.OverflowStrategy.DROP);

        ownFluxListener.subscribe(System.out::println);

    }

    public class MyDataListener{
        public void onReceiveData(String str){
            System.out.println("收到数据:"+str);
        }

        public void onComplete(){
            System.out.println("完成数据的消费处理");
        }
    }

    @Test
    public void fluxDefer() {
        Flux.defer(() -> Flux.just("小米", "三星", "华为"))
                .subscribe(System.out::println);

        Flux<String> stockSeq4 = Flux.defer(() -> Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"})));
        stockSeq4.subscribe();
        stockSeq4.subscribe();
    }

    @Test
    public void fluxInterval() throws InterruptedException {
        //interval 定时发送元素
        Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
                .subscribe((t) -> System.out.println((String.valueOf(t))));
        Thread.sleep(1000000);
    }

}

5.2 Flux响应流的订阅

在上面的操作API中,调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。接下来,看看如何订阅和操作这些流。

import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

public class ReactorStreamApi {

    /**
     * 流的map操作
     */
    @Test
    public void streamMap() {
        Flux<Integer> ints = Flux.range(1, 4);
        Flux<Integer> mapped = ints.map(i -> i * 2);
        mapped.subscribe(System.out::println);
    }

    /**
     * 带有异常情况的处理
     */
    @Test
    public void withError() {
        Flux<Integer> ints = Flux.range(1, 4)
                .map(i-> {
                    if(i<3){
                        return i;
                    }
                    throw new RuntimeException("大于3");
                });
        ints.subscribe(
                i-> System.out.println(i),
                err -> System.out.println("error : " + err.getMessage()),
                () -> System.out.println("完成订阅和数据的消费")
        );
    }

    @Test
    public void testSubscribeWithBase(){
        Flux<Integer> ints = Flux.range(1, 4);
        ints.subscribe(new MySubscriber<>());
    }

    public class MySubscriber<T> extends BaseSubscriber<T> {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("MySubscriber");
            request(1);
        }

        @Override
        protected void hookOnNext(T value) {
            System.out.println(value.toString());
            request(1);
        }
    }

    /**
     * 流的filter操作
     */
    @Test
    public void streamFilter() {
        Flux<Integer> ints = Flux.range(1, 4);
        Flux<Integer> filtered = ints.filter(i -> i % 2 == 0);
        filtered.subscribe(System.out::println);
    }

    @Test
    public void streamBuffer() {
        Flux<Integer> ints = Flux.range(1, 40);
        Flux<List<Integer>> buffered = ints.buffer(3);
        buffered.subscribe(System.out::println);
    }

    @Test
    public void streamRetry() {
        Mono<String> client = Mono.fromSupplier(() -> {
            double num = Math.random();
            if (num > 0.01) {
                throw new Error("Network issue");
            }
            return "https://www.baidu.com";
        });
        client.retry(3).subscribe(System.out::println);
    }

    /**
     * 响应式流的合并
     */
    @Test
    public void streamZip(){
        Flux<Integer> fluxA = Flux.range(1, 4);
        Flux<Integer> fluxB = Flux.range(5, 5);
        fluxA
                .zipWith(fluxB, (a, b)-> a+b)
                .subscribe(System.out::println);
    }

}

5.3 Flux处理实时流

对于某些需要实时处理的场景,可以考虑Flux的实时流的处理

import org.junit.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

public class StreamTest {

    @Test
    public void simpleHotStreamCreation() {
        Sinks.Many<Integer> hotSource = Sinks.unsafe().many().multicast().directBestEffort();
        //转为flux
        Flux<Integer> hotFlux = hotSource.asFlux();
        //订阅数据
        hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d));

        hotSource.emitNext(1, FAIL_FAST);
        hotSource.tryEmitNext(2).orThrow();

        hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d));

        hotSource.emitNext(3, FAIL_FAST);
        hotSource.emitNext(4, FAIL_FAST);
        hotSource.emitComplete(FAIL_FAST);

    }

    @Test
    public void connectableFlux() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 4);
        ConnectableFlux<Integer> connectableFlux = source.publish();
        connectableFlux.subscribe(d -> System.out.println("Subscriber 1 gets " + d));
        connectableFlux.subscribe(d -> System.out.println("Subscriber 2 gets " + d));
        System.out.println("Finish subscribe action");
        Thread.sleep(1000L);
        System.out.println("Connect to Flux now");
        connectableFlux.connect();
    }

    @Test
    public void autoConnectConnectableFlux() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 4);
        Flux<Integer> autoConnect = source.publish().autoConnect(2);
        autoConnect.subscribe(d -> System.out.println("Subscriber 1 gets " + d));
        System.out.println("Finish subscriber 1 action");
        Thread.sleep(1000L);
        System.out.println("Start subscriber 2 action");
        autoConnect.subscribe(d -> System.out.println("Subscriber 2 gets " + d));
    }

}

六、Spring Webflux 使用

6.1 Spring Webflux简介

在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况。因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。

在servlet3.0标准之后,为解决此类问题,提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他请求使用,这样的操作机制将极大的提升程序的并发性能。

对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。

在spring中实现响应式编程,就需要使用到spring webFlux。该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式(编程式实现),另一种是基于SpringMVC注解方式。

6.1 Spring Webflux中的核心组件

Spring Webflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻 塞的框架。Spring Webflux 执行过程和 SpringMVC 相似的 Spring Webflux 核心控制器 DispatchHandler,实现接口 WebHandler。

SpringWebflux 里面 DispatcherHandler,负责请求的处理,

  • HandlerMapping:请求查询到处理的方法

  • HandlerAdapter:真正负责请求处理

  • HandlerResultHandler:响应结果处理

SpringWebflux 实现函数式编程,两个接口:RouterFunction(路由处理)和 HandlerFunction(处理函数)

6.2 Spring Webflux基于注解的实现

6.2.1 引入核心依赖

注意,如果是在springboot项目中提供web接口,引入了下面的依赖之后就不要引入spring-boot-starter-web依赖了。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

6.2.2 核心业务类

使用webflux编写web接口,与普通的rest-api类似,只是在webflux,返回值不再是对象或其他数据类型,而是Flux或Mono包装的数据对象。

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

@Service
public class BookServiceImpl implements BookService {

    //创建 map 集合存储数据
    private final Map<String,BookInfo> books = new HashMap<>();

    public BookServiceImpl() {
        this.books.put("01",new BookInfo("01","Java",20));
        this.books.put("02",new BookInfo("02","Js",30));
        this.books.put("03",new BookInfo("03","Hadoop",50));
    }

    @Override
    public Mono<BookInfo> getById(String id) {
        return Mono.justOrEmpty(this.books.get(id));
    }

    @Override
    public Flux<BookInfo> getAll() {
        return Flux.fromIterable(this.books.values());
    }

    @Override
    public Mono<Void> saveBookInfo(Mono<BookInfo> bookInfoMono) {
        return bookInfoMono.doOnNext(book -> {
            //向 map 集合里面放值
            int id = books.size()+1;
            books.put(String.valueOf(id),book);
        }).thenEmpty(Mono.empty());
    }
}

6.2.3 核心接口类

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class BookController {

    @Autowired
    private BookService bookService;

    //根据ID查询 http://localhost:8082/book/01
    @GetMapping("/book/{id}")
    public Mono<BookInfo> getById(@PathVariable String id) {
        return bookService.getById(id);
    }
    
    //查询所有 http://localhost:8082/findAll
    @GetMapping("/findAll")
    public Flux<BookInfo> getUsers() {
        return bookService.getAll();
    }
   
    @PostMapping("/save")
    public Mono<Void> save(@RequestBody BookInfo user) {
        Mono<BookInfo> userMono = Mono.just(user);
        return bookService.saveBookInfo(userMono);
    }

}

选择其中一个接口测试,可以看到效果与传统的API接口返回值并无差别

补充说明:

1)SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat;

2)SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty;

6.3 Spring Webflux 函数式编程实现

在使用函数式编程模型操作时候,需要自己初始化服务器,基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。

Spring Webflux 请 求 和 响 应 不 再 是 ServletRequest 和 ServletResponse ,而是ServerRequest 和 ServerResponse

熟悉Netty的同学对Netty的编码风格不陌生,在编写Netty的服务时,也需要自定义Handler,然后将这个自定义Handler配置到启动配置参数中,因此可以同样的方式来理解Spring Webflux的函数式编程的套路。

6.3.1 自定义handler

可以这么理解,在这个handler类中,其实就是对底层的业务方法进一步的封装,只不过返回的数据类型为Mono或Flux;

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BookHandler {

    private BookService bookService;

    public BookHandler(BookService bookService) {
        this.bookService = bookService;
    }

    /**
     * 根据ID查询
     * @param serverRequest
     * @return
     */
    public Mono<ServerResponse> getBookById(ServerRequest serverRequest) {
        String id = serverRequest.pathVariable("id");
        Mono<BookInfo> bookInfoMono = this.bookService.getById(id);

        Mono<ServerResponse> noDataRes = ServerResponse.notFound().build();

        return bookInfoMono.flatMap(bookInfo ->
                ServerResponse
                        .ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(bookInfo, BookInfo.class)
                        .switchIfEmpty(noDataRes)
        );
    }

    /**
     * 获取所有
     * @return
     */
    public Mono<ServerResponse> getAllBooks(ServerRequest serverRequest) {
        //调用 service 得到结果
        Flux<BookInfo> bookInfoFlux = this.bookService.getAll();
        return
                ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(bookInfoFlux,BookInfo.class);
    }

    /**
     * 保存数据
     * @param request
     * @return
     */
    public Mono<ServerResponse> saveUser(ServerRequest request) {
        //得到 user 对象
        Mono<BookInfo> bookInfoMono = request.bodyToMono(BookInfo.class);
        return
                ServerResponse.ok().build(this.bookService.saveBookInfo(bookInfoMono));
    }

6.3.2 自定义server服务器

该类的作用就相当于是netty编程中,通过ServerBootstrap创建一个服务器类似;

import com.congge.handler.BookHandler;
import com.congge.service.BookService;
import com.congge.service.impl.BookServiceImpl;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.http.server.HttpServer;

import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;

public class BookServer {

    public RouterFunction routerFunction() {

        BookService bookService = new BookServiceImpl();
        BookHandler bookHandler = new BookHandler(bookService);

        //设置路由
      /*  return RouterFunctions.route(
                GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById)
                .andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::get
                        AllUsers);*/
        return RouterFunctions.route(
                RequestPredicates.GET("/users/{id}")
                        .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
                bookHandler::getBookById
        ).andRoute(
                RequestPredicates.GET("/users/{id}")
                        .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
                bookHandler::getAllBooks
        );
    }

    public void createReactorServer() {
        //路由和 handler 适配
        RouterFunction<ServerResponse> route = routerFunction();
        HttpHandler httpHandler = toHttpHandler(route);
        ReactorHttpHandlerAdapter adapter = new
                ReactorHttpHandlerAdapter(httpHandler);
        //创建服务器
        HttpServer httpServer = HttpServer.create();
        httpServer.handle(adapter).bindNow();
    }

    public static void main(String[] args) throws Exception{
        BookServer server = new BookServer();
        server.createReactorServer();
        System.out.println("enter to exit");
        System.in.read();
    }

}

在该类的最后,编写了一个main函数,运行这个main程序,注意日志中的端口号,因为接下来将通过这个端口进行访问;

6.3.3 访问效果测试

访问接口:localhost:51315/book/01

6.3.4 使用webclient调用

也可以编写webclient调用上面的接口,代码如下

import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

public class ClientTest {

    public static void main(String[] args) {
        //调用服务器地址
        WebClient webClient = WebClient.create("http://127.0.0.1:51315");
        //根据 id 查询
        String id = "01";
        BookInfo bookInfo = webClient.get().uri("/book/{id}", id)
                .accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(BookInfo.class)
                .block();
        System.out.println(bookInfo.getName());

        //查询所有
        Flux<BookInfo> results = webClient.get().uri("/book/findAll").accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(BookInfo
                        .class);
        results.map(stu -> stu.getName())
                .buffer().doOnNext(System.out::println).blockFirst();
    }

}

6.4 Spring Boot RouterFunction 整合方式一

上面是通过自定义handler的方式实现了Spring Webflux函数式编程,如果直接在springboot中直接集成怎么做呢,只需要通过自定义配置bean的方式,将路由配置进去即可;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.BodyInserters.fromObject;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Configuration
public class MyRoutesConfig {

    @Bean
    RouterFunction<ServerResponse> index() {
        return route(GET("/index"), request -> ok().body(fromObject("Hello Index")));
    }

    @Bean
    RouterFunction<ServerResponse> about() {
        return route(GET("/about"), request -> ok().body(fromObject("About page")));
    }

}

当然里面的逻辑非常简单,实际使用时,可以在每个bean中补充更复杂的逻辑,比如调用其他业务类的逻辑,同样我们启动springboot应用后访问下端点/index,看到下面的效果。

6.5 Spring Boot RouterFunction 整合方式二

紧接着上面的案例,下面使用更通用的做法来完成与RouterFunction 的整合,首先还是自定义一个handler,这种自定义的配置类形式handler好处是可以注入其他业务类,从而实现更复杂的逻辑。

import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class ApiHandler {

    public Mono<ServerResponse> getNewBooks(ServerRequest serverRequest) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(
                        Flux.create(sink ->{
                            sink.next(new BookInfo("05","mysql",90));
                            sink.next(new BookInfo("06","flink",78));
                            sink.next(new BookInfo("07","php",66));
                            sink.complete();
                        }),BookInfo.class
                );
    }

    public Mono<ServerResponse> getBookById(ServerRequest serverRequest) {
        String bookId = serverRequest.pathVariable("id");
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(
                        Mono.just(
                                new BookInfo(bookId,"python",57)
                        ),BookInfo.class
                );
    }

}

自定义routerFunction,可以这么理解,通过这个类,就不用再单独编写一个controller,从而实现与普通的controller类中一样定义接口的功能。

import com.congge.handler.ApiHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class ApiRouterFunction {

    @Bean
    public RouterFunction<ServerResponse> apiRoute(ApiHandler apiHandler){
        return route(
                GET("/book/getBookById/{id}"),
                apiHandler::getBookById
        ).and(
                route(
                        GET("/book/getNewBooks"),
                        apiHandler::getNewBooks
        ));

    }

}

启动项目之后,我们来访问一下其中的一个接口,效果与普通的接口效果类似。

6.5.1 静态化改造

如果你不希望上面的自定义handler和routerConfig与spring框架耦合的太紧密,也可以将其做成静态化的配置,通过app启动的时候自动注册,只需去掉spring相关的注解,然后在app启动类注册进去即可。

ApiRouterFunction改造,将原本的配置bean方法修改为static 如下代码

public class ApiRouterFunction {

    public static RouterFunction<ServerResponse> apiRoute(){
        ApiHandler apiHandler = new ApiHandler();
        return route(
                GET("/book/getBookById/{id}"),
                apiHandler::getBookById
        ).and(
                route(
                        GET("/book/getNewBooks"),
                        apiHandler::getNewBooks
                ));

    }

}

ApiHandler改造,去掉配置注解,启动类改造如下

public static void main(String[] args) {
        new SpringApplicationBuilder()
                .sources(FluxApp.class)
                .initializers((ApplicationContextInitializer<GenericApplicationContext>) ctx ->{
                    ctx.registerBean("apiRoute",RouterFunction.class,ApiRouterFunction::apiRoute);
                })
                .run(args)
        ;
    }

再次启动后调用相同的接口,仍然可以得到正确的响应结果

七、webflux的使用场景

通过上面关于webflux的使用了解到webflux的强大之处,其实在很多中间件,微服务组件中都随处可见webflux的响应式编程的影子,比如在springcloud gateway网关中,网关作为流量的入口,为了持续提升整体服务的高性能、高吞吐、高并发的请求,在处理请求拦截、路由转发等方面使用webflux。如下这段代码,就是gateway中自定义过滤器的一段配置;

@Component
@Slf4j
public class LogFilter implements GlobalFilter {
 
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info(exchange.getRequest().getPath().value());
        return chain.filter(exchange);
    }
}

结合实际经验,对于下面的这些场景,可以考虑使用webflux解决:

  • Spring WebFlux 是一个异步非阻塞式的 Web 框架,所以,它特别适合应用在 IO 密集型的服务中,比如像上面提到的微服务网关这样的应用中;
  • 硬件资源扩充困难,但又希望提升系统整体的吞吐量,可以考虑使用webflux,因为WebFlux 内部使用的是响应式编程(Reactive Programming),以 Reactor 库为基础, 基于异步和事件驱动;
  • 一些对请求响应时间要求不高,但是并发较大的异步场景;

注意

WebFlux 并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。

八、写在文末

从WebFlux 的发展以及在众多的Java生态组件中广泛使用来看,WebFlux 的流行趋势已经到来,因此掌握WebFlux 的核心原理和思想,在日常工作开发中,在某些特殊的场景下能够提供很好的解决思路,当然WebFlux 涉及到的技术点还有很多,比如对websocket的支持等,有兴趣的同学可以继续参阅相关资料深入学习,本篇到此结束,感谢观看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1110022.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

『C++成长记』C++入门——命名空间缺省参数

&#x1f525;博客主页&#xff1a;小王又困了 &#x1f4da;系列专栏&#xff1a;C &#x1f31f;人之为学&#xff0c;不日近则日退 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、C的认识 &#x1f4d2;1.1什么是C &#x1f4d2;1.2C的发展 二、C关键字 三…

Bootstrap的small标签

Bootstrap 中的 <small> 标签是用来标记和渲染小文本的 HTML 元素。它通常用于表示与主要文本内容不同的次要信息、注释、版权声明、法律声明、或者其他需要更小字号的文本。在 Bootstrap 中&#xff0c;<small> 标签可以进一步定制样式以适应您的设计需求。 以下…

如何用工业树莓派和MQTT平台打通OT和IT?

一、应用设备 OT端设备&#xff1a;步进电机&#xff0c;MODBUS TCP远程I/O模块&#xff0c;PLC设备 边缘侧设备&#xff1a;宏集工业树莓派&#xff1b; IT端设备&#xff1a;PC、安卓手机&#xff1b; IT端软件&#xff1a;宏集HiveMQ MQTT通信平台 二、原理 宏集工业树…

Autodesk_Revit2022安装图文教程_Revit2022建筑信息模型BIM软件图文教程

下载地址&#xff1a;https://pan.92zl.cn/ 1、先下载文件&#xff0c;不同版本的安装注册方式是不一样的步骤。 2、运行安装文件。 3、安装过程请等待 4、和常用软件一样安装&#xff0c;注意安装路径即可 5、直到出现安装完成界面 6、接下来打开Crack文件夹的Autodesk Licen…

Redis之UV统计

HyperLogLog 首先我们搞懂两个概念&#xff1a; UV&#xff1a;全称Unique Visitor&#xff0c;也叫独立访客量&#xff0c;是指通过互联网访问、浏览这个网页的自然人。1天内同一个用户多次访问该网站&#xff0c;只记录1次。PV&#xff1a;全称Page View&#xff0c;也叫页…

C++类与对象 (上)

C类与对象 &#xff08;上&#xff09; 1.面向过程和面向对象初步认识2.类的引入3.类的定义4.类的访问限定符及封装4.1权限限定符4.2封装 5.类的作用域6.类的实例化7.类的对象大小的计算7.1如何计算类的大小7.2类对象的储存方式 8.类成员函数的this指针8.1this指针的引出8.2thi…

C语言动态内存管理———超级全!快来看!

C语言动态内存管理———超级全&#xff01;快来看&#xff01; 文章目录 C语言动态内存管理———超级全&#xff01;快来看&#xff01;一、为什么要用动态内存二、动态内存函数1.malloc和free①malloc②.free③举例a.内存的空间b.栗子 2.calloc3.realloc 三、动态内存分配常…

遇到工厂索赔你是一竿子处理方式吗?

最近听到一个博主说关于客户索赔的事情&#xff0c;说是自己以前的处理方式要么就是将事情推给工厂&#xff0c;以工厂的态度为原则&#xff0c;工厂赔付给我们&#xff0c;然后我们就同意赔付给客户。如果工厂敷衍我们&#xff0c;那么我们就敷衍客户&#xff0c;这样最起码能…

蓝桥杯每日一题2023.10.19

题目描述 完全二叉树的权值 - 蓝桥云课 (lanqiao.cn) 题目分析 我们以每一个节点的坐标来将这一深度的权值之和相加从而算出权值和 要清楚每一个深度的其实节点和末尾节点&#xff0c;使用双指针将这个深度节点的权值和计算出来&#xff0c;记录所 需要的深度即可 #includ…

QT基础 柱状图

目录 1.QBarSeries 2.QHorizontalBarSeries 3.QPercentBarSeries 4.QHorizontalPercentBarSeries 5.QStackedBarSeries 6.QHorizontalStackedBarSeries 从上图得知柱状的基类是QAbstractBarSeries&#xff0c;派生出来分别是柱状图的水平和垂直类&#xff0c;只是类型…

1688拍立淘接口,按图搜索商品接口,图片识别接口,图片上传搜索接口,图片搜索API接口,以图搜货接口

1688拍立淘接口的作用是让用户通过上传图片或输入图片链接的方式&#xff0c;调用1688的图片搜索引擎&#xff0c;返回与该图片相关的所有1688商品。 使用该接口需要先获取一个key和secret&#xff0c;然后参考API文档里的接入方式和示例&#xff0c;查看测试工具是否有需要的…

h5的扫一扫功能 (非微信浏览器环境下)

必须在 https 域名下才生效 <template><div><van-field label"服务商编码" right-icon"scan" placeholder"扫描二维码获取" click-right-icon"getCameras" /> <div class"scan" :style"{disp…

互动超2800万!小红书涌现大批“听劝人”,关键词数据发掘内容玩法

听说一身反骨的年轻人&#xff0c;最近开始听劝了&#xff1f;不知道你有没有注意到&#xff0c;小红书上被“旅游听劝、穿搭听劝、改妆听劝”等各种“听劝”刷屏&#xff0c;诸多品牌纷纷入局&#xff0c;迂回种草。 一、预估互动量超2800万&#xff0c;小红书“听劝体”爆火 …

大数据高级面试题

大数据高级面试题 Kafka的producer如何实现幂等性? Producer 幂等性 Producer 的幂等性指的是当发送同一条消息时&#xff0c;数据在 Server 端只会被持久化一次&#xff0c;数据不丟不重&#xff0c;但是这里的幂等性是有条件的&#xff1a; 只能保证 Producer 在单个会话内…

最新最全大数据专业毕业设计选题精华汇总-持续更新中

文章目录 0 前言1 大数据毕设选题推荐2 开题指导3 最后 0 前言 大家好&#xff01;大四的同学们&#xff0c;毕业设计的时间即将到来&#xff0c;你们准备好了吗&#xff1f;为了帮助大家更好地开始毕设&#xff0c;我作为学长给大家整理了最新的计算机大数据专业的毕设选题。…

内网穿透的应用-通过内网穿透技术实现PLSQL远程访问Oracle数据库

文章目录 前言1. 安装postgreSQL2. 本地连接postgreSQL3. Windows 安装 cpolar4. 配置postgreSQL公网地址5. 公网postgreSQL访问6. 固定连接公网地址7. postgreSQL固定地址连接测试 前言 PostgreSQL是一个功能非常强大的关系型数据库管理系统&#xff08;RDBMS&#xff09;,下…

企业如何有效搭建呼叫中心系统?

通讯是企业运营的重要组成部分&#xff0c;尤其是大型企业&#xff0c;必须采取有效的通讯方式&#xff0c;以便于与客户保持良好的沟通。为了满足这个需求&#xff0c;许多企业已经开始搭建自己的呼叫中心系统。 呼叫中心是什么&#xff1f; 呼叫中心系统是一种用于处理大量电…

OnlyOffice集成Springboot以及web端

上次我们已经搭建好了onlyoffice的服务&#xff0c;不知道如何搭建的伙伴可以看看上篇文章。 以下是springboot和前端web简单集成的页面&#xff0c;亲测jdk8和jdk17都适用。 结构 前端页面 index.html <!DOCTYPE html> <html lang"en" xmlns:th"h…

虹科Pico技术交流会上海站启程 | 2+2课程新模式,入门汽车波形免拆诊断

虹科与Tech Gear免拆诊断学院即将于9月在上海举办汽车示波器交流会&#xff0c;本次交流会报名人满即开班&#xff0c;旨在为汽车示波器的基础入门和初阶选手提供学习机会。 本次交流会将在Tech Gear免拆诊断学院的车间进行。课程的安排灵活多样&#xff0c;包括理论课程和实践…

自定义内核模块读取进程的线性地址

打印指定进程的线性地址段 利用procfs查看进程的线性地址自定义内核模块读取进程的线性地址编译并加载内核模块 利用procfs查看进程的线性地址 自定义内核模块读取进程的线性地址 #include <linux/module.h> #include <linux/init.h> #include <linux/kernel.h…