【微服务】spring webflux使用详解

news2024/10/12 20:28:59

目录

一、webflux介绍

1.1 什么是webflux

1.2 什么是响应式编程

1.3 webflux特点

二、Java9中响应式编程

2.1 定义事件流源

2.2 实现订阅者

三、Spring Webflux介绍

四、Reactor 介绍

五、Reactor 常用API操作

5.1 Flux 创建流操作API

5.2 Flux响应流的订阅

5.3 Flux处理实时流

六、Spring Webflux 使用

6.1 Spring Webflux简介

6.1 Spring Webflux中的核心组件

6.2 Spring Webflux基于注解的实现

6.2.1 引入核心依赖

6.2.2 核心业务类

6.2.3 核心接口类

6.3 Spring Webflux 函数式编程实现

6.3.1 自定义handler

6.3.2 自定义server服务器

6.3.3 访问效果测试

6.3.4 使用webclient调用

6.4 Spring Boot RouterFunction 整合方式一

6.5 Spring Boot RouterFunction 整合方式二

6.5.1 静态化改造

七、webflux的使用场景

八、写在文末


一、webflux介绍

1.1 什么是webflux

webflux,即响应式编程。在JDK9中开始引入了响应式编程模型,而spring5.0版本之后正式引入对webflux的支持,即spring webflux,spring webflux是spring在5.0版本后提供的一套响应式编程风格的web开发框架。

1.2 什么是响应式编程

响应式编程是一种用于处理异步数据流和事件的编程范式。它的核心思想是将数据流看作是一系列事件的序列,通过对事件流的处理来实现计算。它强调基于事件的异步处理和函数式编程的思想,可以帮助开发人员更好地处理复杂的应用程序逻辑。

而响应式编程,其实就是为这种异步非阻塞的流式编程制定的一套标准。流式编程已不陌生了,Java8提供的stream api就是这种风格。这套标准包括对运行环境(JVM、JavaScript)以及网络协议相关的规范。

1.3 webflux特点

非阻塞式

在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程

函数式编程

Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求

二、Java9中响应式编程

Java 9引入了Flow API作为响应式编程的标准实现,具体来说:

  • Flow API提供了一组接口和类,用于定义和处理数据流;

  • 它基于Publisher-Subscriber模式,其中Publisher生成事件流并发布给Subscriber进行处理。

如果使用Java9中的响应式编程进行实现,核心需要两步:

  • 定义事件流源;
  • 实现订阅者;

下面来看一段具体的实现代码。

2.1 定义事件流源

在Flow API中,事件流源被定义为Publisher的实现类,具体来说,首先需要创建一个类实现Publisher接口,并重写其subscribe()方法。在subscribe()方法中,可以通过调用Subscriber的onSubscribe()方法来将事件流订阅给Subscriber。

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class EventPublisher implements Flow.Publisher<String> {

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        subscriber.onSubscribe(new SimpleSubscription(subscriber));
    }
}

2.2 实现订阅者

订阅者是实现Subscriber接口的类。在Flow API中,只需要实现Subscriber接口的onNext()、onError()和onComplete()方法;

  • 当事件流发出下一个元素时,onNext()方法将被调用;

  • 当发生错误时,onError()方法将被调用;

  • 当事件流结束时,onComplete()方法将被调用;

在这些方法中,我们可以根据业务需要添加处理事件流的数据相关逻辑。

import java.util.concurrent.Flow;

public class EventSubscriber implements Flow.Subscriber<String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received item: " + item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error occurred: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Event stream completed.");
    }
}

测试代码

import java.util.concurrent.Flow;

public class Main {
    public static void main(String[] args) {
        EventPublisher publisher = new EventPublisher();
        EventSubscriber subscriber = new EventSubscriber();

        publisher.subscribe(subscriber);

        publisher.submit("Event 1");
        publisher.submit("Event 2");
        publisher.submit("Event 3");

        publisher.close();
    }
}

三、Spring Webflux介绍

是Spring5添加新的模块,用于web开发的,功能和SpringMVC类似的,Webflux使用当前一种比较流程响应式编程出现的框架。spring官方文档地址:Web on Reactive Stack :: Spring Framework

spring-webflux是spring web框架体系中的一个组成模块,说起这个WebFlux,不难会拿出来与Spring Web与WebMvc进行比较,因为在目前很多项目开发中,仍然会使用WebMVC进行开发,尽管springboot成为基础的开发框架,但是接口开发中核心组件还是WebMVC的进一步封装。

