Spring WebFlux 核心原理(2-1)

news2025/1/19 23:27:59

1、Spring 响应式编程

1.1、早期响应式解决方案

        响应式编程是构建响应式系统的主要候选方案。Spring 4.x 引入了 ListenableFuture 类,它扩展了 Java Future,并且可以基于 HTTP 请求实现异步执行操作。但是只有少数 Spring 4.x 组件支持新的 Java 8 CompletableFuture,后者引入了一些用于组合异步执行的简洁方法。

        Spring 框架还提供了其他一些基础架构,它们对构建我们的响应式应用程序非常有用。

1.1.1、观察者模式

        听着感觉好像观察者模式似乎与响应式编程无关。但是,经过一些小修改,它定义了响应式编程的基础。

        观察者模式拥有一个主题(subject),其中包含该模式的依赖者列表,这些依赖者被称为观察者(Observer)。

        主题通常通过调用自身的一个方法将状态变化通知观察者。在基于事件处理的系统中此模式至关重要。观察者模式是 MVC(模型−视图−控制器)模式的重要组成部分。

类图

Observe 接口:

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:12
 */
public interface Observer {

    void observe(String event);
}

Observe 实现:

 

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:14
 */
public class ConcreteObserverA implements Observer {

    @Override
    public void observe(String event) {
        System.out.println(getClass().getCanonicalName() + " --- " + event);
    }
}

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:14
 */
public class ConcreteObserverB implements Observer {

    @Override
    public void observe(String event) {
        System.out.println(getClass().getCanonicalName() + " --- " + event);
    }
}

Subject 接口:

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:10
 */
public interface Subject {

    /**
     * 用途:注册观察者
     * @author liaoyibin
     * @since 9:11 2024/10/9
     * @params [observer]
     * @param observer
     * @return void
    **/
    void registerObserver(Observer observer);

    /**
     * 用途:解绑观察者
     * @author liaoyibin
     * @since 9:11 2024/10/9
     * @params [observer]
     * @param observer
     * @return void
    **/
    void unregisterObserver(Observer observer);

    /**
     * 用途:通知事件变更
     * @author liaoyibin
     * @since 9:11 2024/10/9
     * @params [event]
     * @param event
     * @return void
    **/
    void notifyObservers(String event);
}

Subject 实现:

package blnp.net.cn.jvm.demos.webflux;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:16
 */
public class ConcreteSubject implements Subject {

    /**
     *  保证Set是线程安全的
     **/
    private Set<Observer> observers = new CopyOnWriteArraySet<>();

    @Override
    public void registerObserver(Observer observer) {
        observers.add(observer);
    }

    @Override
    public void unregisterObserver(Observer observer) {
        observers.remove(observer);
    }

    @Override
    public void notifyObservers(String event) {
        observers.forEach(observer -> observer.observe(event));
    }
}

Main 单元测试:

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:17
 */
public class ConcreteMain {

    public static void main(String[] args) {
        Subject subject = new ConcreteSubject();
        Observer observer1 = new ConcreteObserverA();
        Observer observer2 = new ConcreteObserverB();

        subject.registerObserver(observer1);
        subject.registerObserver(observer2);
        subject.notifyObservers("hello tom");

        System.out.println("==================================");

        subject.unregisterObserver(observer1);
        subject.notifyObservers("great tom cat");
    }
}

        为了在多线程场景中确保线程安全,使用 CopyOnWriteArraySet,这是一个线程安全的 Set 实现,它在每次 update 操作发生时都会创建元素的新副本

        更新 CopyOnWriteArraySet中的内容相对代价较高,当容器包含大量元素时尤为如此。但是,订阅者列表通常不会经常更改,因此对于线程安全的 Subject 实现来说,这是一个相当合理的选择。

1.1.2、观察者模式使用

        在不需要取消订阅的情况下,我们可以活用 Java 8 特性,用 lambda 替换 Observer 实现类。下面编写相应的测试用例:

@Test
public void subjectLeveragesLambdas() {
    Subject subject = new ConcreteSubject();
    subject.registerObserver(e -> System.out.println("A: " + e));
    subject.registerObserver(e -> System.out.println("B: " + e));
    subject.notifyObservers("This message will receive A & B");
    // ...
}

        在有很多观察者处理明显延迟的事件(由下游处理引入)时,我们可以使用其他线程或线程池(thread pool)并行传播消息。基于这种处理方式可以得出 notifyObservers 方法的下一个实现:

package blnp.net.cn.jvm.demos.webflux;

import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:16
 */
public class ConcreteSubject implements Subject {

    /**
     *  保证Set是线程安全的
     **/
    private Set<Observer> observers = new CopyOnWriteArraySet<>();
    /**
     *  线程池
     **/
    private final ExecutorService executorService = Executors.newCachedThreadPool();

    @Override
    public void registerObserver(Observer observer) {
        observers.add(observer);
    }

    @Override
    public void unregisterObserver(Observer observer) {
        observers.remove(observer);
    }

    @Override
    public void notifyObservers(String event) {
        //v1 实现执行
        //observers.forEach(observer -> observer.observe(event));

        //v2 实现执行
        observers.forEach(
                observer -> executorService.submit(() -> observer.observe(event))
        );
    }
}

        这些方案通常不是最高效的,并且很可能隐藏着 bug。例如,我们可能忘记限制线程池大小,并最终导致 OutOfMemoryError。因为每个线程在 Java 中消耗大约 1 MB,典型的 JVM 应用程序有可能创建几千个线程来耗尽所有可用内存。

🍕知识拓展:

        为了防止资源滥用,我们可以限制线程池大小并将应用程序的活跃度(liveness)属性设置为violate。当所有可用线程试图将某些事件推送到同一个缓慢的 Observer 时,就会出现这种情况。在这里,我们只是初步暴露了可能发生的潜在问题。

public ThreadPoolExecutor(
	//线程池的核心线程数量
	int corePoolSize, 

	//线程池的最大线程数
    int maximumPoolSize, 

	//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
    long keepAliveTime, 

	//时间单位
    TimeUnit unit, 

	//任务队列,用来储存等待执行任务的队列
    BlockingQueue < Runnable > workQueue, 

	//线程工厂,用来创建线程,一般默认即可
    ThreadFactory threadFactory, 

	//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
    RejectedExecutionHandler handler) 

        因此,当需要支持多线程的 Observer 模式时,最好使用经过实战验证的库。java.util 包中的观察者模式从 JDK 1.0 发布的。如果查看源代码,会发现一个非常简单的实现,它与前面的实现非常相似。因为这些类是在 Java 泛型(Java generics)之前引入的,所以它们操作Object 类型的事件,是类型不安全的。

        此外,这种实现效率不高,尤其是在多线程环境中。这些类在 Java 9 中已被弃用。在开发应用程序时,可能使用观察者模式的手工自定义实现。这能够对事件源和观察者进行解耦。但是,需要考虑许多对现代多线程应用程序至关重要的方面,包括错误处理、异步执行、线程安全、高性能需求等。

1.1.3、基于@EventListener注解的发布和订阅模式

        在很长一段时间内,Spring 框架有自己的观察者模式实现,这被广泛用于跟踪应用程序的生命周期事件。从 Spring 4.2 开始,不仅用于处理应用程序事件,还用于处理业务逻辑事件。Spring 的@EventListener 注解实现事件分发ApplicationEventPublisher 类实现事件发布

        @EventListener 和 ApplicationEventPublisher 实现了发布−订阅模式(Publish-Subscribepattern),它可以被视为观察者模式的变体。在发布−订阅模式中,发布者和订阅者不需要彼此了解,如下图所示:

        发布−订阅模式在发布者和订阅者之间提供了额外的间接层。订阅者知道广播通知的事件通道,但通常不关心发布者的身份。此外,每个事件通道中可能同时存在几个发布者。

        事件通道(event channel,也被称为消息代理或事件总线)可以额外过滤传入的消息并在订阅者之间分发它们。过滤和路由的执行可以基于消息内容或消息主题,也可以同时基于这两者。因此,基于主题的系统中的订阅者将接收发布到自身感兴趣主题的所有消息。

        @EventListener 注解支持基于主题和基于内容的路由。消息类型作为主题的角色;condition 属性基于内容进行事件的路由,事件路由处理基于 Spring 表达式语言(SpEL)。

1.1.4、使用@EventListener注解构建应用程序

        实现一个简单的 Web 服务,用于显示房间当前的温度。设置一个温度传感器,它不时地将当前的摄氏温度通过事件发送出来。使用随机数生成器模拟温度传感器。

        为了使应用程序遵循响应式设计,不使用旧的拉模型获取数据。使用 WebSocket 和服务器发送事件(Server-Sent Events,SSE)。SSE 能使客户端从服务器接收自动更新,通常用于向浏览器发送消息更新或连续数据流。使用 EventSource 的 JavaScript API,请求特定 URL 并接收事件流。在通信发生问题时,EventSource 默认自动重连。

1、实现业务逻辑

项目完成目录结构:

  • 源代码文件 

Maven:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.7.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.blnp.net</groupId>
    <artifactId>pubSubDemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>pubSubDemo</name>
    <description>关于发布订阅模式 @EventListenner 注解应用</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20200518</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

事件对象:

package com.blnp.net.pubSub.dto;

import lombok.Data;

/**
 * <p>温度传感器对象</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 11:27
 */
@Data
public class Temperature {

    /**
     *  实时温度值
     **/
    private final double value;
}

业务程序:TemperatureSensor类模拟传感器,并使用@Component 注解

package com.blnp.net.pubSub.service;

import com.blnp.net.pubSub.dto.Temperature;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 11:28
 */
@Component
public class TemperatureSensorService {

    /**
     *  事件发布者
     **/
    private final ApplicationEventPublisher publisher;
    /**
     *  随机生成器(模拟温度传感器)
     **/
    private final Random random = new Random();
    /**
     *  线程池(创建只含一个线程的周期性线程池对象)
     **/
    private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

    public TemperatureSensorService(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }

    @PostConstruct
    public void startProcessing() {
        this.service.schedule(this::probe, 1, TimeUnit.SECONDS);
    }

    private void probe() {
        double temperature = 16 + random.nextGaussian() * 10;
        System.err.println("发送事件。。。");
        // 通过ApplicationEventPublisher发布Temperature事件
        publisher.publishEvent(new Temperature(temperature));
        service.schedule(this::probe, random.nextInt(5000), TimeUnit.MILLISECONDS);
    }
}

        模拟温度传感器仅依赖于 Spring 框架提供的 ApplicationEventPublisher类。该类可以将事件发布到系统。

2、基于SpringMVC 的异步http

        Servlet 3.0中引入的异步支持扩展了在非容器线程中处理 HTTP请求的能力。基于Servlet 3.0,Spring Web MVC 可以返回 Callable<T> 或 DeferredResult<T> 。Callable<T> 可以在非容器线程内运行,但仍然是阻塞调用。DeferredResult<T> 能通过调用 setResult(T result) 方法在非容器线程上生成异步响应,可以在事件循环中使用。

        从 4.2 版开始,Spring Web MVC 可以返回 ResponseBodyEmitter ,其行为类似于DeferredResult ,但可以用于发送多个对象。SseEmitter 继承了 ResponseBodyEmitter ,可以根据 SSE 的协议需求为一个请求发送多个响应。

3、暴露SSE端点
package com.blnp.net.pubSub.controller;

import com.blnp.net.pubSub.dto.Temperature;
import org.json.JSONObject;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.servlet.http.HttpServletRequest;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 11:32
 */
@RestController
public class TemperatureController {

    /**
     *  发射器集合
     **/
    private final Set<SseEmitter> clients = new CopyOnWriteArraySet<>();

    @RequestMapping(value = "/temperature-stream", method = RequestMethod.GET)
    public SseEmitter events(HttpServletRequest request) {
        // ResponseBodyEmitter的子类,用于发送SSE(Server-Send Event):服务器发送的事件
        SseEmitter emitter = new SseEmitter();
        //设置超时时间
        //SseEmitter emitter = new SseEmitter(10000L);

        // 将当前发射器放到集合中
        clients.add(emitter);

        /**
         *  当异步请求超时的时候调用的代码。该方法在异步请求超时的时候由容器线程调用。
         **/
        //给当前发射器设置事件处理函数
        emitter.onTimeout(() -> clients.remove(emitter));
        /**
         *  当异步请求结束的时候调用的代码。
         *  当超时或网络错误而终止异步请求处理的时候,在容器线程调用该方法。
         *  该方法一般用于检车一个ResponseBodyEmitter实例已经无用了。
         **/
        emitter.onCompletion(() -> clients.remove(emitter));
        return emitter;
    }

    /**
     * 用途:异步事件处理,事件监听器,该监听器只接收Temperature事件
     * @author liaoyibin
     * @since 11:34 2024/10/9
     * @params [temperature]
     * @param temperature
     * @return void
    **/
    @Async
    @EventListener
    public void handleMessage(Temperature temperature) {
        System.out.println("监听到web的调度事件了 -- " + temperature);
        List<SseEmitter> deadEmitters = new ArrayList<>();
        // 遍历发射器集合
        clients.forEach(emitter -> {
            try {
                // 发射器发送温度对象,json类型
                final JSONObject jsonObject = new JSONObject(temperature);
                final String s1 = jsonObject.toString();
                emitter.send(s1);
            } catch (Exception ignore) {
                // 如果抛异常,则将该发射器放到deadEmitters集合中
                deadEmitters.add(emitter);
            }
        });
        // 从clients中移除所有失效的发射器。
        clients.removeAll(deadEmitters);
    }
}

        Spring Web MVC 提供SseEmitter的唯一目的是发送 SSE 事件。当控制器方法返回 SseEmitter 实例时,实际的请求处理过程将一直持续下去,直到 SseEmitter.complete()方法被调用、发生错误或超时。

        在客户端请求 /temperature-stream 时,创建并返回新的 SseEmitter 实例,同时将该实例注册到先前的活动 clients 列表中。此外,SseEmitter 构造函数可以使用 timeout 参数。

        对于 clients 集合,我们可以使用 java.util.concurrent 包中的 CopyOnWriteArraySet类。这样的实现使我们能在修改列表的同时执行迭代操作。当一个 Web 客户端打开新的 SSE 会话时,我们将新的发射器添加到 clients 集合中。SseEmitter 在完成处理或已达到超时时,会将自己从 clients 列表中删除。

        handleMessage()方法使用@EventListener 注解,以便从Spring 接收事件。Spring 框架仅在接收到 Temperature 事件时才会调用 handleMessage()方法,因为该方法的参数是 temperature 对象。

        @Async 注解将方法标记为异步执行的候选方法,在手动配置的线程池中调用。        

        handleMessage()方法接收一个新的温度事件,并把每个事件并行地以 JSON 格式异步发送给所有客户端。此外,当发送到各个发射器时,跟踪所有发生故障的发射器并将其从活动 clients 列表中删除。这种方法使我们可以发现不运作的客户端。不幸的是,SseEmitter 没有为处理错误提供任何回调,只能通过处理send()方法抛出的错误来完成错误处理。

4、配置异步支持
package com.blnp.net.pubSub.config;

import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/**
 * <p>异步配置</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 11:08
 */
@Configuration
@EnableAsync
public class MyAsyncConfig implements AsyncConfigurer {

    /**
     * 用途:为异步调用设置Executor
     * @author liaoyibin
     * @since 11:09 2024/10/9
     * @params []
     * @param
     * @return java.util.concurrent.Executor
    **/
    @Override
    public Executor getAsyncExecutor() {
        // 使用包含两个核心线程的 ThreadPoolTaskExecutor,可以将核心线程增加到一百个。
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数
        executor.setCorePoolSize(2);
        //最大线程数
        executor.setMaxPoolSize(100);
        /**
         *  如果没有正确配置队列容量,线程池就无法增长。这是因为程序将转而使用 SynchronousQueue,而这限制了并发。
         **/
        executor.setQueueCapacity(5);
        executor.initialize();
        return executor;
    }

    /**
     * 用途:为异步执行引发的异常配置异常处理程序。
     * @author liaoyibin
     * @since 11:09 2024/10/9
     * @params []
     * @param
     * @return org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler
    **/
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        // 此处仅记录异常
        return new SimpleAsyncUncaughtExceptionHandler();
    }

}

应用启动类:

package com.blnp.net.pubSub;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class PubSubApplication {

    public static void main(String[] args) {
        SpringApplication.run(PubSubApplication.class, args);
    }

}
5、构建具有 SSE 支持的 UI
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SSE UI</title>
</head>
<body>
    <ul id="events"></ul>
</body>

<script type="application/javascript">

    /*
     * 将从服务器接收来的数据进行展示
     */
    function add(message) {
        const el = document.createElement("li");
        el.innerHTML = message;
        document.getElementById("events").appendChild(el);
    }

    //指定服务端发射器的订阅地址
    var eventSource = new EventSource("/temperature-stream");
    eventSource.onmessage = e => {
        const t = JSON.parse(e.data);
        const fixed = Number(t.value).toFixed(2);
        add('Temperature: ' + fixed + ' ℃');
    }
    //发射器订阅连接成功
    eventSource.onopen = e => add('#### Connection 【opened】');
    //发射器连接异常显示
    eventSource.onerror = e => add('#### Connection (closed)');
</script>
</html>
6、应用程序验证

        应用程序启动成功后,浏览器访问:http://localhost:8087

        当前的解决方案不是 JavaScript 独有的,也可以使用 curl 访问:http://10.1.77.3:8087/temperature-stream

1.2、使用 RxJava 框架

  • 详见官网

        ReactiveX 通常被定义为观察者模式、迭代器模式和函数式编程的组合。Java平台上有一个用于响应式编程的标准库,即RxJava,是 Reactive Extensions(响应式扩展,也称为 ReactiveX)的 Java 实现。目前,它不是唯一的响应式库,还有Akka Streams和Project Reactor。

        此外,随着 2.x版的发布,RxJava本身发生了很大的变化。目前最新版本是RxJava3。RxJava 是迄今为止应用最广泛的响应式库。

        虽然 RxJava 1.x 的生命周期结束于 2018 年 3 月,但它仍然被用于很多库和应用程序,这主要是因为该版本被长期而广泛地采用。

