ReactiveReactor Core

news2025/1/19 20:56:42

Reactive&Reactor Core

  • 一、概述
    • 1、问题
    • 2、优势
    • 3、发展
  • 二、Reactive Streams
    • 1、依赖
    • 2、API
  • 三、Project Reactor
    • 1、概述
    • 2、并发模型
    • 3、入门
      • 1、依赖
      • 2、Flux和Mono
      • 3、空流&错误流
    • 4、订阅响应式流
      • 1、常见订阅
      • 2、自定义订阅
    • 5、API
      • 1、index
      • 2、timestamp
      • 3、any
      • 4、map
      • 5、filter
      • 6、collect
      • 7、distinct
      • 8、flatMap
      • 9、scan
      • 10、thenMany
      • 11、组合响应流
      • 12、流元素批处理
      • 13、采样
      • 14、响应流转化为阻塞流
      • 15、物化和非物化
    • 6、编程方式创建流
      • 1、push&create
      • 2、generate
      • 3、using-disposable
      • 4、usingWhen

一、概述

1、问题

传统的命令式编程在面对当前需求时会有一些限制,比如,在应用负载较高时,应用需要有更高的可用性,并提供低的延迟时间。

1)资源消耗大

使用Servlet开发的单体应用,是基于传统的Thread per Request模型。当服务部署到Tomcat后,Tomcat有线程池,每个请求交给线程池中的一个线程来执行,如果执行过程中包括访问数据库,或者包括读取文件,则在调用数据库时或读取文件时,请求线程是阻塞的,即使是阻塞线程也是占用资源的,典型的每个线程要使用1MB的内存。如果有并发请求,则会同时有多个线程处于阻塞状态,每个线程占据一份资源。

同时,Tomcat的线程池大小决定了可以同时处理多少个请求。如果应用基于微服务架构,我们可以横向扩展,但是也有内存高占用的问题。因此,当并发数很大的时候,Thread per Request模型很消耗资源。

2)压垮客户端

服务A请求服务的数据,如果数据量很大,超过了服务能处理的程度,则导致服务OOM


2、优势

使用响应式编程的优势:

  1. 不用Thread per request模型,使用少量线程即可处理大量的请求
  2. 在执行I/O操作时不让线程等待
  3. 简化并行调用
  4. 支持背压,让客户端告诉服务端它可以处理多少负载。

3、发展

Reactive Streams:

Reactive Streams是个规范,它规范了“有非阻塞背压机制的异步的流处理”。真正正确理解异步、非阻塞并不容易。实际上Reactive Streams规范或者说它的第三方代码实现包含的内容更加丰富:除了non-blocking,还有:Composable、Deferred、Flow Controll、Resilient、Interruptible。

其中Composable就是函数式编程思想的体现。 可体会下Java8里的Stream API各种算子的参数,所以Lamda表达式是进行Reactive Streams实现的基本前提,否则很难想象臃肿的面向对象的Composable。有了JDK8的铺垫,Reactive Streams接口被JDK9定义在Flow里才是可能的


ReactiveX Java(Rx Java):

2011年,微软发布了NET的响应式扩展(Reactive Extensions,即ReactiveX或Rx),以方便异步、事件驱动的程序。ReactiveX混合了迭代模式和观察者模式。不同之处在于一个是推模式,一个是基于迭代器的拉模式。除了对变化事件的观察,完成事件和异常事件也会发布给订阅者。

ReactiveX的基本思想是事件是数据,数据是事件。响应式扩展被移植到几种语言和平台上,当然包括JavaScript、Python、C++、Swift和Java。ReactiveX很快成为一种跨语言的标准,将反应式编程引入到行业中。

RxJava 1.0于2014年11月发布。RxJava是其他Reactivex JVM端口的主干,比如Rx Scala、Rx Kotin、RxGroovy。它已经成为Android开发的核心技术,并且已经进入Java后端开发。许多RxJavaAdapter库,例如RxAndroid、RxJava JDBC、Rx Netty和RxJavaF X调整了几个Java框架,使之成为响应式的,并且可以开箱即用地使用RxJava


Akka:

Akka是一个受欢迎的框架,具有大量功能和大型社区。然而,Akka最初是作为Scala生态系统的一部分构建的,在很长一段时间内,它仅在基于Scala编写的解决方案中展示了它的强大功能。尽管Scala是一种基于JVM的语言,但它与Java明显不同。几年前,Akka直接开始支持Java,但出于某些原因,它在Java世界中不像在Scala世界中那么受欢迎。

Vert.x:

Vert.x框架也是构建高效响应式系统的强大解决方案。Vert.x的设计初衷是作为Node.js在Java虚拟机上的替代方法,它支持非阻塞和事件驱动。然而,Vert.x仅在几年前才开始具备竞争力。


Project Reactor:

既然 Spring 都提供了对 Reactive Streams 的实现,感觉其实上面列出的几个库已经没有太多的意义。各家对Reactive Streams规范的实现在细节上都有很大不同,因为Spring 的生态太强大了,如果没有特殊的需求,比如JDK小于8,那么我们的项目使用Project Reactor应该是较好的选择。