四、Reactor 介绍

可以这么理解,响应式编程中的核心实现在于Reactor 的实现和应用,具体来说,Reactor是满足Reactive规范框架。具体来说:

  • 对响应式流规范的一种实现;

  • Spring Webflux默认的响应式框架;

  • 完全异步非阻塞,对背压的支持;

  • 提供两个异步序列API,Flux[N]和Mono[0|1];

  • 提供对响应式流的操作;

在Reactor中,有两个核心类,Flux和Mono ,这两个类实现接口 Publisher,提供丰富操作符。

  • Flux 对象实现发布者,返回 N 个元素,即产生0到N个元素的异步序列;

  • Mono 实现发布者,返回 0 或者 1 个元素,即产生至多一个元素的异步序列。

Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者。

三种数据信号特点:

  • 错误信号和完成信号都是终止信号,不能共存的;
  • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流;
  • 如果没有错误信号,没有完成信号,表示是无限数据流;

五、Reactor 常用API操作

接下来通过实际操作来演示下基于Reactor 常用的API的使用。引入如下依赖包。

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.5.5</version>
</dependency>

5.1 Flux 创建流操作API

在上面提到,如果你需要创建多于一个元素的异步序列,可以考虑使用Flux 相关API,下面是使用Flux 的创建多种形式流的操作

import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

public class ReactorApi {

    @Test
    public void fluxJust() {
        Flux<String> phones = Flux.just("小米", "三星", "华为");
    }

    @Test
    public void fluxFromIterable() {
        Flux<String> phones = Flux.fromIterable(Arrays.asList("小米", "三星", "华为"));
    }

    @Test
    public void fluxFromArray() {
        Flux<String> phones = Flux.fromArray(new String[]{"小米", "三星", "华为"});
    }

    @Test
    public void fluxFromStream() {
        Flux<String> phones = Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"}));
        phones.subscribe();
        phones.subscribe(); //只能被订阅一次
    }

    @Test
    public void fluxEmpty() {
        Flux<String> phones = Flux.empty(); //generic type still honored
    }

    @Test
    public void fluxRange() {
        Flux<Integer> phones = Flux.range(5, 3);
    }

    @Test
    public void fluxGenerate() {
        Flux<Long> flux = Flux.generate(
                AtomicLong::new,
                (state, sink) -> {
                    long i = state.getAndIncrement();
                    sink.next(i);
                    if (i == 10) sink.complete();
                    return state;
                },
                (state) -> System.out.println("done")
        );
        flux.subscribe(System.out::println);
    }


    @Test
    public void fluxCreate() {
        Flux<String> phones = Flux.create((t) -> {
            t.next("小米");
            t.next("三星");
            t.next("华为");
            t.complete();
        });

        phones.subscribe(System.out::println);

        System.out.println("------------");

        Flux<String> ownFluxListener = Flux.create(sink -> {
            //传入自定义的方法
            new MyDataListener(){
                public void onReceiveData(String str){
                    sink.next(str);
                }
                public void onComplete(){
                    sink.complete();
                }
            };
        }, FluxSink.OverflowStrategy.DROP);

        ownFluxListener.subscribe(System.out::println);

    }

    public class MyDataListener{
        public void onReceiveData(String str){
            System.out.println("收到数据:"+str);
        }

        public void onComplete(){
            System.out.println("完成数据的消费处理");
        }
    }

    @Test
    public void fluxDefer() {
        Flux.defer(() -> Flux.just("小米", "三星", "华为"))
                .subscribe(System.out::println);

        Flux<String> stockSeq4 = Flux.defer(() -> Flux.fromStream(Stream.of(new String[]{"小米", "三星", "华为"})));
        stockSeq4.subscribe();
        stockSeq4.subscribe();
    }

    @Test
    public void fluxInterval() throws InterruptedException {
        //interval 定时发送元素
        Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
                .subscribe((t) -> System.out.println((String.valueOf(t))));
        Thread.sleep(1000000);
    }

}

5.2 Flux响应流的订阅

在上面的操作API中,调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的。接下来,看看如何订阅和操作这些流。

import org.junit.Test;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;

public class ReactorStreamApi {

    /**
     * 流的map操作
     */
    @Test
    public void streamMap() {
        Flux<Integer> ints = Flux.range(1, 4);
        Flux<Integer> mapped = ints.map(i -> i * 2);
        mapped.subscribe(System.out::println);
    }