1.2.1、响应式流

        观察者模式为我们提供了一张清晰分离的生产者(Producer)事件和消费者(Consumer)事件视图。代码如下所示:

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:12
 */
public interface Observer {

    void observe(String event);
}


package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 9:10
 */
public interface Subject {

    /**
     * 用途:注册观察者
     * @author liaoyibin
     * @since 9:11 2024/10/9
     * @params [observer]
     * @param observer
     * @return void
    **/
    void registerObserver(Observer observer);

    /**
     * 用途:解绑观察者
     * @author liaoyibin
     * @since 9:11 2024/10/9
     * @params [observer]
     * @param observer
     * @return void
    **/
    void unregisterObserver(Observer observer);

    /**
     * 用途:通知事件变更
     * @author liaoyibin
     * @since 9:11 2024/10/9
     * @params [event]
     * @param event
     * @return void
    **/
    void notifyObservers(String event);
}

        如果不希望生产者在消费者出现之前生成事件,则可以使用迭代器(Iterator)模式。如下代码:

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 14:03
 */
public interface Iterator<E> {

    /**
     * 用途:判断是否还有迭代元素
     * @author liaoyibin
     * @since 14:04 2024/10/9
     * @params [] 
     * @param  
     * @return boolean
    **/
    boolean hasNext();

    /**
     * 用途:获取下一个迭代元素
     * @author liaoyibin
     * @since 14:04 2024/10/9
     * @params [] 
     * @param  
     * @return E
    **/
    E next();
}

        将迭代器模式和观察者模式相结合,如下代码:

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p>迭代器模式和观察者模式相结合</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 14:05
 */
public interface RxObserver<T> {

    /**
     * 用途:下一步处理
     * @author liaoyibin
     * @since 14:06 2024/10/9
     * @params [next]
     * @param next
     * @return void
    **/
    void onNext(T next);

    /**
     * 用途:完成处理后的执行
     * @author liaoyibin
     * @since 14:06 2024/10/9
     * @params []
     * @param
     * @return void
    **/
    void onComplete();
}

        虽然 RxObserver 非常类似于 Iterator,但它:

  • 不是调用 Iterator 的 next()方法,而是通过 onNext()回调将一个新值通知给 RxObserver。
  • 不是检查 hasNext()方法的结果是否为 true ,而是通过调用 onComplete()方法通知RxObserver 流的结束。

对于错误如何处理呢?

        因为 Iterator 可能在处理 next()方法时抛出 Exception,所以应该有一个从生产者到 RxObserver的错误传播机制。

        为此添加一个特殊的回调,即 onError() 。因此,最终的解决方案如下所示:

package blnp.net.cn.jvm.demos.webflux;

/**
 * <p>迭代器模式和观察者模式相结合</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 14:05
 */
public interface RxObserver<T> {

    /**
     * 用途:下一步处理
     * @author liaoyibin
     * @since 14:06 2024/10/9
     * @params [next]
     * @param next
     * @return void
    **/
    void onNext(T next);

    /**
     * 用途:完成处理后的执行
     * @author liaoyibin
     * @since 14:06 2024/10/9
     * @params []
     * @param
     * @return void
    **/
    void onComplete();

    /**
     * 用途:异常回调处理
     * @author liaoyibin
     * @since 14:30 2024/10/9
     * @params [e] 
     * @param e 
     * @return void
    **/
    void onError(Exception e);
}

        如上所示,则是设计了一个 Observer 接口,这是 RxJava 的基本概念

        此接口定义了数据如何在响应式流的每个部分之间进行流动。作为库的最小组成部分,Observer 接口随处可见。RxObserver 类似于前面介绍的观察者模式中的 Observer。

        Observable 响应式类是观察者模式中 Subject 的对应类。Observable 扮演事件源的角色,它会发出元素。它有数百种流转换方法以及几十种初始化响应式流的工厂方法。

        Subscriber 抽象类不仅实现 Observer 接口并消费元素,还被用作 Subscriber 的实际实现的基础。

        Observable 和 Subscriber 之间的运行时关系由 Subscription 控制,Subscription 可以检查订阅状态并在必要时取消订阅。如下图所示:

        RxJava 定义了有关发送元素的规则,使 Observable 能发送任意数量的元素(包括零个)。然后它通过声明成功或引发错误来指示执行结束。

        Observable 会为订阅它的每个Subscriber 多次调用 onNext(),然后再调用 onComplete()或onError()(但不能同时调用两者)。所以在 onComplete()或 onError()之后调用 onNext()是不可行的。

1.2.2、生产和消费数据

  • 项目源代码

创建Maven项目并导入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.blnp.net</groupId>
    <artifactId>observer-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>observer-demo</name>
    <description>Demo project for Spring Boot</description>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!--RxJava-->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.0.7</version>
        </dependency>
        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
            <version>2.2.20</version>
        </dependency>
        <dependency>
            <groupId>io.reactivex</groupId>
            <artifactId>rxjava</artifactId>
            <version>1.3.8</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.8.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

案例1:

package com.blnp.net.observerdemo.demo;

import org.junit.jupiter.api.Test;
import rx.Observable;
import rx.Subscriber;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 15:30
 */
public class MainTest {

    @Test
    public void test1() {
        Observable<String> objectObservable = Observable.create(
                new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        for (int i = 0; i < 10; i++) {
                            subscriber.onNext("hello Observable " + i);
                        }
                        subscriber.onCompleted();
                    }
                }
        );

        objectObservable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("on completed");
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("on error :" + throwable.getMessage());
            }

            @Override
            public void onNext(String s) {
                System.out.println("on next: " + s);
            }
        });
    }
}

        创建 Observable 并使其带有一个回调,该回调将在订阅者出现时立即被触发。此时,Observer 将产生一个字符串值,并将流的结束信号发送给订阅者。还可以使用 Java 8 lambda 改进此代码:

package com.blnp.net.observerdemo.demo;

import org.junit.jupiter.api.Test;
import rx.Observable;
import rx.Subscriber;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 15:30
 */
public class MainTest {

    @Test
    public void test1() {
        Observable<String> objectObservable = Observable.create(
                new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        for (int i = 0; i < 10; i++) {
                            subscriber.onNext("hello Observable " + i);
                        }
                        subscriber.onCompleted();
                    }
                }
        );

        objectObservable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("on completed");
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("on error :" + throwable.getMessage());
            }

            @Override
            public void onNext(String s) {
                System.out.println("on next: " + s);
            }
        });
    }

    @Test
    public void test2() {
        Observable.create(
                new Observable.OnSubscribe<String>() {
                    @Override
                    public void call(Subscriber<? super String> subscriber) {
                        for (int i = 0; i < 10; i++) {
                            subscriber.onNext("hello Observable " + i);
                        }
                        subscriber.onCompleted();
                    }
                }
        ).subscribe(
                System.out::println,
                System.err::println,
                () -> System.out.println("执行结束")
        );
    }
}

特别注意:从 RxJava 1.2.7 开始,Observable 的创建已因不安全而被弃用。这是因为它可能生成太多元素,导致订阅者超载。换句话说,这种方法不支持背压。

案例2:

        可以使用 just 来引用元素、使用旧式数组,或者使用 from 通过 Iterable 集合来创建 Observable 实例,代码如下所示:

package com.blnp.net.observerdemo.demo;

import org.junit.jupiter.api.Test;
import rx.Observable;
import rx.Subscriber;

import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 15:30
 */
public class MainTest {

    @Test
    public void test3() {
        Observable<String> just = Observable.just("1", "2", "3", "4", "5");
        just.subscribe(
                item -> System.out.println("下一个元素是:" + item),
                ex -> System.err.println("异常信息:" + ex.getMessage()),
                () -> System.out.println("结束")
        );
    }

    @Test
    public void test4() {
        Observable<Integer> from = Observable.from(new Integer[]{1, 2, 3, 4, 5});
        from.subscribe(
                item -> System.out.println("下一个元素是:" + item),
                ex -> System.err.println("异常信息是:" + ex.getMessage()),
                () -> System.out.println("结束")
        );
    }

    @Test
    public void test5() {
        Observable<Object> from = Observable.from(Collections.emptyList());
        from.subscribe(
                item -> System.out.println("下一个元素是:" + item),
                ex -> System.err.println("异常信息:" + ex.getMessage()),
                () -> System.out.println("结束")
        );
    }

    @Test
    public void test6() {
        Observable<String> fromCallable = Observable.fromCallable(() -> "hello Observable");
        fromCallable.subscribe(
                item -> System.out.println("下一个元素是:" + item),
                ex -> System.err.println("错误信息是:" + ex.getMessage()),
                () -> System.out.println("结束")
        );
    }

    @Test
    public void test7() {
        Future<String> future = Executors.newCachedThreadPool().submit(() -> "hello Observable");
        Observable<String> from = Observable.from(future);
        from.subscribe(
                item -> System.out.println("下一个元素是:" + item),
                ex -> System.err.println("异常信息是:" + ex.getMessage()),
                () -> System.out.println("结束")
        );
    }
}