Project Reactor 到目前为止经历了 1.0、2.0、 3.0。其中1.0这个阶段还没有Reactive Stream的规范。在2.0开始Follow 规范并基本定型。3.0 感觉是个重构版,形成 Reactive-Streams-commons库。

有了Project Reactor这样的基础库,整个Spring组件基本都有了Reactive Style的版本,在这个基础上用Netty(或 Servet 3.1 Containe)+ Reactive Streams 适配层 + Spring Security Reactive + WebFlux + Spring Data Reactive Repository,就可以构建出重头到尾的 Reactive 应用。

从Spring Cloud的组件角度讲,也衍生出Reactive Discovery Client、Reactive Load Balancer、Blockhound, Reactor Debug、Improved Reactor Micrometer Support、Reactor Netty Metric…




二、Reactive Streams

是一种规范
访问地址:https://www.reactive-streams.org/https://github.com/reactive-streams/reactive-streams-jvm/tree/v1.0.4

Reactive Streams是一种基于异步流处理的标准化规范,旨在使流处理更加可靠、高效和响应式。反应式流的特点简单来说就是:基本特性(变化传递 + 数据流 + 声明式) + 高级特性(非阻塞回压 + 异步边界)


Reactive Streams的核心思想是让发布者(Publisher)和订阅者(Subscriber)之间进行异步流处理,以实现非阻塞的响应式应用程序。发布者可以产生任意数量的元素并将其发送到订阅者,而订阅者则可以以异步方式处理这些元素。Reactive Streams还定义了一些接口和协议,以确保流处理的正确性和可靠性。


1、依赖

<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams</artifactId>
  <version>1.0.4</version>
</dependency>
<dependency>
  <groupId>org.reactivestreams</groupId>
  <artifactId>reactive-streams-tck</artifactId>
  <version>1.0.4</version>
  <scope>test</scope>
</dependency>

2、API

Publisher:定义了生产元素并将其发送给订阅者的方法。

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Subscriber:定义了接收元素并进行处理的方法。

public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

Subscription:定义了订阅者和发布者之间的协议,包括请求元素和取消订阅等。

public interface Subscription {
    public void request(long n);
    public void cancel();
}

Processor:定义了同时实现Publisher和Subscriber接口的中间件,它可以对元素进行转换或者过滤等操作。

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}




三、Project Reactor

1、概述

Project Reactor由Reactor文档中列出的一组模块组成。主要组件是Reactor Core,其中包含响应式类型Flux和Mono,它们实现了Reactive Streams的Publisher接口以及一组可应用于这些类型的运算符。

其他模块:

  1. Reactor Test:提供一些实用程序来测试响应流
  2. Reactor Extra:提供一些额外的Flux运算符
  3. Reactor Netty:无阻塞且支持背压的TCP、HTTP和UDP的客户端和服务器
  4. Reactor Adapter:用于与其他响应式库,如RxJava和Akka Streams的适配
  5. Reactor Kafka:用于Kafka的响应式API,作为Kafka的生产者和消费者

2、并发模型

有两种在响应式链中切换执行某些的方式:publishOn和subscribeOn

区别如下:

  • publishOn(Scheduler Scheduler):影响所有后续运算符的执行(只要未指定其他任何内容)
  • subscribeOn(Scheduler Scheduler):根据链中最早的subscribeOn调用,更改整个操作符链所订阅的线程。它不影影响随后对publishOn的调用的行为

Schedulers类包含用于提供执行上下文的静态方法:

  • parallel():为并行工作而调整的固定工作池,可创建与cpu内核数量一样多的工作线程池。
  • single:单个可重用线程。此方法为所有调用方重用同一线程,直到调度程序被释放为止。如果您希望使用按调用专用线程,则可以为每个调用使用schedulers.newSingle()
  • boundedElastic:动态创建一定数量的工作者,它限制了它可以创建的支持线程的数量,并且可以在线程可用时重新调度要排队的任务。这是包装同步阻塞调用的不错选择
  • immediate():立即在执行线程上运行,而不切换执行上下文
  • fromExecutorService(Executorservice):可用于从任何现有ExecutorService中创建调度程序

3、入门

1、依赖

<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-core</artifactId>
  <version>3.3.5.RELEASE</version>
</dependency>
<dependency>
  <groupId>io.projectreactor</groupId>
  <artifactId>reactor-test</artifactId>
  <version>3.3.5.RELEASE</version>
</dependency>

2、Flux和Mono

Flux和Mono提供了许多工厂方法,可以根据已有的数据创建响应流。

Flux:

public static void main(String[] args) {
    Flux.just("d1", "d2").subscribe(System.out::print);
    Flux.fromArray(new String[]{"d1", "d2"}).subscribe(System.out::println);
    Flux.fromIterable(Arrays.asList("d1", "d2")).subscribe(System.out::println);
    Flux.range(1, 10).subscribe(System.out::println);
}

Mono:

public static void main(String[] args) {
    Mono.just(1).subscribe(System.out::println);
    Mono.justOrEmpty(Optional.empty()).subscribe(System.out::println);
    Mono.fromCallable(() -> "fromCallable").subscribe(System.out::println);
    Mono.fromSupplier(() -> "fromSupplier").subscribe(System.out::println);
    Mono.fromFuture(() -> CompletableFuture.completedFuture("fromFuture")).subscribe(System.out::println);
    Mono.fromCompletionStage(() -> CompletableFuture.completedFuture("fromCompletionStage")).subscribe(System.out::println);
    Mono.fromRunnable(() -> {
        System.out.println("fromRunnable");
    }).subscribe(System.out::println);
}