    /**
     * 带有异常情况的处理
     */
    @Test
    public void withError() {
        Flux<Integer> ints = Flux.range(1, 4)
                .map(i-> {
                    if(i<3){
                        return i;
                    }
                    throw new RuntimeException("大于3");
                });
        ints.subscribe(
                i-> System.out.println(i),
                err -> System.out.println("error : " + err.getMessage()),
                () -> System.out.println("完成订阅和数据的消费")
        );
    }

    @Test
    public void testSubscribeWithBase(){
        Flux<Integer> ints = Flux.range(1, 4);
        ints.subscribe(new MySubscriber<>());
    }

    public class MySubscriber<T> extends BaseSubscriber<T> {
        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            System.out.println("MySubscriber");
            request(1);
        }

        @Override
        protected void hookOnNext(T value) {
            System.out.println(value.toString());
            request(1);
        }
    }

    /**
     * 流的filter操作
     */
    @Test
    public void streamFilter() {
        Flux<Integer> ints = Flux.range(1, 4);
        Flux<Integer> filtered = ints.filter(i -> i % 2 == 0);
        filtered.subscribe(System.out::println);
    }

    @Test
    public void streamBuffer() {
        Flux<Integer> ints = Flux.range(1, 40);
        Flux<List<Integer>> buffered = ints.buffer(3);
        buffered.subscribe(System.out::println);
    }

    @Test
    public void streamRetry() {
        Mono<String> client = Mono.fromSupplier(() -> {
            double num = Math.random();
            if (num > 0.01) {
                throw new Error("Network issue");
            }
            return "https://www.baidu.com";
        });
        client.retry(3).subscribe(System.out::println);
    }

    /**
     * 响应式流的合并
     */
    @Test
    public void streamZip(){
        Flux<Integer> fluxA = Flux.range(1, 4);
        Flux<Integer> fluxB = Flux.range(5, 5);
        fluxA
                .zipWith(fluxB, (a, b)-> a+b)
                .subscribe(System.out::println);
    }

}

5.3 Flux处理实时流

对于某些需要实时处理的场景,可以考虑Flux的实时流的处理

import org.junit.Test;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

import static reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST;

public class StreamTest {

    @Test
    public void simpleHotStreamCreation() {
        Sinks.Many<Integer> hotSource = Sinks.unsafe().many().multicast().directBestEffort();
        //转为flux
        Flux<Integer> hotFlux = hotSource.asFlux();
        //订阅数据
        hotFlux.subscribe(d -> System.out.println("Subscriber 1 to Hot Source: " + d));

        hotSource.emitNext(1, FAIL_FAST);
        hotSource.tryEmitNext(2).orThrow();

        hotFlux.subscribe(d -> System.out.println("Subscriber 2 to Hot Source: " + d));

        hotSource.emitNext(3, FAIL_FAST);
        hotSource.emitNext(4, FAIL_FAST);
        hotSource.emitComplete(FAIL_FAST);

    }

    @Test
    public void connectableFlux() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 4);
        ConnectableFlux<Integer> connectableFlux = source.publish();
        connectableFlux.subscribe(d -> System.out.println("Subscriber 1 gets " + d));
        connectableFlux.subscribe(d -> System.out.println("Subscriber 2 gets " + d));
        System.out.println("Finish subscribe action");
        Thread.sleep(1000L);
        System.out.println("Connect to Flux now");
        connectableFlux.connect();
    }

    @Test
    public void autoConnectConnectableFlux() throws InterruptedException {
        Flux<Integer> source = Flux.range(1, 4);
        Flux<Integer> autoConnect = source.publish().autoConnect(2);
        autoConnect.subscribe(d -> System.out.println("Subscriber 1 gets " + d));
        System.out.println("Finish subscriber 1 action");
        Thread.sleep(1000L);
        System.out.println("Start subscriber 2 action");
        autoConnect.subscribe(d -> System.out.println("Subscriber 2 gets " + d));
    }

}

六、Spring Webflux 使用

6.1 Spring Webflux简介

在servlet3.0标准之前,是每一个请求对应一个线程。如果此时一个线程出现了高延迟,就会产生阻塞问题,从而导致整个服务出现严重的性能情况。因为一旦要调用第三方接口,就有可能出现这样的操作了。早期的处理方式只能是手工控制线程。