案例3:

        除了简单的创建功能,还可以通过组合其他 Observable 实例来创建 Observable流,这可以轻松实现非常复杂的工作流。

        例如,每个传入流的 concat()操作符会通过将每个数据项重新发送到下游观察者的方式来消费所有数据项。然后,传入流将被处理,直到发生终止操作(onComplete(),onError()),并且其处理顺序会与 concat()方法中参数的顺序保持一致。以下代码展示了 concat()用法的示例:

@Test
public void test8() {
    Observable.concat(
        Observable.just("hello"),
        Observable.from(new String[] {
            "Observable"
        }),
        Observable.just("!")
    ).forEach(
        item -> System.out.println("下一个元素是:" + item),
        ex -> System.err.println("异常信息是:" + ex.getMessage()),
        () -> System.out.println("结束")
    );
}

        这里,作为几个 Observable 实例(使用不同来源)直接组合的一部分,我们还使用Observable.forEach()方法以类似于 Java 8 Stream API 的方式遍历结果。这样的程序生成以下输出:

        请注意,虽然不为异常定义处理程序很方便,但在发生错误的情况下,默认的Subscriber 实现仍会抛出rx.exceptions.OnErrorNotImplementedException。

1.2.3、生成异步序列

        RxJava 不仅可以生成一个未来的事件,还可以基于时间间隔等生成一个异步事件序列,示例代码如下所示:

package com.blnp.net.observerdemo.demo;

import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import rx.Observable;

import java.util.concurrent.TimeUnit;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 16:24
 */
public class AsyncSequenceTest {

    @SneakyThrows
    @Test
    public void test1() {
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(System.out::println);
        Thread.sleep(5000);
    }
}

        如果删除 Thread.sleep(...),那么应用程序将在不输出任何内容的情况下退出。因为生成事件并进行消费的过程发生在一个单独的守护线程中。因此,为了防止主线程完成执行,我们可以调用 sleep()方法或执行一些其他有用的任务。Subscription可以控制观察者−订阅者协作,该接口声明如下:

public interface Subscription {

    /**
     * 用途:取消订阅
     * @author liaoyibin
     * @since 16:28 2024/10/9
     * @params []
     * @param
     * @return void
    **/
    void unsubscribe();

    /**
     * 用途:检查 Subscriber是否仍在等待事件
     * @author liaoyibin
     * @since 16:29 2024/10/9
     * @params []
     * @param
     * @return boolean
    **/
    boolean isUnsubscribed();
}

        为了便于理解前面提到的取消订阅功能,请假设这种情况:订阅者是唯一对事件感兴趣的一方,并且订阅者会消费它们直到 CountDawnLatch 发出一个外部信号。传入流每 100 毫秒生成一个新事件,而这些事件会产生无限序列,即 0, 1, 2, 3...。以下代码不仅演示了在定义响应式流时如何获取一个Subscription,还展示了如何取消对流的订阅。

package com.blnp.net.observerdemo.demo;

import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;
import rx.Observable;
import rx.Subscription;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 16:24
 */
public class AsyncSequenceTest {

    @SneakyThrows
    @Test
    public void test1() {
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(System.out::println);
        Thread.sleep(5000);
    }

    @Test
    public void test2() throws InterruptedException {
        //计数器
        CountDownLatch latch = new CountDownLatch(1);
        Subscription subscription = Observable.interval(100, TimeUnit.MILLISECONDS)
                .subscribe(System.out::println);

        // 启动新的线程,用于计数
        new Thread(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 如果订阅票据还在订阅状态,则取消订阅
            if (!subscription.isUnsubscribed()) {
                subscription.unsubscribe();
            }
            latch.countDown();
        }).start();
        System.out.println("======");

        // 主线程等待
        latch.await();
        System.out.println("------");
    }
}

        订阅者在此处接收事件 0, 1, 2, 3,之后,latch调用发生,这会导致订阅取消。此时,响应式编程包含一个 Observable 流、一个 Subscriber,以及一个订阅票据:Subscription。该 Subscription 会传达 Subscriber 从 Observable 生产者处接收事件的意图。

1.2.4、操作符

        响应式编程包含一个 Observable 流、一个 Subscriber,以及订阅票据 Subscription。该 Subscription 会传达 Subscriber 从 Observable 生产者处接收事件的意图。
        下面通过操作符对流过响应式流的数据进行转换。RxJava 的整体功能仍隐藏在它的操作符中。操作符用于调整流的元素或更改流结构本身。RxJava 为几乎所有可能的场景提供了大量的操作符,但是多数其他操作符只是这些基本操作符的组合。

1、map 操作符

        RxJava 中最常用的操作符是 map,它具有以下签名:

public final < R > Observable < R > map(Func1 < ? super T, ? extends R > func)

        func 函数可以将 T 对象类型转换为 R 对象类型,并且应用 map 将Observable<T>转换为 Observable<R>。使用弹珠图(marble diagram)描述操作符复杂的转换行为:

        上图的map 操作符:通过对每个数据项应用函数来转换 Observable 发出的数据,map 执行一对一的转换。此外,输出流具有与输入流相同数量的元素。

package com.blnp.net.observerdemo.demo.operator;

import lombok.var;
import rx.Observable;

import java.util.Arrays;

/**
 * <p>map 操作符测试用例</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 16:50
 */
public class MapMainTest {

    public static void main(String[] args) {
        Observable<Integer> just = Observable.just(1, 2, 3, 4, 5);
        just.map(item -> {
            var strings = new String[item];
            Arrays.fill(strings, "a");
            return strings;
        }).forEach(item -> System.out.println(Arrays.toString(item)));
    }
}

2、filter 操作符

        与 map 操作符相比,filter 操作符所产生的元素可能少于它所接收的元素。它只发出那些已成功通过谓词测试的元素,如下图所示:

        上图的filter 操作符:仅发出通过谓词测试的 Observable 中的数据项

package com.blnp.net.observerdemo.demo.operator;

import lombok.SneakyThrows;
import rx.Observable;

import java.util.concurrent.TimeUnit;

/**
 * <p>filter 操作符测试用例</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 16:50
 */
public class FilterMainTest {

    @SneakyThrows
    public static void main(String[] args) {
        Observable.interval(1, TimeUnit.SECONDS)
                .filter(item -> item % 2 == 0)
                .subscribe(System.out::println);
        Thread.sleep(10000);
    }
}

3、count 操作符

        count 操作符自描述性很强,它发出的唯一值代表输入流中的元素数量。但是,count 操作符只在原始流结束时发出结果,因此,在处理无限流时,count 操作符将不会完成或返回任何内容,如下图所示:

注意:该弹珠图的操作符不是java实现中的。

        count 操作符:计算 Observable 源发出的数据项数,并仅发出该值。

package com.blnp.net.observerdemo.demo.operator;

import lombok.var;
import rx.Observable;

import java.util.ArrayList;

/**
 * <p>count 操作符测试用例</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 16:51
 */
public class CountMainTest {

    public static void main(String[] args) {
        var list = new ArrayList<Integer>();
        for (int i = 0; i < 1000; i++) {
            list.add(i);
        }
        Observable.from(list)
                .filter(item -> item % 2 == 0)
                .count()
                .subscribe(item -> {
                    System.out.println(Thread.currentThread().getName());
                    System.out.println(item);
                });
    }
}
4、zip 操作符

        该操作符具有更复杂的行为,因为它会通过应用 zip函数来组合来自两个并行流的值。它通常用于填充数据,且特别适用于部分预期结果从不同源获取的情况,如下图所示:

        zip 操作符:通过指定的函数将多个 Observable 发送的元素组合在一起,并根据此函数的结果为每个组合发出单个数据项。简单起见,我们用 zip 将两个字符串流拼接,代码如下所示:

package com.blnp.net.observerdemo.demo.operator;

import rx.Observable;

/**
 * <p>zip 操作符测试用例</p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 16:51
 */
public class ZipMainTest {

    public static void main(String[] args) {
        Observable.zip(
                Observable.just(1,2,3,4,5),
                Observable.just("a", "b", "c", "d", "e"),
                (a, b) -> a + b)
                .forEach(System.out::println);
    }
}

        访问 站点(访问不稳定) 查看响应式编程更多操作符的使用。该站点包含反映实际操作符行为的交互式图表。交互式 UI 使我们能根据每个事件在流中出现的顺序和时间来将事件的转换可视化。

请注意,该站点本身是使用 RxJS库构建的,而该库是 RxJava 在 JavaScript 中的对应。

1.2.5、RxJava 的先决条件和优势

        在生产者和订阅者之间通常存在一些订阅信息,这些信息使打破生产者−消费者关系成为可能。这种方式非常灵活,并使我们可以控制生产和消费的事件数量,节省 CPU 时间(CPU 时间通常会浪费在创建永远不会用到的数据上)。

        为了证明响应式编程提供了节省资源的能力,请假设我们需要实现一个简单的内存搜索引擎服务。该服务应该返回一个 URL 集合,其中的 URL 链接到包含了所需短语的文档。通常,客户端应用程序(Web 或移动应用程序)也会传入一些限制条件,例如有效结果的最大返回量。如果没有响应式编程,我们可能使用以下 API 设计此类服务:

package com.blnp.net.observerdemo.demo;

import java.net.URL;
import java.util.List;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 17:47
 */
public interface SearchEngine {