3、空流&错误流

  1. empty():工厂方法,它们分别生成Flux或Mono的空实例
  2. never():方法会创建一个永远不会发出完成、数据或错误等信号的流
  3. error(Throwable):工厂方法创建一个序列,该序列在订阅时始终通过每个订阅者的onError方法传播错误,由于错误是在Flux或Mono声明期间被创建的,因此,每个订阅者都会收到相同的Throwable实例
public static void main(String[] args) {
    Mono.never().subscribe(System.out::println);
    Flux.never().subscribe(System.out::println);

    
    Mono.empty().subscribe(System.out::println);
    Flux.empty().subscribe(System.out::println);

    // subscribe的第二个参数代表错误的捕获
    Mono.error(new RuntimeException("Mono error"))
            .subscribe(System.out::println, System.out::println);
    Flux.error(new RuntimeException("Flux error"))
            .subscribe(System.out::println, System.out::println);
}

4、订阅响应式流

Flux和Mono提供了对subscribe()方法的基于Lambda的重载,简化了订阅的开发。subscribe方法的所有重载都返回Disposable接口的实例,可以用于取消基础的订阅过程。

1、常见订阅

重载相关方法

public static void main(String[] args) {
    Flux.range(100, 10).subscribe(e -> {
        System.out.println("接收到:" + e);
    }, ex -> {
        System.out.println("发生异常:" + ex);
    }, () -> {
        System.out.println("执行完成");
    });
}

2、自定义订阅

自定义Subscriber类:

public static void main(String[] args) {
    Flux.range(100, 10).subscribe(new Subscriber<Integer>() {
        Subscription subscription;

        @Override
        public void onSubscribe(Subscription s) {
            subscription = s;
            subscription.request(1);
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("接收到:" + integer);
            subscription.request(1);
        }

        @Override
        public void onError(Throwable t) {
            System.out.println("发生异常:" + t);
        }

        @Override
        public void onComplete() {
            System.out.println("执行完成");
        }
    });
}

该定义订阅的方法存在一定问题。它打破了线性代码流,也容易出错。最困难的部分是需要自己管理背压并正确实现订阅者的所有TCK要求。在该示例中,就打破了有关订阅验证和取消这几个TCK要求。


建议扩展Project Reactor提供的BaseSubscriber类。在这种情况下订阅示例:

public static void main(String[] args) {
    Flux.range(100, 10).subscribe(new BaseSubscriber<Integer>() {
        @Override
        protected void hookOnError(Throwable throwable) {
            System.out.println("发生异常:" + throwable);
        }

        @Override
        protected void hookOnComplete() {
            System.out.println("执行完成");
        }

        @Override
        protected void hookOnSubscribe(Subscription subscription) {
            request(1);
        }

        @Override
        protected void hookOnNext(Integer value) {
            System.out.println("接收到:" + value);
            request(1);
        }
    });
}

使用BaseSubscriber类,实现符合TCK的订阅者更为容易。订阅者在本身拥有生命周期管理的宝贵资源时,会需要这种方法,例如,订阅者可能包装文件处理程序或连接到第三方服务的WebSocket链接。


5、API

SDK内部还有具体的弹珠图示意
如:Flux.collect(Supplier containerSupplier, BiConsumer<E, ? super T> collector)

1、index

public static void main(String[] args) {
    Flux.range(100, 10)
            .index()
            .subscribe(System.out::println);
}

结果:

2、timestamp

public static void main(String[] args) {
    Flux.range(100, 10)
            .timestamp()
            .subscribe(System.out::println);
}

结果:

3、any