在servlet3.0标准之后,为解决此类问题,提供了异步响应的支持。在异步响应处理结构中,可以将耗时操作的部分交由一个专属的异步线程进行响应处理,同时请求的线程资源将被释放,并将该线程返回到线程池中,以供其他请求使用,这样的操作机制将极大的提升程序的并发性能。

对于以上给出的响应式编程支持,仅仅是一些原生的支持模式,而现在既然基于springboot程序开发,那么就需要考虑一些更简单的整合。

在spring中实现响应式编程,就需要使用到spring webFlux。该组件是一个重新构建的且基于Reactive Streams标准实现的异步非阻塞Web开发框架,以Reactor开发框架为基础,可以更加容易实现高并发访问下的请求处理模型。在springboot2.x版本中提供了webFlux依赖模块,该模块有两种模型实现:一种是基于功能性端点的方式(编程式实现),另一种是基于SpringMVC注解方式。

6.1 Spring Webflux中的核心组件

Spring Webflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻 塞的框架。Spring Webflux 执行过程和 SpringMVC 相似的 Spring Webflux 核心控制器 DispatchHandler,实现接口 WebHandler。

SpringWebflux 里面 DispatcherHandler,负责请求的处理,

  • HandlerMapping:请求查询到处理的方法

  • HandlerAdapter:真正负责请求处理

  • HandlerResultHandler:响应结果处理

SpringWebflux 实现函数式编程,两个接口:RouterFunction(路由处理)和 HandlerFunction(处理函数)

6.2 Spring Webflux基于注解的实现

6.2.1 引入核心依赖

注意,如果是在springboot项目中提供web接口,引入了下面的依赖之后就不要引入spring-boot-starter-web依赖了。

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

6.2.2 核心业务类

使用webflux编写web接口,与普通的rest-api类似,只是在webflux,返回值不再是对象或其他数据类型,而是Flux或Mono包装的数据对象。

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.HashMap;
import java.util.Map;

@Service
public class BookServiceImpl implements BookService {

    //创建 map 集合存储数据
    private final Map<String,BookInfo> books = new HashMap<>();

    public BookServiceImpl() {
        this.books.put("01",new BookInfo("01","Java",20));
        this.books.put("02",new BookInfo("02","Js",30));
        this.books.put("03",new BookInfo("03","Hadoop",50));
    }

    @Override
    public Mono<BookInfo> getById(String id) {
        return Mono.justOrEmpty(this.books.get(id));
    }

    @Override
    public Flux<BookInfo> getAll() {
        return Flux.fromIterable(this.books.values());
    }

    @Override
    public Mono<Void> saveBookInfo(Mono<BookInfo> bookInfoMono) {
        return bookInfoMono.doOnNext(book -> {
            //向 map 集合里面放值
            int id = books.size()+1;
            books.put(String.valueOf(id),book);
        }).thenEmpty(Mono.empty());
    }
}

6.2.3 核心接口类

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@RestController
public class BookController {

    @Autowired
    private BookService bookService;

    //根据ID查询 http://localhost:8082/book/01
    @GetMapping("/book/{id}")
    public Mono<BookInfo> getById(@PathVariable String id) {
        return bookService.getById(id);
    }
    
    //查询所有 http://localhost:8082/findAll
    @GetMapping("/findAll")
    public Flux<BookInfo> getUsers() {
        return bookService.getAll();
    }
   
    @PostMapping("/save")
    public Mono<Void> save(@RequestBody BookInfo user) {
        Mono<BookInfo> userMono = Mono.just(user);
        return bookService.saveBookInfo(userMono);
    }

}

选择其中一个接口测试,可以看到效果与传统的API接口返回值并无差别

补充说明:

1)SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat;

2)SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty;

6.3 Spring Webflux 函数式编程实现

在使用函数式编程模型操作时候,需要自己初始化服务器,基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数式接口的实现并且启动需要的服务器。

Spring Webflux 请 求 和 响 应 不 再 是 ServletRequest 和 ServletResponse ,而是ServerRequest 和 ServerResponse

熟悉Netty的同学对Netty的编码风格不陌生,在编写Netty的服务时,也需要自定义Handler,然后将这个自定义Handler配置到启动配置参数中,因此可以同样的方式来理解Spring Webflux的函数式编程的套路。

6.3.1 自定义handler

可以这么理解,在这个handler类中,其实就是对底层的业务方法进一步的封装,只不过返回的数据类型为Mono或Flux;