    /**
     * 用途:将查询结果限定为limit条,返回结果
     * @author liaoyibin
     * @since 17:47 2024/10/9
     * @params [query, limit] 
     * @param query 查询条件
     * @param limit 查询条数
     * @return java.util.List<java.net.URL>
    **/
    List<URL> search(String query, int limit);
}

        此时,即使有人在客户端结果界面只选择了第一个或第二个结果,服务的客户端也会收到整个结果集。在这种情况下,虽然我们的服务做了很多工作,客户端也已经等了很长时间,客户端却忽略了大部分结果。这无疑是一种资源浪费。

        我们可以通过遍历结果集来对搜索结果进行处理。因此,只要客户端继续消费它们,服务器就会搜索下一个结果项。通常,服务器的搜索过程不是针对每一行,而是针对某些固定大小(比方说 100 项)。在客户端,结果以迭代器的形式表示。

package com.blnp.net.observerdemo.demo;

import java.net.URL;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 17:59
 */
public interface IterableSearchEngine {

    /**
     * 用途:迭代器查询检索
     * @author liaoyibin
     * @since 17:59 2024/10/9
     * @params [query, limit]
     * @param query
     * @param limit
     * @return java.lang.Iterable<java.net.URL>
    **/
    Iterable<URL> search(String query, int limit);
}

        迭代器的唯一缺点是,客户端的线程在主动等待新的数据时会产生阻塞。该交互方式效率不高,不足以构建高性能应用程序。

        搜索引擎可以返回 CompletableFuture 以构建异步服务。此时,客户端线程可以做一些有用的事情,而不会搅乱搜索请求,因为服务会在结果到达时立即执行回调。但是在这里我们同样要么收到全部结果,要么收不到结果,因为 CompletableFuture 只能包含一个值,即使所包含的值是一个结果列表,也是如此。代码如下所示:

package com.blnp.net.observerdemo.demo;

import java.net.URL;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * <p></p>
 *
 * @author lyb 2045165565@qq.com
 * @version 1.0
 * @since 2024/10/9 18:20
 */
public interface FutureSearchEngine {

    /**
     * 用途:
     * @author liaoyibin
     * @since 18:20 2024/10/9
     * @params [query, limit] 
     * @param query 
     * @param limit 
     * @return java.util.concurrent.CompletableFuture<java.util.List<java.net.URL>>
    **/
    CompletableFuture<List<URL>> search(String query, int limit);
}

        通过使用 RxJava,返回一个流。同时客户端可以随时取消订阅(即unsubscribe()),减少搜索服务处理过程中所需完成的工作量。代码如下所示:

import rx.Observable;
import java.net.URL;
public interface RxSearchEngine {
    /**
     * 搜索
     * @param query
     * @return
     */
    Observable < URL > search(String query);
}

        RxJava 使以更加通用和灵活的方式异步组合数据流成为可能。也可以将旧式同步代码包装到异步工作流中。要调用速度比较慢的 Callable ,可以使用 subscriberOn(Scheduler)操作符。该操作符定义启动流处理的 Scheduler(Java 中ExecutorService 的响应式对应类)。

String query = "";
Observable.fromCallable(() -> doSlowSyncRequest(query))
    .subscribeOn(Schedulers.io())
    .subscribe(this::processResult);

        使用这种方法,不能依赖一个线程来处理整个请求,会阻塞。采用这种方法对可变对象有害,因为是多线程,唯一合理的策略是采用不变性(immutability)。
        不变性不是一个新概念,它是函数式编程(functional programming)的核心原则之一。对象一旦被创建,就不会更改。这样一条简单的规则可以防止并行应用程序中可能出现的一大类问题。

        在Java 8 引入lambda之前,没有lambda,就必须创建许多匿名类或内部类,这些类会污染应用程序代码,并且它们创建的样板代码多于有效代码。在RxJava创建之初,尽管速度很慢,但Netflix仍广泛使用Groovy进行开发,这主要是因为Groovy支持lambda。

1.3、Spring 响应式实现

1.3.1、Spring WebFlux

        Spring Framework 5添加新模块 spring-web-reactive ,使用响应式非阻塞引擎支持类似SpringMVC的@Controller编程模型。下图是Spring MVC与Spring Web Reactive的关系对比:

        Spring Web Reactive使用Servlet 3.1非阻塞特性。也可以运行于非Servlet运行时,如Netty和Undertow等。
        对每个运行时适配了一组公共的响应式 ServerHttpRequest 和 ServerHttpResponse 抽象,以Flux<DataBuffer> 的形式暴露请求和响应,读写完全支持背压。

  • spring-core: 模块提供了 Encoder 和 Decoder 契约,用于对 Flux 的数据进行序列化和反序列化。
  • spring-web: 模块添加了JSON和XML的实现,用于web应用或其他的SSE流和零拷贝文件传输。
  • spring-web-reactive: 模块包含了Spring Web Reactive框架以支持@Controller编程模型。

        重新定义了很多Spring MVC的契约,如 HandlerMapping 和 HandlerAdapter 以支持异步和非阻塞,响应式地操作HTTP的请求和响应。
        Spring MVC和Spring Web Reactive不共享任何代码,处理逻辑有很多是共通的。跟Spring MVC的编程模型一样,但是支持响应式类型并且以响应式的方式执行。下述类型都可以作为控制器方法的@RequestBody参数来使用:

  • Account account : account在调用控制器之前非阻塞地反序列化。
  • Mono<Account> account :控制器使用 Mono 声明执行的逻辑,当account反序列化之后执行。
  • Single<Account> account :跟 Mono 一样,但是使用RxJava执行引擎。
  • Flux<Account> accounts : 输入流场景
  • Observable<Account> accounts :使用RxJava的输入流

返回值类型:

  • Mono<Account> : 当Mono结束,非阻塞地序列化给定的Account对象
  • Singe<Account> : 跟Mono的一样,但是使用RxJava执行引擎。
  • Flux<Account> :流场景,根据请求content type的不同,有可能是SSE。
  • Flux<SseEvent> : SSE 流。
  • Observable<SseEvent> : 使用RxJava执行引擎的SSE流。
  • Mono<Void> : 当Mono结束,请求处理结束。
  • void :当方法返回,请求处理结束。表示同步、非阻塞的控制器方法。
  • Account : 非阻塞地序列化给定的Account,表示同步、非阻塞控制器方法。

1.3.2、WebSocket

        最知名的全双工客户端−服务器通信双工协议,即WebSocket。WebSocket 协议的通信于2013 年初引入到Spring 框架中,旨在进行异步消息发送,但其实际的实现仍然有一些阻塞操作。

        例如,将数据写入I/O 或从I/O 读取数据仍然是阻塞操作,因此这二者都会影响应用程序的性能。WebFlux 模块为WebSocket 引入了改进版本的基础设施。WebFlux 同时提供客户端和服务器基础设施。

1、服务端 API

        WebFlux 提供 WebSocketHandler 作为处理WebSocket 连接的核心接口。该接口有一个名为handle 的方法,它接收WebSocketSession。WebSocketSession 类表示客户端和服务器之间的成功握手,并提供对包括有关握手、会话属性和传入数据流的信息的访问。使用echo 消息响应发送者的示例:

class EchoWebSocketHandler implements WebSocketHandler {
    @Override
    public Mono < Void > handle(WebSocketSession session) {
        return session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .map(tm -> "Echo: " + tm)
            .map(session::textMessage)
            .as(session::send);
    }
}

        上述代码表示接收入站消息,并转换,然后封装为WebSocketMessage对象,发送出去。其中发送返回Mono<Void>,当写出完成,该Mono完成。

        WebSocketMessage 是DataBuffer 的包装器,它提供了额外功能,例如将以字节为单位的有效负载转换为文本。一旦提取了传入消息,我们在该文本前面加上“Echo:”后缀,将新文本消息包装在WebSocketMessage 中,并使用WebSocketSession#send 方法将其发送回客户端。这里,send 方法接受Publisher<WebSocketMessage>并返回Mono<Void>作为结果。

        因此,通过使用Reactor API 中的as 操作符,我们可以将Flux 视为Mono<Void>,并使用session::send 作为转换函数。以下代码为此类配置的示例:

@Configuration
public class WebSocketConfiguration {
    @Bean
    public HandlerMapping handlerMapping() {
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(Collections.singletonMap("/ws/echo", new EchoWebSocketHandler()));
        // 为了在其他HandlerMapping实例之前处理SimpleUrlHandlerMapping,它应该具有更高的优先级
        mapping.setOrder(-1);
        return mapping;
    }
    @Bean
    public HandlerAdapter handlerAdapter() {
        // 将HTTP 连接升级到WebSocket,然后调用了WebSocketHandler#handle 方法
        return new WebSocketHandlerAdapter();
    }
}
2、客户端 API

        与WebSocket 模块(基于Web MVC)不同,WebFlux 还为我们提供了客户端支持。要发送WebSocket 连接请求,可以使用WebSocketClient 类。WebSocketClient 有两个执行WebSocket连接的核心方法,如下代码:

public interface WebSocketClient {
    Mono < Void > execute(
        URI url,
        WebSocketHandler handler
    );
    Mono < Void > execute(
        URI url,
        HttpHeaders headers,
        WebSocketHandler handler
    );
}

        WebSocketClient 使用相同的WebSockeHandler 接口来处理来自服务器的消息并发回消息。有一些WebSocketClient 实现与服务器引擎相关,例如TomcatWebSocketClient实现或JettyWebSocketClient 实现。在下面的示例中,查看ReactorNettyWebSocketClient:

/**
 * 需要添加VM选项:
 * --add-opens java.base/jdk.internal.misc=ALL-UNNAMED
 * -Dio.netty.tryReflectionSetAccessible=true
 * --illegal-access=warn
 * @param args
 * @throws InterruptedException
 */
public static void main(String[] args) throws InterruptedException {
    WebSocketClient client = new ReactorNettyWebSocketClient();
    client.execute(URI.create("http://localhost:8080/ws/echo"),
        session -> {
            session.receive()
            .map(WebSocketMessage::getPayloadAsText)
            .subscribe(System.out::println);
            return Flux.interval(Duration.ofMillis(500))
                .map(String::valueOf)
                .map(session::textMessage)
                .as(session::send);
        }
    ).subscribe();
    Thread.sleep(5000);
}

        前面的示例展示了如何使用ReactorNettyWebSocketClient 连接WebSocket 并开始向服务器定期发送消息。

3、对比 WebFlux Websocket

        Spring WebSocket 模块的主要缺点是它阻塞了与I/O的交互,而Spring WebFlux 提供了完全无阻塞的写入和读取。WebFlux 模块通过使用响应式流规范和Project Reactor 提供了更好的流抽象。旧WebSocket 模块中的WebSocketHandler 接口只允许一次处理一条消息。此外,WebSocketSession#sendMessage 方法仅允许以同步方式发送消息。

        旧Spring WebSocket 模块的一个关键特性就是它与Spring Messaging 模块的良好集成,而这能用@MessageMapping 注解来声明WebSocket 端点。以下代码展示了旧WebSocket API 的简单示例,这些API 基于Web MVC,且使用SpringMessaging 中的注解:

@Controller
public class GreetingController {
    @MessageMapping("/hello")
    @SendTo("/topic/greetings")
    public Greeting greeting(HelloMessage message) {
        return new Greeting("Hello, " + message.getName() + "!");
    }
}

        上述代码展示了我们如何使用Spring Messaging 模块声明WebSocket 端点。遗憾的是,WebFlux模块的WebSocket 集成缺少此类支持,为了声明复杂的处理程序,必须提供自己的基础设施。

4、响应式 SSE

        与重量级WebSocket 一起,HTML5 引入了一种创建静态(在本例中为半双工)连接的新方法,其中服务器能够推送事件。该技术解决了与WebSocket 类似的问题。
        例如,可以使用相同的基于注解的编程模型声明服务器发送事件(Server-Sent Events,SSE)流,但是返回一个无限的ServerSentEvent 对象流,如以下示例所示:

@RestController
@RequestMapping("/sse/stocks")
class StocksController {
    final Map < String, StocksService > stocksServiceMap;
    //...
    @GetMapping
    public Flux < ServerSentEvent < ? >> streamStocks() {
        return Flux
            .fromIterable(stocksServiceMap.values())
            .flatMap(StocksService::stream)
            . < ServerSentEvent < ? >> map(item ->
                ServerSentEvent
                .builder(item)
                .event("StockItem")
                .id(item.getId())
                .build()
            )
            .startWith(
                ServerSentEvent
                .builder()
                .event("Stocks")
                .data(stocksServiceMap.keySet())
                .build()
            );
    }
}
  1. 这是@RestController 类的声明。为了简化代码,我们跳过了构造函数和字段初始化部分。
  2. 在这里,我们声明处理程序方法,该方法使用熟悉的@GetMapping 注解。streamStocks 方法返回ServerSentEvent 的Flux,这意味着当前处理程序启用了事件流。然后,我们合并所有可用的股票来源和流更改到客户端。之后,应用映射,将每个StockItem 映射到ServerSentEvent,这里使用了静态builder 方法。为了正确设置ServerSentEvent 实例,我们在构建器参数中提供事件ID 和事件名称,它允许在客户端区分消息。此外,使用特定的ServerSentEvent 实例启动Flux,它向客户端声明可用的股票通道。

        正如上述示例所示,Spring WebFlux 能映射Flux 响应式类型的流特性,并向客户端发送无限的股票事件流。此外,SSE 流不要求我们更改API 或使用其他抽象。它只需要我们声明一个特定的返回类型,以帮助框架找出处理响应的方法。我们不必声明ServerSentEvent 的Flux,我们可以直接提供内容类型,如下例所示:

@GetMapping(produces = "text/event-stream")
public Flux < StockItem > streamStocks() {
    // ...
}

        在这种情况下,WebFlux 框架在内部将流的每个元素包装到ServerSentEvent 中。正如上述示例所示,ServerSentEvent 技术的核心优势在于这种流模型的配置不需要额外的样板代码,而在WebFlux 中采用WebSocket 时则需要这些样板代码。这是因为SSE 是一种基于HTTP 的简单抽象,既不需要协议切换,也不需要特定的服务器配置。

        如上述示例所示,我们可以使用@RestController和@XXXMapping 注解的传统组合来配置SSE。但是,对于WebSocket而言,我们需要自定义消息转换配置,例如手动选择特定的消息传递协议。相比之下,Spring WebFlux 为SSE 提供的消息转换器配置与典型REST 控制器提供的相同。

        另外,SSE 不支持二进制编码并将事件限制为UTF-8 编码。这意味着WebSocket 可能对较小的消息有用,并且在客户端和服务器之间传输的流量较少,因此具有较低的延迟。

1.3.3、RSocket

1、简介

        RSocket是一个应用通信协议,用在多路复用全双工通信中。可以在TCP、WebSocket或其他字节流传输中使用。提供了如下交互模型:

  • Request-Response :发送一个消息,接收一个消息
  • Request-Stream : 发送一个消息,接收返回的消息流
  • Channel : 双向发送消息流
  • Fire-and-Forget : 发送单向消息

        建立初始连接之后,就没有客户端服务端的概念了,因为双方地位对等,都可以初始化交互。因此,RSocket中只有请求者和响应者,而没有客户端和服务端的概念,交互称为“请求流”或简单地称为“请求们”。

RSocket协议的关键特性和优势:

  • 跨网络边界的响应式流语义 :对于诸如“请求流”和“通道”之类的流请求,背压信号在请求者和响应者之间传播,从而允许请求者放慢源处的响应者的速度,从而减少了对网络层拥塞控制的依赖以及在网络级别或任何级别缓冲。
  • Request throttling : 可以从两端发送的“ LEASE”帧,因此命名为“ Leasing”,以限制给定时间内另一端允许的请求总数。 租约定期更新。
  • Session恢复 : 这是专为断开连接而设计的,用于维护会话的状态。 状态管理对于应用程序是透明的,并且可以与背压结合使用,从而可以在可能的情况下停止生产者并减少所需的状态量。
  • 对大消息的分割和再组装。
  • Keepalive(心跳)
2、协议流程
2.1、建立连接

        最初,客户端通过一些低级流传输(例如TCP或WebSocket)连接到服务器,并向服务器发送“SETUP”帧以设置连接参数。
        服务器可以拒绝“ SETUP”帧,但是通常在发送(对于客户端)和接收(对于服务器)之后,双方都可以开始发出请求,除非“ SETUP”指示使用租赁语义来限制数量。在这种情况下,双方都必须等待另一端的“租约”帧以允许发出请求。

2.2、发起请求

        一旦建立连接,双方就可以通过帧“ REQUEST_RESPONSE”,“ REQUEST_STREAM”,“REQUEST_CHANNEL”或“ REQUEST_FNF”之一发起请求。 这些帧中的每一个都将一个消息从请求者传送到响应者。
        响应者然后可以返回带有响应消息的“ PAYLOAD”帧,并且在“ REQUEST_CHANNEL”的情况下,请求者还可以发送带有更多请求消息的“ PAYLOAD”帧。
        当请求涉及诸如“请求流”和“通道”之类的消息流时,响应者必须遵守来自请求者的需求信号。需求表示为许多消息。 初始需求在“ REQUEST_STREAM”和“ REQUEST_CHANNEL”框架中指定。 随后的需求通过“ REQUEST_N”帧发出信号。每一端还可以通过“ METADATA_PUSH”帧发送元数据通知,该元数据通知与任何单独的请求无关,而与整个连接有关。

2.3、消息格式

        RSocket消息包含数据和元数据。 元数据可用于发送路由,安全令牌等。数据和元数据的格式可以不同。 每个类的Mime类型都在“ SETUP”框架中声明,并应用于给定连接上的所有请求。
        尽管所有消息都可以具有元数据,但通常每个请求都包含诸如路由之类的元数据,因此仅包含在请求的第一条消息中,即带有帧“ REQUEST_RESPONSE”,“ REQUEST_STREAM”,“ REQUEST_CHANNEL”或“ REQUEST_FNF”之一 。协议扩展定义了用于应用程序的通用元数据格式:

  • Composite Metadata-- 多个独立格式化的元数据条目。
  • Routing — 请求的路由