public static void main(String[] args) {
    Flux.just("456", "789", "123")
            .all(e -> e.equalsIgnoreCase("123")).subscribe(System.out::println);
    Flux.just("456", "789", "123")
            .any(e -> e.equalsIgnoreCase("123")).subscribe(System.out::println);
    Flux.just("456", "789", "123")
            .hasElement("123").subscribe(System.out::println);
    Flux.just("456", "789", "123")
            .hasElements().subscribe(System.out::println);
    Flux.just("456")
            .hasElements().subscribe(System.out::println);
    Flux.empty()
            .hasElements().subscribe(System.out::println);
    try {
        System.in.read();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

结果:

4、map

public static void main(String[] args) {
    Flux.range(100, 10)
            .map(w -> "map:" + w)
            .subscribe(System.out::println);
}

结果:

5、filter

  1. take(n)操作符限制所获取的元素,该方法忽略除前n个元素之外的所有
  2. takeLast仅返回流的最后一个元素。
  3. takeUntil(Predicate)传递一个元素直到满足某个条件。
  4. elementAt(n)只可用于获取序列的第n个元素。
  5. single操作符从数据源发出单个数据项,也为空数据源发出NoSuchElementException错误信号,或者为具有多个元素的数据源发出IndexOutOfBoundsException信号。它不仅可以基于一定数量来获取或跳过元素,还可以通过带有Duration的skip(Duration)或take(Duration)操作符。
  6. takeUntilOther(Publisher)或skipUntilOther(Publisher)操作符,可以跳过或获取一个元素,直到某些消息从另一个流到达。
public static void main(String[] args) {
    // filter
    Flux.range(100, 10)
            .filter(e -> e.equals(105))
            .subscribe(e -> {
                System.out.printf(e + "\t");
            });
    System.out.println();
    // take
    Flux.range(100, 10)
            .take(5)
            .subscribe(e -> {
                System.out.printf(e + "\t");
            });
    System.out.println();
    // takeLast
    Flux.range(100, 10)
            .takeLast(3)
            .subscribe(e -> {
                System.out.printf(e + "\t");
            });
    System.out.println();
    // takeUntil
    Flux.range(100, 10)
            .takeUntil(e -> e.equals(108))
            .subscribe(e -> {
                System.out.printf(e + "\t");
            });
    System.out.println();
    // elementAt
    Flux.range(100, 10)
            .elementAt(2)
            .subscribe(e -> {
                System.out.printf(e + "\t");
            });
    System.out.println();
    // single
    Flux.range(100, 1)
            .single()
            .subscribe(e -> {
                System.out.printf(e + "\t");
            }, ex -> {
                System.out.printf("ex:" + ex + "\t");
            });
    System.out.println();
    Flux.range(100, 10)
            .single()
            .subscribe(e -> {
                System.out.printf(e + "\t");
            }, ex -> {
                System.out.printf("ex:" + ex + "\t");
            });
    System.out.println();
    Flux.empty()
            .single()
            .subscribe(e -> {
                System.out.printf(e + "\t");
            }, ex -> {
                System.out.printf("ex:" + ex + "\t");
            });
    
    System.out.println();
    // skipUntilOther&takeUntilOther
    Mono<String> start = Mono.just("start").delayElement(Duration.ofSeconds(1));
    Mono<String> stop = Mono.just("stop").delayElement(Duration.ofSeconds(3));
    Flux.range(100, 10)
            .delayElements(Duration.ofMillis(400))
            .map(item -> "map:" + item)
            .skipUntilOther(start)
            .takeUntilOther(stop)
            .subscribe(e -> {
                System.out.printf(e + "\t");
            });
    try {
        // 阻塞线程,避免延时任务提前结束
        System.in.read();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

结果:

6、collect

收集列表中的所有元素,并使用Flux.collectList()和Flux.collectSortedList()将结果集合处理为Mono流是可能的。Flux.collectSortedList()不仅会收集元素,还会对它们进行排序。

请注意,收集集合中的序列元素可能耗费资源,当序列具有许多元素时这种现象尤为突出。此外,尝试在无限流上收集数据可能消耗掉所有可用的内存。

public static void main(String[] args) {
    Flux.just("123", "456", "789", "123")
            .subscribe(System.out::println);
    Flux.just("456", "789", "123")
            .collectList().subscribe(System.out::println);
    Flux.just("456", "789", "123")
            .collectSortedList().subscribe(System.out::println);


    Flux.just("456", "789", "123")
            .collectMap(e -> "key-" + e, e -> "value-" + e).subscribe(System.out::println);

    Flux.just("456", "789", "123")
            .collectMap(e -> Integer.parseInt(e) < 500 ? "small" : "large").subscribe(System.out::println);
    Flux.just("456", "789", "123")
            .collectMultimap(e -> Integer.parseInt(e) < 500 ? "small" : "large").subscribe(System.out::println);
    
    try {
        System.in.read();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

结果:

7、distinct

public static void main(String[] args) {
    AtomicInteger times = new AtomicInteger(0);
    Flux.just("456", "789", "123")
            .repeat(2)
            .subscribe(e -> {
                if (e.equalsIgnoreCase("456")) {
                    times.addAndGet(1);
                    System.out.print("\n" + "重复打印:" + times.get());
                }
                System.out.print("\t" + e);
            });
    times.set(0);
    Flux.just("456", "789", "123")
            .repeat(2)
            .distinct()
            .subscribe(e -> {
                if (e.equalsIgnoreCase("456")) {
                    times.addAndGet(1);
                    System.out.print("\n" + "重复打印:" + times.get());
                }
                System.out.print("\t" + e);
            });
    System.out.print("\n" + "删除重复行:");
    Flux.just("5456", "5789", "5123", "5456", "5789", "5123", "5456")
            .distinct().subscribe(e -> {
                        System.out.printf(e + "\t");
                    }
            );
    System.out.print("\n" + "删除相邻行:");
    Flux.just("5456", "5456", "5123", "5456", "5789", "5123", "5456")
            .distinctUntilChanged().subscribe(e -> {
                        System.out.printf(e + "\t");
                    }
            );
    System.out.println();
    Flux.empty()
            .defaultIfEmpty("Flux#empty").subscribe(System.out::println);
    try {
        System.in.read();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

结果:

8、flatMap

  1. flatMap:将传入的元素转化为响应流后,再合并为一个新的响应流。会立即订阅新的流,不一定保证原始顺序,且允许来自不同子流的元素进行交错
  2. concatMap:整体与flatMap类似,但不会立即订阅新的流,会在生成下一个子流并订阅它之前等待每个内部完成,天生保留与源元素相同的顺序,不允许来自不同子流的元素交错
  3. flatMapSequential:整体与flatMap类似,但是会通过对所接收的元素进行排序来进行保留顺序,同样不允许来自不同子流的元素交错
@SneakyThrows
public static void main(String[] args) {
    Random random = new Random();
    Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
            .flatMap(e -> Flux.fromIterable(e)
                    .delayElements(Duration.ofMillis(random.nextInt(200))))
            .subscribe(System.out::println, null, () -> {
                System.out.println("complete");
            });
    Thread.sleep(2000);

    long l = System.currentTimeMillis();
    Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
            .concatMap(e -> Flux.fromIterable(e)
                    .delayElements(Duration.ofMillis(500)))
            .subscribe(System.out::println, null, () -> {
                System.out.println(System.currentTimeMillis() - l);
            });
    Thread.sleep(4000);

    long l2 = System.currentTimeMillis();
    Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6))
            .flatMapSequential(e -> Flux.fromIterable(e)
                    .delayElements(Duration.ofMillis(500)))
            .subscribe(System.out::println, null, () -> {
                System.out.println(System.currentTimeMillis() - l2);
            });

    System.in.read();
}

结果:

9、scan

public static void main(String[] args) {
    Flux.just(1, 2, 3, 4, 5)
            .scan(0, (a, b) -> a + b).subscribe(System.out::println);
}

结果:

10、thenMany

public static void main(String[] args) {
    Flux.just(1, 2, 3, 4, 5)
            .thenMany(Flux.just(6, 7, 8, 9, 10)).subscribe(System.out::println);
}

结果:

11、组合响应流

  1. concat操作符通过向下游转发接收的元素来连接所有数据源。当操作符连接两个流时,它首先消费并重新发送第一个流的所有元素,然后第二个流执行相同的操作
  2. merge操作符将来自上游序列的数据合并到一个下游序列中。与concat操作符不同,上游数据源是立即(同时)被订阅的
  3. zip操作符号订阅所有上游,等待所有数据源发出一个元素,然后将接收到的元素组合到一个输出元素中
  4. combineLatest操作符与zip操作符的工作方式类似。但是,只要至少一个上游数据源发出一个值,它就会生成一个新值
@SneakyThrows
public static void main(String[] args) {
    Flux.concat(
                    Flux.range(1, 3),
                    Flux.just(5, 6, 7))
            .subscribe(e -> {
                System.out.println("concat:" + e);
            });
    Flux.merge(
                    Flux.range(1, 3).delayElements(Duration.ofMillis(500)),
                    Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000)))
            .subscribe(e -> {
                System.out.println("merge:" + e);
            });
    Flux.zip(
                    Flux.range(1, 4).delayElements(Duration.ofMillis(500)),
                    Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000)))
            .subscribe(e -> {
                System.out.println("zip:" + e);
            });
    Flux.combineLatest(
                    Flux.range(1, 4).delayElements(Duration.ofMillis(500)),
                    Flux.just(5, 6, 7).delayElements(Duration.ofMillis(1000)),
                    (a, b) -> "combineLatest==>" + a + b)
            .subscribe(System.out::println);
    System.in.read();
}

结果:

12、流元素批处理

  1. 将元素缓冲(buffering)到容器(如list)中,结果流的类型为Flux<List>
  2. 通过开窗(windowing)方式将元素加入诸如Flux<Flux>等流中。请注意,现在的流信号不是值,而是可以处理的子流。
  3. 通过某些键将元素分组(grouping)到具有Flux<GroupedFlux<K,T>>类型的流中。每个新键都会触发一个新的GroupedFlux实例,并且具有该键的所有元素都将被推送到GroupedFlux类的该实例中。

可以基于以下场景进行缓冲和开窗操作:

  1. 处理元素的数量,比方说每10个元素;
  2. 一段时间,比方说每5分钟一次;
  3. 基于一些谓语,比方说在每个新的偶数之前切割;
  4. 基于来自其他Flux的一个事件,该事件控制着执行过程。
@SneakyThrows
public static void main(String[] args) {
    Flux.range(1, 7)

            .buffer(2)
            .doOnNext(e -> {
                System.out.println("buffer start");
            })
            .subscribe(System.out::println);
    Flux.range(1, 7).window(2)
            .doOnNext(e -> {
                System.out.println("window start");
            })
            .subscribe(integerFlux -> {
                // 由于是子流,所以需要再次订阅
                integerFlux.subscribe(System.out::println);
            });
    Flux.range(1, 7)
            .groupBy(e -> e < 4 ? "small" : "large")
            .subscribe(e -> {
                ArrayList<Integer> integers = new ArrayList<>();
                e.scan(integers, (list, value) -> {
                            list.add(value);
                            return list;
                        })
                        .doOnComplete(() -> {
                            System.out.println(e.key() + " ==>: " + integers);
                        })
                        // 由于是子流,所以需要再次订阅
                        .subscribe();
            });
    System.in.read();
}

结果:

13、采样

可以让产生的流能够周期性的发出与时间窗口内最近看到的值相对应的数据项

@SneakyThrows
public static void main(String[] args) {
    Flux.range(1, 5)
            .delayElements(Duration.ofMillis(400))
            .sample(Duration.ofMillis(1000))
            .subscribe(System.out::println);
    System.in.read();
}

结果:

14、响应流转化为阻塞流

  1. toIterable方法将响应式Flux转换为阻塞Iterable
  2. toStream方法将响应式Flux转换为阻塞Stream API。从Reactor3.2开始,在底层使用toIterable方法
  3. blockFirst方法阻塞了当前线程,直到上游发出第一个值或完成流为止。
  4. blockLast方法阻塞当前线程,直到上游发出最后一个值或完成流为止。在onError的情况下,它会在被阻塞的线程中抛出异常。

补充:

  1. blockFirst操作符和blockLast操作符具有方法重载,可用于设置线程阻塞的持续时间。这可以防止线程被无限阻塞。
  2. toIterable和toStream方法能够使用Queue来存储事件,这些事件可能比客户端代码阻塞Iterable或Stream更快到达。微批处理。
public static void main(String[] args) {
    Flux.range(1, 5)
            .delayElements(Duration.ofMillis(200)).toIterable().forEach(System.out::println);
    List<String> collect = Flux.range(1, 5)
            .delayElements(Duration.ofMillis(200)).toStream()
            .map(e -> "stream:" + e)
            .collect(Collectors.toList());
    System.out.println(JSON.toJSONString(collect));
    
    Flux.range(1, 5)
            .delayElements(Duration.ofMillis(200))
            .doOnEach(System.out::println).blockFirst();
    Flux.range(10, 5)
            .delayElements(Duration.ofMillis(200))
            .doOnEach(System.out::println).blockLast();
}

结果:

15、物化和非物化

使用materialize方法将流中的元素封装为Signa对象进行处理,使用dematerialize方法对Signa对象进行解封处理

@SneakyThrows
public static void main(String[] args) {
    Flux.error(new IOException("error"))
            .subscribe(e -> {
                System.out.println("consumer==>" + e);
            }, ex -> {
                System.out.println("error==>" + ex);
            });

    Flux.error(new IOException("error2"))
            .materialize()
            .subscribe(e -> {
                System.out.println("consumer==>" + e);
            }, ex -> {
                System.out.println("error==>" + ex);
            });
    Flux.error(new IOException("error2"))
            .materialize()
            .dematerialize()
            .subscribe(e -> {
                System.out.println("consumer==>" + e);
            }, ex -> {
                System.out.println("error==>" + ex);
            });

    System.in.read();
}

结果:

6、编程方式创建流

在实际的需求开发中,我们需要以一种更复杂的方法来在流中生成数据,或将对象的生命周期绑定到响应式流的生命周期中

1、push&create

push工厂方法能通过适配一个单线程生产者来编程创建Flux实例。create整体与push方法相同,都是起到桥接的作用,但是create能够支持不同线程发送的事件

@SneakyThrows
public static void main(String[] args) {
    Flux.push(sink -> {
                IntStream.range(10, 15)
                        .forEach(e -> {
                            if (e != 13) {
                                sink.next(e);
                            } else {
                                sink.onCancel(() -> System.out.println("cancel"));
                            }
                        });
            })
            .subscribe(e -> {
                System.out.print(e + "\t");
            });
    Thread.sleep(1000);

    ExecutorService executorService = Executors.newFixedThreadPool(3);
    System.out.print("\n" + "push   == > ");
    Flux.push(sink -> {
        Runnable task = () -> {
            for (int i = 10; i < 15; i++) {
                sink.next(i);
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    sink.error(e);
                }
            }
        };
        for (int i = 0; i < 3; i++) {
            executorService.submit(task);
        }
    }).subscribe(e -> {
        System.out.print(e + "\t");
    });
    Thread.sleep(2000);

    System.out.print("\n" + "create == > ");
    Flux.create(sink -> {
        Runnable task = () -> {
            for (int i = 10; i < 15; i++) {
                sink.next(i);
                try {
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (InterruptedException e) {
                    sink.error(e);
                }
            }
        };
        for (int i = 0; i < 3; i++) {
            executorService.submit(task);
        }
    }).subscribe(e -> {
        System.out.print(e + "\t");
    });
    System.in.read();
}

结果:

2、generate

generate工厂方法旨在基于生成器的内部处理状态创建复杂序列。

它可以使用一个初始值和一个函数,该函数可借助初始值的内部状态计算下一个状态,并将onNext信号发送给下游订订阅者

@SneakyThrows
public static void main(String[] args) {
    AtomicInteger num = new AtomicInteger(1);
    Flux.generate(sink -> {
        sink.next("Hello");
        if (num.get() == 3) {
            sink.complete();
        } else {
            num.addAndGet(1);
        }
    }).subscribe(System.out::println);
    // 创建一个斐波那契数列
    Flux.generate(
                    () -> Tuples.of(1, 2),
                    (state, sink) -> {
                        sink.next(state.getT1() + state.getT2());
                        if (state.getT1() + state.getT2() > 50) {
                            sink.complete();
                        }
                        return Tuples.of(state.getT2(), state.getT1() + state.getT2());
                    })
            .subscribe(System.out::println);
    System.in.read();
}

结果:

3、using-disposable

@SneakyThrows
public static void main(String[] args) {
    Flux.using(
            () -> {
                System.out.println("create");
                return Executors.newFixedThreadPool(3);
            },
            es -> Flux.range(1, 3),
            (executorService -> {
                System.out.println("shutdown");
                executorService.shutdown();
            })
    ).subscribe(System.out::println);
    System.in.read();
}

结果:

4、usingWhen

基于usingWhen工厂包装响应式事务与using操作符类似,usingwhen操作符使我们能以响应式方式管理资源。区别在于using操作符会同步获取。usingWhen操作符响应式地获取受托管资源(通过订阅Pubisher的实例)。此外,usingWhen操作符接受不同的处理程序,以便应对主处理流终止的成功和失败。这些处理程序由发布者实现。

可以仅使用usingWhen一个操作符实现完全无阻塞的响应式事务。

@SneakyThrows
public static void main(String[] args) {
    Flux.usingWhen(
            Mono.fromSupplier(() -> {
                System.out.println("create");
                return Executors.newFixedThreadPool(1);
            }),
            es -> Flux.range(1, 3),
            (executorService -> {
                System.out.println("shutdown");
                executorService.shutdown();
                return Flux.empty();
            })
    ).subscribe(System.out::println);

    Flux.usingWhen(Mono.fromSupplier(() -> {
                        System.out.println("create");
                        return Executors.newFixedThreadPool(1);
                    }),
                    resource -> {
                        return Flux.concat(Flux.just(1, 2, 3),
                                Flux.error(new RuntimeException("Error")));
                    },
                    resource -> {
                        System.out.println("Completed successfully");
                        return Mono.fromRunnable(resource::shutdown);
                    },
                    resource -> {
                        System.out.println("Error occurred");
                        return Mono.fromRunnable(resource::shutdown);
                    },
                    resource -> {
                        System.out.println("Cancelled");
                        return Mono.fromRunnable(resource::shutdown);
                    })
            .subscribe(System.out::println, System.out::println);
    System.in.read();
}

结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2279056.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

STL容器-- list的模拟实现(附源码)

STL容器-- list的模拟实现&#xff08;附源码&#xff09; List的实现主要考察我们对list这一容器的理解&#xff0c;和代码的编写能力&#xff0c;通过上节对list容器的使用&#xff0c;我们对list容器已经有了一些基本的了解&#xff0c;接下来就让我们来实现一些list容器常见…

【转】厚植根基,同启新程!一文回顾 2024 OpenHarmony 社区年度工作会议精彩瞬间

在数字化浪潮奔腾不息的今天&#xff0c;开源技术已成为推动科技创新与产业发展的强大引擎。2025年1月10日-11日&#xff0c;OpenAtom OpenHarmony&#xff08;开放原子开源鸿蒙&#xff0c;以下简称“OpenHarmony”或“开源鸿蒙”&#xff09;社区2024年度工作会议于深圳盛大启…

蓝桥杯备考:堆和priority queue(优先级队列)

堆的定义 heap堆是一种特殊的完全二叉树&#xff0c;对于树中的每个结点&#xff0c;如果该结点的权值大于等于孩子结点的权值&#xff0c;就称它为大根堆&#xff0c;小于等于就叫小根堆&#xff0c;如果是大根堆&#xff0c;每个子树也是符合大根堆的特征的&#xff0c;如果是…

力扣682

from typing import Listclass Solution:def calPoints(self, operations: List[str]) -> int:a [] # 用于存储有效得分的列表for op in operations:if op.isdigit() or (op[0] - and op[1:].isdigit()): # 如果是整数&#xff08;包括负数&#xff09;a.append(int(op)…

考研计算机组成原理——零基础学习的笔记

第一章 研究计算机硬件的学科。 1.计算机系统概述 计算机系统硬件软件&#xff08;系统软件&#xff1a;比如操作系统、数据库管理系统、标准程序库等&#xff0c;应用软件&#xff1a;QQ等&#xff09; 1.2计算机的层次结构 1.2.1计算机硬件的基本组成 冯诺伊曼计算机&a…

海康工业相机的应用部署不是简简单单!?

作者&#xff1a;SkyXZ CSDN&#xff1a;SkyXZ&#xff5e;-CSDN博客 博客园&#xff1a;SkyXZ - 博客园 笔者使用的设备及环境&#xff1a;WSL2-Ubuntu22.04MV-CS016-10UC 不会吧&#xff1f;不会吧&#xff1f;不会还有人拿到海康工业相机还是一脸懵叭&#xff1f;不会还有人…

计算机网络 (49)网络安全问题概述

前言 计算机网络安全问题是一个复杂且多维的领域&#xff0c;它涉及到网络系统的硬件、软件以及数据的安全保护&#xff0c;确保这些元素不因偶然的或恶意的原因而遭到破坏、更改或泄露。 一、计算机网络安全的定义 计算机网络安全是指利用网络管理控制和技术措施&#xff0c;保…

STM32 FreeRTOS中断管理

目录 FreeRTOS的中断管理 1、STM32中断优先级管理 2、FreeRTOS任务优先级管理 3、寄存器和内存映射寄存器 4、BASEPRI寄存器 5、FreeRTOS与STM32中断管理结合使用 vPortRaiseBASEPRI vPortSetBASEPRI 6、FromISR后缀 7、在中断服务函数中调用FreeRTOS的API函数需注意 F…

操作系统 期末重点复习

操作系统 期末重点复习 必会 课后题摘要 第二章&#xff1a; 在操作系统中为什么要引入进程概念&#xff1f;它会产生什么样的影响? 为了使程序在多道程序环境下能并发执行&#xff0c;并对并发执行的程序加以控制和描述&#xff0c;在操作系统中引入了进程概念。影响: 使程…

7.5.4 MVCC优化测试

作者&#xff1a; h5n1 原文来源&#xff1a; https://tidb.net/blog/4e02d900 1. 背景 由于MVCC 版本数量过多导致rocksdb扫描key数量过多影响SQL执行时间是tidb经常出现问的问题&#xff0c;tidb也一直在致力于优化该问题。 一些优化方式包括比&#xff1a; (1) 从传统…

2024年AI与大数据技术趋势洞察:跨领域创新与社会变革

目录 引言 技术洞察 1. 大模型技术的创新与开源推动 2. AI Agent 智能体平台技术 3. 多模态技术的兴起:跨领域应用的新风口 4. 强化学习与推荐系统:智能化决策的底层驱动 5. 开源工具与平台的快速发展:赋能技术创新 6. 技术安全与伦理:AI技术的双刃剑 7. 跨领域技…

vulnhub靶场【Lampiao靶机】,主要考察提权,脏牛提权

前言 靶机&#xff1a;lampiao&#xff0c;IP地址为192.168.10.11 攻击&#xff1a;kali&#xff0c;IP地址为192.168.10.2 都采用虚拟机&#xff0c;网卡为桥接模式 该靶机目前只剩下一个了&#xff0c;之前记得是有两台构成系列的。 文章中涉及的靶机&#xff0c;来源于v…

ASP .NET Core 学习(.NET9)配置接口访问路由

新创建的 ASP .NET Core Web API项目中Controller进行请求时&#xff0c;是在地址:端口/Controller名称进行访问的&#xff0c;这个时候Controller的默认路由配置如下 访问接口时&#xff0c;是通过请求方法&#xff08;GET、Post、Put、Delete&#xff09;进行接口区分的&…

构建core模块

文章目录 1.环境搭建1.sunrays-common下新建core模块2.引入依赖&#xff0c;并设置打包常规配置 2.测试使用1.启动&#xff01;1.创建模块2.引入依赖3.application.yml 配置MySQL和Minio4.创建启动类5.启动测试 2.common-web-starter1.目录2.WebController.java3.结果 3.common…

VRTK4 记录抓取错误

左手原本可以正常抓取&#xff0c;但是当右手拿起一个物体时&#xff0c;左手抓取右手的线性驱动器&#xff0c;只有部分区域可以抓取 原因是左手的判定物体的层级错误 应该在Collections下&#xff0c;之前错误的和Collections同一层级&#xff0c;导致抓取有时可以有时不可以…

游戏画质升级史的思考

画质代入感大众玩家对游戏的第一印象与评判标准 大众玩家还没到靠游戏性等内在因素来评判游戏的程度。 画面的重要性&#xff0c;任何时候都不能轻视。 行业就是靠摩尔定律来推动进步的。 NS2机能达到PS4到PS4PRO之间的水准&#xff0c;5050达到8G显存&#xff0c;都会引发连…

Windows11电脑总是一闪一闪的,黑一下亮一些怎么解决

Windows11电脑总是一闪一闪的&#xff0c;黑一下亮一些怎么解决 1. 打开设备管理器2. 点击显示适配器3. 更新下方两个选项的驱动3.1 更新驱动Inter(R) UHD Graphixs3.2 更新驱动NVIDIA GeForce RTX 4060 Laptop GPU 4. 其他文章快来试试吧&#x1f970; 1. 打开设备管理器 在电…

【RAG落地利器】向量数据库Qdrant使用教程

TrustRAG项目地址&#x1f31f;&#xff1a;https://github.com/gomate-community/TrustRAG 可配置的模块化RAG框架 环境依赖 本教程基于docker安装Qdrant数据库&#xff0c;在此之前请先安装docker. Docker - The easiest way to use Qdrant is to run a pre-built Docker i…

【逆境中绽放:万字回顾2024我在挑战中突破自我】

&#x1f308;个人主页: Aileen_0v0 &#x1f525;热门专栏: 华为鸿蒙系统学习|计算机网络|数据结构与算法 ​&#x1f4ab;个人格言:“没有罗马,那就自己创造罗马~” 文章目录 一、引言二、个人成长与盘点情感与心理成长学习与技能提升其它荣誉 三、年度创作历程回顾创作内容概…

HTTP / 2

序言 在之前的文章中我们介绍过了 HTTP/1.1 协议&#xff0c;现在再来认识一下迭代版本 2。了解比起 1.1 版本&#xff0c;后面的版本改进在哪里&#xff0c;特点在哪里&#xff1f;话不多说&#xff0c;开始吧⭐️&#xff01; 一、 HTTP / 1.1 存在的问题 很多时候新的版本的…