import com.congge.entity.BookInfo;
import com.congge.service.BookService;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class BookHandler {

    private BookService bookService;

    public BookHandler(BookService bookService) {
        this.bookService = bookService;
    }

    /**
     * 根据ID查询
     * @param serverRequest
     * @return
     */
    public Mono<ServerResponse> getBookById(ServerRequest serverRequest) {
        String id = serverRequest.pathVariable("id");
        Mono<BookInfo> bookInfoMono = this.bookService.getById(id);

        Mono<ServerResponse> noDataRes = ServerResponse.notFound().build();

        return bookInfoMono.flatMap(bookInfo ->
                ServerResponse
                        .ok()
                        .contentType(MediaType.APPLICATION_JSON)
                        .body(bookInfo, BookInfo.class)
                        .switchIfEmpty(noDataRes)
        );
    }

    /**
     * 获取所有
     * @return
     */
    public Mono<ServerResponse> getAllBooks(ServerRequest serverRequest) {
        //调用 service 得到结果
        Flux<BookInfo> bookInfoFlux = this.bookService.getAll();
        return
                ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(bookInfoFlux,BookInfo.class);
    }

    /**
     * 保存数据
     * @param request
     * @return
     */
    public Mono<ServerResponse> saveUser(ServerRequest request) {
        //得到 user 对象
        Mono<BookInfo> bookInfoMono = request.bodyToMono(BookInfo.class);
        return
                ServerResponse.ok().build(this.bookService.saveBookInfo(bookInfoMono));
    }

6.3.2 自定义server服务器

该类的作用就相当于是netty编程中,通过ServerBootstrap创建一个服务器类似;

import com.congge.handler.BookHandler;
import com.congge.service.BookService;
import com.congge.service.impl.BookServiceImpl;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.http.server.HttpServer;

import static org.springframework.web.reactive.function.server.RouterFunctions.toHttpHandler;

public class BookServer {

    public RouterFunction routerFunction() {

        BookService bookService = new BookServiceImpl();
        BookHandler bookHandler = new BookHandler(bookService);

        //设置路由
      /*  return RouterFunctions.route(
                GET("/users/{id}").and(accept(APPLICATION_JSON)),handler::getUserById)
                .andRoute(GET("/users").and(accept(APPLICATION_JSON)),handler::get
                        AllUsers);*/
        return RouterFunctions.route(
                RequestPredicates.GET("/users/{id}")
                        .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
                bookHandler::getBookById
        ).andRoute(
                RequestPredicates.GET("/users/{id}")
                        .and(RequestPredicates.accept(MediaType.APPLICATION_JSON)),
                bookHandler::getAllBooks
        );
    }

    public void createReactorServer() {
        //路由和 handler 适配
        RouterFunction<ServerResponse> route = routerFunction();
        HttpHandler httpHandler = toHttpHandler(route);
        ReactorHttpHandlerAdapter adapter = new
                ReactorHttpHandlerAdapter(httpHandler);
        //创建服务器
        HttpServer httpServer = HttpServer.create();
        httpServer.handle(adapter).bindNow();
    }

    public static void main(String[] args) throws Exception{
        BookServer server = new BookServer();
        server.createReactorServer();
        System.out.println("enter to exit");
        System.in.read();
    }

}

在该类的最后,编写了一个main函数,运行这个main程序,注意日志中的端口号,因为接下来将通过这个端口进行访问;

6.3.3 访问效果测试

访问接口:localhost:51315/book/01

6.3.4 使用webclient调用

也可以编写webclient调用上面的接口,代码如下

import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

public class ClientTest {

    public static void main(String[] args) {
        //调用服务器地址
        WebClient webClient = WebClient.create("http://127.0.0.1:51315");
        //根据 id 查询
        String id = "01";
        BookInfo bookInfo = webClient.get().uri("/book/{id}", id)
                .accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(BookInfo.class)
                .block();
        System.out.println(bookInfo.getName());

        //查询所有
        Flux<BookInfo> results = webClient.get().uri("/book/findAll").accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(BookInfo
                        .class);
        results.map(stu -> stu.getName())
                .buffer().doOnNext(System.out::println).blockFirst();
    }

}

6.4 Spring Boot RouterFunction 整合方式一

上面是通过自定义handler的方式实现了Spring Webflux函数式编程,如果直接在springboot中直接集成怎么做呢,只需要通过自定义配置bean的方式,将路由配置进去即可;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.BodyInserters.fromObject;
import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
import static org.springframework.web.reactive.function.server.ServerResponse.ok;

@Configuration
public class MyRoutesConfig {

    @Bean
    RouterFunction<ServerResponse> index() {
        return route(GET("/index"), request -> ok().body(fromObject("Hello Index")));
    }

    @Bean
    RouterFunction<ServerResponse> about() {
        return route(GET("/about"), request -> ok().body(fromObject("About page")));
    }

}

当然里面的逻辑非常简单,实际使用时,可以在每个bean中补充更复杂的逻辑,比如调用其他业务类的逻辑,同样我们启动springboot应用后访问下端点/index,看到下面的效果。

6.5 Spring Boot RouterFunction 整合方式二

紧接着上面的案例,下面使用更通用的做法来完成与RouterFunction 的整合,首先还是自定义一个handler,这种自定义的配置类形式handler好处是可以注入其他业务类,从而实现更复杂的逻辑。

import com.congge.entity.BookInfo;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class ApiHandler {

    public Mono<ServerResponse> getNewBooks(ServerRequest serverRequest) {
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(
                        Flux.create(sink ->{
                            sink.next(new BookInfo("05","mysql",90));
                            sink.next(new BookInfo("06","flink",78));
                            sink.next(new BookInfo("07","php",66));
                            sink.complete();
                        }),BookInfo.class
                );
    }

    public Mono<ServerResponse> getBookById(ServerRequest serverRequest) {
        String bookId = serverRequest.pathVariable("id");
        return ServerResponse.ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(
                        Mono.just(
                                new BookInfo(bookId,"python",57)
                        ),BookInfo.class
                );
    }

}

自定义routerFunction,可以这么理解,通过这个类,就不用再单独编写一个controller,从而实现与普通的controller类中一样定义接口的功能。

import com.congge.handler.ApiHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.ServerResponse;

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;

@Configuration
public class ApiRouterFunction {

    @Bean
    public RouterFunction<ServerResponse> apiRoute(ApiHandler apiHandler){
        return route(
                GET("/book/getBookById/{id}"),
                apiHandler::getBookById
        ).and(
                route(
                        GET("/book/getNewBooks"),
                        apiHandler::getNewBooks
        ));

    }

}

启动项目之后,我们来访问一下其中的一个接口,效果与普通的接口效果类似。

6.5.1 静态化改造

如果你不希望上面的自定义handler和routerConfig与spring框架耦合的太紧密,也可以将其做成静态化的配置,通过app启动的时候自动注册,只需去掉spring相关的注解,然后在app启动类注册进去即可。

ApiRouterFunction改造,将原本的配置bean方法修改为static 如下代码

public class ApiRouterFunction {

    public static RouterFunction<ServerResponse> apiRoute(){
        ApiHandler apiHandler = new ApiHandler();
        return route(
                GET("/book/getBookById/{id}"),
                apiHandler::getBookById
        ).and(
                route(
                        GET("/book/getNewBooks"),
                        apiHandler::getNewBooks
                ));

    }

}

ApiHandler改造,去掉配置注解,启动类改造如下

public static void main(String[] args) {
        new SpringApplicationBuilder()
                .sources(FluxApp.class)
                .initializers((ApplicationContextInitializer<GenericApplicationContext>) ctx ->{
                    ctx.registerBean("apiRoute",RouterFunction.class,ApiRouterFunction::apiRoute);
                })
                .run(args)
        ;
    }

再次启动后调用相同的接口,仍然可以得到正确的响应结果

七、webflux的使用场景

通过上面关于webflux的使用了解到webflux的强大之处,其实在很多中间件,微服务组件中都随处可见webflux的响应式编程的影子,比如在springcloud gateway网关中,网关作为流量的入口,为了持续提升整体服务的高性能、高吞吐、高并发的请求,在处理请求拦截、路由转发等方面使用webflux。如下这段代码,就是gateway中自定义过滤器的一段配置;

@Component
@Slf4j
public class LogFilter implements GlobalFilter {
 
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        log.info(exchange.getRequest().getPath().value());
        return chain.filter(exchange);
    }
}