3、Java 实现

        RSocket的Java实现基于Project Reactor构建。 TCP和WebSocket的传输建立在Reactor Netty上。作为响应式库,Reactor简化了实现协议的工作。对于应用程序,自然要配合使用带有声明性运算符和透明背压支持的“ Flux”和“ Mono”。

        RSocket Java中的API设计为最小且基本的。它着重于协议功能,而将应用程序编程模型(例如RPC代码生成与其他)作为一个更高级别的独立关注点。

        主合同io.rsocket.RSocket对四种请求交互类型进行建模,其中“ Mono”表示对单个消息的承诺,“Flux”表示消息流,而“ io.rsocket.Payload”表示实际消息,可以访问数据和元数据作为字节缓冲区。RSocket合约是对称使用的。为了进行请求,应用程序被赋予了一个“ RSocket”来执行请求。为了响应,应用程序实现了“ RSocket”来处理请求。

        在大多数情况下,Spring应用程序不直接使用其API。但是,独立于Spring查看或试验RSocket可能很重要。 RSocket Java存储库包含许多示例应用程序,以演示其API和协议功能。

4、Spring Support

        spring-messaging 模块包含如下内容:

  • RSocketRequester :流式API,使用 io.rsocket.RSocket 对数据和元数据编码解码,发起请求。
  • Annotated Responders :  @MessageMapping 注解的用于处理请求的处理器方法。

        spring-web 模块包含了 Encoder 和 Decoder 的实现,如Jackson的CBOR/JSON,以及Protobuf。也包含了 PathPatternParser 以可插拔的方式,高效处理路径匹配。
        Spring Boot 2.2支持通过TCP或WebSocket建立RSocket服务器,包括在WebFlux服务器中通过WebSocket公开RSocket的选项。 还为RSocketRequester.Builder和RSocketStrategies提供客户端支持和自动配置。

  1. Spring Security 5.2提供了RSocket支持。
  2. Spring Integration 5.2提供了入站出站网关用于RSocket客户端和服务端的交互。
  3. Spring Cloud Gateway支持RSocket连接。

1.3.4、WebClient

        Spring Framework 5在 RestTemplate 之外添加了新的响应式 WebClient 。每个支持的HTTP客户端适配了一组公共的响应式 ClientHttpRequest 和 ClientHttpResponse 抽象,以 Flux<DataBuffer> 的形式对外暴露请求和响应,读写完全支持背压。

        spring-core 提供了 Encoder 和 Decoder 抽象,用于客户端的Flux字节进行序列化和反序列化。WebClient 示例程序:

ClientHttpConnector connector = new ReactorClientHttpConnector();
WebClient.builder().clientConnector(connector).build()
    .get()
    .uri(URI.create("https://blog.csdn.net/"))
    .accept(MediaType.TEXT_HTML)
    .retrieve()
    .bodyToMono(String.class)
    .subscribe(System.out::println);
Thread.sleep(10000);

2、Project Reactor 介绍

2.1、Spring WebFlux 与 Project Reactor

        Spring Framework从版本5开始,基于Project Reactor支持响应式编程。Project Reactor是用于在JVM上构建非阻塞应用程序的Reactive库,基于Reactive Streams规范。Project Reactor是Spring生态系统中响应式的基础,并且与Spring密切合作进行开发。Spring WebFlux要求Project Reactor作为核心依赖项

2.1.1、模块

        Project Reactor由Reactor文档中列出的一组模块组成。主要组件是Reactor Core,其中包含响应式类型Flux和Mono,它们实现了Reactive Stream的Publisher接口以及一组可应用于这些类型的运算符。其他一些模块是:

  • Reactor Test —— 提供一些实用程序来测试响应流。
  • Reactor Extra —— 提供一些额外的Flux运算符。
  • Reactor Netty —— 无阻塞且支持背压的TCP,HTTP和UDP的客户端和服务器。
  • Reactor Adapter —— 用于与其他响应式库(例如RxJava2和Akka Streams)的适配。
  • Reactor Kafka —— 用于Kafka的响应式API,作为Kafka的生产者和消费者。

2.1.2、并发模型

        有两种在响应式链中切换执行某些的方式: publishOn 和 subscribeOn 。区别如下:

  • publishOn(Scheduler scheduler) ——影响所有后续运算符的执行(只要未指定其他任何内容)
  • subscribeOn(Scheduler scheduler) ——根据链中最早的subscribeOn调用,更改整个操作符链所订阅的线程。它不影响随后对publishOn的调用的行为。

Schedulers类包含用于提供执行上下文的静态方法:

  • parallel() :为并行工作而调整的固定工作池,可创建与CPU内核数量一样多的工作线程池。
  • single() :单个可重用线程。此方法为所有调用方重用同一线程,直到调度程序被释放为止。如果您希望使用按呼叫专用线程,则可以为每个呼叫使用Schedulers.newSingle()。
  • boundedElastic() :动态创建一定数量的工作者。它限制了它可以创建的支持线程的数量,并且可以在线程可用时重新调度要排队的任务。这是包装同步阻塞调用的不错选择。
  • immediate() :立即在执行线程上运行,而不切换执行上下文。
  • fromExecutorService(ExecutorService) :可用于从任何现有ExecutorService中创建调度程序。

2.2、Reactor 1.x 版本

案例1:

public static void main(String[] args) {
    // 创建Environment实例。
    // Environment实例是执行上下文,负责创建特定的Dispatcher。
    // 可以提供不同类型的分派程序,范围包括进程间分派到分布式分派。
    Environment env = new Environment();
    /*
    创建Reactor实例,它是Reactor模式的直接实现。
    我们使用Reactors类创建Reactor实例。
    使用基于RingBuffer结构的Dispatcher预定义实现。
    */
    Reactor reactor = Reactors.reactor()
        .env(env)
        .dispatcher(Environment.RING_BUFFER)
        .get();
    // 声明通道Selector和Event消费者声明。注册一个事件处理程序:打印
    // 通过字符串选择器进行过滤,该字符串选择器指示事件通道的名称。
    // Selectors.$提供了更全面的标准选择,因此事件选择的最终表达式可能更复杂。
    reactor.on($("channel"), event -> System.out.println(event.getData()));
    /*
    底层实现中,事件由Dispatcher进行处理,然后发送到目的地。
    根据Dispatcher的实现,可以同步或异步处理事件。
    这提供了一种功能分解,并且通常以与Spring框架事件处理方法类似的方式工作。
    */
    Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
        // 给通道发送通知
        reactor.notify("channel", Event.wrap("test"));
    }, 0, 100, TimeUnit.MICROSECONDS);
}

案例2:

public static void main(String[] args) {
    Environment env = new Environment();
    /*
    Reactor实例是一个事件网关,允许其他组件注册事件消费者,这些事件消费者随后会得到事件
    的通知。
    消费者一般通过Selector进行注册,通过匹配通知的key,消费事件。
    Reactor得到事件通知时,Reactor通过Dispatcher分发任务
    任务在线程中执行。
    根据Dispatcher实现的不同,线程的调度不同。
    */
    Reactor reactor = Reactors.reactor(env);
    // on方法使用指定的Selector将Stream关联到Observable
    Stream < String > stream = Streams.on(reactor, $("channel"));
    stream.map(s -> "hello lagou - " + s)
        .distinct() // 对连续的相同值进行去重
        .filter((Predicate < String > ) s -> s.length() > 2)
        .consume(System.out::println);
    // 使用指定的环境创建一个延迟流
    // 第一个泛型表示值类型
    // 第二个泛型表示可以消费值的消费者类型
    Deferred < String, Stream < String >> input = Streams.defer(env);
    // 获取Composable的子类,用于消费异常和值
    Stream < String > compose = input.compose();
    compose.map(m -> m + " = hello lagou")
        .filter((Function < String, Boolean > ) s -> s.contains("123"))
        .map(Event::wrap) // 将数据封装为事件
        // reactor.prepare方法用于创建一个优化的路径,给指定的key广播事件通知
        .consume(reactor.prepare("channel")); // 给当前Composable关联一个消费者, 消费composable的数据
    for (int i = 0; i < 1000; i++) {
        // 接收指定的值,让底层的Composable可以消费
        input.accept(UUID.randomUUID().toString());
    }
}

        通过与Spring框架的完美集成以及与Netty的组合,非常适合开发具备异步和非阻塞消息处理的高性能系统。Reactor 1.x的缺点:

  1. 该库没有背压控制。除了阻塞生产线程或跳过事件之外,事件驱动的Reactor 1.x并没有提供控制背压的方法。
  2. 错误处理非常复杂。Reactor 1.x提供了几种处理错误和失败的方法,但是使用比较复杂。

2.3、Reactor 2.x 版本

        在 Reactor 设计中,最重要的变化是将事件总线和流功能提取到单独的模块中。此外,深度的重新设计使新的 Reactor Streams 库完全符合响应式流规范。Reactor 团队大大改进了 Reactor 的API。例如,新的 Reactor API 与 Java Collections API 具有更好的集成性。

        在第二个版本中,Reactor 的 Streams API 变得更加类似于 RxJava API。除了用于创建和消费流的简单附加组件,它还在背压管理、线程调度和回弹性支持方面添加了许多有用的补充。