结合实际经验,对于下面的这些场景,可以考虑使用webflux解决:

  • Spring WebFlux 是一个异步非阻塞式的 Web 框架,所以,它特别适合应用在 IO 密集型的服务中,比如像上面提到的微服务网关这样的应用中;
  • 硬件资源扩充困难,但又希望提升系统整体的吞吐量,可以考虑使用webflux,因为WebFlux 内部使用的是响应式编程(Reactive Programming),以 Reactor 库为基础, 基于异步和事件驱动;
  • 一些对请求响应时间要求不高,但是并发较大的异步场景;

注意

WebFlux 并不能使接口的响应时间缩短,它仅仅能够提升吞吐量和伸缩性。

八、写在文末

从WebFlux 的发展以及在众多的Java生态组件中广泛使用来看,WebFlux 的流行趋势已经到来,因此掌握WebFlux 的核心原理和思想,在日常工作开发中,在某些特殊的场景下能够提供很好的解决思路,当然WebFlux 涉及到的技术点还有很多,比如对websocket的支持等,有兴趣的同学可以继续参阅相关资料深入学习,本篇到此结束,感谢观看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1108705.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PHP WAP餐厅点餐系统mysql数据库web结构apache计算机软件工程网页wamp

一、源码特点 PHP餐厅点餐系统是一套完善的web设计系统&#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 PHP WAP餐厅点餐系统 代码 https://download.csdn.net/download/qq_41221322/88440001 二、…