2.4、Reactor 3.x 版本

        Reactor事件总线在2中得到了改进。首先负责发送消息的Reactor对象被重命名为EventBus。该模块也经过重新设计以支持响应式流规范。

        Maldini和Karnok将他们对RxJava和Project Reactor的想法和经验浓缩为一个名为reactive-stream-commons的库。后来该库成为Reactor 2.5的基础,并最终演变为Reactor 3.x。

        经过一年的努力,Reactor 3.0发布。与此同时,一个完全相同的RxJava 2.0也浮出水面。RxJava与Reactor 3.x的相似性高于与其前身RxJava 1.x的相似性。这些最显著的区别是RxJava针对java6(包括安卓的支持),而Reactor 3选择java8作为基线。同时Reactor 3.x塑造了Spring 5框架的响应式变种。该库支持所有常见的背压传播模式:

  1. 仅推送:当订阅者通过subscription.request(Long.MAX_VALUE)请求有效无限数量的元素时。
  2. 仅拉取:当订阅者通过subscription.request(1)仅在收到前一个元素后请求下一个元素时。
  3. 拉-推(混合):当订阅者有实时控制需求,且发布者可以适应所提出的数据消费速度时。

        为适配不支持推-拉式操作模型的旧API,Reactor提供了许多老式背压机制,包括缓冲、开窗、消息丢弃、启动异常等。某些情况下,上述策略甚至可以用于在实际需求出现之前预取数据,从而提高系统的响应性。

        此外,Reactor API还提供了足够的工具用于消除用户活动的尖峰并防止系统过载。Project Reactor在设计上旨在对并发透明,因此它不会强制执行任何并发模型。同时,它提供了一组有用的调度程序,它们几乎能以任何形式管理执行线程,如果所提出的所有调度程序都不符合要求,开发人员可以基于完全的低阶控制来创建自己的调度程序。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2213948.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Nginx(Linux):服务器版本升级和新增模块

目录 1、概述2、使用Nginx服务信号完成Nginx升级2.1 备份当前版本的Nginx2.2 向服务器导入新的Nginx2.3 向服务器导入新的Nginx2.4 停止老版本Nginx 3、使用Nginx安装目录的make命令完成升级3.1 备份当前版本的Nginx3.2 向服务器导入新的Nginx3.3 执行更新命令 1、概述 如果想…

24最新ComfyUI插件与Lora的下载及使用指南!

前言 本节我们介绍ComfyUI插件和Lora的下载及使用方式。 1. 安装 1.1 Checkpoint安装 将从前面介绍的模型下载平台下载后&#xff0c;放在ComfyUI/models/checkpoints文件夹下。 所有的AI设计工具&#xff0c;安装包、模型和插件&#xff0c;都已经整理好了&#xff0c;&am…

【vue自定义指令】骨架屏指令

场景 预加载的过程中&#xff0c;数据还未请求到&#xff0c;dom已经渲染出来了&#xff1f; 展示效果 实现 封装指令&#xff08;代码块1&#xff09; app引入&#xff08;代码块2&#xff09;使用&#xff08;代码块3&#xff09; 代码 封装 ​ import { reactive, wa…

Spark全网最全总结

Spark 产生之前&#xff0c;已经有 MapReduce 这类非常成熟的计算系统存在了&#xff0c;并提供 了高层次的 API(map/reduce)&#xff0c;把计算运行在集群中并提供容错能力&#xff0c;从而实现 分布式计算。 虽然 MapReduce 提供了对数据访问和计算的抽象&#xff0c…

一个月学会Java 第13天 抽象类与接口

Day13 抽象类与接口 通过了前面的学习&#xff0c;我们已经掌握了面向对象的基础 继承 封装 多态 第一章 抽象类 接下来&#xff0c;我们要对面向对象学习高级的部分&#xff0c;我们先要学到的就是抽象类&#xff0c;听名字也能想到&#xff0c;肯定很抽象&#xff0c;那我们先…

电力电子技术(二)

三相可控整流电路&#xff1a;&#xff08;主要包括三相半波和三相桥式&#xff09; &#xff08;一&#xff09;三相半波&#xff1a; &#xff08;1.1电阻性负载&#xff09; 右侧第三个图代表VT1晶闸管的流经电流波形&#xff0c;一个周期仅导通一次&#xff1a;晶闸管导…

Netty讲解与案例

1.Netty简介&#xff1a; 官网&#xff1a;https://netty.io/ Netty 是一个 NIO 客户端服务器框架&#xff0c;可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和精简了 TCP 和 UDP 套接字服务器等网络编程。 “快速简便”并不意味着最终的应用程序会存在…

Halcon 使用二维像素分类对图像进行分割

文章目录 算子histo_2dim 计算双通道灰度值图像的直方图class_2dim_sup 使用二维像素分类对图像进行分割 示例 算子 histo_2dim 计算双通道灰度值图像的直方图 histo_2dim(Regions, ImageCol, ImageRow : Histo2Dim : : )Regions (输入对象)&#xff1a;在此区域内计算直方图…

腾讯云视立方开通各项云服务相关

云直播 如何开通云直播服务&#xff1f; 进入 云直播管理控制台&#xff0c;进入腾讯云直播服务开通页&#xff0c;查看相关协议并勾选同意&#xff0c;单击申请开通即可开通云直播服务。 。 如何开启流防盗链 KEY&#xff1f; 推流防盗链 KEY 是为了确保只有您的 App 用户…

dockerfile 用法全解析

FROM 构建基于alpine的镜像&#xff0c;单条执行就是复制了一个apline镜像(除了FROM其他都是非必须的) WORKDIR 是之指定接下来的shell语句是运行在哪个路径下&#xff0c;没有就会创建目录 COPY 将宿主机指定目录的文件拷贝到镜像指定目录 &#xff08;ADD 源地址还可以url…

[LeetCode] 662. 二叉树最大宽度

题目描述&#xff1a; 给你一棵二叉树的根节点 root &#xff0c;返回树的 最大宽度 。 树的 最大宽度 是所有层中最大的 宽度 。 每一层的 宽度 被定义为该层最左和最右的非空节点&#xff08;即&#xff0c;两个端点&#xff09;之间的长度。将这个二叉树视作与满二叉树结…

【C语言刷力扣】2206.将数组划分成相等数对

题目&#xff1a; 解题思路&#xff1a; 题目中要求元素成数对出现&#xff0c;即每个元素出现偶数次。用哈希表存放每个数出现的次数&#xff0c;再循环查看每个数的次数是否位偶数。 typedef struct {int key;int count;UT_hash_handle hh; } hashEntry;bool divideArray(int…

IDEA下载安装

文章目录 1、下载安装包2、安装IDEA3、全局配置4、安装插件 1、下载安装包 IDEA官网下载最新IDEA。 上面的ULtimate是旗舰版&#xff0c;试用30天&#xff0c;之后是需要收费的&#xff0c;下面黑色区域的Community是社区版&#xff0c;功能不如旗舰版丰富&#xff0c;但是好在…

文件的二维码怎么做成?简单的3步生成二维码技巧

数字化时代的到来&#xff0c;用来将内容分享给其他人展示的方式也越来越多。其中二维码就是现在很流行的一种方式&#xff0c;将内容存入二维码后&#xff0c;其他人就可以扫描分享二维码来查看内容&#xff0c;那么文件生成二维码该怎么操作呢&#xff1f; 通过使用文件二维…

如何轻松实现Patreon订阅,获得独家内容

目录 1. 什么是Patreon&#xff1f; Patreon是一个非常流行的在线平台。 它支持内容创作者通过订阅的方式,直接从粉丝那里获取资金支持。无论是艺术家、音乐家,还是作家、视频制作人,都能在这里找到自己的粉丝群体。 简而言之,你可以通过订阅他们定期发布的独家内容,享受到更…

量子噪声与量子操作

由于量子不确定性和量子态的测量过程而引入的随机波动&#xff0c;量子噪声不可避免。 经典噪声 想象一下&#xff0c;一个比特存储在硬盘驱动器上&#xff0c; 它与普通计算机相连&#xff0c;该比特从状态0或1开始&#xff0c;经过长时间&#xff0c;散乱的磁场很可能会导致…

020 elasticsearch7.10.2 elasticsearch-head kibana安装

文章目录 全文检索流程ElasticSearch介绍ElasticSearch应用场景elasticsearch安装允许远程访问设置vm.max_map_count 的值 elasticsearch-head允许跨域 kibana 商品数量超千万&#xff0c;数据库无法使用索引 如何使用全文检索&#xff1a; 使用lucene&#xff0c;在java中唯一…

AI视频监控卫士:一键Docker简易安装,开源技术引领视频监控

AI视频监控卫士的主要应用场景&#xff1a; 我们决定开源的原因&#xff1a; 1. 灵活性与可定制性&#xff1a; 开源产品的代码对用户公开&#xff0c;允许开发者根据特定需求进行自定义和扩展。思通数科AI视频监控卫士作为开源项目&#xff0c;可以灵活适应不同企业或项目的需…

一些NLP代表性模型

&#xff08;一&#xff09;BERT 由Bidirectional Encoder Representations from Transformers的首字母组成&#xff0c;是encoder-only结构类型的代表。 模型分预训练和微调两步&#xff0c;预训练任务有两类&#xff1a;masked language model(MLM)、next sentence predict…