Kubeadm部署k8s集群 kuboard

目录 主机准备 主机配置 修改主机名&#xff08;三个节点分别执行&#xff09; 配置hosts&#xff08;所有节点&#xff09; 关闭防火墙、selinux、swap、dnsmasq(所有节点) 安装依赖包&#xff08;所有节点&#xff09; 系统参数设置(所有节点) 时间同步(所有节点) 配…

【密码学】第三章、分组密码

DES、IDEA、AES、SM4 1、分组密码定义&#xff08;按照五个组成部分答&#xff09; 密钥空间&#xff1a;属于对称加密算法kekd明密文空间&#xff1a;将明文划分为m比特的组&#xff0c;每一块依次进行加密加解密算法&#xff1a;由key决定一个明文到密文的可逆映射 2、发展…

C语言入门-1.1 C语言概述

想要学好一门计算机编程语言&#xff0c;就和谈一个女朋友是一样的&#xff0c;需要对其深入了解。 1、计算机语言 &#xff08;1&#xff09;什么是计算机语言&#xff1f; 顾名思义&#xff0c;就是计算机之间交流的语言&#xff0c;就和人一样&#xff0c;咱们都是使用普通…

中文编程开发语言工具编程实际案例:美发店会员管理系统软件编程实例

中文编程开发语言工具编程实际案例&#xff1a;美发店会员管理系统软件编程实例 中文编程开发语言工具编程实际案例&#xff1a;美发店会员管理系统软件编程实例。 软件功能&#xff1a; 1、系统设置&#xff1a;参数设定&#xff0c;账号及权限设置&#xff0c;系统初始化&a…

PHP的四层架构

PHP的4层架构是一种软件设计模式&#xff0c;用于将一个PHP应用程序划分为不同的层次&#xff0c;以实现解耦、可扩展和易于维护的代码结构。这个架构通常由以下四个层次组成&#xff1a; 1、 表现层&#xff08;Presentation Layer&#xff09;&#xff1a; 表现层是与用户直…

动态规划解股票类型

文章目录 单只股票买卖多次买卖单只股票最多两次买卖股票最多买k次含冷静期含手续费 单只股票买卖 买卖股票的最佳时机 关键思路&#xff1a;找到一个值&#xff0c;他与之后的最大值之差最大。 用minprice记录最小的值&#xff0c;用maxprofit记录最大的收益。 想清楚一个点…

麒麟kylinOS 2303制作自定义免交互安装镜像

原文链接&#xff1a;麒麟kylinOS 2303制作自定义免交互安装镜像 hello&#xff0c;大家好啊&#xff0c;今天给大家带来一篇麒麟kylinOS 2303制作自定义免交互ISO安装镜像的文章&#xff0c;内容相对来说比较简单&#xff0c;测试安装了一个360浏览器软件&#xff0c;后续复杂…

c语言练习93:环形链表的约瑟夫问题

环形链表的约瑟夫问题 环形链表的约瑟夫问题_牛客题霸_牛客网 描述 编号为 1 到 n 的 n 个人围成一圈。从编号为 1 的人开始报数&#xff0c;报到 m 的人离开。 下一个人继续从 1 开始报数。 n-1 轮结束以后&#xff0c;只剩下一个人&#xff0c;问最后留下的这个人编号是…

【CesiumforUnreal插件】UE5 快速构建Cesium场景 快速入门!!!

目录 0 引言1 快速入门1.1 准备1.2 安装Cesium for Unreal插件并创建一个项目1.3 准备关卡并添加地形和纹理1.4 添加3D建筑到场景中1.5 探索场景 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;CesiumforUnreal专栏&#x1f4a5; 标题&#xff1a…

论坛介绍|COSCon'23开源商业(V)

众多开源爱好者翘首期盼的开源盛会&#xff1a;第八届中国开源年会&#xff08;COSCon23&#xff09;将于 10月28-29日在四川成都市高新区菁蓉汇举办。本次大会的主题是&#xff1a;“开源&#xff1a;川流不息、山海相映”&#xff01;各位新老朋友们&#xff0c;欢迎到成都&a…

C++数据结构X篇_17_C++实现二叉树的非递归遍历(企业链表实现栈,利用栈的先进后出特点实现二叉树的非递归遍历)

本篇参考C实现二叉树的非递归遍历进行整合介绍。 在C数据结构X篇_14_二叉树的递归遍历&#xff08;先序遍历、中序遍历、后续遍历方法介绍&#xff1b;举例&#xff1b;代码实现&#xff09;中我们实现二叉树通过递归遍历实现了先序、中序与后续遍历&#xff0c;那么如何通过非…

Confluence 自定义博文列表

1. 概述 Confluence 自有博文列表无法实现列表自定义功能&#xff0c;实现该需求可采用页面中引用博文宏标签控制的方式 2. 实现方式 功能入口&#xff1a; Confluence →指定空间→创建页面 功能说明&#xff1a; &#xff08;1&#xff09;页面引用博文宏 &#xff08;…

标准化助推开源发展丨九州未来参编开源领域4项团体标准正式发布

在数字中国及数字经济时代的大背景下&#xff0c;开源逐步成为各行业数字化发展的关键模式。在开源产业迅速发展的同时&#xff0c;如何评估、规范开源治理成为行业极度关注的问题。 近日&#xff0c;中电标2023年第27号团体标准公告正式发布&#xff0c;九州未来作为起草单位…

云表:只需3步,让你搞懂低代码和传统开发有什么区别

自2014年Forrester明确提出低代码&#xff08;Low-Code&#xff09;概念以来&#xff0c;这个领域已经引起了广泛的关注&#xff0c;并逐渐受到越来越多的重视。近年来&#xff0c;低代码因为其低开发门槛、易用性等优点&#xff0c;赢得了众多投资研究机构和企业用户的青睐&am…

【Vue】终结v-model

v-model修饰符 .lazy 默认 v-model 是输入框内容每次改变都会更新数据 加了 .lazy 后,只有在输入框失去焦点时才会更新数据 例如输入用户名,只有离开输入框时才保存用户名 // 输入的时候不会立即加载&#xff0c;等失去焦点时会加载 <input v-model.lazy"msg"…

Python打造一个词云制作软件

文章目录 参数字典布局测试结果 参数字典 自从做了热榜的词云之后&#xff0c;就越来越觉得词云的表达力真的很强&#xff0c;所以合计是不是可以为WordCloud做一个界面&#xff0c;来更加直观地操作。 既然以WordCloud为核心&#xff0c;那么界面的组件自然要和WordCloud的参…

GEO生信数据挖掘(九)肺结核数据-差异分析-WGCNA分析(900行代码整理注释更新版本)

第六节&#xff0c;我们使用结核病基因数据&#xff0c;做了一个数据预处理的实操案例。例子中结核类型&#xff0c;包括结核&#xff0c;潜隐进展&#xff0c;对照和潜隐&#xff0c;四个类别。第七节延续上个数据&#xff0c;进行了差异分析。 第八节对差异基因进行富集分析。…

王道计算机考研 操作系统学习笔记篇章一:操作系统概念

目录 操作系统的概念 操作系统的功能和目标 操作系统的特征 并发 共享 虚拟 异步 操作系统的发展和分类 三大阶段 手工操作阶段 批次处理阶段—单道批处理系统 批处理阶段—多道批处理系统 操作系统分类 分时操作系统 实时操作系统 其他操作系统 操作系统的运行机制 预备知识 …

CV计算机视觉每日开源代码Paper with code速览-2023.10.18

精华置顶 墙裂推荐&#xff01;小白如何1个月系统学习CV核心知识&#xff1a;链接 点击CV计算机视觉&#xff0c;关注更多CV干货 论文已打包&#xff0c;点击进入—>下载界面 点击加入—>CV计算机视觉交流群 1.【语义分割】IDRNet: Intervention-Driven Relation Netw…